Skip to content
清晨的一缕阳光
返回

RocketMQ POP 消费模式详解与实战

RocketMQ 5.0 引入了全新的 POP 消费模式,解决了传统 Pull/Push 模式的重复消费问题。本文将深入探讨 POP 消费的实现原理和实战应用。

一、POP 消费基础

1.1 为什么需要 POP 模式?

Pull/Push 模式问题

问题场景:
1. 消费者宕机,消息重新投递
2. 消费处理时间长,触发重试
3. 网络抖动,Ack 丢失

结果:消息重复消费

POP 模式优势

POP 模式 = 消息可见性锁 + 定时续期

优势:
- 消息独占消费
- 自动续期机制
- 避免重复消费
- 支持负载均衡

1.2 消费模式对比

graph TB
    subgraph Pull 模式
        P1[消费者主动拉取]
        P2[无锁机制]
        P3[可能重复]
    end
    
    subgraph Push 模式
        S1[Broker 主动推送]
        S2[无锁机制]
        S3[可能重复]
    end
    
    subgraph POP 模式
        C1[获取消息锁]
        C2[独占消费]
        C3[定时续期]
        C4[避免重复]
    end
特性PullPushPOP
消息获取主动拉取Broker 推送获取锁
重复消费可能可能避免
负载均衡
实现复杂度
适用场景批量消费实时消费精确消费

1.3 核心概念

概念说明
InvisibleTime消息不可见时间(默认 30 秒)
PopLock消息消费锁
Renew锁续期机制
Ack消费确认

二、实现原理

2.1 POP 流程

sequenceDiagram
    participant C as Consumer
    participant B as Broker
    
    C->>B: 1. Pop 请求
    B->>B: 2. 加锁(InvisibleTime)
    B-->>C: 3. 返回消息
    
    loop 消费处理
        C->>C: 4. 处理消息
    end
    
    alt 消费成功
        C->>B: 5. Ack 确认
        B->>B: 6. 删除消息
    else 消费失败/超时
        B->>B: 7. 锁过期
        B->>B: 8. 消息可见
    end

2.2 消息锁机制

加锁

// Broker 端加锁逻辑
public PopMessageResult popMessage(String topic, String group, int queueId, 
                                    int batchSize, long invisibleTime) {
    // 1. 检查锁
    if (isLocked(queueId, offset)) {
        return null;  // 消息被锁
    }
    
    // 2. 加锁
    long expireTime = System.currentTimeMillis() + invisibleTime;
    setLock(queueId, offset, expireTime);
    
    // 3. 返回消息
    return getMessage(queueId, offset, batchSize);
}

续期

// Consumer 端续期逻辑
public void renewLock() {
    scheduledExecutor.scheduleAtFixedRate(() -> {
        for (MessageExt msg : processingMessages) {
            if (needRenew(msg)) {
                // 请求 Broker 续期
                renewMessage(msg.getMsgId(), newInvisibleTime);
            }
        }
    }, 10, 10, TimeUnit.SECONDS);
}

2.3 Ack 机制

正常 Ack

// 消费成功后 Ack
public void ackMessage(MessageExt msg) {
    // 发送 Ack 请求到 Broker
    AckMessageRequestHeader header = new AckMessageRequestHeader();
    header.setMsgId(msg.getMsgId());
    header.setOffset(msg.getQueueOffset());
    
    remotingClient.invokeOneway(brokerAddr, request, 3000);
}

超时处理

// Broker 端超时检查
public void checkTimeout() {
    long currentTime = System.currentTimeMillis();
    
    for (LockEntry lock : lockTable.values()) {
        if (currentTime > lock.expireTime) {
            // 锁过期,释放消息
            releaseLock(lock);
        }
    }
}

三、配置使用

3.1 Broker 配置

# broker.conf

# 启用 POP 消费
enablePop=true

# 默认不可见时间(毫秒)
defaultInvisibleTime=30000

# 最大不可见时间
maxInvisibleTime=43200000  # 12 小时

# 最小不可见时间
minInvisibleTime=10000  # 10 秒

# 锁表大小
popLockTableSize=100000

3.2 Consumer 配置

LitePullConsumer POP 模式

public class PopConsumer {
    
    public static void main(String[] args) throws Exception {
        // 创建 LitePullConsumer
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("pop-consumer-group");
        consumer.setNamesrvAddr("ns1:9876");
        
        // 启用 POP 模式
        consumer.setPop(true);
        
        // 设置不可见时间
        consumer.setInvisibleTime(30000);  // 30 秒
        
        // 设置批量大小
        consumer.setPullBatchSize(32);
        
        consumer.start();
        consumer.subscribe("pop-topic", "*");
        
        // 消费消息
        while (true) {
            List<MessageExt> messages = consumer.poll();
            for (MessageExt msg : messages) {
                try {
                    // 处理消息
                    processMessage(msg);
                    
                    // Ack 确认
                    consumer.ack(msg);
                } catch (Exception e) {
                    // 不 Ack,消息会重新可见
                    log.error("处理消息失败", e);
                }
            }
        }
    }
}

PushConsumer POP 模式

public class PopPushConsumer {
    
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pop-push-group");
        consumer.setNamesrvAddr("ns1:9876");
        
        // 启用 POP 模式
        consumer.setPop(true);
        
        // 设置不可见时间
        consumer.setInvisibleTime(30000);
        
        // 设置消费线程
        consumer.setConsumeThreadMin(20);
        consumer.setConsumeThreadMax(64);
        
        consumer.subscribe("pop-topic", "*");
        
        // 注册消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                try {
                    processMessage(msg);
                    // 自动 Ack
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    log.error("处理消息失败", e);
                    // 不 Ack,消息会重新可见
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        consumer.start();
    }
}

四、实战应用

4.1 订单处理

@Service
public class OrderPopConsumer {
    
    @Autowired
    private DefaultMQPushConsumer consumer;
    
    @Autowired
    private OrderService orderService;
    
    @PostConstruct
    public void init() throws MQClientException {
        consumer.setNamesrvAddr("ns1:9876");
        consumer.setPop(true);
        consumer.setInvisibleTime(60000);  // 订单处理时间长,设置 60 秒
        consumer.subscribe("order-topic", "*");
        
        consumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                try {
                    Order order = parseOrder(msg);
                    
                    // 处理订单(可能耗时较长)
                    orderService.processOrder(order);
                    
                    // 处理成功,自动 Ack
                    log.info("订单处理成功:orderId={}", order.getId());
                    
                } catch (Exception e) {
                    log.error("订单处理失败:orderId={}", getOrderIds(msg), e);
                    // 不 Ack,消息会重新可见
                    // 但不会立即重试,要等不可见时间过期
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        consumer.start();
    }
}

4.2 长时间任务

public class LongTaskConsumer {
    
    private final DefaultLitePullConsumer consumer;
    private final ExecutorService executor;
    
    public LongTaskConsumer() throws Exception {
        consumer = new DefaultLitePullConsumer("long-task-group");
        consumer.setNamesrvAddr("ns1:9876");
        consumer.setPop(true);
        consumer.setInvisibleTime(300000);  // 5 分钟
        consumer.subscribe("long-task-topic", "*");
        
        executor = Executors.newFixedThreadPool(10);
        
        consumer.start();
    }
    
    public void consume() {
        while (true) {
            List<MessageExt> messages = consumer.poll();
            
            for (MessageExt msg : messages) {
                // 提交到线程池处理
                executor.submit(() -> {
                    try {
                        // 长时间任务
                        processLongTask(msg);
                        
                        // 任务完成,Ack
                        consumer.ack(msg);
                        
                    } catch (Exception e) {
                        log.error("任务处理失败", e);
                        // 不 Ack,消息会重新可见
                    }
                });
            }
        }
    }
}

4.3 分布式锁替代

public class PopDistributedLock {
    
    private final DefaultLitePullConsumer consumer;
    
    public PopDistributedLock() throws Exception {
        consumer = new DefaultLitePullConsumer("lock-group");
        consumer.setNamesrvAddr("ns1:9876");
        consumer.setPop(true);
        consumer.setInvisibleTime(30000);  // 锁有效期 30 秒
        consumer.subscribe("lock-topic", "*");
        consumer.start();
    }
    
    /**
     * 尝试获取锁
     */
    public boolean tryLock(String lockKey, long timeout, TimeUnit unit) {
        long expireTime = System.currentTimeMillis() + unit.toMillis(timeout);
        
        while (System.currentTimeMillis() < expireTime) {
            // 尝试获取锁消息
            List<MessageExt> messages = consumer.poll(1000);
            
            for (MessageExt msg : messages) {
                if (lockKey.equals(msg.getKeys())) {
                    // 获取到锁
                    log.info("获取锁成功:{}", lockKey);
                    
                    // 启动续期线程
                    startRenewThread(msg);
                    
                    return true;
                }
            }
        }
        
        return false;
    }
    
    /**
     * 释放锁
     */
    public void unlock(MessageExt msg) {
        // Ack 确认,释放锁
        consumer.ack(msg);
        log.info("释放锁:{}", msg.getKeys());
    }
    
    private void startRenewThread(MessageExt msg) {
        // 定时续期,保持锁有效
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> {
            consumer.renew(msg);
        }, 10, 10, TimeUnit.SECONDS);
    }
}

五、性能优化

5.1 不可见时间配置

场景不可见时间说明
快速处理10-30 秒简单消息处理
一般业务30-60 秒数据库操作
长时间任务1-5 分钟外部调用、复杂计算
分布式锁30 秒锁续期机制

5.2 批量配置

// 批量 POP
consumer.setPop(true);
consumer.setPullBatchSize(64);  // 批量获取 64 条
consumer.setInvisibleTime(30000);

// 批量 Ack
List<MessageExt> messages = consumer.poll();
List<String> msgIds = messages.stream()
    .map(MessageExt::getMsgId)
    .collect(Collectors.toList());

consumer.ack(msgIds);  // 批量 Ack

5.3 续期优化

// 智能续期
public class SmartRenewService {
    
    private final Map<String, RenewTask> renewTasks = new ConcurrentHashMap<>();
    
    public void addRenewTask(MessageExt msg, long invisibleTime) {
        String msgId = msg.getMsgId();
        
        RenewTask task = new RenewTask(msg, invisibleTime);
        renewTasks.put(msgId, task);
        
        // 在不可见时间过半时续期
        long renewDelay = invisibleTime / 2;
        scheduledExecutor.schedule(() -> {
            if (renewTasks.containsKey(msgId)) {
                consumer.renew(msg);
                log.info("消息续期:{}", msgId);
            }
        }, renewDelay, TimeUnit.MILLISECONDS);
    }
    
    public void removeRenewTask(String msgId) {
        renewTasks.remove(msgId);
    }
}

六、监控告警

6.1 监控指标

指标说明告警阈值
pop_lock_count当前锁数量> 10000
pop_lock_expire_rate锁过期率> 5%
pop_ack_rateAck 确认率< 95%
pop_renew_count续期次数-

6.2 告警配置

# Prometheus 告警规则
groups:
  - name: rocketmq-pop
    rules:
      - alert: PopLockHigh
        expr: rocketmq_pop_lock_count > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "POP 锁数量过高:{{ $value }}"
      
      - alert: PopLockExpireHigh
        expr: rocketmq_pop_lock_expire_rate > 0.05
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "POP 锁过期率过高:{{ $value }}"
      
      - alert: PopAckRateLow
        expr: rocketmq_pop_ack_rate < 0.95
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "POP Ack 确认率过低:{{ $value }}"

七、最佳实践

7.1 使用建议

建议说明
合理设置不可见时间根据业务处理时间设置
实现续期机制长时间任务需要续期
监控锁状态及时发现锁问题
处理幂等性即使 POP 也要幂等

7.2 代码模板

public class PopConsumerTemplate {
    
    private final DefaultMQPushConsumer consumer;
    
    public PopConsumerTemplate(String group, String topic) throws MQClientException {
        consumer = new DefaultMQPushConsumer(group);
        consumer.setNamesrvAddr("ns1:9876");
        consumer.setPop(true);
        consumer.setInvisibleTime(30000);
        consumer.subscribe(topic, "*");
        
        consumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                try {
                    // 1. 业务处理
                    processBusiness(msg);
                    
                    // 2. 自动 Ack
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    
                } catch (BusinessException e) {
                    // 业务异常,记录日志,不重试
                    log.error("业务异常:msgId={}", msg.getMsgId(), e);
                    saveFailedMessage(msg);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    
                } catch (Exception e) {
                    // 系统异常,不 Ack,等待重试
                    log.error("系统异常:msgId={}", msg.getMsgId(), e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        consumer.start();
    }
    
    private void processBusiness(MessageExt msg) {
        // 业务处理逻辑
    }
    
    private void saveFailedMessage(MessageExt msg) {
        // 保存失败消息
    }
}

7.3 常见问题

问题 1:消息长时间不 Ack

原因:
- 消费者宕机
- 处理逻辑卡住
- 网络问题

解决:
- 设置合理不可见时间
- 实现续期机制
- 监控 Ack 率

问题 2:锁竞争严重

原因:
- 消息处理慢
- 消费者过多

解决:
- 优化处理逻辑
- 增加不可见时间
- 减少消费者数量

总结

RocketMQ POP 消费的核心要点:

  1. 实现原理:消息锁、续期机制、Ack 确认
  2. 配置使用:Broker 配置、Consumer 配置
  3. 实战应用:订单处理、长时间任务、分布式锁
  4. 性能优化:不可见时间、批量配置、续期优化
  5. 监控告警:锁数量、过期率、Ack 率

核心要点

参考资料


分享这篇文章到:

上一篇文章
Agent 反思与自修正机制
下一篇文章
RocketMQ 运维自动化与 DevOps 实践