Building a Unified Apache Beam Pipeline
Learn how to create an Apache Beam pipeline for batch and stream processing with event-time windowing.
Overview
In this tutorial, we demonstrate how to build a unified Apache Beam pipeline that works seamlessly in both batch and stream-like modes using the DirectRunner. We generate synthetic, event-time–aware data and apply fixed windowing with triggers and allowed lateness to illustrate how Apache Beam consistently handles both on-time and late events. By switching only the input source, we maintain identical core aggregation logic, allowing us to clearly understand Beam’s event-time model, windows, and panes without relying on external streaming infrastructure.
Installation and Setup
We install the required dependencies and ensure version compatibility for Apache Beam. We import the core Beam APIs along with windowing, triggers, and TestStream utilities needed in the pipeline. Additionally, we utilize standard Python modules for time handling and JSON formatting.
!pip -q install -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q install -U apache-beam crcmod
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezoneConfiguration
We define the global configuration that controls window size, lateness, and execution mode. We create synthetic events with explicit event-time timestamps to ensure deterministic and understandable windowing behavior.
MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120
def make_event(user_id, event_type, amount, event_time_epoch_s):
return {"user_id": user_id, "event_type": event_type, "amount": float(amount), "event_time": int(event_time_epoch_s)}
base = datetime.now(timezone.utc).replace(microsecond=0)
t0 = int(base.timestamp())
BATCH_EVENTS = [
make_event("u1", "purchase", 20, t0 + 5),
make_event("u1", "purchase", 15, t0 + 20),
make_event("u2", "purchase", 8, t0 + 35),
make_event("u1", "refund", -5, t0 + 62),
make_event("u2", "purchase", 12, t0 + 70),
make_event("u3", "purchase", 9, t0 + 75),
make_event("u2", "purchase", 3, t0 + 50),
]Windowed Aggregation Logic
We build a reusable Beam PTransform that encapsulates all windowed aggregation logic. We apply fixed windows, triggers, and accumulation rules, then group events by user to compute counts and sums.
def format_joined_record(kv):
user_id, d = kv
return {
"user_id": user_id,
"count": int(d["count"][0]) if d["count"] else 0,
"sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
}
class WindowedUserAgg(beam.PTransform):
def expand(self, pcoll):
stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
windowed = stamped | beam.WindowInto(
FixedWindows(WINDOW_SIZE_SECS),
allowed_lateness=ALLOWED_LATENESS_SECS,
trigger=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(10),
),
accumulation_mode=AccumulationMode.ACCUMULATING,
)
keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
counts = keyed | beam.combiners.Count.PerKey()
sums = keyed | beam.CombinePerKey(sum)
return (
{"count": counts, "sum_amount": sums}
| beam.CoGroupByKey()
| beam.Map(format_joined_record)
)Adding Window Metadata
To enrich each aggregated record with window and pane metadata, we define logic to convert Beam’s internal timestamps into human-readable UTC times.
class AddWindowInfo(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
ws = float(window.start)
we = float(window.end)
yield {
**element,
"window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
"window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
"pane_timing": str(pane_info.timing),
"pane_is_first": pane_info.is_first,
"pane_is_last": pane_info.is_last,
}Executing the Pipeline
Finally, we wire everything into executable batch and stream-like pipelines, toggling between modes by changing a single flag.
def run_batch():
with beam.Pipeline(options=PipelineOptions([])) as p:
(
p
| beam.Create(BATCH_EVENTS)
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
def run_stream():
opts = PipelineOptions([])
opts.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=opts) as p:
(
p
| build_test_stream()
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
run_stream() if MODE == "stream" else run_batch()Key Takeaways
We demonstrated that the same Beam pipeline can process both bounded batch data and unbounded, stream-like data while preserving identical windowing and aggregation semantics. This foundational understanding enables scaling designs to real streaming runners and production environments.
Сменить язык
Читать эту статью на русском