Flink流处理架构
约 1401 字大约 5 分钟
flinkstreaming
2025-09-08
Apache Flink 是分布式流处理领域的领先引擎,以"流优先"的设计理念同时支持有界和无界数据处理。本文深入介绍 Flink 的核心架构、时间语义、窗口机制、状态管理和容错保证。
架构概览
- JobManager:协调分布式执行,管理 checkpoint、故障恢复
- TaskManager:执行实际的数据处理任务,每个 TM 包含若干 Task Slot
- Task Slot:资源隔离单元,代表 TM 的一部分资源(内存)
Dataflow Graph
Flink 程序被编译为 Dataflow Graph,由 Source、Transformation 和 Sink 三类算子组成。
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
# Source: 从 Kafka 读取
kafka_consumer = FlinkKafkaConsumer(
topics='user_events',
deserialization_schema=SimpleStringSchema(),
properties={
'bootstrap.servers': 'kafka:9092',
'group.id': 'flink-consumer',
}
)
stream = env.add_source(kafka_consumer)
# Transformation
result = (
stream
.map(parse_json)
.filter(lambda event: event['type'] == 'click')
.key_by(lambda event: event['user_id'])
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(ClickCountAggregator())
)
# Sink: 写入结果
result.add_sink(jdbc_sink)
env.execute("Click Count Job")时间语义
Flink 支持三种时间概念:
| 时间类型 | 定义 | 适用场景 |
|---|---|---|
| Event Time | 事件实际发生的时间 | 大多数业务场景 |
| Processing Time | 事件被处理的时间 | 延迟不敏感场景 |
| Ingestion Time | 事件进入 Flink 的时间 | 折中方案 |
Event Time 处理需要配合 Watermark 机制来处理乱序数据。
Watermark(水位线)
Watermark 是 Flink 处理乱序事件的核心机制。Watermark(t) 表示"所有时间戳 <= t 的事件都已经到达"。
from pyflink.datastream import WatermarkStrategy
from pyflink.common import Duration
# 有界乱序水位线策略
watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(5))
.with_timestamp_assigner(
lambda event, _: event['timestamp']
)
)
stream_with_watermark = stream.assign_timestamps_and_watermarks(watermark_strategy)窗口(Windows)
滚动窗口(Tumbling Window)
固定大小、不重叠的窗口。
|---Window 1---|---Window 2---|---Window 3---|
0 5 10 15 (minutes)滑动窗口(Sliding Window)
固定大小、有重叠的窗口。由窗口大小和滑动步长定义。
|---Window 1(0-10)---|
|---Window 2(5-15)---|
|---Window 3(10-20)---|
0 5 10 15 20 (minutes)会话窗口(Session Window)
基于活动间隔的动态窗口。如果两个事件间隔超过阈值,则认为属于不同会话。
|--Session 1--| gap |----Session 2----| gap |--Session 3--|from pyflink.datastream.window import (
TumblingEventTimeWindows,
SlidingEventTimeWindows,
EventTimeSessionWindows,
)
from pyflink.common import Time
# 滚动窗口:每5分钟
stream.key_by(...).window(
TumblingEventTimeWindows.of(Time.minutes(5))
)
# 滑动窗口:窗口10分钟,每5分钟滑动一次
stream.key_by(...).window(
SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5))
)
# 会话窗口:30分钟无活动则关闭会话
stream.key_by(...).window(
EventTimeSessionWindows.with_gap(Time.minutes(30))
)状态管理
Flink 的有状态流处理是其最强大的特性之一。状态存储在本地(每个算子实例),并通过 checkpoint 实现容错。
状态类型
| 状态类型 | 用途 | 示例 |
|---|---|---|
| ValueState | 存储单个值 | 最新事件 |
| ListState | 存储列表 | 事件缓冲 |
| MapState | 存储键值对 | 计数器集合 |
| ReducingState | 聚合值 | 累加求和 |
| AggregatingState | 自定义聚合 | 复杂聚合 |
from pyflink.datastream import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
class StatefulCounter(MapFunction):
def open(self, runtime_context: RuntimeContext):
# 声明状态
self.count_state = runtime_context.get_state(
ValueStateDescriptor("count", Types.LONG())
)
def map(self, event):
current_count = self.count_state.value() or 0
new_count = current_count + 1
self.count_state.update(new_count)
return (event['user_id'], new_count)State Backend
| Backend | 存储 | 特点 |
|---|---|---|
| HashMapStateBackend | JVM 堆内存 | 快,受内存限制 |
| EmbeddedRocksDBStateBackend | RocksDB(磁盘) | 状态可超过内存,增量 checkpoint |
Checkpoint 与 Exactly-Once
Flink 使用 Chandy-Lamport 分布式快照算法的变体实现 checkpoint,保证 exactly-once 处理语义。
关键配置:
env.enable_checkpointing(60000) # 每60秒一次 checkpoint
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
env.get_checkpoint_config().set_checkpoint_timeout(120000)
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
# 使用 RocksDB State Backend
env.set_state_backend(EmbeddedRocksDBStateBackend())
env.get_checkpoint_config().set_checkpoint_storage("hdfs:///flink/checkpoints")Exactly-Once 端到端保证
Flink 内部通过 checkpoint 保证 exactly-once,但端到端 exactly-once 还需要 source 和 sink 的配合:
- Source:需要可重放(如 Kafka 的 offset 回滚)
- Sink:需要支持事务写入(如 Kafka 事务、两阶段提交)或幂等写入
Flink SQL
Flink SQL 让用户可以用标准 SQL 处理流数据:
-- 创建 Kafka Source 表
CREATE TABLE user_clicks (
user_id STRING,
url STRING,
click_time TIMESTAMP(3),
WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_clicks',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- 创建 Sink 表
CREATE TABLE click_stats (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
user_id STRING,
click_count BIGINT,
PRIMARY KEY (window_start, user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/analytics',
'table-name' = 'click_stats'
);
-- 窗口聚合查询
INSERT INTO click_stats
SELECT
window_start,
window_end,
user_id,
COUNT(*) as click_count
FROM TABLE(
TUMBLE(TABLE user_clicks, DESCRIPTOR(click_time), INTERVAL '5' MINUTES)
)
GROUP BY window_start, window_end, user_id;总结
Flink 的核心竞争力在于:原生流处理架构、精确的 Event Time 处理(Watermark)、灵活的窗口机制、强大的有状态流处理,以及基于 Chandy-Lamport 算法的 exactly-once 保证。Flink SQL 进一步降低了流处理的使用门槛。在实时数仓、实时风控、实时推荐等场景中,Flink 已成为标准选择。
贡献者
更新日志
9f6c2-feat: organize wiki content and refresh site setup于