Skip to content
清晨的一缕阳光
返回

RocketMQ 客户端高级用法详解

RocketMQ 客户端提供了丰富的高级特性,满足各种复杂业务场景。本文将深入探讨客户端的高级用法和最佳实践。

一、消息发送高级用法

1.1 批量发送

简单批量

public class BatchProducer {
    
    private final DefaultMQProducer producer;
    
    public BatchProducer() throws MQClientException {
        producer = new DefaultMQProducer("batch-producer");
        producer.setNamesrvAddr("ns1:9876");
        producer.start();
    }
    
    /**
     * 批量发送消息
     */
    public void sendBatch(List<String> bodies) throws Exception {
        List<Message> messages = new ArrayList<>();
        
        for (String body : bodies) {
            Message msg = new Message("batch-topic", "batch", body.getBytes());
            messages.add(msg);
        }
        
        // 批量发送
        SendResult result = producer.send(messages);
        log.info("批量发送成功:msgId={}", result.getMsgId());
    }
}

分批发送

public class BatchSplitProducer {
    
    private final DefaultMQProducer producer;
    
    /**
     * 分批发送(超过 4MB 需要分批)
     */
    public void sendLargeBatch(List<Message> allMessages) throws Exception {
        ListSplitter splitter = new ListSplitter(allMessages);
        
        while (splitter.hasNext()) {
            List<Message> batch = splitter.next();
            
            try {
                SendResult result = producer.send(batch);
                log.info("分批发送成功:size={}", batch.size());
            } catch (Exception e) {
                log.error("分批发送失败", e);
                // 重试或记录失败
            }
        }
    }
}

// 消息分割器
public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 4 * 1024 * 1024;  // 4MB
    private final List<Message> messages;
    private int currIdx;
    
    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }
    
    @Override
    public boolean hasNext() {
        return currIdx < messages.size();
    }
    
    @Override
    public List<Message> next() {
        int startIdx = currIdx;
        int size = 0;
        
        while (currIdx < messages.size()) {
            Message msg = messages.get(currIdx);
            int msgSize = msg.getBody().length + 100;  // body + header
            
            if (size + msgSize > SIZE_LIMIT) {
                break;
            }
            
            size += msgSize;
            currIdx++;
        }
        
        return messages.subList(startIdx, currIdx);
    }
}

1.2 异步发送

回调方式

public class AsyncProducer {
    
    private final DefaultMQProducer producer;
    private final AtomicInteger successCount = new AtomicInteger();
    private final AtomicInteger failCount = new AtomicInteger();
    
    public AsyncProducer() throws MQClientException {
        producer = new DefaultMQProducer("async-producer");
        producer.setNamesrvAddr("ns1:9876");
        producer.start();
    }
    
    /**
     * 异步发送
     */
    public void sendAsync(Message message) throws Exception {
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                successCount.incrementAndGet();
                log.info("发送成功:msgId={}", sendResult.getMsgId());
            }
            
            @Override
            public void onException(Throwable e) {
                failCount.incrementAndGet();
                log.error("发送失败", e);
                // 重试或记录失败
            }
        });
    }
    
    /**
     * 批量异步发送
     */
    public void sendBatchAsync(List<Message> messages) throws Exception {
        CountDownLatch latch = new CountDownLatch(messages.size());
        
        for (Message msg : messages) {
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    latch.countDown();
                }
                
                @Override
                public void onException(Throwable e) {
                    latch.countDown();
                    log.error("发送失败", e);
                }
            });
        }
        
        // 等待所有发送完成
        latch.await(30, TimeUnit.SECONDS);
        
        log.info("批量异步发送完成:成功={}, 失败={}", 
            successCount.get(), failCount.get());
    }
}

1.3 单向发送

适用场景

public class OnewayProducer {
    
    private final DefaultMQProducer producer;
    
    public OnewayProducer() throws MQClientException {
        producer = new DefaultMQProducer("oneway-producer");
        producer.setNamesrvAddr("ns1:9876");
        producer.start();
    }
    
    /**
     * 单向发送(日志场景)
     */
    public void sendLog(String logMessage) throws Exception {
        Message msg = new Message("log-topic", "log", logMessage.getBytes());
        
        // 单向发送,不等待响应
        producer.sendOneway(msg);
    }
}

二、消息消费高级用法

2.1 并发消费

配置并发度

public class ConcurrentConsumer {
    
    private final DefaultMQPushConsumer consumer;
    
    public ConcurrentConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer("concurrent-consumer");
        consumer.setNamesrvAddr("ns1:9876");
        consumer.subscribe("concurrent-topic", "*");
        
        // 配置并发度
        consumer.setConsumeThreadMin(20);
        consumer.setConsumeThreadMax(64);
        
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                try {
                    // 并发处理
                    processMessage(msg);
                } catch (Exception e) {
                    log.error("处理失败", e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        consumer.start();
    }
}

2.2 顺序消费

顺序消费实现

public class OrderlyConsumer {
    
    private final DefaultMQPushConsumer consumer;
    
    public OrderlyConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer("orderly-consumer");
        consumer.setNamesrvAddr("ns1:9876");
        consumer.subscribe("orderly-topic", "*");
        
        // 注册顺序消费监听器
        consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                try {
                    // 顺序处理
                    processOrderly(msg);
                } catch (Exception e) {
                    log.error("顺序处理失败", e);
                    // 返回重试,保持顺序
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });
        
        consumer.start();
    }
}

2.3 批量消费

public class BatchConsumer {
    
    private final DefaultMQPushConsumer consumer;
    
    public BatchConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer("batch-consumer");
        consumer.setNamesrvAddr("ns1:9876");
        consumer.setPullBatchSize(64);  // 批量拉取 64 条
        consumer.subscribe("batch-topic", "*");
        
        consumer.registerMessageListener((msgs, context) -> {
            // 批量处理
            List<Order> orders = new ArrayList<>();
            
            for (MessageExt msg : msgs) {
                Order order = parseOrder(msg);
                orders.add(order);
            }
            
            // 批量插入数据库
            orderMapper.batchInsert(orders);
            
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        consumer.start();
    }
}

三、消息过滤

3.1 Tag 过滤

public class TagFilterConsumer {
    
    private final DefaultMQPushConsumer consumer;
    
    public TagFilterConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer("tag-filter-consumer");
        consumer.setNamesrvAddr("ns1:9876");
        
        // 订阅单个 Tag
        consumer.subscribe("order-topic", "pay");
        
        // 订阅多个 Tag(OR 关系)
        // consumer.subscribe("order-topic", "pay || ship || cancel");
        
        // 订阅所有 Tag
        // consumer.subscribe("order-topic", "*");
        
        consumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                log.info("收到消息:tag={}", msg.getTags());
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        consumer.start();
    }
}

3.2 SQL92 过滤

public class SqlFilterConsumer {
    
    private final DefaultMQPushConsumer consumer;
    
    public SqlFilterConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer("sql-filter-consumer");
        consumer.setNamesrvAddr("ns1:9876");
        
        // SQL92 过滤
        consumer.subscribe("order-topic", MessageSelector.bySql(
            "amount > 100 AND province = 'beijing'"
        ));
        
        consumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                String amount = msg.getUserProperty("amount");
                String province = msg.getUserProperty("province");
                log.info("过滤后的消息:amount={}, province={}", amount, province);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        consumer.start();
    }
}

四、事务消息

4.1 事务监听器

public class TransactionListenerImpl implements TransactionListener {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 1. 解析消息
            Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
            
            // 2. 执行本地事务
            orderService.createOrder(order);
            
            // 3. 返回提交
            return LocalTransactionState.COMMIT_MESSAGE;
            
        } catch (Exception e) {
            log.error("执行本地事务失败", e);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 1. 解析消息
        Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
        
        try {
            // 2. 查询本地事务状态
            Order dbOrder = orderService.queryOrder(order.getId());
            
            if (dbOrder != null && dbOrder.getStatus() == OrderStatus.CREATED) {
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            log.error("回查事务状态失败", e);
            return LocalTransactionState.UNKNOW;
        }
    }
}

4.2 事务生产者

public class TransactionProducer {
    
    private final TransactionMQProducer producer;
    
    public TransactionProducer() throws MQClientException {
        producer = new TransactionMQProducer("transaction-producer");
        producer.setNamesrvAddr("ns1:9876");
        
        // 设置事务监听器
        producer.setTransactionListener(new TransactionListenerImpl());
        
        // 配置线程池
        ExecutorService executorService = new ThreadPoolExecutor(
            2, 5, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2000),
            new ThreadFactoryImpl("transaction-thread")
        );
        producer.setExecutorService(executorService);
        
        producer.start();
    }
    
    /**
     * 发送事务消息
     */
    public void sendTransactionMessage(Order order) throws Exception {
        Message msg = new Message("order-topic", "create", 
            JSON.toJSONString(order).getBytes());
        
        TransactionSendResult result = producer.sendMessageInTransaction(msg, order);
        
        switch (result.getSendStatus()) {
            case SEND_OK:
                log.info("事务消息发送成功");
                break;
            case SEND_MESSAGE_ILLEGAL:
                log.error("消息非法");
                break;
            default:
                log.error("发送失败:{}", result.getSendStatus());
        }
    }
}

五、延迟消息

5.1 发送延迟消息

public class DelayProducer {
    
    private final DefaultMQProducer producer;
    
    public DelayProducer() throws MQClientException {
        producer = new DefaultMQProducer("delay-producer");
        producer.setNamesrvAddr("ns1:9876");
        producer.start();
    }
    
    /**
     * 发送延迟消息
     */
    public void sendDelayMessage(String topic, String body, int delayLevel) throws Exception {
        Message msg = new Message(topic, body.getBytes());
        
        // 设置延迟级别(1-18)
        msg.setDelayTimeLevel(delayLevel);
        
        SendResult result = producer.send(msg);
        log.info("延迟消息发送成功:msgId={}", result.getMsgId());
    }
    
    /**
     * 发送订单超时取消消息
     */
    public void sendOrderTimeoutMessage(Long orderId, long timeoutMinutes) throws Exception {
        OrderTimeoutMessage msg = new OrderTimeoutMessage();
        msg.setOrderId(orderId);
        msg.setTimeoutTime(System.currentTimeMillis() + timeoutMinutes * 60 * 1000);
        
        Message message = new Message(
            "order-timeout-topic",
            "timeout",
            orderId.toString(),
            JSON.toJSONString(msg).getBytes()
        );
        
        // 根据超时时间选择延迟级别
        int delayLevel = calculateDelayLevel(timeoutMinutes);
        message.setDelayTimeLevel(delayLevel);
        
        producer.send(message);
    }
    
    private int calculateDelayLevel(long minutes) {
        if (minutes <= 1) return 1;      // 1s
        if (minutes <= 5) return 2;      // 5s
        if (minutes <= 10) return 3;     // 10s
        if (minutes <= 30) return 4;     // 30s
        if (minutes <= 60) return 5;     // 1m
        if (minutes <= 120) return 6;    // 2m
        if (minutes <= 180) return 7;    // 3m
        if (minutes <= 240) return 8;    // 4m
        if (minutes <= 300) return 9;    // 5m
        if (minutes <= 360) return 10;   // 6m
        if (minutes <= 420) return 11;   // 7m
        if (minutes <= 480) return 12;   // 8m
        if (minutes <= 540) return 13;   // 9m
        if (minutes <= 600) return 14;   // 10m
        if (minutes <= 1200) return 15;  // 20m
        if (minutes <= 1800) return 16;  // 30m
        if (minutes <= 3600) return 17;  // 1h
        return 18;                        // 2h
    }
}

六、最佳实践

6.1 Producer 配置

public class ProducerConfig {
    
    public static DefaultMQProducer createProducer(String group) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(group);
        producer.setNamesrvAddr("ns1:9876");
        
        // 性能配置
        producer.setSendMsgTimeout(5000);
        producer.setRetryTimesWhenSendFailed(3);
        producer.setRetryTimesWhenSendAsyncFailed(3);
        
        // 关闭 VIP 通道
        producer.setVipChannelEnabled(false);
        
        return producer;
    }
}

6.2 Consumer 配置

public class ConsumerConfig {
    
    public static DefaultMQPushConsumer createConsumer(String group, String topic) 
            throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
        consumer.setNamesrvAddr("ns1:9876");
        consumer.subscribe(topic, "*");
        
        // 并发配置
        consumer.setConsumeThreadMin(20);
        consumer.setConsumeThreadMax(64);
        
        // 拉取配置
        consumer.setPullBatchSize(32);
        consumer.setPullThresholdForQueue(200);
        
        return consumer;
    }
}

6.3 错误处理

public class MessageErrorHandler {
    
    /**
     * 处理消费失败
     */
    public ConsumeConcurrentlyStatus handleFailedMessage(MessageExt msg, Exception e) {
        // 1. 记录错误日志
        log.error("消费失败:msgId={}", msg.getMsgId(), e);
        
        // 2. 保存失败消息
        saveFailedMessage(msg, e);
        
        // 3. 发送告警
        if (isCriticalError(e)) {
            sendAlert("消息消费失败:msgId=" + msg.getMsgId());
        }
        
        // 4. 返回重试
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    
    private void saveFailedMessage(MessageExt msg, Exception e) {
        // 保存到失败队列或数据库
    }
    
    private boolean isCriticalError(Exception e) {
        // 判断是否是严重错误
        return e instanceof BusinessException;
    }
}

总结

RocketMQ 客户端高级用法的核心要点:

  1. 消息发送:批量、异步、单向
  2. 消息消费:并发、顺序、批量
  3. 消息过滤:Tag、SQL92
  4. 事务消息:事务监听器、回查机制
  5. 延迟消息:延迟级别、超时取消

核心要点

参考资料


分享这篇文章到:

上一篇文章
以日为鉴:少子化浪潮下的教师过剩危机与破局之道
下一篇文章
Prompt 测试框架详解