Skip to content
清晨的一缕阳光
返回

RocketMQ 批量消息详解与实战

RocketMQ 支持批量消息发送和消费,可以显著提升系统吞吐量。本文将深入探讨批量消息的实现原理和实战应用。

一、批量消息基础

1.1 为什么需要批量消息?

场景

日志系统:每秒产生 1 万条日志
订单系统:批量导入 1000 个订单
数据同步:批量同步数据库记录

问题

单条发送:1 万条消息 = 1 万次网络请求
批量发送:1 万条消息 = 100 次网络请求(每批 100 条)

性能提升:100 倍

1.2 批量限制

限制项默认值说明
单次最大条数128一次批量最多 128 条
单次最大大小4MB批量消息总大小
发送超时3 秒批量发送超时时间

二、批量发送

2.1 简单批量发送

// 1. 构建批量消息
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) {
    Message msg = new Message(
        "batch-topic",
        "batch",
        "key_" + i,
        ("body_" + i).getBytes()
    );
    messages.add(msg);
}

// 2. 批量发送
SendResult sendResult = producer.send(messages);

System.out.println("批量发送成功:" + sendResult.getMsgId());

2.2 批量发送限制

大小限制

// 批量消息总大小不能超过 4MB
public void sendLargeBatch() {
    List<Message> messages = new ArrayList<>();
    int totalSize = 0;
    final int MAX_SIZE = 4 * 1024 * 1024;  // 4MB
    
    for (int i = 0; i < 1000; i++) {
        byte[] body = new byte[1024];  // 1KB
        Message msg = new Message("topic", "tag", body);
        
        // 估算大小
        int msgSize = body.length + 100;  // body + header
        if (totalSize + msgSize > MAX_SIZE) {
            break;  // 超过限制
        }
        
        messages.add(msg);
        totalSize += msgSize;
    }
    
    producer.send(messages);
}

分批发送

// 超过限制需要分批发送
public void sendBatchInChunks(List<Message> allMessages) {
    final int BATCH_SIZE = 128;  // 每批最多 128 条
    
    for (int i = 0; i < allMessages.size(); i += BATCH_SIZE) {
        int end = Math.min(i + BATCH_SIZE, allMessages.size());
        List<Message> batch = allMessages.subList(i, end);
        
        try {
            producer.send(batch);
            log.info("批量发送成功:{} - {}", i, end);
        } catch (Exception e) {
            log.error("批量发送失败:{} - {}", i, end, e);
            // 重试或记录失败
        }
    }
}

2.3 批量发送工具类

public class BatchMessageSender {
    
    private final DefaultMQProducer producer;
    private final int batchSize;
    private final long flushIntervalMs;
    
    private List<Message> buffer = new ArrayList<>();
    private long lastFlushTime = System.currentTimeMillis();
    
    public BatchMessageSender(DefaultMQProducer producer, int batchSize, long flushIntervalMs) {
        this.producer = producer;
        this.batchSize = batchSize;
        this.flushIntervalMs = flushIntervalMs;
    }
    
    /**
     * 添加消息到缓冲区
     */
    public synchronized void addMessage(Message message) {
        buffer.add(message);
        
        // 达到批量条件
        if (buffer.size() >= batchSize || 
            System.currentTimeMillis() - lastFlushTime >= flushIntervalMs) {
            flush();
        }
    }
    
    /**
     * 强制刷新
     */
    public synchronized void flush() {
        if (buffer.isEmpty()) {
            return;
        }
        
        try {
            List<Message> batch = new ArrayList<>(buffer);
            producer.send(batch);
            buffer.clear();
            lastFlushTime = System.currentTimeMillis();
        } catch (Exception e) {
            log.error("批量发送失败", e);
        }
    }
    
    /**
     * 关闭
     */
    public void close() {
        flush();  // 刷新剩余消息
    }
}

// 使用示例
BatchMessageSender sender = new BatchMessageSender(producer, 100, 1000);

for (int i = 0; i < 10000; i++) {
    Message msg = new Message("topic", "body".getBytes());
    sender.addMessage(msg);
}

sender.close();

三、批量消费

3.1 并发批量消费

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    System.out.println("收到批量消息:" + msgs.size() + "");
    
    // 批量处理
    for (MessageExt msg : msgs) {
        try {
            processMessage(msg);
        } catch (Exception e) {
            log.error("处理消息失败", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

3.2 批量数据库操作

consumer.registerMessageListener((msgs, context) -> {
    // 1. 解析消息
    List<Order> orders = new ArrayList<>();
    for (MessageExt msg : msgs) {
        Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
        orders.add(order);
    }
    
    // 2. 批量插入数据库
    try {
        orderMapper.batchInsert(orders);
        log.info("批量插入成功:{} 条", orders.size());
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        log.error("批量插入失败", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
});

// Mapper 批量插入
@Insert("<script>" +
    "INSERT INTO orders (order_id, user_id, amount) VALUES " +
    "<foreach item='order' collection='list' separator=','>" +
    "(#{order.orderId}, #{order.userId}, #{order.amount})" +
    "</foreach>" +
    "</script>")
int batchInsert(@Param("list") List<Order> orders);

3.3 批量外部调用

consumer.registerMessageListener((msgs, context) -> {
    // 1. 收集请求
    List<NotifyRequest> requests = new ArrayList<>();
    for (MessageExt msg : msgs) {
        NotifyRequest req = parseMessage(msg);
        requests.add(req);
    }
    
    // 2. 批量调用外部接口
    try {
        NotifyResponse response = httpClient.batchNotify(requests);
        
        if (response.isSuccess()) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } else {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    } catch (Exception e) {
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
});

四、性能优化

4.1 Producer 优化

// 批量发送配置
Properties props = new Properties();

// 增加批次大小
props.put("maxMessageSize", 4194304");  // 4MB

// 异步发送
producer.setSendMsgTimeout(5000);

// 重试配置
producer.setRetryTimesWhenSendFailed(3);

4.2 Consumer 优化

// 拉取配置
consumer.setPullBatchSize(64);           // 每次拉取 64 条
consumer.setConsumeThreadMin(20);        // 最小消费线程
consumer.setConsumeThreadMax(64);        // 最大消费线程

// 流控配置
consumer.setPullThresholdForQueue(200);  // 队列消息数阈值
consumer.setPullThresholdSizeForQueue(100);  // 队列大小阈值

4.3 批量处理优化

分批处理

public void processBatch(List<MessageExt> msgs) {
    final int SUB_BATCH_SIZE = 50;
    
    // 分成更小的批次处理
    for (int i = 0; i < msgs.size(); i += SUB_BATCH_SIZE) {
        int end = Math.min(i + SUB_BATCH_SIZE, msgs.size());
        List<MessageExt> subBatch = msgs.subList(i, end);
        
        processSubBatch(subBatch);
    }
}

并行处理

public void processBatchParallel(List<MessageExt> msgs) {
    // 使用并行流处理
    msgs.parallelStream().forEach(msg -> {
        try {
            processMessage(msg);
        } catch (Exception e) {
            log.error("处理消息失败", e);
            throw new RuntimeException(e);
        }
    });
}

异步处理

private final ExecutorService executor = Executors.newFixedThreadPool(10);

public void processBatchAsync(List<MessageExt> msgs) {
    List<Future<?>> futures = new ArrayList<>();
    
    for (MessageExt msg : msgs) {
        futures.add(executor.submit(() -> processMessage(msg)));
    }
    
    // 等待所有任务完成
    for (Future<?> future : futures) {
        try {
            future.get();
        } catch (Exception e) {
            log.error("异步处理失败", e);
        }
    }
}

五、实战案例

5.1 日志批量上报

public class LogBatchReporter {
    
    private final DefaultMQProducer producer;
    private final BlockingQueue<LogEntry> queue = new LinkedBlockingQueue<>(10000);
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    public LogBatchReporter(DefaultMQProducer producer) {
        this.producer = producer;
        
        // 定时刷新
        scheduler.scheduleAtFixedRate(this::flush, 1, 1, TimeUnit.SECONDS);
    }
    
    /**
     * 添加日志
     */
    public void addLog(LogEntry log) {
        if (!queue.offer(log)) {
            // 队列满,强制刷新
            flush();
            queue.offer(log);
        }
    }
    
    /**
     * 批量刷新
     */
    public void flush() {
        List<LogEntry> logs = new ArrayList<>();
        queue.drainTo(logs, 100);  // 最多 100 条
        
        if (logs.isEmpty()) {
            return;
        }
        
        // 构建批量消息
        List<Message> messages = new ArrayList<>();
        for (LogEntry log : logs) {
            Message msg = new Message(
                "log-topic",
                log.getLevel(),
                log.getService(),
                JSON.toJSONString(log).getBytes()
            );
            messages.add(msg);
        }
        
        // 批量发送
        try {
            producer.send(messages);
        } catch (Exception e) {
            log.error("批量发送日志失败", e);
        }
    }
}

5.2 订单批量导入

@Service
public class OrderBatchImportService {
    
    @Autowired
    private DefaultMQProducer producer;
    
    @Autowired
    private OrderMapper orderMapper;
    
    /**
     * 批量导入订单
     */
    @Transactional(rollbackFor = Exception.class)
    public void batchImport(List<OrderImportDTO> orders) {
        // 1. 分批处理(每批 100 条)
        final int BATCH_SIZE = 100;
        
        for (int i = 0; i < orders.size(); i += BATCH_SIZE) {
            int end = Math.min(i + BATCH_SIZE, orders.size());
            List<OrderImportDTO> batch = orders.subList(i, end);
            
            // 2. 批量插入数据库
            List<Order> orderList = batch.stream()
                .map(this::convertToOrder)
                .collect(Collectors.toList());
            
            orderMapper.batchInsert(orderList);
            
            // 3. 批量发送消息
            List<Message> messages = batch.stream()
                .map(this::convertToMessage)
                .collect(Collectors.toList());
            
            try {
                producer.send(messages);
            } catch (Exception e) {
                log.error("批量发送消息失败", e);
                throw new BusinessException("发送消息失败");
            }
        }
    }
}

5.3 数据批量同步

public class DataBatchSyncService {
    
    private final DefaultMQPushConsumer consumer;
    private final JdbcTemplate jdbcTemplate;
    
    public DataBatchSyncService() {
        consumer = new DefaultMQPushConsumer("data-sync-group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("data-sync-topic", "*");
        
        // 批量消费配置
        consumer.setPullBatchSize(64);
        consumer.setConsumeThreadMax(32);
        
        consumer.registerMessageListener(this::consumeBatch);
    }
    
    private ConsumeConcurrentlyStatus consumeBatch(List<MessageExt> msgs, 
                                                    ConsumeConcurrentlyContext context) {
        // 1. 解析消息
        List<DataSyncRecord> records = msgs.stream()
            .map(this::parseMessage)
            .collect(Collectors.toList());
        
        // 2. 按表分组
        Map<String, List<DataSyncRecord>> grouped = records.stream()
            .collect(Collectors.groupingBy(DataSyncRecord::getTableName));
        
        // 3. 批量同步
        for (Map.Entry<String, List<DataSyncRecord>> entry : grouped.entrySet()) {
            String table = entry.getKey();
            List<DataSyncRecord> tableRecords = entry.getValue();
            
            try {
                batchSyncTable(table, tableRecords);
            } catch (Exception e) {
                log.error("同步表 {} 失败", table, e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    
    private void batchSyncTable(String table, List<DataSyncRecord> records) {
        // 批量 SQL
        String sql = buildBatchSql(table, records);
        jdbcTemplate.batchUpdate(sql, records);
    }
}

六、监控告警

6.1 批量发送监控

@Component
public class BatchMessageMetrics {
    
    private final MeterRegistry meterRegistry;
    
    private final AtomicLong sendCount = new AtomicLong();
    private final AtomicLong sendBatchCount = new AtomicLong();
    private final AtomicLong sendFailedCount = new AtomicLong();
    
    public BatchMessageMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        
        meterRegistry.gauge("batch.message.send.count", sendCount);
        meterRegistry.gauge("batch.message.send.batch.count", sendBatchCount);
        meterRegistry.gauge("batch.message.send.failed.count", sendFailedCount);
    }
    
    public void recordSend(int batchSize) {
        sendCount.addAndGet(batchSize);
        sendBatchCount.incrementAndGet();
    }
    
    public void recordFailed() {
        sendFailedCount.incrementAndGet();
    }
}

6.2 批量消费监控

@Component
public class BatchConsumeMetrics {
    
    private final MeterRegistry meterRegistry;
    
    public BatchConsumeMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    public void recordConsume(List<MessageExt> msgs, long duration) {
        // 消费数量
        meterRegistry.counter("batch.consume.message.count")
            .increment(msgs.size());
        
        // 消费批次
        meterRegistry.counter("batch.consume.batch.count")
            .increment();
        
        // 消费延迟
        meterRegistry.timer("batch.consume.duration")
            .record(duration, TimeUnit.MILLISECONDS);
        
        // 批量大小分布
        meterRegistry.summary("batch.consume.batch.size")
            .record(msgs.size());
    }
}

七、最佳实践

7.1 批量大小选择

场景批量大小说明
日志上报100-200小消息,大批量
订单处理20-50中等消息,中批量
数据同步50-100大消息,小批量

7.2 错误处理

public ConsumeConcurrentlyStatus consumeBatch(List<MessageExt> msgs, 
                                               ConsumeConcurrentlyContext context) {
    // 1. 记录失败消息
    List<Integer> failedIndices = new ArrayList<>();
    
    for (int i = 0; i < msgs.size(); i++) {
        MessageExt msg = msgs.get(i);
        try {
            processMessage(msg);
        } catch (Exception e) {
            log.error("处理消息失败:index={}", i, e);
            failedIndices.add(i);
        }
    }
    
    // 2. 部分成功处理
    if (failedIndices.isEmpty()) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } else if (failedIndices.size() == msgs.size()) {
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    } else {
        // 部分失败,记录失败消息
        for (int index : failedIndices) {
            saveFailedMessage(msgs.get(index));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

7.3 幂等性保证

// 批量去重
public void processBatch(List<MessageExt> msgs) {
    // 1. 提取消息 ID
    Set<String> msgIds = msgs.stream()
        .map(MessageExt::getMsgId)
        .collect(Collectors.toSet());
    
    // 2. 批量检查是否已处理
    Set<String> processedIds = checkProcessed(msgIds);
    
    // 3. 过滤已处理消息
    List<MessageExt> toProcess = msgs.stream()
        .filter(msg -> !processedIds.contains(msg.getMsgId()))
        .collect(Collectors.toList());
    
    // 4. 处理未处理消息
    if (!toProcess.isEmpty()) {
        processMessages(toProcess);
        markAsProcessed(toProcess);
    }
}

总结

RocketMQ 批量消息的核心要点:

  1. 批量发送:构建批量消息、分批处理、工具类封装
  2. 批量消费:并发消费、批量数据库操作、批量外部调用
  3. 性能优化:Producer 配置、Consumer 配置、并行处理
  4. 实战应用:日志上报、订单导入、数据同步
  5. 监控告警:发送监控、消费监控、错误处理

核心要点

参考资料


分享这篇文章到:

上一篇文章
Tool/Function 设计规范
下一篇文章
RocketMQ Proxy 深度解析与实战