Hands-On: Build Partitioned Dagster Data Pipelines with CSV IO and a Tiny ML Model
'A practical Dagster tutorial that shows how to build daily-partitioned pipelines, persist assets with a custom CSV IO manager, enforce data-quality checks, and train a small regression model.'
Setup and dependencies
Start by installing the runtime dependencies and importing the core libraries used in the tutorial. The snippet below installs Dagster, Pandas and scikit-learn, then prepares NumPy, Pandas, Dagster primitives, and a base storage folder for persisted assets.
import sys, subprocess, json, os
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "dagster", "pandas", "scikit-learn"])
import numpy as np, pandas as pd
from pathlib import Path
from dagster import (
asset, AssetCheckResult, asset_check, Definitions, materialize, Output,
DailyPartitionsDefinition, IOManager, io_manager
)
from sklearn.linear_model import LinearRegression
BASE = Path("/content/dagstore"); BASE.mkdir(parents=True, exist_ok=True)
START = "2025-08-01" These steps ensure a reproducible environment (suitable for Colab or other ephemeral environments) and define a START date used for daily partitioning.
Custom CSV IO Manager and partitioning
To persist assets to disk and reload them across runs, the example provides a simple CSV/JSON IOManager. It writes DataFrame assets as CSV and other objects as JSON, and supports loading upstream CSV outputs for downstream assets. The manager is registered as a Dagster resource and a DailyPartitionsDefinition is created for day-level partitioning.
class CSVIOManager(IOManager):
def __init__(self, base: Path): self.base = base
def _path(self, key, ext): return self.base / f"{'_'.join(key.path)}.{ext}"
def handle_output(self, context, obj):
if isinstance(obj, pd.DataFrame):
p = self._path(context.asset_key, "csv"); obj.to_csv(p, index=False)
context.log.info(f"Saved {context.asset_key} -> {p}")
else:
p = self._path(context.asset_key, "json"); p.write_text(json.dumps(obj, indent=2))
context.log.info(f"Saved {context.asset_key} -> {p}")
def load_input(self, context):
k = context.upstream_output.asset_key; p = self._path(k, "csv")
df = pd.read_csv(p); context.log.info(f"Loaded {k} <- {p} ({len(df)} rows)"); return df
@io_manager
def csv_io_manager(_): return CSVIOManager(BASE)
daily = DailyPartitionsDefinition(start_date=START)This is intentionally lightweight but demonstrates how to persist asset outputs to a simple directory structure and restore them for dependent computations.
Asset definitions: raw generation, cleaning, and features
The pipeline is organized into modular assets. First, raw_sales synthesizes a noisy daily sales dataset with occasional missing values. clean_sales removes nulls and clips outliers to stabilize downstream statistics. features performs small feature engineering steps and standardizes selected columns.
@asset(partitions_def=daily, description="Synthetic raw sales with noise & occasional nulls.")
def raw_sales(context) -> Output[pd.DataFrame]:
rng = np.random.default_rng(42)
n = 200; day = context.partition_key
x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
sales = 2.5 * x + 30 * promo + noise + 50
x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
df = pd.DataFrame({"date": day, "units": x, "promo": promo, "sales": sales})
meta = {"rows": n, "null_units": int(df["units"].isna().sum()), "head": df.head().to_markdown()}
return Output(df, metadata=meta)
@asset(description="Clean nulls, clip outliers for robust downstream modeling.")
def clean_sales(context, raw_sales: pd.DataFrame) -> Output[pd.DataFrame]:
df = raw_sales.dropna(subset=["units"]).copy()
lo, hi = df["units"].quantile([0.01, 0.99]); df["units"] = df["units"].clip(lo, hi)
meta = {"rows": len(df), "units_min": float(df.units.min()), "units_max": float(df.units.max())}
return Output(df, metadata=meta)
@asset(description="Feature engineering: interactions & standardized columns.")
def features(context, clean_sales: pd.DataFrame) -> Output[pd.DataFrame]:
df = clean_sales.copy()
df["units_sq"] = df["units"] ** 2; df["units_promo"] = df["units"] * df["promo"]
for c in ["units", "units_sq", "units_promo"]:
mu, sigma = df[c].mean(), df[c].std(ddof=0) or 1.0
df[f"z_{c}"] = (df[c] - mu) / sigma
return Output(df, metadata={"rows": len(df), "cols": list(df.columns)})Each asset emits metadata (row counts, clipped bounds, column lists, and a small DataFrame preview), which the IO manager persists alongside the primary CSV/JSON outputs. This makes debugging and auditing runs easier.
Data-quality checks and a tiny model
To enforce data integrity, an asset_check verifies that cleaned data contains no nulls, the promo column only contains 1, and units are within the clipped bounds. After validation, a tiny linear regression is trained on z-scored features and returns R^2 and coefficients as a JSON-like dictionary.
@asset_check(asset=clean_sales, description="No nulls; promo in {0,1}; units within clipped bounds.")
def clean_sales_quality(clean_sales: pd.DataFrame) -> AssetCheckResult:
nulls = int(clean_sales.isna().sum().sum())
promo_ok = bool(set(clean_sales["promo"].unique()).issubset({0, 1}))
units_ok = bool(clean_sales["units"].between(clean_sales["units"].min(), clean_sales["units"].max()).all())
passed = bool((nulls == 0) and promo_ok and units_ok)
return AssetCheckResult(
passed=passed,
metadata={"nulls": nulls, "promo_ok": promo_ok, "units_ok": units_ok},
)
@asset(description="Train a tiny linear regressor; emit R^2 and coefficients.")
def tiny_model_metrics(context, features: pd.DataFrame) -> dict:
X = features[["z_units", "z_units_sq", "z_units_promo", "promo"]].values
y = features["sales"].values
model = LinearRegression().fit(X, y)
return {"r2_train": float(model.score(X, y)),
**{n: float(c) for n, c in zip(["z_units","z_units_sq","z_units_promo","promo"], model.coef_)}}This pattern keeps validation and modeling within the same reproducible workflow, enabling automated checks before or after model training.
Materializing the DAG and inspecting artifacts
Finally, assets and the IO manager are registered in Definitions, and the example materializes the full DAG for a chosen partition key (a single day) and prints a success status along with saved file sizes and metric contents.
defs = Definitions(
assets=[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
resources={"io_manager": csv_io_manager}
)
if __name__ == "__main__":
run_day = os.environ.get("RUN_DATE") or START
print("Materializing everything for:", run_day)
result = materialize(
[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
partition_key=run_day,
resources={"io_manager": csv_io_manager},
)
print("Run success:", result.success)
for fname in ["raw_sales.csv","clean_sales.csv","features.csv","tiny_model_metrics.json"]:
f = BASE / fname
if f.exists():
print(fname, "->", f.stat().st_size, "bytes")
if fname.endswith(".json"):
print("Metrics:", json.loads(f.read_text()))Running this end-to-end example produces CSV/JSON artifacts in the target folder, persists metadata for each asset, runs the data-quality check, and stores model metrics for inspection. The modular asset structure and daily partitioning make the pipeline easy to extend to richer data sources, additional checks, or more elaborate models.
Notes and next steps
- The CSVIOManager used here is intentionally simple; for production, replace it with a robust object store or database-backed IO manager.
- Partitioning enables efficient backfills and reprocessing of specific dates.
- Asset-level metadata is helpful for observability; Dagster's tooling can surface these values in dashboards.
Check the original repository for the full code, notebooks and extended examples if you want to build on this pattern.
Сменить язык
Читать эту статью на русском