数据管道设计模式
约 1520 字大约 5 分钟
data-pipelineetl
2025-09-06
数据管道是将原始数据转化为可用信息的核心基础设施。本文介绍 ETL/ELT 模式、批处理与流处理架构、数据质量保障以及编排策略。
ETL vs ELT
| 维度 | ETL | ELT |
|---|---|---|
| 转换位置 | 管道中间(外部工具) | 目标存储内(SQL/Spark) |
| 适用场景 | 传统数据仓库 | 云数据湖/Lakehouse |
| 灵活性 | 低(schema-on-write) | 高(schema-on-read) |
| 存储成本 | 低(只存转换后数据) | 较高(保留原始数据) |
| 代表工具 | Informatica, SSIS | dbt, Spark, BigQuery |
现代数据平台普遍采用 ELT 模式:先将原始数据加载到数据湖/仓库,再在其中进行转换。这样保留了原始数据的完整性,且能利用目标存储的计算能力。
批处理 vs 流处理
| 维度 | 批处理 | 流处理 |
|---|---|---|
| 延迟 | 分钟到小时 | 毫秒到秒 |
| 吞吐量 | 高 | 中等 |
| 复杂度 | 低 | 高 |
| 容错 | 简单(重跑) | 复杂(checkpoint) |
| 典型工具 | Spark, Hive, dbt | Flink, Kafka Streams |
| 适用场景 | 报表、模型训练、历史分析 | 实时监控、推荐、风控 |
Lambda 架构
Lambda 架构同时维护批处理层和速度层,将两者的结果合并提供给服务层。
优点:兼顾完整性和实时性。缺点:需要维护两套代码逻辑,复杂度高。
Kappa 架构
Kappa 架构用统一的流处理引擎处理所有数据,消除了 Lambda 架构中两套代码的问题。
通过消息队列(如 Kafka)的长期存储能力,可以重放历史数据来修正或重建结果。
CDC(Change Data Capture)
CDC 捕获数据库中的增量变更,避免全量扫描。
# Debezium + Kafka CDC 架构
# docker-compose.yml 片段
debezium_config = {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "${DB_PASSWORD}",
"database.server.id": "1",
"database.server.name": "myserver",
"table.include.list": "mydb.orders,mydb.users",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes",
}
# CDC 消息示例(Kafka 中的消息)
cdc_message = {
"op": "u", # c=create, u=update, d=delete, r=read(snapshot)
"before": {"id": 1, "name": "old_name", "amount": 100},
"after": {"id": 1, "name": "new_name", "amount": 150},
"source": {
"db": "mydb",
"table": "orders",
"ts_ms": 1693958400000,
}
}数据质量检查
在管道的关键节点嵌入数据质量检查,确保下游数据可靠。
from great_expectations import get_context
import great_expectations as gx
context = get_context()
# 定义数据质量期望
validator = context.sources.pandas_default.read_dataframe(df)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled"])
# 验证
results = validator.validate()
if not results.success:
raise DataQualityError(f"数据质量检查失败: {results}")编排(Orchestration)
数据管道编排负责调度和协调各个任务的执行顺序、依赖关系和错误处理。
# Airflow DAG 示例
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
}
with DAG(
'daily_etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # 每天凌晨2点
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_from_sources,
)
transform = SparkSubmitOperator(
task_id='transform_data',
application='/path/to/transform.py',
conn_id='spark_default',
)
quality_check = PythonOperator(
task_id='data_quality_check',
python_callable=run_quality_checks,
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_dwh,
)
extract >> transform >> quality_check >> load幂等性(Idempotency)
幂等性确保管道重复执行不会产生重复或错误的数据。实现方式:
# 1. Upsert(INSERT OR UPDATE)
def idempotent_write_sql(df, table_name, key_columns):
"""使用 MERGE 实现幂等写入"""
merge_sql = f"""
MERGE INTO {table_name} AS target
USING staging_table AS source
ON {' AND '.join(f'target.{k} = source.{k}' for k in key_columns)}
WHEN MATCHED THEN
UPDATE SET {', '.join(f'target.{c} = source.{c}' for c in df.columns)}
WHEN NOT MATCHED THEN
INSERT ({', '.join(df.columns)})
VALUES ({', '.join(f'source.{c}' for c in df.columns)})
"""
return merge_sql
# 2. 分区覆写
def idempotent_write_partition(df, table_name, partition_col, partition_value):
"""先删除目标分区,再写入"""
spark.sql(f"ALTER TABLE {table_name} DROP IF EXISTS PARTITION ({partition_col}='{partition_value}')")
df.write.partitionBy(partition_col).mode("append").saveAsTable(table_name)
# 3. 临时表 + 交换
def atomic_table_swap(staging_table, production_table):
"""原子性地交换表"""
spark.sql(f"ALTER TABLE {production_table} RENAME TO {production_table}_backup")
spark.sql(f"ALTER TABLE {staging_table} RENAME TO {production_table}")
spark.sql(f"DROP TABLE {production_table}_backup")回填策略(Backfill)
当管道逻辑变更或修复错误时,需要重新处理历史数据。
关键考虑:
- 资源隔离:回填任务不应影响日常管道的运行
- 顺序执行:有依赖关系的任务应按时间顺序回填
- 断点续传:支持从失败点恢复,而非从头开始
- 结果验证:回填后对比新旧结果,确认正确性
# Airflow 回填命令
# airflow dags backfill daily_etl_pipeline \
# --start-date 2025-01-01 \
# --end-date 2025-06-30 \
# --reset-dagruns监控与告警
# 数据管道监控指标
pipeline_metrics = {
"数据新鲜度": "最新数据的时间戳距当前时间的延迟",
"数据量波动": "与历史同期相比的记录数变化百分比",
"任务执行时间": "每个任务的运行时长趋势",
"失败率": "任务失败次数 / 总执行次数",
"数据完整性": "NULL 值比例、主键唯一性",
}总结
数据管道设计需要在实时性、复杂度和可靠性之间权衡。ELT 模式配合 Kappa 架构是现代数据平台的主流选择。幂等性、数据质量检查和完善的监控是保障管道稳定运行的关键要素。CDC 和流处理技术使近实时数据管道成为可能。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于