Airflow工作流编排
约 1355 字大约 5 分钟
airfloworchestration
2025-09-11
Apache Airflow 是最流行的工作流编排平台,用 Python 代码定义数据管道的 DAG(有向无环图),提供丰富的调度、监控和告警能力。本文详细介绍 Airflow 的核心概念、常用 Operator、通信机制和生产实践。
核心架构
- Scheduler:解析 DAG 文件,调度满足条件的任务
- Executor:负责将任务分发给 Worker 执行
- Worker:实际执行任务的进程
- Metadata DB:存储 DAG 定义、任务状态、执行历史
- Web Server:提供 UI 界面用于监控和管理
DAG 定义
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'execution_timeout': timedelta(hours=2),
}
with DAG(
dag_id='daily_data_pipeline',
default_args=default_args,
description='每日数据处理管道',
schedule_interval='0 2 * * *', # 每天凌晨 2 点
start_date=datetime(2025, 1, 1),
catchup=False, # 不回填历史
tags=['production', 'etl'],
max_active_runs=1, # 同时只允许一个实例运行
) as dag:
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')
extract = BashOperator(
task_id='extract_data',
bash_command='python /scripts/extract.py --date {{ ds }}',
)
def transform_data(**context):
execution_date = context['ds']
# 数据转换逻辑
print(f"Processing data for {execution_date}")
return {"records_processed": 10000}
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
)
load = BashOperator(
task_id='load_to_warehouse',
bash_command='python /scripts/load.py --date {{ ds }}',
)
# 定义依赖关系
start >> extract >> transform >> load >> end常用 Operator
BashOperator
执行 Bash 命令或脚本。
check_file = BashOperator(
task_id='check_file_exists',
bash_command='test -f /data/input/{{ ds }}.csv && echo "File exists"',
)PythonOperator
执行 Python 函数。
def process_data(ds, **kwargs):
ti = kwargs['ti']
# 使用 XCom 获取上游数据
upstream_result = ti.xcom_pull(task_ids='extract_data')
print(f"Upstream result: {upstream_result}")
return {"status": "success", "count": 5000}
process = PythonOperator(
task_id='process_data',
python_callable=process_data,
op_kwargs={'custom_param': 'value'},
)KubernetesPodOperator
在 Kubernetes 集群中启动 Pod 执行任务,实现资源隔离和依赖管理。
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
spark_job = KubernetesPodOperator(
task_id='run_spark_job',
name='spark-etl-job',
namespace='data-processing',
image='my-spark:3.5',
cmds=['spark-submit'],
arguments=[
'--master', 'k8s://https://k8s-api:6443',
'--conf', 'spark.executor.instances=10',
'/app/etl_job.py',
'--date', '{{ ds }}',
],
resources={
'request_cpu': '2',
'request_memory': '4Gi',
'limit_cpu': '4',
'limit_memory': '8Gi',
},
is_delete_operator_pod=True,
get_logs=True,
)其他常用 Operator
| Operator | 用途 |
|---|---|
| SqlSensor | 等待 SQL 查询返回结果 |
| S3KeySensor | 等待 S3 对象出现 |
| BigQueryOperator | 执行 BigQuery SQL |
| SparkSubmitOperator | 提交 Spark 作业 |
| EmailOperator | 发送邮件通知 |
| SlackWebhookOperator | 发送 Slack 通知 |
Sensor(传感器)
Sensor 是特殊的 Operator,它会持续轮询直到某个条件满足。
from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# 等待文件出现
wait_for_file = FileSensor(
task_id='wait_for_input_file',
filepath='/data/input/{{ ds }}.csv',
poke_interval=300, # 每 5 分钟检查一次
timeout=3600, # 最多等待 1 小时
mode='reschedule', # 释放 Worker 资源等待
)
# 等待 S3 文件
wait_for_s3 = S3KeySensor(
task_id='wait_for_s3_data',
bucket_name='data-bucket',
bucket_key='raw/events/dt={{ ds }}/_SUCCESS',
aws_conn_id='aws_default',
poke_interval=60,
timeout=7200,
mode='reschedule',
)XCom(跨任务通信)
XCom(Cross-Communication)允许任务之间传递小量数据。
# 推送数据到 XCom
def push_data(**context):
context['ti'].xcom_push(key='row_count', value=42000)
# PythonOperator 的返回值自动推送到 XCom
return {"status": "complete", "records": 42000}
# 拉取 XCom 数据
def pull_data(**context):
ti = context['ti']
row_count = ti.xcom_pull(task_ids='push_task', key='row_count')
result = ti.xcom_pull(task_ids='push_task') # 获取返回值
print(f"Row count: {row_count}, Result: {result}")
# Jinja 模板中使用 XCom
notify = BashOperator(
task_id='notify',
bash_command='echo "Processed {{ ti.xcom_pull(task_ids=\'push_task\', key=\'row_count\') }} rows"',
)注意:XCom 存储在 Metadata DB 中,不适合传递大量数据。大数据应通过文件路径或数据库引用传递。
Connections 与 Hooks
Connections 管理外部系统的连接信息,Hooks 提供与外部系统交互的 Python 接口。
from airflow.hooks.base import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
# 获取连接信息
conn = BaseHook.get_connection('my_postgres')
print(f"Host: {conn.host}, Port: {conn.port}, Schema: {conn.schema}")
# 使用 Hook 操作数据库
def query_database(**context):
pg_hook = PostgresHook(postgres_conn_id='my_postgres')
records = pg_hook.get_records("SELECT COUNT(*) FROM orders WHERE date = %s", [context['ds']])
return records[0][0]任务依赖关系
# 链式依赖
task_a >> task_b >> task_c
# 扇出/扇入
task_a >> [task_b, task_c, task_d] >> task_e
# 条件分支
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
if context['ds_nodash'] >= '20250901':
return 'new_pipeline'
return 'legacy_pipeline'
branch = BranchPythonOperator(
task_id='branch_decision',
python_callable=choose_branch,
)
branch >> [new_pipeline, legacy_pipeline] >> join_task调度策略
| 参数 | 说明 | 示例 |
|---|---|---|
| schedule_interval | Cron 表达式或预设 | '0 2 * * *', '@daily' |
| start_date | DAG 起始日期 | datetime(2025, 1, 1) |
| catchup | 是否回填缺失的运行 | True/False |
| max_active_runs | 同时运行的最大实例数 | 1 |
| depends_on_past | 依赖上一次运行成功 | True/False |
Executor 类型
| Executor | 特点 | 适用场景 |
|---|---|---|
| SequentialExecutor | 单线程串行 | 开发测试 |
| LocalExecutor | 多进程本地执行 | 小规模部署 |
| CeleryExecutor | 分布式 Celery Worker | 生产环境 |
| KubernetesExecutor | 每个任务启一个 Pod | 云原生部署 |
生产实践
# 推荐的 DAG 结构
# dags/
# daily_pipeline/
# __init__.py
# dag.py # DAG 定义
# tasks/
# extract.py # 抽取逻辑
# transform.py # 转换逻辑
# validate.py # 验证逻辑
# sql/
# transform.sql # SQL 模板
# tests/
# test_dag.py # DAG 测试
# DAG 测试
def test_dag_loaded():
"""验证 DAG 可以正常加载"""
from airflow.models import DagBag
dagbag = DagBag()
assert 'daily_data_pipeline' in dagbag.dags
assert dagbag.import_errors == {}
def test_dag_structure():
"""验证 DAG 结构"""
from airflow.models import DagBag
dagbag = DagBag()
dag = dagbag.get_dag('daily_data_pipeline')
assert len(dag.tasks) == 5
assert dag.has_task('extract_data')总结
Airflow 的核心优势在于用 Python 代码定义工作流(DAG as Code),提供了丰富的 Operator 生态和灵活的调度能力。生产部署时建议使用 KubernetesExecutor 实现资源隔离,配合 XCom 进行轻量级任务间通信,使用 Sensor 处理外部依赖。合理的 DAG 结构和完善的测试是保障编排系统稳定性的基础。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于