RocketMQ 消费者组是消息系统的核心组件,负责消息的消费和处理。本文将深入探讨消费者组的工作原理、订阅关系管理以及消费模式。
一、消费者组基础
1.1 核心概念
**消费者组(Consumer Group)**是一组消费者的集合,它们共同消费一个或多个 Topic 的消息:
graph TB
subgraph Consumer Group A
C1[Consumer 1]
C2[Consumer 2]
C3[Consumer 3]
end
subgraph Topic
Q1[Queue 1]
Q2[Queue 2]
Q3[Queue 3]
Q4[Queue 4]
end
C1 --> Q1
C1 --> Q2
C2 --> Q3
C3 --> Q4
1.2 核心特性
| 特性 | 说明 |
|---|---|
| 一组一消费 | 一条消息只会被组内一个消费者消费 |
| 多组订阅 | 多个消费者组可以订阅同一 Topic |
| 队列独占 | 组内一个队列只能被一个消费者消费 |
| 水平扩展 | 增加消费者数量提高消费能力 |
1.3 基本使用
// 1. 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("localhost:9876");
// 2. 订阅 Topic
consumer.subscribe("topic", "*");
// 3. 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.printf("收到消息:msgId=%s, body=%s%n",
msg.getMsgId(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 4. 启动消费者
consumer.start();
二、订阅关系管理
2.1 订阅方式
Tag 过滤:
// 订阅单个 Tag
consumer.subscribe("topic", "tag1");
// 订阅多个 Tag(OR 关系)
consumer.subscribe("topic", "tag1 || tag2");
// 订阅所有 Tag
consumer.subscribe("topic", "*");
SQL92 过滤:
// 需要 Broker 配置 enablePropertyFilter=true
consumer.subscribe("topic", MessageSelector.bySql("(TAGS is not null and TAGS = 'tag1') and (code is not null and code >= 100)"));
2.2 多 Topic 订阅
// 订阅多个 Topic
consumer.subscribe("topic-1", "tag1");
consumer.subscribe("topic-2", "tag2");
consumer.subscribe("topic-3", "*");
// 或使用 Pattern 订阅
PatternSubscriptionConsumer consumer = new PatternSubscriptionConsumer("group");
consumer.subscribe(Pattern.compile("topic-.*"), "tag1");
2.3 订阅关系存储
graph TB
subgraph Consumer
ST[SubscriptionTable]
PQ[ProcessQueue]
end
ST --> T1[Topic-1]
ST --> T2[Topic-2]
T1 --> MQ1[MessageQueue-1]
T1 --> MQ2[MessageQueue-2]
三、消费模式
3.1 推模式(Push)
默认的消费模式,Broker 主动推送消息:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.subscribe("topic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 并发消费
for (MessageExt msg : msgs) {
process(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
特点:
- 实时性好
- 由 Consumer 控制消费速率
- 适合大多数场景
3.2 拉模式(Pull)
消费者主动拉取消息:
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group");
consumer.setNamesrvAddr("localhost:9876");
consumer.start();
// 获取消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topic");
for (MessageQueue mq : mqs) {
// 拉取消息
PullResult pullResult = consumer.pull(mq, "*", 0, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
for (MessageExt msg : pullResult.getMsgFoundList()) {
process(msg);
}
break;
case NO_NEW_MSG:
break;
case NO_MATCHED_MSG:
break;
case OFFSET_ILLEGAL:
// 修正偏移量
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
break;
}
}
特点:
- 灵活控制拉取时机
- 适合批量处理
- 需要手动管理偏移量
3.3 顺序消费
并发消费(默认):
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 多线程并发处理
for (MessageExt msg : msgs) {
process(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
顺序消费:
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
// 单线程顺序处理
for (MessageExt msg : msgs) {
process(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
});
| 消费模式 | 吞吐量 | 顺序性 | 适用场景 |
|---|---|---|---|
| 并发消费 | 高 | 无序 | 日志、通知 |
| 顺序消费 | 低 | 有序 | 订单、交易 |
四、消费进度管理
4.1 偏移量存储
graph TB
subgraph Consumer
OFFSET[ConsumeOffset]
end
subgraph Broker
OFFSET_STORE[OffsetStore]
end
subgraph ZooKeeper
ZK[偏移量备份]
end
OFFSET -->|定期提交 | OFFSET_STORE
OFFSET_STORE -.->|备份 | ZK
4.2 偏移量提交
自动提交(默认):
// 默认 5 秒自动提交
consumer.setAutoCommit(true);
consumer.setAutoCommitInterval(5000);
手动提交:
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt msg : msgs) {
process(msg);
}
// 手动提交
context.setAckIndex(-1); // 全部确认
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 部分确认
context.setAckIndex(0); // 只确认第一条
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
4.3 偏移量重置
// 从最早开始消费
consumer.setOffsetStore(new LocalFileOffsetStore(consumer, "group"));
consumer.setOffsetResetStrategy(OffsetResetStrategy.EARLIEST);
// 从最新开始消费
consumer.setOffsetResetStrategy(OffsetResetStrategy.LATEST);
// 从指定时间开始消费
long timestamp = System.currentTimeMillis() - 24 * 60 * 60 * 1000; // 24 小时前
consumer.setOffset(timestamp);
五、重试机制
5.1 消费重试
// 配置重试
consumer.setMaxReconsumeTimes(3); // 最大重试 3 次
consumer.setConsumeTimeout(15, TimeUnit.MINUTES); // 消费超时时间
5.2 死信队列
消费失败超过最大重试次数后进入死信队列:
// 死信队列命名:%DLQ% + 消费者组名
// 例如:%DLQ%consumer-group
// 查看死信消息
DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
adminExt.start();
ClusterInfo clusterInfo = adminExt.examineBrokerClusterInfo();
// 查询死信队列
5.3 重试配置
// 自定义重试 Topic
consumer.setRetryTopicGroupName("retry-group");
// 延迟重试级别
// 级别 1-18: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
六、负载均衡
6.1 分配策略
平均分配(默认):
consumer.setMessageModel(MessageModel.CLUSTERING);
// 队列尽量平均分配给每个消费者
广播消费:
consumer.setMessageModel(MessageModel.BROADCASTING);
// 每个消费者消费所有消息
6.2 重平衡
sequenceDiagram
participant C1 as Consumer 1
participant C2 as Consumer 2
participant B as Broker
participant ZK as NameServer
C1->>ZK: 心跳
C2->>ZK: 加入组
ZK->>C1: 触发重平衡
ZK->>C2: 触发重平衡
C1->>B: 重新分配队列
C2->>B: 重新分配队列
6.3 配置优化
// 心跳间隔
consumer.setHeartbeatBrokerInterval(1000 * 30); // 30 秒
// 消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 拉取配置
consumer.setPullBatchSize(32);
consumer.setPullInterval(0);
七、性能优化
7.1 并发消费优化
// 1. 增加消费线程
consumer.setConsumeThreadMin(40);
consumer.setConsumeThreadMax(100);
// 2. 批量拉取
consumer.setPullBatchSize(64);
// 3. 预取消息
consumer.setPullThresholdForQueue(200);
consumer.setPullThresholdSizeForQueue(100); // MB
7.2 内存优化
// 限制内存中消息数量
consumer.setPullThresholdForQueue(100);
consumer.setPullThresholdSizeForQueue(50);
// 限制进程内消息数量
consumer.setPullThresholdForTopic(1000);
consumer.setPullThresholdSizeForTopic(500);
7.3 网络优化
// 使用 VIP 通道
consumer.setVipChannelEnabled(true);
// 设置超时
consumer.setConsumeTimeout(15, TimeUnit.MINUTES);
八、常见问题排查
8.1 消费滞后
监控指标:
- 消费 TPS
- 消息堆积量
- 消费延迟时间
解决:
// 1. 增加消费者数量
// 2. 提高并发度
consumer.setConsumeThreadMax(100);
// 3. 优化消费逻辑
// 4. 增加分区数
8.2 重复消费
原因:
- 消费成功但提交失败
- 重平衡导致
解决:
// 1. 业务层去重
Set<String> processedIds = new HashSet<>();
if (!processedIds.contains(getBusinessId(msg))) {
process(msg);
processedIds.add(getBusinessId(msg));
}
// 2. 确保提交成功
context.setAckIndex(-1);
8.3 消费失败
原因:
- 业务逻辑异常
- 消息格式错误
- 依赖服务不可用
解决:
consumer.registerMessageListener((msgs, context) -> {
try {
for (MessageExt msg : msgs) {
process(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("消费失败", e);
// 记录失败消息
saveFailedMessage(msg);
// 返回重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
九、最佳实践
9.1 配置建议
| 场景 | 消费模式 | 线程数 | 批量大小 | 重试次数 |
|---|---|---|---|---|
| 订单处理 | 顺序消费 | 10 | 1 | 3 |
| 日志收集 | 并发消费 | 40 | 64 | 0 |
| 通知推送 | 并发消费 | 20 | 32 | 3 |
| 数据同步 | 并发消费 | 30 | 48 | 5 |
9.2 监控指标
- 消费 TPS
- 消息堆积量
- 消费延迟
- 重试次数
- 死信数量
9.3 代码模板
public class RocketMQConsumerService {
private DefaultMQPushConsumer consumer;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
consumer.subscribe("topic", "tag1 || tag2");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt msg : msgs) {
processMessage(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("消费失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
@PreDestroy
public void close() {
if (consumer != null) {
consumer.shutdown();
}
}
}
总结
RocketMQ 消费者组的核心机制:
- 消费者组:实现水平扩展和消息隔离
- 订阅关系:Tag 过滤、SQL92 过滤
- 消费模式:推模式、拉模式、并发/顺序消费
- 偏移量管理:自动/手动提交、重置策略
- 重试机制:消费重试、死信队列
- 性能优化:并发配置、内存控制
核心要点:
- 根据业务选择消费模式(并发/顺序)
- 合理配置消费线程和批量大小
- 监控消费滞后和重试情况
- 实现业务层幂等性防止重复消费
参考资料
- RocketMQ Consumer 官方文档
- RocketMQ 源码
- 《RocketMQ 技术内幕》第 4 章