<RETURN_TO_BASE

Building a Unified Apache Beam Pipeline

Learn how to create an Apache Beam pipeline for batch and stream processing with event-time windowing.

Overview

In this tutorial, we demonstrate how to build a unified Apache Beam pipeline that works seamlessly in both batch and stream-like modes using the DirectRunner. We generate synthetic, event-time–aware data and apply fixed windowing with triggers and allowed lateness to illustrate how Apache Beam consistently handles both on-time and late events. By switching only the input source, we maintain identical core aggregation logic, allowing us to clearly understand Beam’s event-time model, windows, and panes without relying on external streaming infrastructure.

Installation and Setup

We install the required dependencies and ensure version compatibility for Apache Beam. We import the core Beam APIs along with windowing, triggers, and TestStream utilities needed in the pipeline. Additionally, we utilize standard Python modules for time handling and JSON formatting.

!pip -q install -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q install -U apache-beam crcmod
 
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone

Configuration

We define the global configuration that controls window size, lateness, and execution mode. We create synthetic events with explicit event-time timestamps to ensure deterministic and understandable windowing behavior.

MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120
 
 
def make_event(user_id, event_type, amount, event_time_epoch_s):
   return {"user_id": user_id, "event_type": event_type, "amount": float(amount), "event_time": int(event_time_epoch_s)}
 
base = datetime.now(timezone.utc).replace(microsecond=0)
t0 = int(base.timestamp())
 
BATCH_EVENTS = [
   make_event("u1", "purchase", 20, t0 + 5),
   make_event("u1", "purchase", 15, t0 + 20),
   make_event("u2", "purchase",  8, t0 + 35),
   make_event("u1", "refund",   -5, t0 + 62),
   make_event("u2", "purchase", 12, t0 + 70),
   make_event("u3", "purchase",  9, t0 + 75),
   make_event("u2", "purchase",  3, t0 + 50),
]

Windowed Aggregation Logic

We build a reusable Beam PTransform that encapsulates all windowed aggregation logic. We apply fixed windows, triggers, and accumulation rules, then group events by user to compute counts and sums.

def format_joined_record(kv):
   user_id, d = kv
   return {
       "user_id": user_id,
       "count": int(d["count"][0]) if d["count"] else 0,
       "sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
   }
 
class WindowedUserAgg(beam.PTransform):
   def expand(self, pcoll):
       stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
       windowed = stamped | beam.WindowInto(
           FixedWindows(WINDOW_SIZE_SECS),
           allowed_lateness=ALLOWED_LATENESS_SECS,
           trigger=AfterWatermark(
               early=AfterProcessingTime(10),
               late=AfterProcessingTime(10),
           ),
           accumulation_mode=AccumulationMode.ACCUMULATING,
       )
       keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
       counts = keyed | beam.combiners.Count.PerKey()
       sums = keyed | beam.CombinePerKey(sum)
       return (
           {"count": counts, "sum_amount": sums}
           | beam.CoGroupByKey()
           | beam.Map(format_joined_record)
       )

Adding Window Metadata

To enrich each aggregated record with window and pane metadata, we define logic to convert Beam’s internal timestamps into human-readable UTC times.

class AddWindowInfo(beam.DoFn):
   def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
       ws = float(window.start)
       we = float(window.end)
       yield {
           **element,
           "window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
           "window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
           "pane_timing": str(pane_info.timing),
           "pane_is_first": pane_info.is_first,
           "pane_is_last": pane_info.is_last,
       }

Executing the Pipeline

Finally, we wire everything into executable batch and stream-like pipelines, toggling between modes by changing a single flag.

def run_batch():
   with beam.Pipeline(options=PipelineOptions([])) as p:
       (
           p
           | beam.Create(BATCH_EVENTS)
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )
 
 
def run_stream():
   opts = PipelineOptions([])
   opts.view_as(StandardOptions).streaming = True
   with beam.Pipeline(options=opts) as p:
       (
           p
           | build_test_stream()
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )
 
run_stream() if MODE == "stream" else run_batch()

Key Takeaways

We demonstrated that the same Beam pipeline can process both bounded batch data and unbounded, stream-like data while preserving identical windowing and aggregation semantics. This foundational understanding enables scaling designs to real streaming runners and production environments.

🇷🇺

Сменить язык

Читать эту статью на русском

Переключить на Русский