Skip to content
清晨的一缕阳光
返回

RocketMQ 消费者组与订阅关系详解

RocketMQ 消费者组是消息系统的核心组件,负责消息的消费和处理。本文将深入探讨消费者组的工作原理、订阅关系管理以及消费模式。

一、消费者组基础

1.1 核心概念

**消费者组(Consumer Group)**是一组消费者的集合,它们共同消费一个或多个 Topic 的消息:

graph TB
    subgraph Consumer Group A
        C1[Consumer 1]
        C2[Consumer 2]
        C3[Consumer 3]
    end
    
    subgraph Topic
        Q1[Queue 1]
        Q2[Queue 2]
        Q3[Queue 3]
        Q4[Queue 4]
    end
    
    C1 --> Q1
    C1 --> Q2
    C2 --> Q3
    C3 --> Q4

1.2 核心特性

特性说明
一组一消费一条消息只会被组内一个消费者消费
多组订阅多个消费者组可以订阅同一 Topic
队列独占组内一个队列只能被一个消费者消费
水平扩展增加消费者数量提高消费能力

1.3 基本使用

// 1. 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("localhost:9876");

// 2. 订阅 Topic
consumer.subscribe("topic", "*");

// 3. 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("收到消息:msgId=%s, body=%s%n", 
            msg.getMsgId(), new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

// 4. 启动消费者
consumer.start();

二、订阅关系管理

2.1 订阅方式

Tag 过滤

// 订阅单个 Tag
consumer.subscribe("topic", "tag1");

// 订阅多个 Tag(OR 关系)
consumer.subscribe("topic", "tag1 || tag2");

// 订阅所有 Tag
consumer.subscribe("topic", "*");

SQL92 过滤

// 需要 Broker 配置 enablePropertyFilter=true
consumer.subscribe("topic", MessageSelector.bySql("(TAGS is not null and TAGS = 'tag1') and (code is not null and code >= 100)"));

2.2 多 Topic 订阅

// 订阅多个 Topic
consumer.subscribe("topic-1", "tag1");
consumer.subscribe("topic-2", "tag2");
consumer.subscribe("topic-3", "*");

// 或使用 Pattern 订阅
PatternSubscriptionConsumer consumer = new PatternSubscriptionConsumer("group");
consumer.subscribe(Pattern.compile("topic-.*"), "tag1");

2.3 订阅关系存储

graph TB
    subgraph Consumer
        ST[SubscriptionTable]
        PQ[ProcessQueue]
    end
    
    ST --> T1[Topic-1]
    ST --> T2[Topic-2]
    
    T1 --> MQ1[MessageQueue-1]
    T1 --> MQ2[MessageQueue-2]

三、消费模式

3.1 推模式(Push)

默认的消费模式,Broker 主动推送消息:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.subscribe("topic", "*");

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    // 并发消费
    for (MessageExt msg : msgs) {
        process(msg);
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();

特点

3.2 拉模式(Pull)

消费者主动拉取消息:

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group");
consumer.setNamesrvAddr("localhost:9876");
consumer.start();

// 获取消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topic");

for (MessageQueue mq : mqs) {
    // 拉取消息
    PullResult pullResult = consumer.pull(mq, "*", 0, 32);
    
    switch (pullResult.getPullStatus()) {
        case FOUND:
            for (MessageExt msg : pullResult.getMsgFoundList()) {
                process(msg);
            }
            break;
        case NO_NEW_MSG:
            break;
        case NO_MATCHED_MSG:
            break;
        case OFFSET_ILLEGAL:
            // 修正偏移量
            consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
            break;
    }
}

特点

3.3 顺序消费

并发消费(默认):

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    // 多线程并发处理
    for (MessageExt msg : msgs) {
        process(msg);
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

顺序消费

consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
    // 单线程顺序处理
    for (MessageExt msg : msgs) {
        process(msg);
    }
    return ConsumeOrderlyStatus.SUCCESS;
});
消费模式吞吐量顺序性适用场景
并发消费无序日志、通知
顺序消费有序订单、交易

四、消费进度管理

4.1 偏移量存储

graph TB
    subgraph Consumer
        OFFSET[ConsumeOffset]
    end
    
    subgraph Broker
        OFFSET_STORE[OffsetStore]
    end
    
    subgraph ZooKeeper
        ZK[偏移量备份]
    end
    
    OFFSET -->|定期提交 | OFFSET_STORE
    OFFSET_STORE -.->|备份 | ZK

4.2 偏移量提交

自动提交(默认):

// 默认 5 秒自动提交
consumer.setAutoCommit(true);
consumer.setAutoCommitInterval(5000);

手动提交

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    try {
        for (MessageExt msg : msgs) {
            process(msg);
        }
        // 手动提交
        context.setAckIndex(-1); // 全部确认
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        // 部分确认
        context.setAckIndex(0); // 只确认第一条
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
});

4.3 偏移量重置

// 从最早开始消费
consumer.setOffsetStore(new LocalFileOffsetStore(consumer, "group"));
consumer.setOffsetResetStrategy(OffsetResetStrategy.EARLIEST);

// 从最新开始消费
consumer.setOffsetResetStrategy(OffsetResetStrategy.LATEST);

// 从指定时间开始消费
long timestamp = System.currentTimeMillis() - 24 * 60 * 60 * 1000; // 24 小时前
consumer.setOffset(timestamp);

五、重试机制

5.1 消费重试

// 配置重试
consumer.setMaxReconsumeTimes(3); // 最大重试 3 次
consumer.setConsumeTimeout(15, TimeUnit.MINUTES); // 消费超时时间

5.2 死信队列

消费失败超过最大重试次数后进入死信队列:

// 死信队列命名:%DLQ% + 消费者组名
// 例如:%DLQ%consumer-group

// 查看死信消息
DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
adminExt.start();

ClusterInfo clusterInfo = adminExt.examineBrokerClusterInfo();
// 查询死信队列

5.3 重试配置

// 自定义重试 Topic
consumer.setRetryTopicGroupName("retry-group");

// 延迟重试级别
// 级别 1-18: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

六、负载均衡

6.1 分配策略

平均分配(默认):

consumer.setMessageModel(MessageModel.CLUSTERING);
// 队列尽量平均分配给每个消费者

广播消费

consumer.setMessageModel(MessageModel.BROADCASTING);
// 每个消费者消费所有消息

6.2 重平衡

sequenceDiagram
    participant C1 as Consumer 1
    participant C2 as Consumer 2
    participant B as Broker
    participant ZK as NameServer
    
    C1->>ZK: 心跳
    C2->>ZK: 加入组
    ZK->>C1: 触发重平衡
    ZK->>C2: 触发重平衡
    C1->>B: 重新分配队列
    C2->>B: 重新分配队列

6.3 配置优化

// 心跳间隔
consumer.setHeartbeatBrokerInterval(1000 * 30); // 30 秒

// 消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);

// 拉取配置
consumer.setPullBatchSize(32);
consumer.setPullInterval(0);

七、性能优化

7.1 并发消费优化

// 1. 增加消费线程
consumer.setConsumeThreadMin(40);
consumer.setConsumeThreadMax(100);

// 2. 批量拉取
consumer.setPullBatchSize(64);

// 3. 预取消息
consumer.setPullThresholdForQueue(200);
consumer.setPullThresholdSizeForQueue(100); // MB

7.2 内存优化

// 限制内存中消息数量
consumer.setPullThresholdForQueue(100);
consumer.setPullThresholdSizeForQueue(50);

// 限制进程内消息数量
consumer.setPullThresholdForTopic(1000);
consumer.setPullThresholdSizeForTopic(500);

7.3 网络优化

// 使用 VIP 通道
consumer.setVipChannelEnabled(true);

// 设置超时
consumer.setConsumeTimeout(15, TimeUnit.MINUTES);

八、常见问题排查

8.1 消费滞后

监控指标

解决

// 1. 增加消费者数量
// 2. 提高并发度
consumer.setConsumeThreadMax(100);
// 3. 优化消费逻辑
// 4. 增加分区数

8.2 重复消费

原因

解决

// 1. 业务层去重
Set<String> processedIds = new HashSet<>();
if (!processedIds.contains(getBusinessId(msg))) {
    process(msg);
    processedIds.add(getBusinessId(msg));
}

// 2. 确保提交成功
context.setAckIndex(-1);

8.3 消费失败

原因

解决

consumer.registerMessageListener((msgs, context) -> {
    try {
        for (MessageExt msg : msgs) {
            process(msg);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        log.error("消费失败", e);
        
        // 记录失败消息
        saveFailedMessage(msg);
        
        // 返回重试
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
});

九、最佳实践

9.1 配置建议

场景消费模式线程数批量大小重试次数
订单处理顺序消费1013
日志收集并发消费40640
通知推送并发消费20323
数据同步并发消费30485

9.2 监控指标

9.3 代码模板

public class RocketMQConsumerService {
    private DefaultMQPushConsumer consumer;
    
    @PostConstruct
    public void init() throws MQClientException {
        consumer = new DefaultMQPushConsumer("consumer-group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeThreadMin(20);
        consumer.setConsumeThreadMax(64);
        
        consumer.subscribe("topic", "tag1 || tag2");
        
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                for (MessageExt msg : msgs) {
                    processMessage(msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                log.error("消费失败", e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        
        consumer.start();
    }
    
    @PreDestroy
    public void close() {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}

总结

RocketMQ 消费者组的核心机制:

  1. 消费者组:实现水平扩展和消息隔离
  2. 订阅关系:Tag 过滤、SQL92 过滤
  3. 消费模式:推模式、拉模式、并发/顺序消费
  4. 偏移量管理:自动/手动提交、重置策略
  5. 重试机制:消费重试、死信队列
  6. 性能优化:并发配置、内存控制

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 生产问题排查案例集
下一篇文章
RocketMQ 运维脚本与工具集