Практическое руководство: Dagster-пайплайн с партицированием, CSV IO и небольшой ML-моделью
'Пошаговое руководство по созданию партицированного Dagster-пайплайна: сохранение активов в CSV, проверки качества и обучение простой регрессии.'
Установка и зависимости
Сначала устанавливаем необходимые библиотеки и импортируем ключевые модули. Пример ниже ставит Dagster, Pandas и scikit-learn, затем подготавливает NumPy, Pandas, примитивы Dagster и базовую папку для хранения артефактов.
import sys, subprocess, json, os
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "dagster", "pandas", "scikit-learn"])
import numpy as np, pandas as pd
from pathlib import Path
from dagster import (
asset, AssetCheckResult, asset_check, Definitions, materialize, Output,
DailyPartitionsDefinition, IOManager, io_manager
)
from sklearn.linear_model import LinearRegression
BASE = Path("/content/dagstore"); BASE.mkdir(parents=True, exist_ok=True)
START = "2025-08-01" Эти шаги делают окружение воспроизводимым (удобно для Colab) и задают дату START для дневного партицирования.
Кастомный CSV IO Manager и партиционирование
В примере реализован простой CSV/JSON IOManager, который сохраняет DataFrame как CSV, а прочие объекты — в JSON. Менеджер умеет загружать CSV предыдущих активов для downstream-зависимостей. Менеджер регистрируется как ресурс Dagster, а также создается DailyPartitionsDefinition для ежедневной обработки.
class CSVIOManager(IOManager):
def __init__(self, base: Path): self.base = base
def _path(self, key, ext): return self.base / f"{'_'.join(key.path)}.{ext}"
def handle_output(self, context, obj):
if isinstance(obj, pd.DataFrame):
p = self._path(context.asset_key, "csv"); obj.to_csv(p, index=False)
context.log.info(f"Saved {context.asset_key} -> {p}")
else:
p = self._path(context.asset_key, "json"); p.write_text(json.dumps(obj, indent=2))
context.log.info(f"Saved {context.asset_key} -> {p}")
def load_input(self, context):
k = context.upstream_output.asset_key; p = self._path(k, "csv")
df = pd.read_csv(p); context.log.info(f"Loaded {k} <- {p} ({len(df)} rows)"); return df
@io_manager
def csv_io_manager(_): return CSVIOManager(BASE)
daily = DailyPartitionsDefinition(start_date=START)Решение простое, но демонстрирует, как хранить выходы активов в файловой структуре и восстанавливать их для последующих шагов.
Определение активов: генерация, очистка и признаки
Пайплайн разбит на отдельные активы. raw_sales генерирует синтетические дневные продажи с шумом и пропусками. clean_sales удаляет null-ы и урезает выбросы. features добавляет взаимодействия и стандартизированные признаки.
@asset(partitions_def=daily, description="Synthetic raw sales with noise & occasional nulls.")
def raw_sales(context) -> Output[pd.DataFrame]:
rng = np.random.default_rng(42)
n = 200; day = context.partition_key
x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
sales = 2.5 * x + 30 * promo + noise + 50
x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
df = pd.DataFrame({"date": day, "units": x, "promo": promo, "sales": sales})
meta = {"rows": n, "null_units": int(df["units"].isna().sum()), "head": df.head().to_markdown()}
return Output(df, metadata=meta)
@asset(description="Clean nulls, clip outliers for robust downstream modeling.")
def clean_sales(context, raw_sales: pd.DataFrame) -> Output[pd.DataFrame]:
df = raw_sales.dropna(subset=["units"]).copy()
lo, hi = df["units"].quantile([0.01, 0.99]); df["units"] = df["units"].clip(lo, hi)
meta = {"rows": len(df), "units_min": float(df.units.min()), "units_max": float(df.units.max())}
return Output(df, metadata=meta)
@asset(description="Feature engineering: interactions & standardized columns.")
def features(context, clean_sales: pd.DataFrame) -> Output[pd.DataFrame]:
df = clean_sales.copy()
df["units_sq"] = df["units"] ** 2; df["units_promo"] = df["units"] * df["promo"]
for c in ["units", "units_sq", "units_promo"]:
mu, sigma = df[c].mean(), df[c].std(ddof=0) or 1.0
df[f"z_{c}"] = (df[c] - mu) / sigma
return Output(df, metadata={"rows": len(df), "cols": list(df.columns)})Каждый актив возвращает метаданные (количество строк, границы, список столбцов и небольшой превью), и эти метаданные сохраняются вместе с артефактами.
Проверки качества данных и простая модель
asset_check проверяет, что в очищенных данных нет null-ов, promo содержит только 0 и 1, а значения units находятся в допустимом диапазоне. Затем tiny_model_metrics обучает простой линейный регрессор на z-признаках и возвращает R^2 вместе с коэффициентами.
@asset_check(asset=clean_sales, description="No nulls; promo in {0,1}; units within clipped bounds.")
def clean_sales_quality(clean_sales: pd.DataFrame) -> AssetCheckResult:
nulls = int(clean_sales.isna().sum().sum())
promo_ok = bool(set(clean_sales["promo"].unique()).issubset({0, 1}))
units_ok = bool(clean_sales["units"].between(clean_sales["units"].min(), clean_sales["units"].max()).all())
passed = bool((nulls == 0) and promo_ok and units_ok)
return AssetCheckResult(
passed=passed,
metadata={"nulls": nulls, "promo_ok": promo_ok, "units_ok": units_ok},
)
@asset(description="Train a tiny linear regressor; emit R^2 and coefficients.")
def tiny_model_metrics(context, features: pd.DataFrame) -> dict:
X = features[["z_units", "z_units_sq", "z_units_promo", "promo"]].values
y = features["sales"].values
model = LinearRegression().fit(X, y)
return {"r2_train": float(model.score(X, y)),
**{n: float(c) for n, c in zip(["z_units","z_units_sq","z_units_promo","promo"], model.coef_)}}Такой подход позволяет автоматически проверить качество данных и затем безопасно обучать модель на проверенном наборе признаков.
Материализация DAG и проверка артефактов
В конце все активы и ресурс IO manager регистрируются в Definitions. Пример материализует весь набор активов для указанной партиции (дня), печатает статус выполнения и показывает размеры сохраненных файлов и содержимое JSON с метриками.
defs = Definitions(
assets=[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
resources={"io_manager": csv_io_manager}
)
if __name__ == "__main__":
run_day = os.environ.get("RUN_DATE") or START
print("Materializing everything for:", run_day)
result = materialize(
[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
partition_key=run_day,
resources={"io_manager": csv_io_manager},
)
print("Run success:", result.success)
for fname in ["raw_sales.csv","clean_sales.csv","features.csv","tiny_model_metrics.json"]:
f = BASE / fname
if f.exists():
print(fname, "->", f.stat().st_size, "bytes")
if fname.endswith(".json"):
print("Metrics:", json.loads(f.read_text()))После запуска вы получите CSV/JSON артефакты в целевой папке, сохраненные метаданные для каждого актива, выполненную проверку качества и метрики модели для анализа.
Рекомендации для продакшена
- Замените простой CSVIOManager на стабильный бэкенд хранения (объектное хранилище, базу данных) для надежности.
- Партиционирование упрощает бэкафиллы и повторную обработку отдельных дат.
- Метаданные активов повышают наблюдаемость; Dagster может отображать их в интерфейсе.
Смотрите репозиторий с полным кодом и ноутбуками, чтобы развить эту базовую архитектуру в более сложные производственные пайплайны.
Switch Language
Read this article in English