Data processing workflows on GCP Dataflow
Dataflow on Google GCP is the de-facto tool for handling and processing a large amount of data (batch mode), including unbounded data (streaming mode). It provides a complete framework to perform real-time computations with built-in auto-scaling of flow graphs.
It is based on Apache Beam. Java and Python can be used to define acyclic graphs that compute your data.
This article will show you practical examples to understand the concept by the practice. The Apache beam documentation is well written and I strongly recommend you to start reading it before this page to understand the main concepts.
There are also some good examples available in the Apache beam Github.
pipenv
brew update
brew install pipenv
mkdir beam && cd "$_"
pipenv --python 3
pipenv shell
pip install "apache-beam[gcp]"
touch test_01.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
produce_counts = (
pipeline
| 'Create produce counts' >> beam.Create([
('spring', '๐'),
('spring', '๐ฅ'),
('spring', '๐'),
('spring', '๐
'),
('summer', '๐ฅ'),
('summer', '๐
'),
('summer', '๐ฝ'),
('fall', '๐ฅ'),
('fall', '๐
'),
('winter', '๐'),
])
| 'Group counts per produce' >> beam.GroupByKey()
| beam.Map(print))
python test_01.py
('spring', ['๐', '๐ฅ', '๐', '๐
'])
('summer', ['๐ฅ', '๐
', '๐ฝ'])
('fall', ['๐ฅ', '๐
'])
('winter', ['๐'])
touch test_02.py
import apache_beam as beam
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
EVENTS = [
"elem1", "elem3", "elem2",
"elem1", "elem1", "elem3", "elem2", "elem2", "elem2"
]
EXPECTED_COUNTS = [("elem1", 3), ("elem2", 4), ("elem3", 2)]
with beam.Pipeline() as p:
input = (p | beam.Create(EVENTS))
# Apply the Count transform
output = input | beam.combiners.Count.PerElement()
# Print the result
_ = (output | beam.Map(print))
# Assert the result
assert_that(
output,
equal_to(EXPECTED_COUNTS))
python test_02.py
('elem1', 3)
('elem3', 2)
('elem2', 4)
touch test_03.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
EVENTS = [
{
"ts": "2020-10-15T07:59:00.000Z",
"type": "A",
"name": "event1"
},
{
"ts": "2020-10-15T08:00:00.000Z",
"type": "B",
"name": "event2"
},
{
"ts": "2020-10-15T08:01:00.000Z",
"type": "B",
"name": "event3"
},
{
"ts": "2020-10-15T08:02:00.000Z",
"type": "C",
"name": "event4"
},
{
"ts": "2020-10-15T08:03:00.000Z",
"type": "A",
"name": "event5"
},
]
options = PipelineOptions()
p = beam.Pipeline(options=options)
input = (p | beam.Create(EVENTS))
# Set the key and group by key
output = (input
| 'KeyByType' >> beam.Map(lambda x: (x["type"], x))
| 'GroupByType' >> beam.GroupByKey())
# Print the result
_ = (output | beam.Map(print))
result = p.run()
result.wait_until_finish()
python test_03.py
('A', [{'ts': '2020-10-15T07:59:00.000Z', 'type': 'A', 'name': 'event1'}, {'ts': '2020-10-15T08:03:00.000Z', 'type': 'A', 'name': 'event4'}])
('B', [{'ts': '2020-10-15T08:00:00.000Z', 'type': 'B', 'name': 'event2'}, {'ts': '2020-10-15T08:01:00.000Z', 'type': 'B', 'name': 'event3'}])
('C', [{'ts': '2020-10-15T08:02:00.000Z', 'type': 'C', 'name': 'event4'}])
Important notes: grouping per key is possible in batch mode as every elements are put in a global (and single) window. Beam will wait until the last element to output the grouping results. On an unbounded collection (streaming), a window is required to specify when beam need to compute the grouping.
touch test_04.py
import apache_beam as beam
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window
# Passthrough transformation method to print element with their timestamp
class PrintMessage(beam.DoFn):
def __init__(self, prefix=""):
self.text = prefix
def process(self, elem, timestamp=beam.DoFn.TimestampParam):
print('{}{} {}'.format(self.text, timestamp.to_utc_datetime(), elem))
yield elem
# Create a fake streaming input
options = StandardOptions(streaming=True)
with beam.Pipeline(options=options) as p:
test_stream = (TestStream()
# watermark is the current business time (!= from processing time)
.advance_watermark_to(0)
.add_elements([
TimestampedValue('a', 1), # We defined an element 'a' with business time = '1'
TimestampedValue('b', 2),
TimestampedValue('c', 4),
TimestampedValue('d', 8),
TimestampedValue('e', 9),
])
.advance_processing_time(10)
.advance_watermark_to(15)
# f, g will get the watermark timestamp
.add_elements(['f', 'g', TimestampedValue('h', 10)])
.advance_watermark_to_infinity())
input = p | test_stream | 'Print message' >> beam.ParDo(PrintMessage("Input: "))
# It will produce the stream:
# 1970-01-01 00:00:01 a
# 1970-01-01 00:00:02 b
# 1970-01-01 00:00:04 c
# 1970-01-01 00:00:08 d
# 1970-01-01 00:00:09 e
# 1970-01-01 00:00:15 f
# 1970-01-01 00:00:15 g
# 1970-01-01 00:00:10 h
_ = (input
| 'Create 2s time window' >> beam.WindowInto(window.FixedWindows(2))
| 'Add dummy key' >> beam.Map(lambda x: ("dummy_key", x))
| 'Count per Key per Window' >> beam.combiners.Count.PerKey()
| 'Print result' >> beam.ParDo(PrintMessage("Result: "))
)
python test_04.py
Input: 1970-01-01 00:00:01 a
Input: 1970-01-01 00:00:02 b
Input: 1970-01-01 00:00:04 c
Input: 1970-01-01 00:00:08 d
Input: 1970-01-01 00:00:09 e
Result: 1970-01-01 00:00:01.999999 ('dummy_key', 1)
Result: 1970-01-01 00:00:03.999999 ('dummy_key', 1)
Result: 1970-01-01 00:00:05.999999 ('dummy_key', 1)
Result: 1970-01-01 00:00:09.999999 ('dummy_key', 2)
Input: 1970-01-01 00:00:15 f
Input: 1970-01-01 00:00:15 g
Input: 1970-01-01 00:00:10 h
Result: 1970-01-01 00:00:15.999999 ('dummy_key', 2)
If we group items instead of counting:
_ = (input
| 'Create 2s time window' >> beam.WindowInto(window.FixedWindows(2))
| 'Add dummy key' >> beam.Map(lambda x: ("dummy_key", x))
| 'Group per Key per Window' >> beam.GroupByKey()
| 'Print result' >> beam.ParDo(PrintMessage("Result: "))
)
We get:
Input: 1970-01-01 00:00:01 a
Input: 1970-01-01 00:00:02 b
Input: 1970-01-01 00:00:04 c
Input: 1970-01-01 00:00:08 d
Input: 1970-01-01 00:00:09 e
Result: 1970-01-01 00:00:01.999999 ('dummy_key', ['a'])
Result: 1970-01-01 00:00:03.999999 ('dummy_key', ['b'])
Result: 1970-01-01 00:00:05.999999 ('dummy_key', ['c'])
Result: 1970-01-01 00:00:09.999999 ('dummy_key', ['d', 'e'])
Input: 1970-01-01 00:00:15 f
Input: 1970-01-01 00:00:15 g
Input: 1970-01-01 00:00:10 h
Result: 1970-01-01 00:00:15.999999 ('dummy_key', ['f', 'g'])
touch test_05.py
pip install iso8601
import apache_beam as beam
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue, Duration
from apache_beam.transforms import window
import time
from iso8601 import parse_date
from pytz import reference
# Convert a string timestamp formatted in ISO 8601 into a UNIX timestamp
def iso2unix(dateTimeTxt):
parsed = parse_date(dateTimeTxt).astimezone(reference.LocalTimezone())
timetuple = parsed.timetuple()
return time.mktime(timetuple)
EVENTS = [
{
"ts": iso2unix("2020-10-15T08:00:00.000Z"),
"type": "A",
"name": "event1"
},
{
"ts": iso2unix("2020-10-15T08:01:00.000Z"),
"type": "B",
"name": "event2"
},
{
"ts": iso2unix("2020-10-15T08:01:30.000Z"),
"type": "B",
"name": "event3"
},
{
"ts": iso2unix("2020-10-15T08:02:00.000Z"),
"type": "B",
"name": "event4"
},
{
"ts": iso2unix("2020-10-15T08:03:00.000Z"),
"type": "C",
"name": "event5"
},
{
"ts": iso2unix("2020-10-15T08:04:00.000Z"),
"type": "A",
"name": "event7"
},
]
LATE_EVENTS = [
{
"ts": iso2unix("2020-10-15T08:03:30.000Z"),
"type": "C",
"name": "event6"
},
]
# Passthrough transformation method to print element with their timestamp
class PrintMessage(beam.DoFn):
def __init__(self, prefix=""):
self.text = prefix
def process(self, elem, timestamp=beam.DoFn.TimestampParam):
print('{}{} {}'.format(self.text, timestamp.to_utc_datetime(), elem))
yield elem
# Create a fake streaming input
options = StandardOptions(streaming=True)
with beam.Pipeline(options=options) as p:
test_stream = (TestStream()
# watermark is the current business time (!= from processing time)
.advance_watermark_to(iso2unix("2020-10-15T08:00:00.000Z"))
.add_elements(EVENTS)
.add_elements(LATE_EVENTS)
.advance_watermark_to_infinity())
input = (p | test_stream
| 'With timestamps' >> beam.Map(lambda x: TimestampedValue(x, x['ts']))
)
_ = (input
| 'Create 2min time window' >> beam.WindowInto(
window.FixedWindows(2 * 60))
| 'Add type key, keep name prop only' >> beam.Map(lambda x: (x["type"], x["name"]))
| 'Group per Key per Window' >> beam.GroupByKey()
| 'Print result' >> beam.ParDo(PrintMessage("Result: "))
)
python test_05.py
Result: 2020-10-15 08:01:59.999999 ('A', ['event1'])
Result: 2020-10-15 08:01:59.999999 ('B', ['event2', 'event3'])
Result: 2020-10-15 08:03:59.999999 ('B', ['event4'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event5', 'event6'])
Result: 2020-10-15 08:05:59.999999 ('A', ['event7'])
test_stream = (TestStream()
# watermark is the current business time (!= from processing time)
.advance_watermark_to(iso2unix("2020-10-15T08:00:00.000Z"))
.add_elements(EVENTS)
.advance_watermark_to(iso2unix("2020-10-15T08:05:00.000Z"))
.add_elements(LATE_EVENTS)
.advance_watermark_to_infinity())
In that case, we get:
Result: 2020-10-15 08:01:59.999999 ('A', ['event1'])
Result: 2020-10-15 08:01:59.999999 ('B', ['event2', 'event3'])
Result: 2020-10-15 08:03:59.999999 ('B', ['event4'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event5'])
Result: 2020-10-15 08:05:59.999999 ('A', ['event7'])
Notes:
allowed_lateness
: _ = (input
| 'Create 2min time window' >> beam.WindowInto(
window.FixedWindows(2 * 60),
allowed_lateness=Duration(seconds=60))
| 'Add type key, keep name prop only' >> beam.Map(lambda x: (x["type"], x["name"]))
| 'Group per Key per Window' >> beam.GroupByKey()
| 'Print result' >> beam.ParDo(PrintMessage("Result: "))
)
In that case, we get:
Result: 2020-10-15 08:01:59.999999 ('A', ['event1'])
Result: 2020-10-15 08:01:59.999999 ('B', ['event2', 'event3'])
Result: 2020-10-15 08:03:59.999999 ('B', ['event4'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event5'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event6 (late)'])
Result: 2020-10-15 08:05:59.999999 ('A', ['event7'])
accumulation_mode=AccumulationMode.ACCUMULATING
, by default it is AccumulationMode.DISCARDING
: from apache_beam.transforms.trigger import AccumulationMode
_ = (input
| 'Create 2min time window' >> beam.WindowInto(
window.FixedWindows(2 * 60),
allowed_lateness=Duration(seconds=60),
accumulation_mode=AccumulationMode.ACCUMULATING
)
| 'Add type key, keep name prop only' >> beam.Map(lambda x: (x["type"], x["name"]))
| 'Group per Key per Window' >> beam.GroupByKey()
| 'Print result' >> beam.ParDo(PrintMessage("Result: "))
)
In that case, we get:
Result: 2020-10-15 08:01:59.999999 ('A', ['event1'])
Result: 2020-10-15 08:01:59.999999 ('B', ['event2', 'event3'])
Result: 2020-10-15 08:03:59.999999 ('B', ['event4'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event5'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event5', 'event6 (late)'])
Result: 2020-10-15 08:05:59.999999 ('A', ['event7'])
touch test_06.py
import apache_beam as beam
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue, Duration
from apache_beam.transforms import window
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterCount, Repeatedly, AfterAny, AfterWatermark, OrFinally
import time
from iso8601 import parse_date
from pytz import reference
# Convert a string timestamp formatted in ISO 8601 into a UNIX timestamp
def iso2unix(dateTimeTxt):
parsed = parse_date(dateTimeTxt).astimezone(reference.LocalTimezone())
timetuple = parsed.timetuple()
return time.mktime(timetuple)
EVENTS = [
{
"ts": iso2unix("2020-10-15T08:00:00.000Z"),
"uid": "user1",
"type": "check-in",
"location": "France"
},
{
"ts": iso2unix("2020-10-15T08:01:00.000Z"),
"uid": "user1",
"type": "action1",
},
{
"ts": iso2unix("2020-10-15T08:01:15.000Z"),
"uid": "user1",
"type": "action2",
},
{
"ts": iso2unix("2020-10-15T08:01:20.000Z"),
"uid": "user1",
"type": "action3",
},
{
"ts": iso2unix("2020-10-15T08:01:30.000Z"),
"uid": "user1",
"type": "check-out",
},
{
"ts": iso2unix("2020-10-15T08:00:30.000Z"),
"uid": "user2",
"type": "check-in",
"location": "USA"
},
{
"ts": iso2unix("2020-10-15T08:00:45.000Z"),
"uid": "user2",
"type": "action1",
},
{
"ts": iso2unix("2020-10-15T08:01:15.000Z"),
"uid": "user2",
"type": "action2",
},
{
"ts": iso2unix("2020-10-15T08:01:20.000Z"),
"uid": "user2",
"type": "action3",
},
]
LATE_EVENTS = [
{
"ts": iso2unix("2020-10-15T08:02:30.000Z"),
"uid": "user2",
"type": "check-out",
},
]
# Passthrough transformation method to print element with their timestamp
class PrintMessage(beam.DoFn):
def __init__(self, prefix=""):
self.text = prefix
def process(self, elem, timestamp=beam.DoFn.TimestampParam):
print('{}{} {}'.format(self.text, timestamp.to_utc_datetime(), elem))
yield elem
# Create a fake streaming input
options = StandardOptions(streaming=True)
with beam.Pipeline(options=options) as p:
test_stream = (TestStream()
# watermark is the current business time (!= from processing time)
.advance_watermark_to(iso2unix("2020-10-15T08:00:00.000Z"))
.add_elements(EVENTS)
.advance_watermark_to(iso2unix("2020-10-15T08:05:00.000Z"))
.add_elements(LATE_EVENTS)
.advance_watermark_to_infinity())
input = (p | test_stream
| 'With timestamps' >> beam.Map(lambda x: TimestampedValue(x, x['ts']))
# | 'Print message' >> beam.ParDo(PrintMessage("Input: "))
)
session_gap_in_seconds = 10*60 # 10 minutes
_ = (input
| 'keyed on uid' >> beam.Map(lambda x: (x["uid"], x["type"]))
| 'Session Window' >> beam.WindowInto(window.Sessions(session_gap_in_seconds),
trigger=OrFinally(Repeatedly(AfterCount(2)), AfterWatermark()),
timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EARLIEST, # Define the timestamp on the output, default is OUTPUT_AT_EOW
accumulation_mode=AccumulationMode.DISCARDING,
)
| 'Group' >> beam.GroupByKey() \
| 'Print result' >> beam.ParDo(PrintMessage("Result: "))
)
python test_06.py
Result: 2020-10-15 08:00:00 ('user1', ['check-in', 'action1', 'action2', 'action3', 'check-out'])
Result: 2020-10-15 08:00:30 ('user2', ['check-in', 'action1', 'action2', 'action3'])
Result: 2020-10-15 08:00:00 ('user1', [])
Result: 2020-10-15 08:00:30 ('user2', ['check-out'])
๐ Don't forget to follow me on Twitter to be notified when new posts are available!
Follow @jcbaey