Redis发布订阅机制
约 1548 字大约 5 分钟
redispub-sub
2025-05-10
Redis 的发布订阅(Pub/Sub)是一种消息通信模式,发布者(Publisher)将消息发送到频道(Channel),订阅者(Subscriber)接收频道中的消息。本文深入分析 Pub/Sub 的实现原理、使用方式及其局限性。
基本架构
核心特点:
- 发布者和订阅者之间完全解耦
- 消息是实时推送的(push model)
- 消息不持久化,不保证送达
基础命令
频道订阅与发布
# 订阅频道(阻塞等待消息)
SUBSCRIBE news sports weather
# 发布消息到频道
PUBLISH news "Breaking: Redis 8.0 released!"
# 返回值:收到消息的订阅者数量
# 取消订阅
UNSUBSCRIBE news
UNSUBSCRIBE # 取消所有订阅模式订阅 (Pattern Subscribe)
# 订阅匹配模式的所有频道
PSUBSCRIBE news.* # 匹配 news.tech, news.sports 等
PSUBSCRIBE user.*.login # 匹配 user.123.login 等
# 取消模式订阅
PUNSUBSCRIBE news.*消息格式
订阅者收到的消息是一个数组,格式因订阅方式不同而有所区别:
# SUBSCRIBE 收到的消息
1) "message" # 消息类型
2) "news" # 频道名
3) "Hello World" # 消息内容
# PSUBSCRIBE 收到的消息
1) "pmessage" # 消息类型
2) "news.*" # 匹配的模式
3) "news.tech" # 实际频道名
4) "Hello World" # 消息内容
# 订阅确认消息
1) "subscribe" # 消息类型
2) "news" # 频道名
3) (integer) 1 # 当前订阅数量内部实现
Channel 订阅
Redis 内部使用一个字典(dict)维护频道与订阅者的关系:
- 键(key)是频道名称
- 值(value)是订阅该频道的客户端链表
- PUBLISH 时遍历对应频道的客户端链表,逐个发送消息
Pattern 订阅
模式订阅使用一个链表(list)维护:
PUBLISH 时需要遍历所有 pattern 进行匹配,时间复杂度 O(N+M),N 是精确订阅数,M 是模式订阅数。
PUBLISH 执行流程
使用场景
实时通知系统
# 用户上线通知
SUBSCRIBE user.online
# 发布上线事件
PUBLISH user.online '{"userId": 1001, "time": "2025-05-10T10:00:00Z"}'配置变更广播
import redis
# 发布方
r = redis.Redis()
r.publish('config.update', '{"key": "max_connections", "value": 1000}')
# 订阅方
pubsub = r.pubsub()
pubsub.subscribe('config.update')
for message in pubsub.listen():
if message['type'] == 'message':
config = json.loads(message['data'])
apply_config(config)聊天室
# 订阅聊天室
pubsub.subscribe(f'chatroom:{room_id}')
# 发送消息
r.publish(f'chatroom:{room_id}', json.dumps({
'sender': username,
'content': message,
'timestamp': time.time()
}))Redis 内部使用
# Sentinel 通过 Pub/Sub 通信
SUBSCRIBE __sentinel__:hello
# Keyspace 通知(需要开启)
CONFIG SET notify-keyspace-events KEA
# 订阅所有键的过期事件
PSUBSCRIBE __keyevent@0__:expiredPub/Sub 的局限性
# 客户端输出缓冲区限制(防止慢消费者占满内存)
# 默认配置:32MB 硬限制,8MB 软限制/60秒
client-output-buffer-limit pubsub 32mb 8mb 60Redis Streams 作为替代方案
Redis 5.0 引入的 Streams 解决了 Pub/Sub 的主要局限:
| 特性 | Pub/Sub | Streams |
|---|---|---|
| 消息持久化 | 否 | 是 |
| 历史消息 | 不支持 | 支持 |
| 消费者组 | 不支持 | 支持 |
| 确认机制(ACK) | 不支持 | 支持 |
| 消息回溯 | 不支持 | 支持 |
| 阻塞读取 | SUBSCRIBE | XREAD BLOCK |
Streams 基本用法
# 写入消息
XADD mystream * user "Alice" action "login"
# 返回消息 ID: 1683705600000-0
# 读取消息
XREAD COUNT 10 STREAMS mystream 0
# 阻塞读取新消息
XREAD BLOCK 5000 COUNT 1 STREAMS mystream $
# 创建消费者组
XGROUP CREATE mystream mygroup 0
# 消费者组读取
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
# 确认消息
XACK mystream mygroup 1683705600000-0
# 查看未确认消息
XPENDING mystream mygroupStreams 消费者组模型
Pub/Sub 监控
# 查看活跃频道(至少一个订阅者的频道)
PUBSUB CHANNELS
# 匹配模式的活跃频道
PUBSUB CHANNELS news.*
# 查看频道的订阅者数量
PUBSUB NUMSUB news sports
# 查看模式订阅数量
PUBSUB NUMPAT
# 查看 Pub/Sub 相关统计
INFO stats
# pubsub_channels: 活跃频道数
# pubsub_patterns: 模式订阅数选择建议
总结
- Redis Pub/Sub 是轻量级的消息广播机制,适合实时通知和事件推送
- 消息不持久化、不保证送达,属于"最多一次"语义
- 模式订阅(PSUBSCRIBE)的匹配开销与模式数量成正比
- 对于需要可靠消息传递的场景,Redis Streams 是更好的选择
- 在生产环境中注意配置
client-output-buffer-limit防止慢消费者导致内存问题
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于