Skip to content
清晨的一缕阳光
返回

消息队列实战

消息队列实战

消息队列是分布式系统的核心组件,用于解耦、异步、削峰。Kafka、RocketMQ、RabbitMQ 是主流选择。本文详解消息队列的应用场景、最佳实践和常见问题解决方案。

一、消息队列选型

1.1 主流 MQ 对比

特性KafkaRocketMQRabbitMQPulsar
吞吐量百万 +十万 +万级百万 +
延迟ms 级ms 级μs 级ms 级
可靠性很高
消息顺序分区有序支持支持支持
事务消息不支持支持不支持支持
延迟消息不支持支持支持支持
适用场景日志、大数据交易、订单低延迟云原生

1.2 选型建议

graph TD
    A[高吞吐量需求?] -->|是 | B[Kafka/Pulsar]
    A -->|否 | C[需要事务消息?]
    C -->|是 | D[RocketMQ]
    C -->|否 | E[需要低延迟μs 级?]
    E -->|是 | F[RabbitMQ]
    E -->|否 | G[RocketMQ]
    G --> H[云原生部署?]
    H -->|是 | I[Pulsar]
    H -->|否 | J[Kafka/RocketMQ]

二、Kafka 实战

2.1 基础配置

# Kafka 配置示例
spring:
  kafka:
    bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
    
    # 生产者配置
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all                    # 所有副本确认
      retries: 3                   # 重试次数
      batch-size: 16384            # 批次大小
      buffer-memory: 33554432      # 缓冲区大小
      compression-type: snappy     # 压缩方式
      properties:
        enable.idempotence: true   # 开启幂等
    
    # 消费者配置
    consumer:
      group-id: myapp-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: latest    # 从最新消费
      enable-auto-commit: false    # 手动提交
      max-poll-records: 500        # 每次拉取数量
      properties:
        session.timeout.ms: 30000
        heartbeat.interval.ms: 10000

2.2 生产者实现

/**
 * Kafka 生产者服务
 */
@Service
public class KafkaProducerService {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    /**
     * 发送消息(异步)
     */
    public void send(String topic, String key, Object data) {
        kafkaTemplate.send(topic, key, data)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    log.info("发送成功:topic={}, partition={}, offset={}",
                        result.getRecord().topic(),
                        result.getRecord().partition(),
                        result.getRecord().offset());
                } else {
                    log.error("发送失败:topic={}", topic, ex);
                }
            });
    }
    
    /**
     * 发送消息(同步)
     */
    public void sendSync(String topic, String key, Object data) {
        try {
            SendResult result = kafkaTemplate.send(topic, key, data).get(5, TimeUnit.SECONDS);
            log.info("发送成功:offset={}", result.getRecordMetadata().offset());
        } catch (Exception e) {
            log.error("发送失败", e);
            throw new BusinessException("消息发送失败");
        }
    }
    
    /**
     * 发送事务消息
     */
    @Transactional
    public void sendTransactional(String topic, Object data) {
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send(topic, "tx-key", data);
            // 其他数据库操作...
            return true;
        });
    }
}

/**
 * 消息发送最佳实践
 */
@Component
public class BestPracticeProducer {
    
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    
    /**
     * 1. 消息压缩(大数据量)
     */
    public void sendCompressed(OrderEvent event) {
        // Kafka 配置 compression-type: snappy/lz4/zstd
        kafkaTemplate.send("orders", event.getOrderId(), event);
    }
    
    /**
     * 2. 消息分区(保证顺序)
     */
    public void sendOrdered(OrderEvent event) {
        // 相同 orderId 发送到同一分区
        String key = event.getOrderId();
        kafkaTemplate.send("orders", key, event);
    }
    
    /**
     * 3. 消息去重(幂等性)
     */
    public void sendIdempotent(OrderEvent event) {
        // 开启 enable.idempotence: true
        // 配合业务去重(Redis 记录已处理消息 ID)
        kafkaTemplate.send("orders", event.getOrderId(), event);
    }
    
    /**
     * 4. 消息确认(可靠性)
     */
    public void sendWithAck(OrderEvent event) {
        kafkaTemplate.send("orders", event.getOrderId(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    // 发送失败,记录日志或重试
                    log.error("发送失败", ex);
                    // 存入失败队列,稍后重试
                    saveToFailedQueue(event);
                }
            });
    }
    
    private void saveToFailedQueue(OrderEvent event) {
        // 实现失败消息存储
    }
}

2.3 消费者实现

/**
 * Kafka 消费者服务
 */
@Component
public class KafkaConsumerService {
    
    /**
     * 基础消费
     */
    @KafkaListener(topics = "orders", groupId = "order-group")
    public void consumeOrder(ConsumerRecord<String, OrderEvent> record) {
        log.info("收到消息:key={}, value={}, partition={}, offset={}",
            record.key(), record.value(), record.partition(), record.offset());
        
        // 处理业务
        processOrder(record.value());
    }
    
    /**
     * 手动 ack(保证消息不丢失)
     */
    @KafkaListener(topics = "orders", groupId = "order-group")
    public void consumeWithManualAck(
            ConsumerRecord<String, OrderEvent> record,
            Acknowledgment ack) {
        
        try {
            // 处理业务
            processOrder(record.value());
            
            // 手动提交
            ack.acknowledge();
            
        } catch (Exception e) {
            log.error("处理失败", e);
            // 不提交,稍后重试
            throw e;  // 触发重试
        }
    }
    
    /**
     * 批量消费
     */
    @KafkaListener(topics = "orders", groupId = "batch-group")
    public void consumeBatch(List<ConsumerRecord<String, OrderEvent>> records) {
        log.info("批量消费:{} 条", records.size());
        
        for (ConsumerRecord<String, OrderEvent> record : records) {
            processOrder(record.value());
        }
    }
    
    /**
     * 重试机制
     */
    @Retryable(
        value = Exception.class,
        maxAttempts = 3,
        backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    @KafkaListener(topics = "orders", groupId = "retry-group")
    public void consumeWithRetry(ConsumerRecord<String, OrderEvent> record) {
        processOrder(record.value());
    }
    
    /**
     * 死信队列
     */
    @KafkaListener(topics = "orders.dlq", groupId = "dlq-group")
    public void consumeDLQ(ConsumerRecord<String, OrderEvent> record) {
        log.error("死信消息:key={}, value={}", record.key(), record.value());
        // 人工处理或告警
    }
    
    private void processOrder(OrderEvent event) {
        // 业务处理逻辑
    }
}

/**
 * 重试和死信配置
 */
@Configuration
public class KafkaConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory) {
        
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        
        // 开启批量
        factory.setBatchListener(true);
        
        // 错误处理(死信队列)
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            (record, ex) -> {
                // 死信处理
                log.error("消息处理失败,发送到死信队列", ex);
                sendToDLQ(record);
            },
            new FixedBackOff(1000L, 3L)  // 重试 3 次,间隔 1 秒
        );
        
        // 不重试的异常
        errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);
        
        factory.setCommonErrorHandler(errorHandler);
        
        return factory;
    }
    
    private void sendToDLQ(ConsumerRecord<?, ?> record) {
        // 发送到死信队列
    }
}

三、RocketMQ 实战

3.1 基础配置

# RocketMQ 配置
rocketmq:
  name-server: 192.168.1.100:9876;192.168.1.101:9876
  
  producer:
    group: order-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2
  
  consumer:
    order-consumer:
      group: order-consumer-group
      topic: order-topic
      consume-mode: CONCURRENTLY  # 并发消费
      message-model: CLUSTERING   # 集群消费
      pull-batch-size: 32

3.2 生产者实现

/**
 * RocketMQ 生产者
 */
@Component
public class RocketMQProducerService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送普通消息
     */
    public void send(String topic, Object data) {
        rocketMQTemplate.convertAndSend(topic, data);
    }
    
    /**
     * 发送同步消息
     */
    public void sendSync(String topic, Object data) {
        SendResult result = rocketMQTemplate.syncSend(topic, data);
        log.info("发送成功:msgId={}", result.getMsgId());
    }
    
    /**
     * 发送异步消息
     */
    public void sendAsync(String topic, Object data) {
        rocketMQTemplate.asyncSend(topic, data, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送成功:msgId={}", sendResult.getMsgId());
            }
            
            @Override
            public void onException(Throwable e) {
                log.error("发送失败", e);
            }
        });
    }
    
    /**
     * 发送顺序消息
     */
    public void sendOrderly(String topic, Object data, String orderId) {
        // 相同 orderId 发送到同一队列(保证顺序)
        rocketMQTemplate.syncSendOrderly(topic, data, orderId);
    }
    
    /**
     * 发送延迟消息
     */
    public void sendDelay(String topic, Object data, int delayLevel) {
        // delayLevel: 1-18 (1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h)
        rocketMQTemplate.syncSend(topic, 
            MessageBuilder.withPayload(data)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel)
                .build());
    }
    
    /**
     * 发送事务消息
     */
    @RocketMQTransactionListener
    public class OrderTransactionListener implements RocketMQLocalTransactionListener {
        
        @Autowired
        private OrderService orderService;
        
        @Override
        @Transactional
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            try {
                Order order = (Order) arg;
                
                // 1. 执行本地事务
                orderService.create(order);
                
                // 2. 返回提交
                return RocketMQLocalTransactionState.COMMIT;
                
            } catch (Exception e) {
                log.error("事务执行失败", e);
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
        
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // 事务回查(当 Broker 未收到确认时调用)
            String orderId = msg.getHeaders().get("orderId").toString();
            
            Order order = orderService.getById(orderId);
            if (order != null) {
                return RocketMQLocalTransactionState.COMMIT;
            }
            
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    /**
     * 发送事务消息
     */
    public void sendTransactionMessage(String topic, Order order) {
        rocketMQTemplate.sendMessageInTransaction(
            topic,
            MessageBuilder.withPayload(order)
                .setHeader("orderId", order.getId())
                .build(),
            order  // 本地事务参数
        );
    }
}

3.3 消费者实现

/**
 * RocketMQ 消费者
 */
@Component
public class RocketMQConsumerService {
    
    /**
     * 并发消费
     */
    @RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-consumer-group",
        consumeMode = ConsumeMode.CONCURRENTLY,
        messageModel = MessageModel.CLUSTERING
    )
    public class OrderConsumer implements RocketMQListener<OrderEvent> {
        
        @Override
        public void onMessage(OrderEvent message) {
            log.info("收到订单消息:{}", message.getOrderId());
            processOrder(message);
        }
    }
    
    /**
     * 顺序消费
     */
    @RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-sequence-group",
        consumeMode = ConsumeMode.ORDERLY  // 顺序消费
    )
    public class OrderlyConsumer implements RocketMQListener<OrderEvent> {
        
        @Override
        public void onMessage(OrderEvent message) {
            // 同一订单的消息按顺序处理
            processOrder(message);
        }
    }
    
    /**
     * 批量消费
     */
    @RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-batch-group"
    )
    public class BatchConsumer implements RocketMQPushConsumerLifecycleListener {
        
        @Override
        public void prepareStart(DefaultMQPushConsumer consumer) {
            consumer.setConsumeMessageBatchMaxSize(32);
        }
    }
    
    /**
     * 延迟消息消费
     */
    @RocketMQMessageListener(
        topic = "order-delay-topic",
        consumerGroup = "order-delay-group"
    )
    public class DelayConsumer implements RocketMQListener<OrderEvent> {
        
        @Override
        public void onMessage(OrderEvent message) {
            // 处理延迟消息(如订单超时关闭)
            closeOrder(message.getOrderId());
        }
    }
    
    private void processOrder(OrderEvent message) {
        // 业务处理
    }
    
    private void closeOrder(String orderId) {
        // 关闭订单逻辑
    }
}

3.4 订单超时关闭实战

/**
 * 订单超时关闭(延迟消息实战)
 */
@Service
public class OrderTimeoutService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 创建订单时发送延迟消息
     */
    @Transactional
    public Order createOrder(OrderRequest request) {
        // 1. 创建订单
        Order order = new Order();
        order.setId(IdGenerator.generate());
        order.setStatus(OrderStatus.UNPAID);
        orderMapper.insert(order);
        
        // 2. 发送延迟消息(30 分钟后检查)
        OrderTimeoutMessage timeoutMsg = new OrderTimeoutMessage();
        timeoutMsg.setOrderId(order.getId());
        timeoutMsg.setCreateTime(System.currentTimeMillis());
        
        rocketMQTemplate.syncSend(
            "order-timeout-topic",
            timeoutMsg,
            3000,  // 超时时间
            18     // 延迟级别(2 小时)
        );
        
        return order;
    }
    
    /**
     * 消费延迟消息
     */
    @RocketMQMessageListener(
        topic = "order-timeout-topic",
        consumerGroup = "order-timeout-group"
    )
    public class TimeoutConsumer implements RocketMQListener<OrderTimeoutMessage> {
        
        @Autowired
        private OrderService orderService;
        
        @Override
        @Transactional
        public void onMessage(OrderTimeoutMessage message) {
            String orderId = message.getOrderId();
            
            // 检查订单状态
            Order order = orderService.getById(orderId);
            
            if (order != null && order.getStatus() == OrderStatus.UNPAID) {
                // 关闭订单
                orderService.closeOrder(orderId);
                
                log.info("订单超时关闭:orderId={}", orderId);
            }
        }
    }
}

四、常见问题

4.1 消息丢失

mindmap
  root((消息丢失场景))
    生产者发送失败
      原因:网络问题/Broker 宕机
      解决
        同步发送 + 重试
        事务消息
        本地消息表
    Broker 存储失败
      原因:磁盘满/宕机
      解决
        同步刷盘
        同步复制
        Dledger 高可用
    消费者消费失败
      原因:处理异常/宕机
      解决
        手动 ack
        重试机制
        死信队列

4.2 消息重复

/**
 * 消息去重(幂等性)
 */
@Service
public class IdempotentConsumer {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-idempotent-group"
    )
    public class IdempotentOrderConsumer implements RocketMQListener<OrderEvent> {
        
        @Override
        @Transactional
        public void onMessage(OrderEvent message) {
            String msgId = message.getMsgId();
            String key = "msg:processed:" + msgId;
            
            // 1. 检查是否已处理(Redis setnx)
            Boolean added = redisTemplate.opsForValue()
                .setIfAbsent(key, "1", 24, TimeUnit.HOURS);
            
            if (Boolean.FALSE.equals(added)) {
                log.warn("消息已处理:msgId={}", msgId);
                return;
            }
            
            // 2. 处理业务
            processOrder(message);
            
            // 注意:Redis 写入和业务操作需保证原子性
            // 可使用 Lua 脚本或分布式锁
        }
    }
    
    private void processOrder(OrderEvent message) {
        // 业务处理
    }
}

4.3 消息积压

/**
 * 消息积压处理
 */
@Component
public class BacklogHandler {
    
    /**
     * 监控积压量
     */
    @Scheduled(fixedRate = 60000)
    public void monitorBacklog() {
        // 获取消费者积压量
        long backlog = getConsumerBacklog("order-topic", "order-consumer-group");
        
        if (backlog > 10000) {
            log.warn("消息积压:{}", backlog);
            
            // 告警
            alertService.send("消息积压超过阈值:" + backlog);
            
            // 自动扩容
            if (backlog > 50000) {
                scaleUpConsumers();
            }
        }
    }
    
    /**
     * 临时扩容消费者
     */
    private void scaleUpConsumers() {
        // K8s HPA 自动扩容
        // 或手动增加消费者实例
        log.info("扩容消费者");
    }
    
    /**
     * 积压消息处理(批量消费)
     */
    @RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-backlog-group",
        consumeThreadNumber = 64  // 增加消费线程
    )
    public class BacklogConsumer implements RocketMQListener<OrderEvent> {
        
        @Override
        public void onMessage(OrderEvent message) {
            // 简化处理逻辑,提升消费速度
            processSimple(message);
        }
        
        private void processSimple(OrderEvent message) {
            // 快速处理
        }
    }
}

4.4 顺序消息

/**
 * 顺序消息保证
 */
@Service
public class OrderMessageService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送顺序消息
     */
    public void sendOrderMessage(OrderEvent event) {
        // 使用 orderId 作为 sharding key
        // 相同 orderId 的消息发送到同一队列
        rocketMQTemplate.syncSendOrderly(
            "order-topic",
            event,
            event.getOrderId()
        );
    }
    
    /**
     * 顺序消费
     */
    @RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-orderly-group",
        consumeMode = ConsumeMode.ORDERLY  // 顺序消费
    )
    public class OrderlyConsumer implements RocketMQListener<OrderEvent> {
        
        @Override
        public void onMessage(OrderEvent message) {
            // RocketMQ 保证同一队列的消息按顺序投递
            // 但需要保证处理成功(不抛异常)
            
            try {
                processOrder(message);
            } catch (Exception e) {
                log.error("顺序消费失败", e);
                // 抛出异常,触发重试
                // RocketMQ 会重试直到成功
                throw e;
            }
        }
    }
}

五、最佳实践

5.1 生产环境配置

# Kafka 生产配置
producer:
  acks: all                    # 所有副本确认
  retries: 3                   # 重试次数
  max.in.flight.requests.per.connection: 5  # 飞行请求数
  enable.idempotence: true     # 开启幂等
  compression.type: snappy     # 压缩
  
# RocketMQ 生产配置
producer:
  send-message-timeout: 3000
  retry-times-when-send-failed: 2
  vip-channel-enabled: false   # 关闭 VIP 通道
  
# 消费者配置
consumer:
  consume-thread-min: 20
  consume-thread-max: 64
  pull-batch-size: 32
  consume-message-batch-max-size: 32

5.2 监控告警

/**
 * MQ 监控
 */
@Component
public class MQMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    /**
     * 记录发送指标
     */
    public void recordSend(String topic, long latency, boolean success) {
        Counter.builder("mq.send.total")
            .tag("topic", topic)
            .tag("success", String.valueOf(success))
            .register(meterRegistry)
            .increment();
        
        Timer.builder("mq.send.latency")
            .tag("topic", topic)
            .register(meterRegistry)
            .record(latency, TimeUnit.MILLISECONDS);
    }
    
    /**
     * 记录消费指标
     */
    public void recordConsume(String topic, long latency, boolean success) {
        // 类似发送指标
    }
}

5.3 设计原则

mindmap
  root((MQ 设计原则))
    推荐做法
      消息体尽量小<10KB
      重要消息持久化
      开启手动 ack
      实现消息去重
      监控积压量
      设置合理重试次数
    避免做法
      大消息>1MB
      同步等待消费结果
      消费者处理时间过长
      忽略消息失败
      无监控告警

六、总结

6.1 核心要点

  1. 选型:根据场景选择 Kafka/RocketMQ/RabbitMQ
  2. 可靠性:ack 机制、事务消息、持久化
  3. 幂等性:消息去重是必须的
  4. 顺序性:分区/队列有序 + 顺序消费
  5. 监控:积压量、发送延迟、消费延迟

6.2 应用场景

场景推荐方案
日志收集Kafka
订单处理RocketMQ
实时计算Kafka
异步解耦RocketMQ/Kafka
延迟任务RocketMQ
事件驱动Kafka/RocketMQ

消息队列是分布式系统的血管,承载着系统间的通信。合理使用可以提升系统性能,滥用会带来灾难。


分享这篇文章到:

上一篇文章
云原生迁移实战:传统应用上云指南
下一篇文章
服务网格实战:Istio 微服务治理指南