From Colab to Production: Build an End-to-End Spark + PySpark Pipeline
Hands-on guide to run PySpark in Colab, perform ETL, run SQL and window functions, train a logistic regression model, and save results to Parquet.
Running PySpark in Google Colab
Start by installing and importing the necessary PySpark packages, then initialize a local Spark session configured to run in the Colab environment. This session acts as the entry point for DataFrame operations, SQL queries and ML workflows.
!pip install -q pyspark==3.5.1
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, FloatType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = (SparkSession.builder.appName("ColabSparkAdvancedTutorial")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate())
print("Spark version:", spark.version)
data = [
(1, "Alice", "IN", "2025-10-01", 56000.0, "premium"),
(2, "Bob", "US", "2025-10-03", 43000.0, "standard"),
(3, "Carlos", "IN", "2025-09-27", 72000.0, "premium"),
(4, "Diana", "UK", "2025-09-30", 39000.0, "standard"),
(5, "Esha", "IN", "2025-10-02", 85000.0, "premium"),
(6, "Farid", "AE", "2025-10-02", 31000.0, "basic"),
(7, "Gita", "IN", "2025-09-29", 46000.0, "standard"),
(8, "Hassan", "PK", "2025-10-01", 52000.0, "premium"),
]
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("country", StringType(), True),
StructField("signup_date", StringType(), True),
StructField("income", FloatType(), True),
StructField("plan", StringType(), True),
])
df = spark.createDataFrame(data, schema)
df.show()Preparing and enriching the dataset
Convert timestamp-like fields, extract date parts and add simple derived columns. This is a typical ETL step where raw input is normalized for downstream analytics and ML.
df2 = (df.withColumn("signup_ts", F.to_timestamp("signup_date"))
.withColumn("year", F.year("signup_ts"))
.withColumn("month", F.month("signup_ts"))
.withColumn("is_india", (F.col("country") == "IN").cast("int")))
df2.show()
df2.createOrReplaceTempView("users")
spark.sql("""
SELECT country, COUNT(*) AS cnt, AVG(income) AS avg_income
FROM users
GROUP BY country
ORDER BY cnt DESC
""").show()
w = Window.partitionBy("country").orderBy(F.col("income").desc())
df_ranked = df2.withColumn("income_rank_in_country", F.rank().over(w))
df_ranked.show()
def plan_priority(plan):
if plan == "premium": return 3
if plan == "standard": return 2
if plan == "basic": return 1
return 0
plan_priority_udf = F.udf(plan_priority, IntegerType())
df_udf = df_ranked.withColumn("plan_priority", plan_priority_udf(F.col("plan")))
df_udf.show()These steps show using DataFrame APIs, registering a temp view for Spark SQL queries, applying window functions to rank values within partitions, and using a Python UDF to derive categorical priorities.
Joining external metadata and aggregating
Enrich the user table by joining country-level metadata, then compute aggregation statistics by region and plan type — a common analytical pattern.
country_data = [
("IN", "Asia", 1.42), ("US", "North America", 0.33),
("UK", "Europe", 0.07), ("AE", "Asia", 0.01), ("PK", "Asia", 0.24),
]
country_schema = StructType([
StructField("country", StringType(), True),
StructField("region", StringType(), True),
StructField("population_bn", FloatType(), True),
])
country_df = spark.createDataFrame(country_data, country_schema)
joined = df_udf.alias("u").join(country_df.alias("c"), on="country", how="left")
joined.show()
region_stats = (joined.groupBy("region", "plan")
.agg(F.count("*").alias("users"),
F.round(F.avg("income"), 2).alias("avg_income"))
.orderBy("region", "plan"))
region_stats.show()This demonstrates scalable joins and group-by aggregations in Spark, which work the same on larger clusters as they do in Colab's single-node session.
Feature engineering and model training with Spark MLlib
Prepare labels and features, encode categorical columns, assemble feature vectors and train a logistic regression model. Evaluate predictions with a standard evaluator.
ml_df = joined.withColumn("label", (F.col("plan") == "premium").cast("int")).na.drop()
country_indexer = StringIndexer(inputCol="country", outputCol="country_idx", handleInvalid="keep")
country_fitted = country_indexer.fit(ml_df)
ml_df2 = country_fitted.transform(ml_df)
assembler = VectorAssembler(inputCols=["income", "country_idx", "plan_priority"], outputCol="features")
ml_final = assembler.transform(ml_df2)
train_df, test_df = ml_final.randomSplit([0.7, 0.3], seed=42)
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20)
lr_model = lr.fit(train_df)
preds = lr_model.transform(test_df)
preds.select("name", "country", "income", "plan", "label", "prediction", "probability").show(truncate=False)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc = evaluator.evaluate(preds)
print("Classification accuracy:", acc)This example highlights how Spark MLlib integrates with DataFrame pipelines: indexing, vector assembly and model training can be combined seamlessly.
Persisting results and inspecting query plans
Write processed data to Parquet, read it back for verification, run SQL queries to extract slices, and inspect the physical plan for optimization insights.
output_path = "/content/spark_users_parquet"
joined.write.mode("overwrite").parquet(output_path)
parquet_df = spark.read.parquet(output_path)
print("Parquet reloaded:")
parquet_df.show()
recent = spark.sql("""
SELECT name, country, income, signup_ts
FROM users
WHERE signup_ts >= '2025-10-01'
ORDER BY signup_ts DESC
""")
recent.show()
recent.explain()
spark.stop()The code shows how to persist datasets in columnar format and re-use them, and how to examine execution plans to reason about performance.
Practical notes
- Running Spark in Colab is a great way to prototype Spark workflows locally before deploying to a cluster. Configuration differences exist between single-node and multi-node environments, but APIs remain consistent.
- Keep transformations lazy until an action is needed to leverage Spark's optimization. Use explain() to check how a query is planned.
- For ML tasks, remember to handle categorical variables and missing values explicitly before training.
This end-to-end flow demonstrates how simple DataFrame operations lead into SQL analytics, feature engineering and predictive modeling — all using the same Spark ecosystem.
Сменить язык
Читать эту статью на русском