Skip to content
清晨的一缕阳光
返回

RocketMQ 最佳实践与生产经验总结

本文总结了 RocketMQ 在生产环境中的最佳实践和经验教训,涵盖架构设计、配置优化、故障处理等方面。

一、架构设计

1.1 部署架构

推荐架构

graph TB
    subgraph 可用区 A
        NS1[NameServer 1]
        B1M[Broker 1 Master]
        B2M[Broker 2 Master]
    end
    
    subgraph 可用区 B
        NS2[NameServer 2]
        B1S[Broker 1 Slave]
        B2S[Broker 2 Slave]
    end
    
    subgraph 客户端
        P[Producer]
        C[Consumer]
    end
    
    NS1 -.-> NS2
    B1M -.-> B1S
    B2M -.-> B2S
    
    P --> NS1
    P --> NS2
    C --> NS1
    C --> NS2

部署建议

组件数量部署方式
NameServer2-4跨可用区
Broker Master按需多实例
Broker Slave1:1 或 1:0异步复制

1.2 Topic 设计

命名规范

# 格式
{业务线}.{子系统}.{消息类型}

# 示例
order.trade.created      # 订单 - 交易 - 创建
pay.transaction.completed # 支付 - 交易 - 完成
user.registered.success   # 用户 - 注册 - 成功

分区规划

// 分区数计算公式
int queueNum = max(
    expectedTPS / singleQueueTPS,  // 基于吞吐量
    maxConsumerConcurrency         // 基于消费并发
);

// 推荐配置
| 场景 | TPS | 分区数 |
|------|-----|--------|
| 低并发 | < 1000 | 4-8 |
| 中并发 | 1000-5000 | 8-16 |
| 高并发 | 5000-20000 | 16-32 |
| 超高并发 | > 20000 | 32-64 |

1.3 消息设计

消息体大小

// 推荐:< 4KB
// 最大:4MB(默认)

// 大消息处理方案
public void sendLargeMessage(LargeObject obj) {
    // 方案 1:压缩
    byte[] compressed = compress(obj);
    
    // 方案 2:分片
    List<byte[]> shards = split(obj);
    for (byte[] shard : shards) {
        send(shard);
    }
    
    // 方案 3:存储外部,消息传引用
    String url = storage.save(obj);
    send(new ReferenceMessage(url));
}

Tag 和 Key 设计

// Tag:用于过滤,建议不超过 20 个
message.setTags("order-create");      // ✅
message.setTags("order-create,pay");  // ❌ 多个 Tag

// Key:用于查询和去重,保持唯一性
message.setKeys(orderId.toString());  // ✅ 订单 ID
message.setKeys(uuid);                // ✅ UUID

// 多 Key 场景(用逗号分隔)
message.setKeys(orderId + "," + userId);

二、配置优化

2.1 Producer 配置

public class ProducerConfig {
    
    public static DefaultMQProducer createProducer(String group) {
        DefaultMQProducer producer = new DefaultMQProducer(group);
        
        // 基础配置
        producer.setNamesrvAddr("ns1:9876;ns2:9876");
        
        // 性能优化
        producer.setVipChannelEnabled(false);  // 关闭 VIP 通道
        producer.setSendMsgTimeout(5000);       // 5 秒超时
        
        // 可靠性配置
        producer.setRetryTimesWhenSendFailed(3);
        producer.setRetryTimesWhenSendAsyncFailed(3);
        producer.setRetryAnotherBrokerWhenNotStoreOK(true);
        
        return producer;
    }
}

2.2 Consumer 配置

public class ConsumerConfig {
    
    public static DefaultMQPushConsumer createConsumer(String group, String topic) 
            throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
        
        // 基础配置
        consumer.setNamesrvAddr("ns1:9876;ns2:9876");
        consumer.subscribe(topic, "*");
        
        // 并发配置
        consumer.setConsumeThreadMin(20);
        consumer.setConsumeThreadMax(64);
        
        // 拉取配置
        consumer.setPullBatchSize(32);
        consumer.setPullThresholdForQueue(200);
        consumer.setPullThresholdSizeForQueue(100);
        
        // 消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        
        return consumer;
    }
}

2.3 Broker 配置

# broker.conf

# 基础配置
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=ns1:9876;ns2:9876

# 性能配置
listenPort=10911
brokerIP1=192.168.1.100
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

# 存储配置
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue=/data/rocketmq/store/consumequeue
storePathIndex=/data/rocketmq/store/index

# 文件配置
fileReservedTime=72
deleteWhen=02
maxMessageSize=4194304

# 刷盘配置
flushCommitLogLeastPages=4
flushCommitLogThoroughInterval=200
flushSlaveTimeoutMillis=3000

三、消息可靠性

3.1 发送可靠性

public class ReliableProducer {
    
    /**
     * 可靠发送(同步 + 重试)
     */
    public boolean sendReliable(Message msg) {
        int maxRetries = 3;
        for (int i = 0; i < maxRetries; i++) {
            try {
                SendResult result = producer.send(msg);
                if (result.getSendStatus() == SendStatus.SEND_OK) {
                    return true;
                }
            } catch (Exception e) {
                log.warn("发送失败,第 {} 次重试", i + 1, e);
            }
        }
        return false;
    }
    
    /**
     * 事务消息(最终一致性)
     */
    public void sendTransactionMessage(Message msg, Object businessArg) {
        TransactionSendResult result = 
            transactionProducer.sendMessageInTransaction(msg, businessArg);
        
        if (result.getSendStatus() != SendStatus.SEND_OK) {
            log.error("事务消息发送失败");
            // 记录日志,人工处理
        }
    }
}

3.2 消费可靠性

public class ReliableConsumer {
    
    /**
     * 可靠消费(幂等 + 重试)
     */
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs, 
            ConsumeConcurrentlyContext context) {
        
        for (MessageExt msg : msgs) {
            String msgId = msg.getMsgId();
            
            // 1. 幂等检查
            if (isProcessed(msgId)) {
                log.info("消息已处理:msgId={}", msgId);
                continue;
            }
            
            try {
                // 2. 业务处理
                processBusiness(msg);
                
                // 3. 标记已处理
                markAsProcessed(msgId);
                
            } catch (BusinessException e) {
                // 业务异常,记录日志,跳过
                log.error("业务异常:msgId={}", msgId, e);
                saveFailedMessage(msg);
                
            } catch (Exception e) {
                // 系统异常,返回重试
                log.error("系统异常:msgId={}", msgId, e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

3.3 消息追踪

public class MessageTrace {
    
    /**
     * 发送追踪
     */
    public void traceSend(Message msg, SendResult result) {
        TraceMessage trace = new TraceMessage();
        trace.setMsgId(result.getMsgId());
        trace.setTopic(msg.getTopic());
        trace.setTags(msg.getTags());
        trace.setKeys(msg.getKeys());
        trace.setBodyLength(msg.getBody().length);
        trace.setSendTime(System.currentTimeMillis());
        trace.setStatus("SENT");
        
        traceService.save(trace);
    }
    
    /**
     * 消费追踪
     */
    public void traceConsume(MessageExt msg, ConsumeResult result) {
        TraceMessage trace = traceService.findByMsgId(msg.getMsgId());
        if (trace != null) {
            trace.setConsumeTime(System.currentTimeMillis());
            trace.setConsumeResult(result.name());
            trace.setConsumerGroup(msg.getConsumerGroup());
            traceService.update(trace);
        }
    }
}

四、故障处理

4.1 常见问题

问题原因解决方案
消息丢失acks 未确认开启事务消息、持久化
消息重复网络重试实现幂等性
消息堆积消费慢增加消费者、优化逻辑
发送失败Broker 故障重试、切换 Broker
消费失败业务异常重试、死信队列

4.2 消息堆积处理

#!/bin/bash
# 消息堆积应急处理脚本

TOPIC="order-topic"
GROUP="order-consumer-group"

# 1. 查看堆积情况
echo "=== 消费组状态 ==="
mqadmin consumerProgress -n ns1:9876 -g $GROUP

# 2. 查看消费者状态
echo "=== 消费者状态 ==="
mqadmin consumerStatus -n ns1:9876 -g $GROUP

# 3. 临时增加消费者
echo "=== 启动临时消费者 ==="
nohup java -jar consumer.jar --group=$GROUP-temp &

# 4. 如需要,跳过部分消息
echo "=== 重置偏移量(慎用)==="
mqadmin resetOffset -n ns1:9876 -t $TOPIC -g $GROUP -s now -f true

4.3 Broker 故障处理

#!/bin/bash
# Broker 故障切换脚本

FAILEDBROKER="broker-a"
NEWBROKER="broker-c"

# 1. 停止故障 Broker
echo "停止故障 Broker..."
ssh $FAILEDBROKER "kill -9 $(ps -ef | grep BrokerStartup | awk '{print $2}')"

# 2. 启动备用 Broker
echo "启动备用 Broker..."
ssh $NEWBROKER "/opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker-c.conf &"

# 3. 检查状态
echo "检查 Broker 状态..."
mqadmin brokerStatus -n ns1:9876 -b $NEWBROKER:10911

# 4. 发送告警
sendAlert "Broker 故障切换完成:$FAILEDBROKER -> $NEWBROKER"

五、性能优化

5.1 吞吐量优化

// 高吞吐配置
public class HighThroughputConfig {
    
    public static Properties getProducerProps() {
        Properties props = new Properties();
        
        // 异步发送
        props.put("sendMsgTimeout", "3000");
        props.put("retryTimesWhenSendAsyncFailed", "2");
        
        // 批量发送
        // 在应用层实现批量
        
        return props;
    }
    
    public static Properties getConsumerProps() {
        Properties props = new Properties();
        
        // 增加并发
        props.put("consumeThreadMin", "40");
        props.put("consumeThreadMax", "100");
        
        // 批量拉取
        props.put("pullBatchSize", "64");
        
        return props;
    }
}

5.2 延迟优化

// 低延迟配置
public class LowLatencyConfig {
    
    public static Properties getProducerProps() {
        Properties props = new Properties();
        
        // 同步发送(保证可靠性)
        props.put("sendMsgTimeout", "2000");
        
        // 不重试(快速失败)
        props.put("retryTimesWhenSendFailed", "0");
        
        return props;
    }
    
    public static Properties getConsumerProps() {
        Properties props = new Properties();
        
        // 减少批量
        props.put("pullBatchSize", "16");
        props.put("pullInterval", "0");
        
        return props;
    }
}

六、监控告警

6.1 关键指标

指标告警阈值说明
生产 TPS< 1000写入速率过低
消费 TPS< 1000消费速率过低
消费延迟> 10000消息堆积
发送失败率> 1%发送异常
消费失败率> 1%消费异常
Broker 宕机= 0节点不可用

6.2 监控配置

# Prometheus 告警规则
groups:
  - name: rocketmq
    rules:
      - alert: RocketMQBrokerDown
        expr: up{job="rocketmq-broker"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Broker 宕机:{{ $labels.instance }}"
      
      - alert: RocketMQConsumerLag
        expr: rocketmq_consumer_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消费滞后:{{ $labels.group }} - {{ $value }}"
      
      - alert: RocketMQSendFailed
        expr: rate(rocketmq_send_failed[5m]) > 0.01
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "发送失败率过高"

七、运维管理

7.1 日常巡检

#!/bin/bash
# 日常巡检脚本

echo "=== RocketMQ 日常巡检 ==="
echo "时间:$(date)"

# 1. 检查 NameServer
echo -e "\n=== NameServer 状态 ==="
mqadmin namesrvStatus -n ns1:9876

# 2. 检查 Broker
echo -e "\n=== Broker 状态 ==="
mqadmin brokerStatus -n ns1:9876 -b broker-1:10911

# 3. 检查 Topic
echo -e "\n=== Topic 统计 ==="
mqadmin topicList -n ns1:9876

# 4. 检查消费组
echo -e "\n=== 消费组进度 ==="
mqadmin consumerProgress -n ns1:9876

# 5. 检查磁盘
echo -e "\n=== 磁盘使用 ==="
df -h /data/rocketmq/store

# 6. 检查集群
echo -e "\n=== 集群信息 ==="
mqadmin clusterList -n ns1:9876

7.2 备份策略

#!/bin/bash
# 元数据备份脚本

BACKUP_DIR="/backup/rocketmq"
DATE=$(date +%Y%m%d)

# 备份 Topic 配置
mqadmin topicList -n ns1:9876 > $BACKUP_DIR/topics_$DATE.txt

# 备份消费组配置
mqadmin consumerList -n ns1:9876 > $BACKUP_DIR/consumers_$DATE.txt

# 备份 Broker 配置
for broker in broker-1 broker-2 broker-3; do
    mqadmin brokerStatus -n ns1:9876 -b $broker:10911 > \
        $BACKUP_DIR/broker_${broker}_$DATE.txt
done

# 保留 30 天
find $BACKUP_DIR -name "*.txt" -mtime +30 -delete

八、经验总结

8.1 设计原则

  1. 消息要轻量:消息体尽量小于 4KB
  2. Tag 要精简:不超过 20 个 Tag
  3. Key 要唯一:用于去重和查询
  4. 消费要幂等:防止重复消费
  5. 监控要完善:关键指标告警

8.2 避坑指南

表现避免方法
消息重复数据重复实现幂等性
消息丢失数据不一致事务消息
消息堆积延迟高监控 + 扩容
顺序错乱业务异常顺序消息
大消息性能差压缩或分片

8.3 检查清单

上线前检查

运维检查

总结

RocketMQ 最佳实践的核心要点:

  1. 架构设计:合理部署、Topic 规划、消息设计
  2. 配置优化:Producer、Consumer、Broker 配置
  3. 消息可靠:发送可靠、消费可靠、消息追踪
  4. 故障处理:常见问题、应急处理
  5. 运维管理:日常巡检、备份策略

核心要点

参考资料


分享这篇文章到:

上一篇文章
Redis 系列完整学习指南
下一篇文章
OpenSpec 规范体系详解