RocketMQ Producer 是消息系统的入口,负责消息的构建和发送。本文将深入探讨 RocketMQ Producer 的发送流程、消息结构以及可靠性保证机制。
一、Producer 发送流程
1.1 整体流程
RocketMQ Producer 的发送流程包括以下几个步骤:
sequenceDiagram
participant App as 应用程序
participant P as Producer
participant NS as NameServer
participant B as Broker
App->>P: 创建消息
P->>NS: 获取路由信息
NS-->>P: 返回 Broker 列表
P->>P: 选择队列
P->>B: 发送消息
B-->>P: 返回结果
P-->>App: 返回 SendResult
1.2 核心代码
// 1. 创建 Producer 实例
DefaultLitePullProducer producer = new DefaultLitePullProducer("producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 2. 创建消息
Message msg = new Message("topic", "tag", "key", "body".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 3. 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("发送成功:msgId=%s, offset=%d%n",
sendResult.getMsgId(), sendResult.getQueueOffset());
// 4. 关闭 Producer
producer.shutdown();
二、消息结构
2.1 消息组成
graph TB
subgraph Message
T[Topic]
TAG[Tag]
KEY[Key]
BODY[Body]
PROP[Properties]
end
subgraph 系统属性
MID[MessageId]
FLAG[Flag]
WAIT[WAIT_STORE_MSG_OK]
end
2.2 消息属性
| 属性 | 说明 | 必填 |
|---|---|---|
| Topic | 消息主题 | 是 |
| Tag | 消息标签 | 否 |
| Key | 消息业务键 | 否 |
| Body | 消息体 | 是 |
| Properties | 扩展属性 | 否 |
2.3 消息构建
// 1. 简单消息
Message msg = new Message("topic", "body".getBytes());
// 2. 带 Tag 的消息
Message msg = new Message("topic", "tag", "body".getBytes());
// 3. 带 Key 的消息(用于事务追踪)
Message msg = new Message("topic", "tag", "order_123", "body".getBytes());
// 4. 带扩展属性的消息
Message msg = new Message("topic", "tag", "key", "body".getBytes());
msg.putUserProperty("delayLevel", "3");
msg.putUserProperty("shardingKey", "user_456");
三、发送方式
3.1 同步发送
等待 Broker 响应,可靠性最高:
// 同步发送
SendResult sendResult = producer.send(msg);
// 处理结果
switch (sendResult.getSendStatus()) {
case SEND_OK:
System.out.println("发送成功");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("刷盘超时");
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("同步从节点超时");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("从节点不可用");
break;
}
适用场景:重要通知、短信通知、金融交易
3.2 异步发送
不等待响应,通过回调处理结果:
// 异步发送
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("发送成功:msgId=%s%n", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.err.println("发送失败:" + e.getMessage());
// 可以重试或记录日志
}
});
适用场景:日志收集、异步通知、耗时不敏感
3.3 单向发送
不等待响应,无回调,性能最高:
// 单向发送
producer.sendOneway(msg);
适用场景:日志采集、监控数据、可丢失的消息
3.4 对比
| 发送方式 | 吞吐量 | 可靠性 | 延迟 | 适用场景 |
|---|---|---|---|---|
| 同步 | 低 | 高 | 高 | 重要业务 |
| 异步 | 中 | 中 | 中 | 一般业务 |
| 单向 | 高 | 低 | 低 | 日志监控 |
四、发送可靠性
4.1 重试机制
// 配置重试
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
4.2 超时控制
// 超时配置
producer.setSendMsgTimeout(3000); // 3 秒超时
4.3 VIP 通道
// 关闭 VIP 通道(避免某些网络问题)
producer.setVipChannelEnabled(false);
五、批量发送
5.1 简单批量
// 构建批量消息
List<Message> messages = Arrays.asList(
new Message("topic", "tag", "msg1".getBytes()),
new Message("topic", "tag", "msg2".getBytes()),
new Message("topic", "tag", "msg3".getBytes())
);
// 批量发送
SendResult sendResult = producer.send(messages);
5.2 分批发送
// 超过 4MB 需要分批
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024; // 1MB
private final List<Message> messages;
private int currIdx;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIdx < messages.size();
}
@Override
public List<Message> next() {
int startIdx = currIdx;
int size = 0;
while (currIdx < messages.size()) {
Message msg = messages.get(currIdx);
int msgSize = msg.getBody().length + 100; // 估算
if (size + msgSize > SIZE_LIMIT) {
break;
}
size += msgSize;
currIdx++;
}
return messages.subList(startIdx, currIdx);
}
}
// 使用
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> batch = splitter.next();
producer.send(batch);
}
六、消息路由
6.1 队列选择
// MessageQueueSelector 实现
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据订单 ID 选择队列(保证顺序)
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}, orderId);
6.2 延迟消息
// 设置延迟级别
Message msg = new Message("topic", "body".getBytes());
msg.setDelayTimeLevel(3); // 10 秒
// 延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
SendResult result = producer.send(msg);
6.3 顺序消息
// 发送顺序消息
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("顺序消息发送成功");
}
@Override
public void onException(Throwable e) {
System.out.println("顺序消息发送失败");
}
};
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 保证同一订单发送到同一队列
return mqs.get(((Long) arg).intValue() % mqs.size());
}
}, orderId, sendCallback);
七、事务消息
7.1 事务消息流程
sequenceDiagram
participant P as Producer
participant B as Broker
participant L as 本地事务
P->>B: 发送半消息
B-->>P: 返回 PREPARE
P->>L: 执行本地事务
L-->>P: 返回结果
P->>B: 提交/回滚
B-->>P: 确认结果
7.2 事务消息实现
// 1. 创建事务生产者
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
// 业务逻辑
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务状态
return LocalTransactionState.COMMIT_MESSAGE;
}
};
TransactionMQProducer producer = new TransactionMQProducer("transaction-group");
producer.setTransactionListener(transactionListener);
producer.start();
// 2. 发送事务消息
Message msg = new Message("topic", "body".getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
八、性能优化
8.1 配置优化
// 1. 增加发送线程池
producer.setSendMsgTimeout(3000);
producer.setRetryTimesWhenSendFailed(3);
// 2. 调整批次大小
// 在 Broker 端配置
// sendMessageThreadPoolNums=4
// useDefaultAsyncSendMessage=true
8.2 使用建议
| 场景 | 发送方式 | 重试次数 | 超时时间 |
|---|---|---|---|
| 金融交易 | 同步 | 3 | 3000ms |
| 订单通知 | 同步 | 2 | 2000ms |
| 日志收集 | 单向 | 0 | 1000ms |
| 异步通知 | 异步 | 2 | 2000ms |
九、常见问题排查
9.1 发送超时
原因:
- Broker 负载高
- 网络延迟
- 超时设置过短
解决:
producer.setSendMsgTimeout(5000);
producer.setRetryTimesWhenSendFailed(5);
9.2 消息重复
原因:
- 网络抖动导致重试
- Producer 重复发送
解决:
- 业务层实现幂等性
- 使用消息去重表
9.3 发送失败
原因:
- NameServer 不可用
- Topic 不存在
- Broker 故障
解决:
// 1. 检查 NameServer 连接
producer.setNamesrvAddr("ns1:9876;ns2:9876");
// 2. 自动创建 Topic
producer.setCreateTopicKey("topic");
// 3. 配置多个 Broker
总结
RocketMQ Producer 的核心机制:
- 发送方式:同步、异步、单向,根据场景选择
- 消息结构:Topic、Tag、Key、Body
- 可靠性保证:重试机制、超时控制
- 高级特性:批量发送、延迟消息、顺序消息、事务消息
- 性能优化:配置调优、使用建议
核心要点:
- 根据业务重要性选择发送方式
- 合理配置重试和超时参数
- 使用 Key 实现消息追踪
- 事务消息保证最终一致性
参考资料
- RocketMQ Producer 官方文档
- RocketMQ 源码
- 《RocketMQ 技术内幕》第 3 章