RocketMQ 支持批量消息发送和消费,可以显著提升系统吞吐量。本文将深入探讨批量消息的实现原理和实战应用。
一、批量消息基础
1.1 为什么需要批量消息?
场景:
日志系统:每秒产生 1 万条日志
订单系统:批量导入 1000 个订单
数据同步:批量同步数据库记录
问题:
单条发送:1 万条消息 = 1 万次网络请求
批量发送:1 万条消息 = 100 次网络请求(每批 100 条)
性能提升:100 倍
1.2 批量限制
| 限制项 | 默认值 | 说明 |
|---|---|---|
| 单次最大条数 | 128 | 一次批量最多 128 条 |
| 单次最大大小 | 4MB | 批量消息总大小 |
| 发送超时 | 3 秒 | 批量发送超时时间 |
二、批量发送
2.1 简单批量发送
// 1. 构建批量消息
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Message msg = new Message(
"batch-topic",
"batch",
"key_" + i,
("body_" + i).getBytes()
);
messages.add(msg);
}
// 2. 批量发送
SendResult sendResult = producer.send(messages);
System.out.println("批量发送成功:" + sendResult.getMsgId());
2.2 批量发送限制
大小限制:
// 批量消息总大小不能超过 4MB
public void sendLargeBatch() {
List<Message> messages = new ArrayList<>();
int totalSize = 0;
final int MAX_SIZE = 4 * 1024 * 1024; // 4MB
for (int i = 0; i < 1000; i++) {
byte[] body = new byte[1024]; // 1KB
Message msg = new Message("topic", "tag", body);
// 估算大小
int msgSize = body.length + 100; // body + header
if (totalSize + msgSize > MAX_SIZE) {
break; // 超过限制
}
messages.add(msg);
totalSize += msgSize;
}
producer.send(messages);
}
分批发送:
// 超过限制需要分批发送
public void sendBatchInChunks(List<Message> allMessages) {
final int BATCH_SIZE = 128; // 每批最多 128 条
for (int i = 0; i < allMessages.size(); i += BATCH_SIZE) {
int end = Math.min(i + BATCH_SIZE, allMessages.size());
List<Message> batch = allMessages.subList(i, end);
try {
producer.send(batch);
log.info("批量发送成功:{} - {}", i, end);
} catch (Exception e) {
log.error("批量发送失败:{} - {}", i, end, e);
// 重试或记录失败
}
}
}
2.3 批量发送工具类
public class BatchMessageSender {
private final DefaultMQProducer producer;
private final int batchSize;
private final long flushIntervalMs;
private List<Message> buffer = new ArrayList<>();
private long lastFlushTime = System.currentTimeMillis();
public BatchMessageSender(DefaultMQProducer producer, int batchSize, long flushIntervalMs) {
this.producer = producer;
this.batchSize = batchSize;
this.flushIntervalMs = flushIntervalMs;
}
/**
* 添加消息到缓冲区
*/
public synchronized void addMessage(Message message) {
buffer.add(message);
// 达到批量条件
if (buffer.size() >= batchSize ||
System.currentTimeMillis() - lastFlushTime >= flushIntervalMs) {
flush();
}
}
/**
* 强制刷新
*/
public synchronized void flush() {
if (buffer.isEmpty()) {
return;
}
try {
List<Message> batch = new ArrayList<>(buffer);
producer.send(batch);
buffer.clear();
lastFlushTime = System.currentTimeMillis();
} catch (Exception e) {
log.error("批量发送失败", e);
}
}
/**
* 关闭
*/
public void close() {
flush(); // 刷新剩余消息
}
}
// 使用示例
BatchMessageSender sender = new BatchMessageSender(producer, 100, 1000);
for (int i = 0; i < 10000; i++) {
Message msg = new Message("topic", "body".getBytes());
sender.addMessage(msg);
}
sender.close();
三、批量消费
3.1 并发批量消费
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println("收到批量消息:" + msgs.size() + " 条");
// 批量处理
for (MessageExt msg : msgs) {
try {
processMessage(msg);
} catch (Exception e) {
log.error("处理消息失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
3.2 批量数据库操作
consumer.registerMessageListener((msgs, context) -> {
// 1. 解析消息
List<Order> orders = new ArrayList<>();
for (MessageExt msg : msgs) {
Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
orders.add(order);
}
// 2. 批量插入数据库
try {
orderMapper.batchInsert(orders);
log.info("批量插入成功:{} 条", orders.size());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("批量插入失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// Mapper 批量插入
@Insert("<script>" +
"INSERT INTO orders (order_id, user_id, amount) VALUES " +
"<foreach item='order' collection='list' separator=','>" +
"(#{order.orderId}, #{order.userId}, #{order.amount})" +
"</foreach>" +
"</script>")
int batchInsert(@Param("list") List<Order> orders);
3.3 批量外部调用
consumer.registerMessageListener((msgs, context) -> {
// 1. 收集请求
List<NotifyRequest> requests = new ArrayList<>();
for (MessageExt msg : msgs) {
NotifyRequest req = parseMessage(msg);
requests.add(req);
}
// 2. 批量调用外部接口
try {
NotifyResponse response = httpClient.batchNotify(requests);
if (response.isSuccess()) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
四、性能优化
4.1 Producer 优化
// 批量发送配置
Properties props = new Properties();
// 增加批次大小
props.put("maxMessageSize", 4194304"); // 4MB
// 异步发送
producer.setSendMsgTimeout(5000);
// 重试配置
producer.setRetryTimesWhenSendFailed(3);
4.2 Consumer 优化
// 拉取配置
consumer.setPullBatchSize(64); // 每次拉取 64 条
consumer.setConsumeThreadMin(20); // 最小消费线程
consumer.setConsumeThreadMax(64); // 最大消费线程
// 流控配置
consumer.setPullThresholdForQueue(200); // 队列消息数阈值
consumer.setPullThresholdSizeForQueue(100); // 队列大小阈值
4.3 批量处理优化
分批处理:
public void processBatch(List<MessageExt> msgs) {
final int SUB_BATCH_SIZE = 50;
// 分成更小的批次处理
for (int i = 0; i < msgs.size(); i += SUB_BATCH_SIZE) {
int end = Math.min(i + SUB_BATCH_SIZE, msgs.size());
List<MessageExt> subBatch = msgs.subList(i, end);
processSubBatch(subBatch);
}
}
并行处理:
public void processBatchParallel(List<MessageExt> msgs) {
// 使用并行流处理
msgs.parallelStream().forEach(msg -> {
try {
processMessage(msg);
} catch (Exception e) {
log.error("处理消息失败", e);
throw new RuntimeException(e);
}
});
}
异步处理:
private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void processBatchAsync(List<MessageExt> msgs) {
List<Future<?>> futures = new ArrayList<>();
for (MessageExt msg : msgs) {
futures.add(executor.submit(() -> processMessage(msg)));
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
log.error("异步处理失败", e);
}
}
}
五、实战案例
5.1 日志批量上报
public class LogBatchReporter {
private final DefaultMQProducer producer;
private final BlockingQueue<LogEntry> queue = new LinkedBlockingQueue<>(10000);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public LogBatchReporter(DefaultMQProducer producer) {
this.producer = producer;
// 定时刷新
scheduler.scheduleAtFixedRate(this::flush, 1, 1, TimeUnit.SECONDS);
}
/**
* 添加日志
*/
public void addLog(LogEntry log) {
if (!queue.offer(log)) {
// 队列满,强制刷新
flush();
queue.offer(log);
}
}
/**
* 批量刷新
*/
public void flush() {
List<LogEntry> logs = new ArrayList<>();
queue.drainTo(logs, 100); // 最多 100 条
if (logs.isEmpty()) {
return;
}
// 构建批量消息
List<Message> messages = new ArrayList<>();
for (LogEntry log : logs) {
Message msg = new Message(
"log-topic",
log.getLevel(),
log.getService(),
JSON.toJSONString(log).getBytes()
);
messages.add(msg);
}
// 批量发送
try {
producer.send(messages);
} catch (Exception e) {
log.error("批量发送日志失败", e);
}
}
}
5.2 订单批量导入
@Service
public class OrderBatchImportService {
@Autowired
private DefaultMQProducer producer;
@Autowired
private OrderMapper orderMapper;
/**
* 批量导入订单
*/
@Transactional(rollbackFor = Exception.class)
public void batchImport(List<OrderImportDTO> orders) {
// 1. 分批处理(每批 100 条)
final int BATCH_SIZE = 100;
for (int i = 0; i < orders.size(); i += BATCH_SIZE) {
int end = Math.min(i + BATCH_SIZE, orders.size());
List<OrderImportDTO> batch = orders.subList(i, end);
// 2. 批量插入数据库
List<Order> orderList = batch.stream()
.map(this::convertToOrder)
.collect(Collectors.toList());
orderMapper.batchInsert(orderList);
// 3. 批量发送消息
List<Message> messages = batch.stream()
.map(this::convertToMessage)
.collect(Collectors.toList());
try {
producer.send(messages);
} catch (Exception e) {
log.error("批量发送消息失败", e);
throw new BusinessException("发送消息失败");
}
}
}
}
5.3 数据批量同步
public class DataBatchSyncService {
private final DefaultMQPushConsumer consumer;
private final JdbcTemplate jdbcTemplate;
public DataBatchSyncService() {
consumer = new DefaultMQPushConsumer("data-sync-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("data-sync-topic", "*");
// 批量消费配置
consumer.setPullBatchSize(64);
consumer.setConsumeThreadMax(32);
consumer.registerMessageListener(this::consumeBatch);
}
private ConsumeConcurrentlyStatus consumeBatch(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 1. 解析消息
List<DataSyncRecord> records = msgs.stream()
.map(this::parseMessage)
.collect(Collectors.toList());
// 2. 按表分组
Map<String, List<DataSyncRecord>> grouped = records.stream()
.collect(Collectors.groupingBy(DataSyncRecord::getTableName));
// 3. 批量同步
for (Map.Entry<String, List<DataSyncRecord>> entry : grouped.entrySet()) {
String table = entry.getKey();
List<DataSyncRecord> tableRecords = entry.getValue();
try {
batchSyncTable(table, tableRecords);
} catch (Exception e) {
log.error("同步表 {} 失败", table, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void batchSyncTable(String table, List<DataSyncRecord> records) {
// 批量 SQL
String sql = buildBatchSql(table, records);
jdbcTemplate.batchUpdate(sql, records);
}
}
六、监控告警
6.1 批量发送监控
@Component
public class BatchMessageMetrics {
private final MeterRegistry meterRegistry;
private final AtomicLong sendCount = new AtomicLong();
private final AtomicLong sendBatchCount = new AtomicLong();
private final AtomicLong sendFailedCount = new AtomicLong();
public BatchMessageMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
meterRegistry.gauge("batch.message.send.count", sendCount);
meterRegistry.gauge("batch.message.send.batch.count", sendBatchCount);
meterRegistry.gauge("batch.message.send.failed.count", sendFailedCount);
}
public void recordSend(int batchSize) {
sendCount.addAndGet(batchSize);
sendBatchCount.incrementAndGet();
}
public void recordFailed() {
sendFailedCount.incrementAndGet();
}
}
6.2 批量消费监控
@Component
public class BatchConsumeMetrics {
private final MeterRegistry meterRegistry;
public BatchConsumeMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public void recordConsume(List<MessageExt> msgs, long duration) {
// 消费数量
meterRegistry.counter("batch.consume.message.count")
.increment(msgs.size());
// 消费批次
meterRegistry.counter("batch.consume.batch.count")
.increment();
// 消费延迟
meterRegistry.timer("batch.consume.duration")
.record(duration, TimeUnit.MILLISECONDS);
// 批量大小分布
meterRegistry.summary("batch.consume.batch.size")
.record(msgs.size());
}
}
七、最佳实践
7.1 批量大小选择
| 场景 | 批量大小 | 说明 |
|---|---|---|
| 日志上报 | 100-200 | 小消息,大批量 |
| 订单处理 | 20-50 | 中等消息,中批量 |
| 数据同步 | 50-100 | 大消息,小批量 |
7.2 错误处理
public ConsumeConcurrentlyStatus consumeBatch(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 1. 记录失败消息
List<Integer> failedIndices = new ArrayList<>();
for (int i = 0; i < msgs.size(); i++) {
MessageExt msg = msgs.get(i);
try {
processMessage(msg);
} catch (Exception e) {
log.error("处理消息失败:index={}", i, e);
failedIndices.add(i);
}
}
// 2. 部分成功处理
if (failedIndices.isEmpty()) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else if (failedIndices.size() == msgs.size()) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else {
// 部分失败,记录失败消息
for (int index : failedIndices) {
saveFailedMessage(msgs.get(index));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
7.3 幂等性保证
// 批量去重
public void processBatch(List<MessageExt> msgs) {
// 1. 提取消息 ID
Set<String> msgIds = msgs.stream()
.map(MessageExt::getMsgId)
.collect(Collectors.toSet());
// 2. 批量检查是否已处理
Set<String> processedIds = checkProcessed(msgIds);
// 3. 过滤已处理消息
List<MessageExt> toProcess = msgs.stream()
.filter(msg -> !processedIds.contains(msg.getMsgId()))
.collect(Collectors.toList());
// 4. 处理未处理消息
if (!toProcess.isEmpty()) {
processMessages(toProcess);
markAsProcessed(toProcess);
}
}
总结
RocketMQ 批量消息的核心要点:
- 批量发送:构建批量消息、分批处理、工具类封装
- 批量消费:并发消费、批量数据库操作、批量外部调用
- 性能优化:Producer 配置、Consumer 配置、并行处理
- 实战应用:日志上报、订单导入、数据同步
- 监控告警:发送监控、消费监控、错误处理
核心要点:
- 合理设置批量大小
- 实现批量处理逻辑
- 注意错误处理和幂等性
- 监控批量处理性能
参考资料
- RocketMQ 批量消息官方文档
- RocketMQ 源码
- 《RocketMQ 技术内幕》第 7 章