RocketMQ架构详解
约 1680 字大约 6 分钟
rocketmqmessage-queue
2025-06-01
概述
Apache RocketMQ是阿里巴巴开源的分布式消息中间件,最初用于支撑"双十一"等高并发场景。RocketMQ以其高吞吐量、低延迟、高可靠性和丰富的消息类型,成为国内使用最广泛的消息队列之一。本文深入解析RocketMQ的架构设计和核心概念。
整体架构
四大核心组件
NameServer
NameServer是一个轻量级的注册中心,功能类似于简化版的ZooKeeper,但更为简单高效。
// NameServer核心特点
// 1. 无状态:各NameServer之间不通信,独立工作
// 2. Broker每30秒向所有NameServer发送心跳
// 3. NameServer每10秒检查Broker是否存活(120秒无心跳则移除)
// 4. Producer/Consumer每30秒从NameServer拉取路由信息
// 与ZooKeeper对比
// NameServer: 无状态,AP模型,最终一致性
// ZooKeeper: 有状态,CP模型,强一致性Broker
Broker是消息存储和传递的核心服务。
Producer
// Producer发送消息示例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");
producer.start();
// 同步发送(可靠性最高)
Message msg = new Message("TopicOrder", "TagA", "OrderID001",
"Hello RocketMQ".getBytes(StandardCharsets.UTF_8));
SendResult result = producer.send(msg);
System.out.println("msgId=" + result.getMsgId()
+ ", status=" + result.getSendStatus());
// 异步发送(高吞吐)
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Send success: " + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.println("Send failed: " + e.getMessage());
}
});
// 单向发送(最高性能,不可靠)
producer.sendOneway(msg);Consumer
// Push模式(实际是长轮询)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("nameserver1:9876");
consumer.subscribe("TopicOrder", "TagA || TagB");
// 并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Received: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
// Pull模式(手动控制拉取)
DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer("pull_group");
pullConsumer.subscribe("TopicOrder", "*");
pullConsumer.start();
while (true) {
List<MessageExt> msgs = pullConsumer.poll(1000);
for (MessageExt msg : msgs) {
processMessage(msg);
}
pullConsumer.commitSync();
}消息存储
CommitLog
所有Topic的消息都顺序写入同一个CommitLog文件,这是RocketMQ高性能的关键设计。
ConsumeQueue
ConsumeQueue是逻辑消费队列,每个Topic的每个Queue对应一个ConsumeQueue文件。它存储了消息在CommitLog中的物理偏移量、消息大小和Tag哈希值。
ConsumeQueue条目格式(固定20字节):
+---------------------+
| CommitLog Offset 8B | 消息在CommitLog中的物理偏移量
+---------------------+
| Message Size 4B | 消息大小
+---------------------+
| Tag HashCode 8B | 用于Tag过滤
+---------------------+IndexFile
IndexFile支持按Message Key或时间范围查询消息,使用哈希索引结构。
IndexFile结构:
+------------------+
| Index Header | 40字节:包含起止时间、偏移量等元信息
+------------------+
| Hash Slot Table | 500万个Slot × 4字节 = 20MB
+------------------+
| Index Items | 2000万条 × 20字节 = 400MB
+------------------+
每个Index Item (20字节):
- Key Hash: 4B
- CommitLog Offset: 8B
- Timestamp Diff: 4B
- Next Index Offset: 4B (哈希冲突链)消息类型
普通消息
Message msg = new Message("TopicTest", "TagA", "Hello".getBytes());
producer.send(msg);顺序消息
保证同一个Queue中的消息有序消费。
// 发送顺序消息:相同orderId的消息发到同一Queue
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}, orderId);
// 顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
// 同一Queue的消息串行消费
processOrderMessage(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});延迟消息
// RocketMQ支持固定级别的延迟(开源版本)
// 延迟级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Message msg = new Message("TopicTest", "Delay", "Delayed message".getBytes());
msg.setDelayTimeLevel(3); // 延迟级别3 = 10s
producer.send(msg);
// 消息会在10秒后才能被Consumer消费事务消息
// 事务消息示例
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务(如数据库操作)
orderService.createOrder(msg);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查:检查本地事务是否执行成功
boolean exists = orderService.checkOrderExists(msg.getKeys());
return exists ?
LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
Message msg = new Message("TopicOrder", "Order creation".getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);Push vs Pull Consumer
总结
RocketMQ的架构设计体现了多项工程智慧:无状态的NameServer简化了注册中心的运维;CommitLog的顺序写入保证了高吞吐量;ConsumeQueue实现了高效的消息检索;丰富的消息类型(普通、顺序、延迟、事务)覆盖了绝大多数业务场景。与Kafka相比,RocketMQ在事务消息、延迟消息、消息过滤等方面提供了更多开箱即用的特性,尤其适合电商、金融等需要可靠消息传递的业务场景。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于