<НА ГЛАВНУЮ

Практическое руководство: Dagster-пайплайн с партицированием, CSV IO и небольшой ML-моделью

'Пошаговое руководство по созданию партицированного Dagster-пайплайна: сохранение активов в CSV, проверки качества и обучение простой регрессии.'

Установка и зависимости

Сначала устанавливаем необходимые библиотеки и импортируем ключевые модули. Пример ниже ставит Dagster, Pandas и scikit-learn, затем подготавливает NumPy, Pandas, примитивы Dagster и базовую папку для хранения артефактов.

import sys, subprocess, json, os
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "dagster", "pandas", "scikit-learn"])
 
 
import numpy as np, pandas as pd
from pathlib import Path
from dagster import (
   asset, AssetCheckResult, asset_check, Definitions, materialize, Output,
   DailyPartitionsDefinition, IOManager, io_manager
)
from sklearn.linear_model import LinearRegression
 
 
BASE = Path("/content/dagstore"); BASE.mkdir(parents=True, exist_ok=True)
START = "2025-08-01" 

Эти шаги делают окружение воспроизводимым (удобно для Colab) и задают дату START для дневного партицирования.

Кастомный CSV IO Manager и партиционирование

В примере реализован простой CSV/JSON IOManager, который сохраняет DataFrame как CSV, а прочие объекты — в JSON. Менеджер умеет загружать CSV предыдущих активов для downstream-зависимостей. Менеджер регистрируется как ресурс Dagster, а также создается DailyPartitionsDefinition для ежедневной обработки.

class CSVIOManager(IOManager):
   def __init__(self, base: Path): self.base = base
   def _path(self, key, ext): return self.base / f"{'_'.join(key.path)}.{ext}"
   def handle_output(self, context, obj):
       if isinstance(obj, pd.DataFrame):
           p = self._path(context.asset_key, "csv"); obj.to_csv(p, index=False)
           context.log.info(f"Saved {context.asset_key} -> {p}")
       else:
           p = self._path(context.asset_key, "json"); p.write_text(json.dumps(obj, indent=2))
           context.log.info(f"Saved {context.asset_key} -> {p}")
   def load_input(self, context):
       k = context.upstream_output.asset_key; p = self._path(k, "csv")
       df = pd.read_csv(p); context.log.info(f"Loaded {k} <- {p} ({len(df)} rows)"); return df
 
 
@io_manager
def csv_io_manager(_): return CSVIOManager(BASE)
 
 
daily = DailyPartitionsDefinition(start_date=START)

Решение простое, но демонстрирует, как хранить выходы активов в файловой структуре и восстанавливать их для последующих шагов.

Определение активов: генерация, очистка и признаки

Пайплайн разбит на отдельные активы. raw_sales генерирует синтетические дневные продажи с шумом и пропусками. clean_sales удаляет null-ы и урезает выбросы. features добавляет взаимодействия и стандартизированные признаки.

@asset(partitions_def=daily, description="Synthetic raw sales with noise & occasional nulls.")
def raw_sales(context) -> Output[pd.DataFrame]:
   rng = np.random.default_rng(42)
   n = 200; day = context.partition_key
   x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
   sales = 2.5 * x + 30 * promo + noise + 50
   x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
   df = pd.DataFrame({"date": day, "units": x, "promo": promo, "sales": sales})
   meta = {"rows": n, "null_units": int(df["units"].isna().sum()), "head": df.head().to_markdown()}
   return Output(df, metadata=meta)
 
 
@asset(description="Clean nulls, clip outliers for robust downstream modeling.")
def clean_sales(context, raw_sales: pd.DataFrame) -> Output[pd.DataFrame]:
   df = raw_sales.dropna(subset=["units"]).copy()
   lo, hi = df["units"].quantile([0.01, 0.99]); df["units"] = df["units"].clip(lo, hi)
   meta = {"rows": len(df), "units_min": float(df.units.min()), "units_max": float(df.units.max())}
   return Output(df, metadata=meta)
 
 
@asset(description="Feature engineering: interactions & standardized columns.")
def features(context, clean_sales: pd.DataFrame) -> Output[pd.DataFrame]:
   df = clean_sales.copy()
   df["units_sq"] = df["units"] ** 2; df["units_promo"] = df["units"] * df["promo"]
   for c in ["units", "units_sq", "units_promo"]:
       mu, sigma = df[c].mean(), df[c].std(ddof=0) or 1.0
       df[f"z_{c}"] = (df[c] - mu) / sigma
   return Output(df, metadata={"rows": len(df), "cols": list(df.columns)})

Каждый актив возвращает метаданные (количество строк, границы, список столбцов и небольшой превью), и эти метаданные сохраняются вместе с артефактами.

Проверки качества данных и простая модель

asset_check проверяет, что в очищенных данных нет null-ов, promo содержит только 0 и 1, а значения units находятся в допустимом диапазоне. Затем tiny_model_metrics обучает простой линейный регрессор на z-признаках и возвращает R^2 вместе с коэффициентами.

@asset_check(asset=clean_sales, description="No nulls; promo in {0,1}; units within clipped bounds.")
def clean_sales_quality(clean_sales: pd.DataFrame) -> AssetCheckResult:
   nulls = int(clean_sales.isna().sum().sum())
   promo_ok = bool(set(clean_sales["promo"].unique()).issubset({0, 1}))
   units_ok = bool(clean_sales["units"].between(clean_sales["units"].min(), clean_sales["units"].max()).all())
   passed = bool((nulls == 0) and promo_ok and units_ok)
   return AssetCheckResult(
       passed=passed,
       metadata={"nulls": nulls, "promo_ok": promo_ok, "units_ok": units_ok},
   )
 
 
@asset(description="Train a tiny linear regressor; emit R^2 and coefficients.")
def tiny_model_metrics(context, features: pd.DataFrame) -> dict:
   X = features[["z_units", "z_units_sq", "z_units_promo", "promo"]].values
   y = features["sales"].values
   model = LinearRegression().fit(X, y)
   return {"r2_train": float(model.score(X, y)),
           **{n: float(c) for n, c in zip(["z_units","z_units_sq","z_units_promo","promo"], model.coef_)}}

Такой подход позволяет автоматически проверить качество данных и затем безопасно обучать модель на проверенном наборе признаков.

Материализация DAG и проверка артефактов

В конце все активы и ресурс IO manager регистрируются в Definitions. Пример материализует весь набор активов для указанной партиции (дня), печатает статус выполнения и показывает размеры сохраненных файлов и содержимое JSON с метриками.

defs = Definitions(
   assets=[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
   resources={"io_manager": csv_io_manager}
)
 
 
if __name__ == "__main__":
   run_day = os.environ.get("RUN_DATE") or START
   print("Materializing everything for:", run_day)
   result = materialize(
       [raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
       partition_key=run_day,
       resources={"io_manager": csv_io_manager},
   )
   print("Run success:", result.success)
 
 
   for fname in ["raw_sales.csv","clean_sales.csv","features.csv","tiny_model_metrics.json"]:
       f = BASE / fname
       if f.exists():
           print(fname, "->", f.stat().st_size, "bytes")
           if fname.endswith(".json"):
               print("Metrics:", json.loads(f.read_text()))

После запуска вы получите CSV/JSON артефакты в целевой папке, сохраненные метаданные для каждого актива, выполненную проверку качества и метрики модели для анализа.

Рекомендации для продакшена

  • Замените простой CSVIOManager на стабильный бэкенд хранения (объектное хранилище, базу данных) для надежности.
  • Партиционирование упрощает бэкафиллы и повторную обработку отдельных дат.
  • Метаданные активов повышают наблюдаемость; Dagster может отображать их в интерфейсе.

Смотрите репозиторий с полным кодом и ноутбуками, чтобы развить эту базовую архитектуру в более сложные производственные пайплайны.

🇬🇧

Switch Language

Read this article in English

Switch to English