Delta Lake数据湖格式
约 1466 字大约 5 分钟
delta-lakelakehouse
2025-09-10
Delta Lake 是由 Databricks 开源的存储层,为数据湖带来了 ACID 事务、Schema 管理和时间旅行等数据仓库级别的能力。本文详细介绍 Delta Lake 的核心机制和实践用法。
核心架构
Delta Lake 本质上是在 Parquet 文件之上添加了一个事务日志层(Transaction Log),所有对表的读写操作都通过这个日志层协调。
Transaction Log(_delta_log)
事务日志是 Delta Lake 的核心,记录了表的所有变更操作。每次写操作生成一个 JSON 日志文件。
// _delta_log/00000000000000000001.json
{
"commitInfo": {
"timestamp": 1693958400000,
"operation": "WRITE",
"operationParameters": {"mode": "Append", "partitionBy": "[date]"}
}
}
{
"add": {
"path": "date=2025-09-10/part-00000.parquet",
"size": 1048576,
"modificationTime": 1693958400000,
"dataChange": true,
"stats": "{\"numRecords\":10000,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":10000}}"
}
}
{
"remove": {
"path": "date=2025-09-10/part-old.parquet",
"deletionTimestamp": 1693958400000,
"dataChange": true
}
}日志中的核心 Action 类型:
- add:添加新的数据文件
- remove:标记旧文件为删除(逻辑删除)
- metaData:更新表的元数据(schema、分区等)
- protocol:记录读写协议版本
- commitInfo:提交信息
Checkpoint
每 10 次提交(默认),Delta Lake 生成一个 Parquet 格式的 checkpoint 文件,汇总之前所有日志的状态,加速表状态重建。
ACID 事务
Delta Lake 通过乐观并发控制实现 ACID 事务:
- Atomicity:每次提交要么完全成功,要么完全回滚
- Consistency:Schema enforcement 保证数据一致性
- Isolation:使用乐观并发控制,写冲突时自动重试
- Durability:数据和日志持久化到对象存储
from delta import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 创建 Delta 表
df = spark.createDataFrame([
(1, "Alice", 100),
(2, "Bob", 200),
(3, "Charlie", 300),
], ["id", "name", "amount"])
df.write.format("delta").save("/data/delta/accounts")
# MERGE(Upsert)操作
delta_table = DeltaTable.forPath(spark, "/data/delta/accounts")
new_data = spark.createDataFrame([
(2, "Bob", 250), # 更新
(4, "David", 400), # 插入
], ["id", "name", "amount"])
delta_table.alias("target").merge(
new_data.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()时间旅行(Time Travel)
Delta Lake 保留了表的所有历史版本,支持按版本号或时间戳查询历史数据。
# 按版本号查询
df_v0 = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("/data/delta/accounts")
# 按时间戳查询
df_hist = spark.read.format("delta") \
.option("timestampAsOf", "2025-09-10 00:00:00") \
.load("/data/delta/accounts")
# 查看表历史
delta_table = DeltaTable.forPath(spark, "/data/delta/accounts")
delta_table.history().show()
# 版本回滚
delta_table.restoreToVersion(5)
# 或按时间回滚
delta_table.restoreToTimestamp("2025-09-01")Schema Evolution 与 Enforcement
Schema Enforcement
默认情况下,Delta Lake 拒绝写入与表 schema 不匹配的数据,防止数据损坏。
# 如果新数据缺少列或类型不匹配,会报错
try:
bad_df = spark.createDataFrame([(1, "Alice")], ["id", "name"])
bad_df.write.format("delta").mode("append").save("/data/delta/accounts")
except Exception as e:
print(f"Schema enforcement error: {e}")Schema Evolution
允许安全地演化表 schema:
# 添加新列
new_df = spark.createDataFrame([
(5, "Eve", 500, "2025-09-10"),
], ["id", "name", "amount", "created_date"])
new_df.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/data/delta/accounts")Z-Ordering
Z-Ordering 是一种多维数据组织技术,将经常一起查询的列的值在物理上聚集存储,从而提高数据跳过(data skipping)的效率。
# Z-Order 优化
delta_table.optimize().executeZOrderBy("user_id", "date")
# SQL 语法
spark.sql("""
OPTIMIZE delta.`/data/delta/events`
ZORDER BY (user_id, date)
""")查询 WHERE user_id = 50 AND date = '2025-09-05' 时,Z-Ordered 表只需要扫描文件 1,而未优化的表需要扫描所有文件。
OPTIMIZE 与 VACUUM
OPTIMIZE
合并小文件为较大的文件,提高查询性能:
# 合并小文件
delta_table.optimize().executeCompaction()
# SQL
spark.sql("OPTIMIZE delta.`/data/delta/events`")
# 带 Z-Order 的优化
spark.sql("OPTIMIZE delta.`/data/delta/events` ZORDER BY (user_id)")VACUUM
物理删除已被标记为 remove 的旧数据文件,释放存储空间:
# 删除超过 7 天的旧文件(默认保留 7 天)
delta_table.vacuum(retentionHours=168)
# 注意:VACUUM 后无法再时间旅行到被清理的版本
# 设置更短的保留期需要显式禁用安全检查
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_table.vacuum(retentionHours=0) # 危险操作Change Data Feed(CDF)
CDF 让下游消费者可以增量读取 Delta 表的变更数据。
# 启用 CDF
spark.sql("""
ALTER TABLE delta.`/data/delta/events`
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# 读取变更数据
changes = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 5) \
.option("endingVersion", 10) \
.load("/data/delta/events")
# 变更数据包含额外列
# _change_type: insert, update_preimage, update_postimage, delete
# _commit_version: 变更的版本号
# _commit_timestamp: 变更的时间戳
changes.select("id", "_change_type", "_commit_version").show()UniForm
UniForm 让 Delta Lake 表自动生成 Iceberg 和 Hudi 兼容的元数据,使其他引擎(如 Trino、Snowflake)可以直接以 Iceberg 或 Hudi 格式读取 Delta 表。
-- 创建支持 UniForm 的表
CREATE TABLE events (
id BIGINT,
event_type STRING,
event_time TIMESTAMP
)
USING DELTA
TBLPROPERTIES (
'delta.universalFormat.enabledFormats' = 'iceberg'
);最佳实践
| 场景 | 建议 |
|---|---|
| 分区策略 | 按日期分区,避免过多小分区 |
| 文件大小 | 目标 128MB-1GB(可通过 OPTIMIZE 调整) |
| VACUUM | 定期执行,保留 7-30 天 |
| Z-Order | 对高基数过滤列使用 |
| CDF | 增量 ETL 场景必备 |
| Schema Evolution | 使用 mergeSchema 而非 overwriteSchema |
总结
Delta Lake 通过事务日志在 Parquet 之上构建了完整的 ACID 事务、时间旅行和 Schema 管理能力。OPTIMIZE + Z-Ordering 优化查询性能,VACUUM 管理存储空间,Change Data Feed 支持增量数据消费。UniForm 的推出进一步增强了 Delta Lake 的互操作性。在 Databricks 和 Spark 生态中,Delta Lake 是数据湖格式的首选。
贡献者
更新日志
9f6c2-feat: organize wiki content and refresh site setup于