Skip to content
清晨的一缕阳光
返回

RocketMQ 生产者发送机制详解

RocketMQ Producer 是消息系统的入口,负责消息的构建和发送。本文将深入探讨 RocketMQ Producer 的发送流程、消息结构以及可靠性保证机制。

一、Producer 发送流程

1.1 整体流程

RocketMQ Producer 的发送流程包括以下几个步骤:

sequenceDiagram
    participant App as 应用程序
    participant P as Producer
    participant NS as NameServer
    participant B as Broker
    
    App->>P: 创建消息
    P->>NS: 获取路由信息
    NS-->>P: 返回 Broker 列表
    P->>P: 选择队列
    P->>B: 发送消息
    B-->>P: 返回结果
    P-->>App: 返回 SendResult

1.2 核心代码

// 1. 创建 Producer 实例
DefaultLitePullProducer producer = new DefaultLitePullProducer("producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 2. 创建消息
Message msg = new Message("topic", "tag", "key", "body".getBytes(RemotingHelper.DEFAULT_CHARSET));

// 3. 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("发送成功:msgId=%s, offset=%d%n", 
    sendResult.getMsgId(), sendResult.getQueueOffset());

// 4. 关闭 Producer
producer.shutdown();

二、消息结构

2.1 消息组成

graph TB
    subgraph Message
        T[Topic]
        TAG[Tag]
        KEY[Key]
        BODY[Body]
        PROP[Properties]
    end
    
    subgraph 系统属性
        MID[MessageId]
        FLAG[Flag]
        WAIT[WAIT_STORE_MSG_OK]
    end

2.2 消息属性

属性说明必填
Topic消息主题
Tag消息标签
Key消息业务键
Body消息体
Properties扩展属性

2.3 消息构建

// 1. 简单消息
Message msg = new Message("topic", "body".getBytes());

// 2. 带 Tag 的消息
Message msg = new Message("topic", "tag", "body".getBytes());

// 3. 带 Key 的消息(用于事务追踪)
Message msg = new Message("topic", "tag", "order_123", "body".getBytes());

// 4. 带扩展属性的消息
Message msg = new Message("topic", "tag", "key", "body".getBytes());
msg.putUserProperty("delayLevel", "3");
msg.putUserProperty("shardingKey", "user_456");

三、发送方式

3.1 同步发送

等待 Broker 响应,可靠性最高:

// 同步发送
SendResult sendResult = producer.send(msg);

// 处理结果
switch (sendResult.getSendStatus()) {
    case SEND_OK:
        System.out.println("发送成功");
        break;
    case FLUSH_DISK_TIMEOUT:
        System.out.println("刷盘超时");
        break;
    case FLUSH_SLAVE_TIMEOUT:
        System.out.println("同步从节点超时");
        break;
    case SLAVE_NOT_AVAILABLE:
        System.out.println("从节点不可用");
        break;
}

适用场景:重要通知、短信通知、金融交易

3.2 异步发送

不等待响应,通过回调处理结果:

// 异步发送
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.printf("发送成功:msgId=%s%n", sendResult.getMsgId());
    }
    
    @Override
    public void onException(Throwable e) {
        System.err.println("发送失败:" + e.getMessage());
        // 可以重试或记录日志
    }
});

适用场景:日志收集、异步通知、耗时不敏感

3.3 单向发送

不等待响应,无回调,性能最高:

// 单向发送
producer.sendOneway(msg);

适用场景:日志采集、监控数据、可丢失的消息

3.4 对比

发送方式吞吐量可靠性延迟适用场景
同步重要业务
异步一般业务
单向日志监控

四、发送可靠性

4.1 重试机制

// 配置重试
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);

4.2 超时控制

// 超时配置
producer.setSendMsgTimeout(3000); // 3 秒超时

4.3 VIP 通道

// 关闭 VIP 通道(避免某些网络问题)
producer.setVipChannelEnabled(false);

五、批量发送

5.1 简单批量

// 构建批量消息
List<Message> messages = Arrays.asList(
    new Message("topic", "tag", "msg1".getBytes()),
    new Message("topic", "tag", "msg2".getBytes()),
    new Message("topic", "tag", "msg3".getBytes())
);

// 批量发送
SendResult sendResult = producer.send(messages);

5.2 分批发送

// 超过 4MB 需要分批
public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024; // 1MB
    private final List<Message> messages;
    private int currIdx;
    
    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }
    
    @Override
    public boolean hasNext() {
        return currIdx < messages.size();
    }
    
    @Override
    public List<Message> next() {
        int startIdx = currIdx;
        int size = 0;
        
        while (currIdx < messages.size()) {
            Message msg = messages.get(currIdx);
            int msgSize = msg.getBody().length + 100; // 估算
            
            if (size + msgSize > SIZE_LIMIT) {
                break;
            }
            
            size += msgSize;
            currIdx++;
        }
        
        return messages.subList(startIdx, currIdx);
    }
}

// 使用
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
    List<Message> batch = splitter.next();
    producer.send(batch);
}

六、消息路由

6.1 队列选择

// MessageQueueSelector 实现
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // 根据订单 ID 选择队列(保证顺序)
        Long orderId = (Long) arg;
        int index = (int) (orderId % mqs.size());
        return mqs.get(index);
    }
}, orderId);

6.2 延迟消息

// 设置延迟级别
Message msg = new Message("topic", "body".getBytes());
msg.setDelayTimeLevel(3); // 10 秒

// 延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
SendResult result = producer.send(msg);

6.3 顺序消息

// 发送顺序消息
SendCallback sendCallback = new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("顺序消息发送成功");
    }
    
    @Override
    public void onException(Throwable e) {
        System.out.println("顺序消息发送失败");
    }
};

producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // 保证同一订单发送到同一队列
        return mqs.get(((Long) arg).intValue() % mqs.size());
    }
}, orderId, sendCallback);

七、事务消息

7.1 事务消息流程

sequenceDiagram
    participant P as Producer
    participant B as Broker
    participant L as 本地事务
    
    P->>B: 发送半消息
    B-->>P: 返回 PREPARE
    P->>L: 执行本地事务
    L-->>P: 返回结果
    P->>B: 提交/回滚
    B-->>P: 确认结果

7.2 事务消息实现

// 1. 创建事务生产者
TransactionListener transactionListener = new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        try {
            // 业务逻辑
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 回查本地事务状态
        return LocalTransactionState.COMMIT_MESSAGE;
    }
};

TransactionMQProducer producer = new TransactionMQProducer("transaction-group");
producer.setTransactionListener(transactionListener);
producer.start();

// 2. 发送事务消息
Message msg = new Message("topic", "body".getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);

八、性能优化

8.1 配置优化

// 1. 增加发送线程池
producer.setSendMsgTimeout(3000);
producer.setRetryTimesWhenSendFailed(3);

// 2. 调整批次大小
// 在 Broker 端配置
// sendMessageThreadPoolNums=4
// useDefaultAsyncSendMessage=true

8.2 使用建议

场景发送方式重试次数超时时间
金融交易同步33000ms
订单通知同步22000ms
日志收集单向01000ms
异步通知异步22000ms

九、常见问题排查

9.1 发送超时

原因

解决

producer.setSendMsgTimeout(5000);
producer.setRetryTimesWhenSendFailed(5);

9.2 消息重复

原因

解决

9.3 发送失败

原因

解决

// 1. 检查 NameServer 连接
producer.setNamesrvAddr("ns1:9876;ns2:9876");

// 2. 自动创建 Topic
producer.setCreateTopicKey("topic");

// 3. 配置多个 Broker

总结

RocketMQ Producer 的核心机制:

  1. 发送方式:同步、异步、单向,根据场景选择
  2. 消息结构:Topic、Tag、Key、Body
  3. 可靠性保证:重试机制、超时控制
  4. 高级特性:批量发送、延迟消息、顺序消息、事务消息
  5. 性能优化:配置调优、使用建议

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 命名空间详解与多租户实践
下一篇文章
RAG 性能优化实战指南