Apache Beam hands on

Data processing workflows on GCP Dataflow

Avatar of Author
Jean-Christophe BaeyOctober 15, 2020
NodeJS 10+ LTS
Photo by Nick Fewings on Unsplash

TL;DR;

Dataflow on Google GCP is the de-facto tool for handling and processing a large amount of data (batch mode), including unbounded data (streaming mode). It provides a complete framework to perform real-time computations with built-in auto-scaling of flow graphs.

It is based on Apache Beam. Java and Python can be used to define acyclic graphs that compute your data.

This article will show you practical examples to understand the concept by the practice. The Apache beam documentation is well written and I strongly recommend you to start reading it before this page to understand the main concepts.

There are also some good examples available in the Apache beam Github.

Local installation

Create a Python virtual environment

  • If you don't have already pipenv
brew update
brew install pipenv
  • in a dedicated folder, create a dedicated env
mkdir beam && cd "$_"
pipenv --python 3
  • Start the python virtual env
pipenv shell

Install beam with GCP extensions

pip install "apache-beam[gcp]"

Create your first batch flow (test_01.py)

touch test_01.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
    produce_counts = (
        pipeline
        | 'Create produce counts' >> beam.Create([
            ('spring', '๐Ÿ“'),
            ('spring', '๐Ÿฅ•'),
            ('spring', '๐Ÿ†'),
            ('spring', '๐Ÿ…'),
            ('summer', '๐Ÿฅ•'),
            ('summer', '๐Ÿ…'),
            ('summer', '๐ŸŒฝ'),
            ('fall', '๐Ÿฅ•'),
            ('fall', '๐Ÿ…'),
            ('winter', '๐Ÿ†'),
        ])
        | 'Group counts per produce' >> beam.GroupByKey()
        | beam.Map(print))
python test_01.py
('spring', ['๐Ÿ“', '๐Ÿฅ•', '๐Ÿ†', '๐Ÿ…'])
('summer', ['๐Ÿฅ•', '๐Ÿ…', '๐ŸŒฝ'])
('fall', ['๐Ÿฅ•', '๐Ÿ…'])
('winter', ['๐Ÿ†'])

Create your 2nd batch flow (test_02.py)

touch test_02.py
import apache_beam as beam
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

EVENTS = [
      "elem1", "elem3", "elem2",
      "elem1", "elem1", "elem3", "elem2", "elem2", "elem2"
  ]

EXPECTED_COUNTS = [("elem1", 3), ("elem2", 4), ("elem3", 2)]

with beam.Pipeline() as p:
    input = (p | beam.Create(EVENTS))

    # Apply the Count transform
    output = input | beam.combiners.Count.PerElement()

    # Print the result
    _ = (output | beam.Map(print))

    # Assert the result
    assert_that(
        output,
        equal_to(EXPECTED_COUNTS))
python test_02.py
('elem1', 3)
('elem3', 2)
('elem2', 4)

Create your 3nd batch flow (test_03.py)

touch test_03.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

EVENTS = [
      {
          "ts": "2020-10-15T07:59:00.000Z",
          "type": "A",
          "name": "event1"
      },
      {
          "ts": "2020-10-15T08:00:00.000Z",
          "type": "B",
          "name": "event2"
      },
      {
          "ts": "2020-10-15T08:01:00.000Z",
          "type": "B",
          "name": "event3"
      },
      {
          "ts": "2020-10-15T08:02:00.000Z",
          "type": "C",
          "name": "event4"
      },
      {
          "ts": "2020-10-15T08:03:00.000Z",
          "type": "A",
          "name": "event5"
      },
  ]

options = PipelineOptions()
p = beam.Pipeline(options=options)
 
input = (p | beam.Create(EVENTS))

# Set the key and group by key
output = (input 
    | 'KeyByType' >> beam.Map(lambda x: (x["type"], x))
    | 'GroupByType' >> beam.GroupByKey())
        
# Print the result
_ = (output | beam.Map(print))

result = p.run()
result.wait_until_finish()
python test_03.py
('A', [{'ts': '2020-10-15T07:59:00.000Z', 'type': 'A', 'name': 'event1'}, {'ts': '2020-10-15T08:03:00.000Z', 'type': 'A', 'name': 'event4'}])
('B', [{'ts': '2020-10-15T08:00:00.000Z', 'type': 'B', 'name': 'event2'}, {'ts': '2020-10-15T08:01:00.000Z', 'type': 'B', 'name': 'event3'}])
('C', [{'ts': '2020-10-15T08:02:00.000Z', 'type': 'C', 'name': 'event4'}])

Important notes: grouping per key is possible in batch mode as every elements are put in a global (and single) window. Beam will wait until the last element to output the grouping results. On an unbounded collection (streaming), a window is required to specify when beam need to compute the grouping.

Create your 1st streaming flow (test_04.py)

touch test_04.py
import apache_beam as beam
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window

# Passthrough transformation method to print element with their timestamp
class PrintMessage(beam.DoFn):
    def __init__(self, prefix=""):
        self.text = prefix
    def process(self, elem, timestamp=beam.DoFn.TimestampParam):
        print('{}{} {}'.format(self.text, timestamp.to_utc_datetime(), elem))
        yield elem

# Create a fake streaming input
options = StandardOptions(streaming=True)
with beam.Pipeline(options=options) as p:

    test_stream = (TestStream()
                    # watermark is the current business time (!= from processing time)
                    .advance_watermark_to(0)
                    .add_elements([
                        TimestampedValue('a', 1), # We defined an element 'a' with business time = '1'
                        TimestampedValue('b', 2),
                        TimestampedValue('c', 4),
                        TimestampedValue('d', 8),
                        TimestampedValue('e', 9),
                    ])
                    .advance_processing_time(10)
                    .advance_watermark_to(15)
                    # f, g will get the watermark timestamp
                    .add_elements(['f', 'g', TimestampedValue('h', 10)])
                    .advance_watermark_to_infinity())

    input = p | test_stream | 'Print message' >> beam.ParDo(PrintMessage("Input: "))

    # It will produce the stream:
        # 1970-01-01 00:00:01 a
        # 1970-01-01 00:00:02 b
        # 1970-01-01 00:00:04 c
        # 1970-01-01 00:00:08 d
        # 1970-01-01 00:00:09 e
        # 1970-01-01 00:00:15 f
        # 1970-01-01 00:00:15 g
        # 1970-01-01 00:00:10 h

    _ = (input 
                | 'Create 2s time window' >> beam.WindowInto(window.FixedWindows(2))
                | 'Add dummy key' >> beam.Map(lambda x: ("dummy_key", x))
                | 'Count per Key per Window' >> beam.combiners.Count.PerKey()
                | 'Print result' >> beam.ParDo(PrintMessage("Result: "))
    )
python test_04.py
Input: 1970-01-01 00:00:01 a
Input: 1970-01-01 00:00:02 b
Input: 1970-01-01 00:00:04 c
Input: 1970-01-01 00:00:08 d
Input: 1970-01-01 00:00:09 e
Result: 1970-01-01 00:00:01.999999 ('dummy_key', 1)
Result: 1970-01-01 00:00:03.999999 ('dummy_key', 1)
Result: 1970-01-01 00:00:05.999999 ('dummy_key', 1)
Result: 1970-01-01 00:00:09.999999 ('dummy_key', 2)
Input: 1970-01-01 00:00:15 f
Input: 1970-01-01 00:00:15 g
Input: 1970-01-01 00:00:10 h
Result: 1970-01-01 00:00:15.999999 ('dummy_key', 2)

If we group items instead of counting:

   _ = (input 
                | 'Create 2s time window' >> beam.WindowInto(window.FixedWindows(2))
                | 'Add dummy key' >> beam.Map(lambda x: ("dummy_key", x))
                | 'Group per Key per Window' >> beam.GroupByKey()
                | 'Print result' >> beam.ParDo(PrintMessage("Result: "))
    )

We get:

Input: 1970-01-01 00:00:01 a
Input: 1970-01-01 00:00:02 b
Input: 1970-01-01 00:00:04 c
Input: 1970-01-01 00:00:08 d
Input: 1970-01-01 00:00:09 e
Result: 1970-01-01 00:00:01.999999 ('dummy_key', ['a'])
Result: 1970-01-01 00:00:03.999999 ('dummy_key', ['b'])
Result: 1970-01-01 00:00:05.999999 ('dummy_key', ['c'])
Result: 1970-01-01 00:00:09.999999 ('dummy_key', ['d', 'e'])
Input: 1970-01-01 00:00:15 f
Input: 1970-01-01 00:00:15 g
Input: 1970-01-01 00:00:10 h
Result: 1970-01-01 00:00:15.999999 ('dummy_key', ['f', 'g'])

Create your 2nd streaming flow (test_05.py)

touch test_05.py
pip install iso8601
import apache_beam as beam
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue, Duration
from apache_beam.transforms import window

import time
from iso8601 import parse_date
from pytz import reference

# Convert a string timestamp formatted in ISO 8601 into a UNIX timestamp
def iso2unix(dateTimeTxt):
    parsed = parse_date(dateTimeTxt).astimezone(reference.LocalTimezone())
    timetuple = parsed.timetuple()
    return time.mktime(timetuple)

EVENTS = [
    {
        "ts": iso2unix("2020-10-15T08:00:00.000Z"),
        "type": "A",
        "name": "event1"
    },
    {
        "ts": iso2unix("2020-10-15T08:01:00.000Z"),
        "type": "B",
        "name": "event2"
    },
    {
        "ts": iso2unix("2020-10-15T08:01:30.000Z"),
        "type": "B",
        "name": "event3"
    },
    {
        "ts": iso2unix("2020-10-15T08:02:00.000Z"),
        "type": "B",
        "name": "event4"
    },
    {
        "ts": iso2unix("2020-10-15T08:03:00.000Z"),
        "type": "C",
        "name": "event5"
    },
    {
        "ts": iso2unix("2020-10-15T08:04:00.000Z"),
        "type": "A",
        "name": "event7"
    },
  ]

LATE_EVENTS = [
    {
        "ts": iso2unix("2020-10-15T08:03:30.000Z"),
        "type": "C",
        "name": "event6"
    },
]

# Passthrough transformation method to print element with their timestamp
class PrintMessage(beam.DoFn):
    def __init__(self, prefix=""):
        self.text = prefix
    def process(self, elem, timestamp=beam.DoFn.TimestampParam):
        print('{}{} {}'.format(self.text, timestamp.to_utc_datetime(), elem))
        yield elem

# Create a fake streaming input
options = StandardOptions(streaming=True)
with beam.Pipeline(options=options) as p:

    test_stream = (TestStream()
                    # watermark is the current business time (!= from processing time)
                    .advance_watermark_to(iso2unix("2020-10-15T08:00:00.000Z"))
                    .add_elements(EVENTS)
                    .add_elements(LATE_EVENTS)
                    .advance_watermark_to_infinity())

    input = (p | test_stream 
                | 'With timestamps' >> beam.Map(lambda x: TimestampedValue(x, x['ts']))
    )

    _ = (input 
                | 'Create 2min time window' >> beam.WindowInto(
                    window.FixedWindows(2 * 60))
                | 'Add type key, keep name prop only' >> beam.Map(lambda x: (x["type"], x["name"]))
                | 'Group per Key per Window' >> beam.GroupByKey()
                | 'Print result' >> beam.ParDo(PrintMessage("Result: "))
    )
python test_05.py
Result: 2020-10-15 08:01:59.999999 ('A', ['event1'])
Result: 2020-10-15 08:01:59.999999 ('B', ['event2', 'event3'])
Result: 2020-10-15 08:03:59.999999 ('B', ['event4'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event5', 'event6'])
Result: 2020-10-15 08:05:59.999999 ('A', ['event7'])
  • We can simulate a late arrival message (a message that arrives a long time after it should have):
    test_stream = (TestStream()
                    # watermark is the current business time (!= from processing time)
                    .advance_watermark_to(iso2unix("2020-10-15T08:00:00.000Z"))
                    .add_elements(EVENTS)
                    .advance_watermark_to(iso2unix("2020-10-15T08:05:00.000Z"))
                    .add_elements(LATE_EVENTS)
                    .advance_watermark_to_infinity())

In that case, we get:

Result: 2020-10-15 08:01:59.999999 ('A', ['event1'])
Result: 2020-10-15 08:01:59.999999 ('B', ['event2', 'event3'])
Result: 2020-10-15 08:03:59.999999 ('B', ['event4'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event5'])
Result: 2020-10-15 08:05:59.999999 ('A', ['event7'])

Notes:

  • The event6 is missing has the window he belongs to was already closed when he arrives.
  • To allow late arrival in the window, we can add options: allowed_lateness:
    _ = (input 
                | 'Create 2min time window' >> beam.WindowInto(
                    window.FixedWindows(2 * 60),
                    allowed_lateness=Duration(seconds=60))
                | 'Add type key, keep name prop only' >> beam.Map(lambda x: (x["type"], x["name"]))
                | 'Group per Key per Window' >> beam.GroupByKey()
                | 'Print result' >> beam.ParDo(PrintMessage("Result: "))
    )

In that case, we get:

Result: 2020-10-15 08:01:59.999999 ('A', ['event1'])
Result: 2020-10-15 08:01:59.999999 ('B', ['event2', 'event3'])
Result: 2020-10-15 08:03:59.999999 ('B', ['event4'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event5'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event6 (late)'])
Result: 2020-10-15 08:05:59.999999 ('A', ['event7'])
  • We can also ask beam to keep the complete window, with the option accumulation_mode=AccumulationMode.ACCUMULATING, by default it is AccumulationMode.DISCARDING:
    from apache_beam.transforms.trigger import AccumulationMode

    _ = (input 
                | 'Create 2min time window' >> beam.WindowInto(
                    window.FixedWindows(2 * 60),
                    allowed_lateness=Duration(seconds=60),
                    accumulation_mode=AccumulationMode.ACCUMULATING
                )
                | 'Add type key, keep name prop only' >> beam.Map(lambda x: (x["type"], x["name"]))
                | 'Group per Key per Window' >> beam.GroupByKey()
                | 'Print result' >> beam.ParDo(PrintMessage("Result: "))
    )

In that case, we get:

Result: 2020-10-15 08:01:59.999999 ('A', ['event1'])
Result: 2020-10-15 08:01:59.999999 ('B', ['event2', 'event3'])
Result: 2020-10-15 08:03:59.999999 ('B', ['event4'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event5'])
Result: 2020-10-15 08:03:59.999999 ('C', ['event5', 'event6 (late)'])
Result: 2020-10-15 08:05:59.999999 ('A', ['event7'])

Create your 3nd streaming flow (test_06.py)

touch test_06.py
import apache_beam as beam
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue, Duration
from apache_beam.transforms import window
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterCount, Repeatedly, AfterAny, AfterWatermark, OrFinally

import time
from iso8601 import parse_date
from pytz import reference

# Convert a string timestamp formatted in ISO 8601 into a UNIX timestamp
def iso2unix(dateTimeTxt):
    parsed = parse_date(dateTimeTxt).astimezone(reference.LocalTimezone())
    timetuple = parsed.timetuple()
    return time.mktime(timetuple)

EVENTS = [
    {
        "ts": iso2unix("2020-10-15T08:00:00.000Z"),
        "uid": "user1",
        "type": "check-in",
        "location": "France"
    },
    {
        "ts": iso2unix("2020-10-15T08:01:00.000Z"),
        "uid": "user1",
        "type": "action1",
    },
    {
        "ts": iso2unix("2020-10-15T08:01:15.000Z"),
        "uid": "user1",
        "type": "action2",
    },
    {
        "ts": iso2unix("2020-10-15T08:01:20.000Z"),
        "uid": "user1",
        "type": "action3",
    },
    {
        "ts": iso2unix("2020-10-15T08:01:30.000Z"),
        "uid": "user1",
        "type": "check-out",
    },
     {
        "ts": iso2unix("2020-10-15T08:00:30.000Z"),
        "uid": "user2",
        "type": "check-in",
        "location": "USA"
    },
    {
        "ts": iso2unix("2020-10-15T08:00:45.000Z"),
        "uid": "user2",
        "type": "action1",
    },
    {
        "ts": iso2unix("2020-10-15T08:01:15.000Z"),
        "uid": "user2",
        "type": "action2",
    },
    {
        "ts": iso2unix("2020-10-15T08:01:20.000Z"),
        "uid": "user2",
        "type": "action3",
    }, 
  ]

LATE_EVENTS = [
    {
        "ts": iso2unix("2020-10-15T08:02:30.000Z"),
        "uid": "user2",
        "type": "check-out",
    },  
]

# Passthrough transformation method to print element with their timestamp
class PrintMessage(beam.DoFn):
    def __init__(self, prefix=""):
        self.text = prefix
    def process(self, elem, timestamp=beam.DoFn.TimestampParam):
        print('{}{} {}'.format(self.text, timestamp.to_utc_datetime(), elem))
        yield elem

# Create a fake streaming input
options = StandardOptions(streaming=True)
with beam.Pipeline(options=options) as p:

    test_stream = (TestStream()
                    # watermark is the current business time (!= from processing time)
                    .advance_watermark_to(iso2unix("2020-10-15T08:00:00.000Z"))
                    .add_elements(EVENTS)
                    .advance_watermark_to(iso2unix("2020-10-15T08:05:00.000Z"))
                    .add_elements(LATE_EVENTS)
                    .advance_watermark_to_infinity())

    input = (p | test_stream 
                | 'With timestamps' >> beam.Map(lambda x: TimestampedValue(x, x['ts']))
                # | 'Print message' >> beam.ParDo(PrintMessage("Input: "))
    )

    session_gap_in_seconds = 10*60 # 10 minutes
    _ = (input 
                | 'keyed on uid' >> beam.Map(lambda x: (x["uid"], x["type"]))
                | 'Session Window' >> beam.WindowInto(window.Sessions(session_gap_in_seconds),
                                        trigger=OrFinally(Repeatedly(AfterCount(2)), AfterWatermark()),
                                        timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EARLIEST, # Define the timestamp on the output, default is OUTPUT_AT_EOW
                                        accumulation_mode=AccumulationMode.DISCARDING,
                                        )
                | 'Group' >> beam.GroupByKey() \
                | 'Print result' >> beam.ParDo(PrintMessage("Result: "))
    )
python test_06.py
Result: 2020-10-15 08:00:00 ('user1', ['check-in', 'action1', 'action2', 'action3', 'check-out'])
Result: 2020-10-15 08:00:30 ('user2', ['check-in', 'action1', 'action2', 'action3'])
Result: 2020-10-15 08:00:00 ('user1', [])
Result: 2020-10-15 08:00:30 ('user2', ['check-out'])

๐Ÿ‘‰ Don't forget to follow me on Twitter to be notified when new posts are available!