RocketMQ 客户端提供了丰富的高级特性,满足各种复杂业务场景。本文将深入探讨客户端的高级用法和最佳实践。
一、消息发送高级用法
1.1 批量发送
简单批量:
public class BatchProducer {
private final DefaultMQProducer producer;
public BatchProducer() throws MQClientException {
producer = new DefaultMQProducer("batch-producer");
producer.setNamesrvAddr("ns1:9876");
producer.start();
}
/**
* 批量发送消息
*/
public void sendBatch(List<String> bodies) throws Exception {
List<Message> messages = new ArrayList<>();
for (String body : bodies) {
Message msg = new Message("batch-topic", "batch", body.getBytes());
messages.add(msg);
}
// 批量发送
SendResult result = producer.send(messages);
log.info("批量发送成功:msgId={}", result.getMsgId());
}
}
分批发送:
public class BatchSplitProducer {
private final DefaultMQProducer producer;
/**
* 分批发送(超过 4MB 需要分批)
*/
public void sendLargeBatch(List<Message> allMessages) throws Exception {
ListSplitter splitter = new ListSplitter(allMessages);
while (splitter.hasNext()) {
List<Message> batch = splitter.next();
try {
SendResult result = producer.send(batch);
log.info("分批发送成功:size={}", batch.size());
} catch (Exception e) {
log.error("分批发送失败", e);
// 重试或记录失败
}
}
}
}
// 消息分割器
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 4 * 1024 * 1024; // 4MB
private final List<Message> messages;
private int currIdx;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIdx < messages.size();
}
@Override
public List<Message> next() {
int startIdx = currIdx;
int size = 0;
while (currIdx < messages.size()) {
Message msg = messages.get(currIdx);
int msgSize = msg.getBody().length + 100; // body + header
if (size + msgSize > SIZE_LIMIT) {
break;
}
size += msgSize;
currIdx++;
}
return messages.subList(startIdx, currIdx);
}
}
1.2 异步发送
回调方式:
public class AsyncProducer {
private final DefaultMQProducer producer;
private final AtomicInteger successCount = new AtomicInteger();
private final AtomicInteger failCount = new AtomicInteger();
public AsyncProducer() throws MQClientException {
producer = new DefaultMQProducer("async-producer");
producer.setNamesrvAddr("ns1:9876");
producer.start();
}
/**
* 异步发送
*/
public void sendAsync(Message message) throws Exception {
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
successCount.incrementAndGet();
log.info("发送成功:msgId={}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
failCount.incrementAndGet();
log.error("发送失败", e);
// 重试或记录失败
}
});
}
/**
* 批量异步发送
*/
public void sendBatchAsync(List<Message> messages) throws Exception {
CountDownLatch latch = new CountDownLatch(messages.size());
for (Message msg : messages) {
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
latch.countDown();
}
@Override
public void onException(Throwable e) {
latch.countDown();
log.error("发送失败", e);
}
});
}
// 等待所有发送完成
latch.await(30, TimeUnit.SECONDS);
log.info("批量异步发送完成:成功={}, 失败={}",
successCount.get(), failCount.get());
}
}
1.3 单向发送
适用场景:
public class OnewayProducer {
private final DefaultMQProducer producer;
public OnewayProducer() throws MQClientException {
producer = new DefaultMQProducer("oneway-producer");
producer.setNamesrvAddr("ns1:9876");
producer.start();
}
/**
* 单向发送(日志场景)
*/
public void sendLog(String logMessage) throws Exception {
Message msg = new Message("log-topic", "log", logMessage.getBytes());
// 单向发送,不等待响应
producer.sendOneway(msg);
}
}
二、消息消费高级用法
2.1 并发消费
配置并发度:
public class ConcurrentConsumer {
private final DefaultMQPushConsumer consumer;
public ConcurrentConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer("concurrent-consumer");
consumer.setNamesrvAddr("ns1:9876");
consumer.subscribe("concurrent-topic", "*");
// 配置并发度
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
// 并发处理
processMessage(msg);
} catch (Exception e) {
log.error("处理失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
2.2 顺序消费
顺序消费实现:
public class OrderlyConsumer {
private final DefaultMQPushConsumer consumer;
public OrderlyConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer("orderly-consumer");
consumer.setNamesrvAddr("ns1:9876");
consumer.subscribe("orderly-topic", "*");
// 注册顺序消费监听器
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
// 顺序处理
processOrderly(msg);
} catch (Exception e) {
log.error("顺序处理失败", e);
// 返回重试,保持顺序
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
}
}
2.3 批量消费
public class BatchConsumer {
private final DefaultMQPushConsumer consumer;
public BatchConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer("batch-consumer");
consumer.setNamesrvAddr("ns1:9876");
consumer.setPullBatchSize(64); // 批量拉取 64 条
consumer.subscribe("batch-topic", "*");
consumer.registerMessageListener((msgs, context) -> {
// 批量处理
List<Order> orders = new ArrayList<>();
for (MessageExt msg : msgs) {
Order order = parseOrder(msg);
orders.add(order);
}
// 批量插入数据库
orderMapper.batchInsert(orders);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
三、消息过滤
3.1 Tag 过滤
public class TagFilterConsumer {
private final DefaultMQPushConsumer consumer;
public TagFilterConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer("tag-filter-consumer");
consumer.setNamesrvAddr("ns1:9876");
// 订阅单个 Tag
consumer.subscribe("order-topic", "pay");
// 订阅多个 Tag(OR 关系)
// consumer.subscribe("order-topic", "pay || ship || cancel");
// 订阅所有 Tag
// consumer.subscribe("order-topic", "*");
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
log.info("收到消息:tag={}", msg.getTags());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
3.2 SQL92 过滤
public class SqlFilterConsumer {
private final DefaultMQPushConsumer consumer;
public SqlFilterConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer("sql-filter-consumer");
consumer.setNamesrvAddr("ns1:9876");
// SQL92 过滤
consumer.subscribe("order-topic", MessageSelector.bySql(
"amount > 100 AND province = 'beijing'"
));
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String amount = msg.getUserProperty("amount");
String province = msg.getUserProperty("province");
log.info("过滤后的消息:amount={}, province={}", amount, province);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
四、事务消息
4.1 事务监听器
public class TransactionListenerImpl implements TransactionListener {
@Autowired
private OrderService orderService;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 1. 解析消息
Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
// 2. 执行本地事务
orderService.createOrder(order);
// 3. 返回提交
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("执行本地事务失败", e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 1. 解析消息
Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
try {
// 2. 查询本地事务状态
Order dbOrder = orderService.queryOrder(order.getId());
if (dbOrder != null && dbOrder.getStatus() == OrderStatus.CREATED) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
log.error("回查事务状态失败", e);
return LocalTransactionState.UNKNOW;
}
}
}
4.2 事务生产者
public class TransactionProducer {
private final TransactionMQProducer producer;
public TransactionProducer() throws MQClientException {
producer = new TransactionMQProducer("transaction-producer");
producer.setNamesrvAddr("ns1:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
// 配置线程池
ExecutorService executorService = new ThreadPoolExecutor(
2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
new ThreadFactoryImpl("transaction-thread")
);
producer.setExecutorService(executorService);
producer.start();
}
/**
* 发送事务消息
*/
public void sendTransactionMessage(Order order) throws Exception {
Message msg = new Message("order-topic", "create",
JSON.toJSONString(order).getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, order);
switch (result.getSendStatus()) {
case SEND_OK:
log.info("事务消息发送成功");
break;
case SEND_MESSAGE_ILLEGAL:
log.error("消息非法");
break;
default:
log.error("发送失败:{}", result.getSendStatus());
}
}
}
五、延迟消息
5.1 发送延迟消息
public class DelayProducer {
private final DefaultMQProducer producer;
public DelayProducer() throws MQClientException {
producer = new DefaultMQProducer("delay-producer");
producer.setNamesrvAddr("ns1:9876");
producer.start();
}
/**
* 发送延迟消息
*/
public void sendDelayMessage(String topic, String body, int delayLevel) throws Exception {
Message msg = new Message(topic, body.getBytes());
// 设置延迟级别(1-18)
msg.setDelayTimeLevel(delayLevel);
SendResult result = producer.send(msg);
log.info("延迟消息发送成功:msgId={}", result.getMsgId());
}
/**
* 发送订单超时取消消息
*/
public void sendOrderTimeoutMessage(Long orderId, long timeoutMinutes) throws Exception {
OrderTimeoutMessage msg = new OrderTimeoutMessage();
msg.setOrderId(orderId);
msg.setTimeoutTime(System.currentTimeMillis() + timeoutMinutes * 60 * 1000);
Message message = new Message(
"order-timeout-topic",
"timeout",
orderId.toString(),
JSON.toJSONString(msg).getBytes()
);
// 根据超时时间选择延迟级别
int delayLevel = calculateDelayLevel(timeoutMinutes);
message.setDelayTimeLevel(delayLevel);
producer.send(message);
}
private int calculateDelayLevel(long minutes) {
if (minutes <= 1) return 1; // 1s
if (minutes <= 5) return 2; // 5s
if (minutes <= 10) return 3; // 10s
if (minutes <= 30) return 4; // 30s
if (minutes <= 60) return 5; // 1m
if (minutes <= 120) return 6; // 2m
if (minutes <= 180) return 7; // 3m
if (minutes <= 240) return 8; // 4m
if (minutes <= 300) return 9; // 5m
if (minutes <= 360) return 10; // 6m
if (minutes <= 420) return 11; // 7m
if (minutes <= 480) return 12; // 8m
if (minutes <= 540) return 13; // 9m
if (minutes <= 600) return 14; // 10m
if (minutes <= 1200) return 15; // 20m
if (minutes <= 1800) return 16; // 30m
if (minutes <= 3600) return 17; // 1h
return 18; // 2h
}
}
六、最佳实践
6.1 Producer 配置
public class ProducerConfig {
public static DefaultMQProducer createProducer(String group) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer(group);
producer.setNamesrvAddr("ns1:9876");
// 性能配置
producer.setSendMsgTimeout(5000);
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
// 关闭 VIP 通道
producer.setVipChannelEnabled(false);
return producer;
}
}
6.2 Consumer 配置
public class ConsumerConfig {
public static DefaultMQPushConsumer createConsumer(String group, String topic)
throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr("ns1:9876");
consumer.subscribe(topic, "*");
// 并发配置
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 拉取配置
consumer.setPullBatchSize(32);
consumer.setPullThresholdForQueue(200);
return consumer;
}
}
6.3 错误处理
public class MessageErrorHandler {
/**
* 处理消费失败
*/
public ConsumeConcurrentlyStatus handleFailedMessage(MessageExt msg, Exception e) {
// 1. 记录错误日志
log.error("消费失败:msgId={}", msg.getMsgId(), e);
// 2. 保存失败消息
saveFailedMessage(msg, e);
// 3. 发送告警
if (isCriticalError(e)) {
sendAlert("消息消费失败:msgId=" + msg.getMsgId());
}
// 4. 返回重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
private void saveFailedMessage(MessageExt msg, Exception e) {
// 保存到失败队列或数据库
}
private boolean isCriticalError(Exception e) {
// 判断是否是严重错误
return e instanceof BusinessException;
}
}
总结
RocketMQ 客户端高级用法的核心要点:
- 消息发送:批量、异步、单向
- 消息消费:并发、顺序、批量
- 消息过滤:Tag、SQL92
- 事务消息:事务监听器、回查机制
- 延迟消息:延迟级别、超时取消
核心要点:
- 根据场景选择合适的发送方式
- 合理配置并发度
- 实现消息过滤
- 处理事务和延迟消息
参考资料
- RocketMQ 客户端官方文档
- RocketMQ 示例代码
- 《RocketMQ 技术内幕》第 7 章