Apache Spark核心原理
约 1424 字大约 5 分钟
sparkbig-data
2025-09-07
Apache Spark 是大数据处理的事实标准引擎,提供批处理、流处理、机器学习和图计算的统一编程模型。本文深入剖析 Spark 的核心架构和关键技术。
架构概览
- Driver:运行用户程序的 main 函数,创建 SparkContext,协调任务调度
- Executor:运行在集群节点上的进程,执行具体计算任务并缓存数据
- Cluster Manager:分配资源(YARN、K8s、Standalone)
RDD 抽象
RDD(Resilient Distributed Dataset)是 Spark 最底层的抽象——不可变、分区的数据集合,支持两种操作:
- Transformation(惰性):map, filter, flatMap, groupByKey, join 等,返回新 RDD
- Action(触发计算):collect, count, reduce, saveAsTextFile 等
from pyspark import SparkContext
sc = SparkContext("local[*]", "RDD Example")
# 创建 RDD
rdd = sc.parallelize(range(1000), numSlices=10)
# Transformation(惰性,不立即执行)
filtered = rdd.filter(lambda x: x % 2 == 0)
mapped = filtered.map(lambda x: (x % 10, x))
grouped = mapped.groupByKey()
# Action(触发实际计算)
result = grouped.mapValues(list).collect()
# RDD 血缘关系(Lineage)
print(grouped.toDebugString())RDD 容错机制
RDD 通过血缘关系(Lineage)实现容错:记录从父 RDD 到当前 RDD 的所有转换操作。当某个分区丢失时,Spark 可以根据血缘信息从源头重算该分区。
DAG 调度器
Spark 将用户的 RDD 操作链构建为 DAG(有向无环图),然后按照 shuffle 边界划分为多个 Stage。
Stage 划分规则:
- 窄依赖(map, filter, union):父 RDD 的每个分区只被子 RDD 的一个分区使用,可以在同一 Stage 内 pipeline 执行
- 宽依赖(groupByKey, reduceByKey, join):父 RDD 的一个分区被子 RDD 的多个分区使用,需要 shuffle,触发 Stage 边界
Shuffle 机制
Shuffle 是 Spark 中最昂贵的操作,涉及网络传输和磁盘 IO。
Shuffle 优化策略:
- 使用
reduceByKey替代groupByKey(map 端预聚合) - 调整
spark.sql.shuffle.partitions(默认 200) - 使用
broadcast join避免大表 shuffle - 开启 AQE(Adaptive Query Execution)自动优化
Spark SQL 与 Catalyst 优化器
Spark SQL 的 Catalyst 优化器将 SQL 查询或 DataFrame 操作转化为优化的物理执行计划。
关键优化规则:
- 谓词下推(Predicate Pushdown):将 filter 尽早应用
- 列裁剪(Column Pruning):只读取需要的列
- 常量折叠(Constant Folding):编译时计算常量表达式
- Join 重排序:将小表放在 build 端
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("Spark SQL Example") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
# DataFrame API
orders = spark.read.parquet("/data/orders")
users = spark.read.parquet("/data/users")
result = (
orders
.filter(F.col("order_date") >= "2025-01-01") # 谓词下推到数据源
.join(users, "user_id", "inner") # Join
.groupBy("city")
.agg(
F.count("order_id").alias("order_count"),
F.sum("amount").alias("total_amount"),
F.avg("amount").alias("avg_amount"),
)
.orderBy(F.desc("total_amount"))
)
# 查看执行计划
result.explain(mode="extended")
result.show()DataFrame 与 Dataset
| 特性 | RDD | DataFrame | Dataset (Scala/Java) |
|---|---|---|---|
| 类型安全 | 编译时检查 | 运行时检查 | 编译时检查 |
| 优化器 | 无 | Catalyst | Catalyst |
| 序列化 | Java/Kryo | Tungsten 二进制 | Tungsten 二进制 |
| API 风格 | 函数式 | 声明式 (SQL-like) | 混合 |
| 性能 | 最低 | 高 | 高 |
# DataFrame 操作示例
df = spark.createDataFrame([
(1, "Alice", 30, 80000),
(2, "Bob", 25, 60000),
(3, "Charlie", 35, 100000),
], ["id", "name", "age", "salary"])
# SQL 风格
df.createOrReplaceTempView("employees")
spark.sql("""
SELECT age_group, COUNT(*) as count, AVG(salary) as avg_salary
FROM (
SELECT *,
CASE
WHEN age < 30 THEN 'young'
WHEN age < 40 THEN 'mid'
ELSE 'senior'
END as age_group
FROM employees
)
GROUP BY age_group
""").show()
# DataFrame API 风格(等价)
df.withColumn("age_group",
F.when(F.col("age") < 30, "young")
.when(F.col("age") < 40, "mid")
.otherwise("senior")
).groupBy("age_group").agg(
F.count("*").alias("count"),
F.avg("salary").alias("avg_salary"),
).show()内存管理
Spark 的统一内存管理模型将 Executor 内存分为三个区域:
关键配置:
spark.executor.memory:Executor 总内存spark.memory.fraction:执行+存储的总占比(默认 0.6)spark.memory.storageFraction:存储内存初始占比(默认 0.5),可动态借用
# 缓存策略
df.cache() # 内存(反序列化)
df.persist(StorageLevel.MEMORY_AND_DISK) # 内存优先,溢出到磁盘
df.persist(StorageLevel.MEMORY_ONLY_SER) # 内存(序列化,更紧凑)
df.unpersist() # 释放缓存部署模式
| 模式 | 特点 | 适用场景 |
|---|---|---|
| Local | 单机多线程 | 开发测试 |
| Standalone | Spark 自带集群管理 | 小型独立集群 |
| YARN | Hadoop 生态集成 | 企业 Hadoop 集群 |
| Kubernetes | 容器化部署 | 云原生环境 |
# Kubernetes 提交示例
spark-submit \
--master k8s://https://k8s-cluster:6443 \
--deploy-mode cluster \
--name spark-etl-job \
--conf spark.executor.instances=10 \
--conf spark.executor.memory=8g \
--conf spark.executor.cores=4 \
--conf spark.kubernetes.container.image=my-spark:3.5 \
--conf spark.sql.adaptive.enabled=true \
/path/to/etl_job.py性能调优要点
- 避免数据倾斜:用 salting 或 AQE 的 skew join 优化
- 合理设置并行度:
spark.sql.shuffle.partitions和repartition/coalesce - 使用 Broadcast Join:小表广播避免 shuffle
- 列式存储:使用 Parquet/ORC 格式获得列裁剪和谓词下推
- 启用 AQE:自动合并小分区、优化 join 策略、处理数据倾斜
总结
Spark 的核心优势在于统一的编程模型和强大的 Catalyst 优化器。理解 RDD 的惰性求值、DAG 调度、Shuffle 机制和内存管理是进行有效调优的基础。在现代数据平台中,Spark on Kubernetes + Delta Lake/Iceberg 已成为主流部署架构。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于