本文总结了 RocketMQ 在生产环境中的最佳实践和经验教训,涵盖架构设计、配置优化、故障处理等方面。
一、架构设计
1.1 部署架构
推荐架构:
graph TB
subgraph 可用区 A
NS1[NameServer 1]
B1M[Broker 1 Master]
B2M[Broker 2 Master]
end
subgraph 可用区 B
NS2[NameServer 2]
B1S[Broker 1 Slave]
B2S[Broker 2 Slave]
end
subgraph 客户端
P[Producer]
C[Consumer]
end
NS1 -.-> NS2
B1M -.-> B1S
B2M -.-> B2S
P --> NS1
P --> NS2
C --> NS1
C --> NS2
部署建议:
| 组件 | 数量 | 部署方式 |
|---|---|---|
| NameServer | 2-4 | 跨可用区 |
| Broker Master | 按需 | 多实例 |
| Broker Slave | 1:1 或 1:0 | 异步复制 |
1.2 Topic 设计
命名规范:
# 格式
{业务线}.{子系统}.{消息类型}
# 示例
order.trade.created # 订单 - 交易 - 创建
pay.transaction.completed # 支付 - 交易 - 完成
user.registered.success # 用户 - 注册 - 成功
分区规划:
// 分区数计算公式
int queueNum = max(
expectedTPS / singleQueueTPS, // 基于吞吐量
maxConsumerConcurrency // 基于消费并发
);
// 推荐配置
| 场景 | TPS | 分区数 |
|------|-----|--------|
| 低并发 | < 1000 | 4-8 |
| 中并发 | 1000-5000 | 8-16 |
| 高并发 | 5000-20000 | 16-32 |
| 超高并发 | > 20000 | 32-64 |
1.3 消息设计
消息体大小:
// 推荐:< 4KB
// 最大:4MB(默认)
// 大消息处理方案
public void sendLargeMessage(LargeObject obj) {
// 方案 1:压缩
byte[] compressed = compress(obj);
// 方案 2:分片
List<byte[]> shards = split(obj);
for (byte[] shard : shards) {
send(shard);
}
// 方案 3:存储外部,消息传引用
String url = storage.save(obj);
send(new ReferenceMessage(url));
}
Tag 和 Key 设计:
// Tag:用于过滤,建议不超过 20 个
message.setTags("order-create"); // ✅
message.setTags("order-create,pay"); // ❌ 多个 Tag
// Key:用于查询和去重,保持唯一性
message.setKeys(orderId.toString()); // ✅ 订单 ID
message.setKeys(uuid); // ✅ UUID
// 多 Key 场景(用逗号分隔)
message.setKeys(orderId + "," + userId);
二、配置优化
2.1 Producer 配置
public class ProducerConfig {
public static DefaultMQProducer createProducer(String group) {
DefaultMQProducer producer = new DefaultMQProducer(group);
// 基础配置
producer.setNamesrvAddr("ns1:9876;ns2:9876");
// 性能优化
producer.setVipChannelEnabled(false); // 关闭 VIP 通道
producer.setSendMsgTimeout(5000); // 5 秒超时
// 可靠性配置
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
return producer;
}
}
2.2 Consumer 配置
public class ConsumerConfig {
public static DefaultMQPushConsumer createConsumer(String group, String topic)
throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
// 基础配置
consumer.setNamesrvAddr("ns1:9876;ns2:9876");
consumer.subscribe(topic, "*");
// 并发配置
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 拉取配置
consumer.setPullBatchSize(32);
consumer.setPullThresholdForQueue(200);
consumer.setPullThresholdSizeForQueue(100);
// 消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
return consumer;
}
}
2.3 Broker 配置
# broker.conf
# 基础配置
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=ns1:9876;ns2:9876
# 性能配置
listenPort=10911
brokerIP1=192.168.1.100
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
# 存储配置
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue=/data/rocketmq/store/consumequeue
storePathIndex=/data/rocketmq/store/index
# 文件配置
fileReservedTime=72
deleteWhen=02
maxMessageSize=4194304
# 刷盘配置
flushCommitLogLeastPages=4
flushCommitLogThoroughInterval=200
flushSlaveTimeoutMillis=3000
三、消息可靠性
3.1 发送可靠性
public class ReliableProducer {
/**
* 可靠发送(同步 + 重试)
*/
public boolean sendReliable(Message msg) {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
try {
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
return true;
}
} catch (Exception e) {
log.warn("发送失败,第 {} 次重试", i + 1, e);
}
}
return false;
}
/**
* 事务消息(最终一致性)
*/
public void sendTransactionMessage(Message msg, Object businessArg) {
TransactionSendResult result =
transactionProducer.sendMessageInTransaction(msg, businessArg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
log.error("事务消息发送失败");
// 记录日志,人工处理
}
}
}
3.2 消费可靠性
public class ReliableConsumer {
/**
* 可靠消费(幂等 + 重试)
*/
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
// 1. 幂等检查
if (isProcessed(msgId)) {
log.info("消息已处理:msgId={}", msgId);
continue;
}
try {
// 2. 业务处理
processBusiness(msg);
// 3. 标记已处理
markAsProcessed(msgId);
} catch (BusinessException e) {
// 业务异常,记录日志,跳过
log.error("业务异常:msgId={}", msgId, e);
saveFailedMessage(msg);
} catch (Exception e) {
// 系统异常,返回重试
log.error("系统异常:msgId={}", msgId, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
3.3 消息追踪
public class MessageTrace {
/**
* 发送追踪
*/
public void traceSend(Message msg, SendResult result) {
TraceMessage trace = new TraceMessage();
trace.setMsgId(result.getMsgId());
trace.setTopic(msg.getTopic());
trace.setTags(msg.getTags());
trace.setKeys(msg.getKeys());
trace.setBodyLength(msg.getBody().length);
trace.setSendTime(System.currentTimeMillis());
trace.setStatus("SENT");
traceService.save(trace);
}
/**
* 消费追踪
*/
public void traceConsume(MessageExt msg, ConsumeResult result) {
TraceMessage trace = traceService.findByMsgId(msg.getMsgId());
if (trace != null) {
trace.setConsumeTime(System.currentTimeMillis());
trace.setConsumeResult(result.name());
trace.setConsumerGroup(msg.getConsumerGroup());
traceService.update(trace);
}
}
}
四、故障处理
4.1 常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 消息丢失 | acks 未确认 | 开启事务消息、持久化 |
| 消息重复 | 网络重试 | 实现幂等性 |
| 消息堆积 | 消费慢 | 增加消费者、优化逻辑 |
| 发送失败 | Broker 故障 | 重试、切换 Broker |
| 消费失败 | 业务异常 | 重试、死信队列 |
4.2 消息堆积处理
#!/bin/bash
# 消息堆积应急处理脚本
TOPIC="order-topic"
GROUP="order-consumer-group"
# 1. 查看堆积情况
echo "=== 消费组状态 ==="
mqadmin consumerProgress -n ns1:9876 -g $GROUP
# 2. 查看消费者状态
echo "=== 消费者状态 ==="
mqadmin consumerStatus -n ns1:9876 -g $GROUP
# 3. 临时增加消费者
echo "=== 启动临时消费者 ==="
nohup java -jar consumer.jar --group=$GROUP-temp &
# 4. 如需要,跳过部分消息
echo "=== 重置偏移量(慎用)==="
mqadmin resetOffset -n ns1:9876 -t $TOPIC -g $GROUP -s now -f true
4.3 Broker 故障处理
#!/bin/bash
# Broker 故障切换脚本
FAILEDBROKER="broker-a"
NEWBROKER="broker-c"
# 1. 停止故障 Broker
echo "停止故障 Broker..."
ssh $FAILEDBROKER "kill -9 $(ps -ef | grep BrokerStartup | awk '{print $2}')"
# 2. 启动备用 Broker
echo "启动备用 Broker..."
ssh $NEWBROKER "/opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker-c.conf &"
# 3. 检查状态
echo "检查 Broker 状态..."
mqadmin brokerStatus -n ns1:9876 -b $NEWBROKER:10911
# 4. 发送告警
sendAlert "Broker 故障切换完成:$FAILEDBROKER -> $NEWBROKER"
五、性能优化
5.1 吞吐量优化
// 高吞吐配置
public class HighThroughputConfig {
public static Properties getProducerProps() {
Properties props = new Properties();
// 异步发送
props.put("sendMsgTimeout", "3000");
props.put("retryTimesWhenSendAsyncFailed", "2");
// 批量发送
// 在应用层实现批量
return props;
}
public static Properties getConsumerProps() {
Properties props = new Properties();
// 增加并发
props.put("consumeThreadMin", "40");
props.put("consumeThreadMax", "100");
// 批量拉取
props.put("pullBatchSize", "64");
return props;
}
}
5.2 延迟优化
// 低延迟配置
public class LowLatencyConfig {
public static Properties getProducerProps() {
Properties props = new Properties();
// 同步发送(保证可靠性)
props.put("sendMsgTimeout", "2000");
// 不重试(快速失败)
props.put("retryTimesWhenSendFailed", "0");
return props;
}
public static Properties getConsumerProps() {
Properties props = new Properties();
// 减少批量
props.put("pullBatchSize", "16");
props.put("pullInterval", "0");
return props;
}
}
六、监控告警
6.1 关键指标
| 指标 | 告警阈值 | 说明 |
|---|---|---|
| 生产 TPS | < 1000 | 写入速率过低 |
| 消费 TPS | < 1000 | 消费速率过低 |
| 消费延迟 | > 10000 | 消息堆积 |
| 发送失败率 | > 1% | 发送异常 |
| 消费失败率 | > 1% | 消费异常 |
| Broker 宕机 | = 0 | 节点不可用 |
6.2 监控配置
# Prometheus 告警规则
groups:
- name: rocketmq
rules:
- alert: RocketMQBrokerDown
expr: up{job="rocketmq-broker"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Broker 宕机:{{ $labels.instance }}"
- alert: RocketMQConsumerLag
expr: rocketmq_consumer_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "消费滞后:{{ $labels.group }} - {{ $value }}"
- alert: RocketMQSendFailed
expr: rate(rocketmq_send_failed[5m]) > 0.01
for: 5m
labels:
severity: warning
annotations:
summary: "发送失败率过高"
七、运维管理
7.1 日常巡检
#!/bin/bash
# 日常巡检脚本
echo "=== RocketMQ 日常巡检 ==="
echo "时间:$(date)"
# 1. 检查 NameServer
echo -e "\n=== NameServer 状态 ==="
mqadmin namesrvStatus -n ns1:9876
# 2. 检查 Broker
echo -e "\n=== Broker 状态 ==="
mqadmin brokerStatus -n ns1:9876 -b broker-1:10911
# 3. 检查 Topic
echo -e "\n=== Topic 统计 ==="
mqadmin topicList -n ns1:9876
# 4. 检查消费组
echo -e "\n=== 消费组进度 ==="
mqadmin consumerProgress -n ns1:9876
# 5. 检查磁盘
echo -e "\n=== 磁盘使用 ==="
df -h /data/rocketmq/store
# 6. 检查集群
echo -e "\n=== 集群信息 ==="
mqadmin clusterList -n ns1:9876
7.2 备份策略
#!/bin/bash
# 元数据备份脚本
BACKUP_DIR="/backup/rocketmq"
DATE=$(date +%Y%m%d)
# 备份 Topic 配置
mqadmin topicList -n ns1:9876 > $BACKUP_DIR/topics_$DATE.txt
# 备份消费组配置
mqadmin consumerList -n ns1:9876 > $BACKUP_DIR/consumers_$DATE.txt
# 备份 Broker 配置
for broker in broker-1 broker-2 broker-3; do
mqadmin brokerStatus -n ns1:9876 -b $broker:10911 > \
$BACKUP_DIR/broker_${broker}_$DATE.txt
done
# 保留 30 天
find $BACKUP_DIR -name "*.txt" -mtime +30 -delete
八、经验总结
8.1 设计原则
- 消息要轻量:消息体尽量小于 4KB
- Tag 要精简:不超过 20 个 Tag
- Key 要唯一:用于去重和查询
- 消费要幂等:防止重复消费
- 监控要完善:关键指标告警
8.2 避坑指南
| 坑 | 表现 | 避免方法 |
|---|---|---|
| 消息重复 | 数据重复 | 实现幂等性 |
| 消息丢失 | 数据不一致 | 事务消息 |
| 消息堆积 | 延迟高 | 监控 + 扩容 |
| 顺序错乱 | 业务异常 | 顺序消息 |
| 大消息 | 性能差 | 压缩或分片 |
8.3 检查清单
上线前检查:
- Topic 和 Group 已创建
- 分区数合理
- 监控告警已配置
- 幂等性已实现
- 异常处理完善
- 日志记录完整
运维检查:
- 每日巡检
- 每周备份
- 每月容量评估
- 每季度演练
总结
RocketMQ 最佳实践的核心要点:
- 架构设计:合理部署、Topic 规划、消息设计
- 配置优化:Producer、Consumer、Broker 配置
- 消息可靠:发送可靠、消费可靠、消息追踪
- 故障处理:常见问题、应急处理
- 运维管理:日常巡检、备份策略
核心要点:
- 遵循命名规范和设计原则
- 实现消息幂等性和可靠性
- 建立完善的监控告警体系
- 定期巡检和备份
参考资料
- RocketMQ 最佳实践官方文档
- RocketMQ 运维指南
- 《RocketMQ 技术内幕》第 12 章