От Colab до продакшена: полноценный конвейер на Spark и PySpark
Пошаговое руководство по запуску PySpark в Colab: от подготовки данных и SQL-аналитики до обучения модели и сохранения в Parquet.
Запуск PySpark в Google Colab
Установите PySpark и импортируйте необходимые модули, затем инициализируйте локальную сессию Spark, настроенную для работы в Colab. Эта сессия служит точкой входа для операций с DataFrame, SQL-запросов и ML-пайплайнов.
!pip install -q pyspark==3.5.1
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, FloatType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = (SparkSession.builder.appName("ColabSparkAdvancedTutorial")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate())
print("Spark version:", spark.version)
data = [
(1, "Alice", "IN", "2025-10-01", 56000.0, "premium"),
(2, "Bob", "US", "2025-10-03", 43000.0, "standard"),
(3, "Carlos", "IN", "2025-09-27", 72000.0, "premium"),
(4, "Diana", "UK", "2025-09-30", 39000.0, "standard"),
(5, "Esha", "IN", "2025-10-02", 85000.0, "premium"),
(6, "Farid", "AE", "2025-10-02", 31000.0, "basic"),
(7, "Gita", "IN", "2025-09-29", 46000.0, "standard"),
(8, "Hassan", "PK", "2025-10-01", 52000.0, "premium"),
]
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("country", StringType(), True),
StructField("signup_date", StringType(), True),
StructField("income", FloatType(), True),
StructField("plan", StringType(), True),
])
df = spark.createDataFrame(data, schema)
df.show()Подготовка и обогащение данных
Преобразуйте поля даты/времени, извлеките год и месяц, добавьте простые производные столбцы. Это стандартный шаг ETL для нормализации данных перед аналитикой и ML.
df2 = (df.withColumn("signup_ts", F.to_timestamp("signup_date"))
.withColumn("year", F.year("signup_ts"))
.withColumn("month", F.month("signup_ts"))
.withColumn("is_india", (F.col("country") == "IN").cast("int")))
df2.show()
df2.createOrReplaceTempView("users")
spark.sql("""
SELECT country, COUNT(*) AS cnt, AVG(income) AS avg_income
FROM users
GROUP BY country
ORDER BY cnt DESC
""").show()
w = Window.partitionBy("country").orderBy(F.col("income").desc())
df_ranked = df2.withColumn("income_rank_in_country", F.rank().over(w))
df_ranked.show()
def plan_priority(plan):
if plan == "premium": return 3
if plan == "standard": return 2
if plan == "basic": return 1
return 0
plan_priority_udf = F.udf(plan_priority, IntegerType())
df_udf = df_ranked.withColumn("plan_priority", plan_priority_udf(F.col("plan")))
df_udf.show()Эти шаги демонстрируют работу с DataFrame API, регистрацию временной таблицы для SQL-запросов, применение оконных функций для ранжирования и использование Python-UDF для вычисления приоритетов подписок.
Объединение с метаданными и агрегации
Обогатите таблицу пользователей данными по странам, затем подсчитайте агрегаты по регионам и типам планов — типичный аналитический сценарий.
country_data = [
("IN", "Asia", 1.42), ("US", "North America", 0.33),
("UK", "Europe", 0.07), ("AE", "Asia", 0.01), ("PK", "Asia", 0.24),
]
country_schema = StructType([
StructField("country", StringType(), True),
StructField("region", StringType(), True),
StructField("population_bn", FloatType(), True),
])
country_df = spark.createDataFrame(country_data, country_schema)
joined = df_udf.alias("u").join(country_df.alias("c"), on="country", how="left")
joined.show()
region_stats = (joined.groupBy("region", "plan")
.agg(F.count("*").alias("users"),
F.round(F.avg("income"), 2).alias("avg_income"))
.orderBy("region", "plan"))
region_stats.show()Это показывает, как в Spark выполняются соединения и группировки, подходящие для масштабных наборов данных.
Признаковое преобразование и обучение модели в Spark MLlib
Подготовьте метки и признаки, закодируйте категориальные столбцы, соберите в вектор признаков и обучите логистическую регрессию. Оцените предсказания с помощью стандартного метрика.
ml_df = joined.withColumn("label", (F.col("plan") == "premium").cast("int")).na.drop()
country_indexer = StringIndexer(inputCol="country", outputCol="country_idx", handleInvalid="keep")
country_fitted = country_indexer.fit(ml_df)
ml_df2 = country_fitted.transform(ml_df)
assembler = VectorAssembler(inputCols=["income", "country_idx", "plan_priority"], outputCol="features")
ml_final = assembler.transform(ml_df2)
train_df, test_df = ml_final.randomSplit([0.7, 0.3], seed=42)
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20)
lr_model = lr.fit(train_df)
preds = lr_model.transform(test_df)
preds.select("name", "country", "income", "plan", "label", "prediction", "probability").show(truncate=False)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc = evaluator.evaluate(preds)
print("Classification accuracy:", acc)Пример показывает интеграцию MLlib в поток данных: индексация, сборка векторных признаков и обучение модели выполняются в одном пайплайне.
Сохранение результатов и анализ плана выполнения
Запишите обработанные данные в Parquet, загрузите их обратно для проверки, выполните SQL-запросы для выборки и изучите план выполнения для оптимизации.
output_path = "/content/spark_users_parquet"
joined.write.mode("overwrite").parquet(output_path)
parquet_df = spark.read.parquet(output_path)
print("Parquet reloaded:")
parquet_df.show()
recent = spark.sql("""
SELECT name, country, income, signup_ts
FROM users
WHERE signup_ts >= '2025-10-01'
ORDER BY signup_ts DESC
""")
recent.show()
recent.explain()
spark.stop()Данный код показывает, как сохранять и повторно использовать колоночные данные, а также как анализировать план выполнения для улучшения производительности.
Практические замечания
- Запуск Spark в Colab удобен для прототипирования пайплайнов перед деплоем на кластер. Между одиночной сессией и кластерным развёртыванием есть нюансы конфигурации, но API остаются одинаковыми.
- Используйте ленивость вычислений Spark и вызывайте действия только при необходимости; explain() помогает понять план выполнения.
- Для ML всегда обрабатывайте категориальные признаки и пропуски перед обучением.
Этот пример демонстрирует, как трансформации DataFrame приводят к SQL-аналитике, признаковому преобразованию и моделированию, используя одну и ту же экосистему Spark.
Switch Language
Read this article in English