Skip to content
清晨的一缕阳光
返回

Spring Boot 集成 RocketMQ 实战指南

RocketMQ 提供了 Spring Boot Starter,简化了 RocketMQ 应用的开发。本文将深入探讨 Spring Boot 集成 RocketMQ 的配置、使用和最佳实践。

一、基础配置

1.1 依赖配置

Maven 依赖

<dependencies>
    <!-- RocketMQ Spring Boot Starter -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>
    
    <!-- Spring Boot -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
</dependencies>

1.2 应用配置

application.yml

rocketmq:
  name-server: 192.168.1.100:9876;192.168.1.101:9876
  producer:
    group: my-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2
    max-message-size: 4194304
    compress-message-body-threshold: 1048576
  consumer:
    listeners:
      - topic: order-topic
        group: order-consumer-group

二、Producer 开发

2.1 基础 Producer

@Service
public class RocketMQProducerService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 同步发送
     */
    public void sendSync(String topic, Object payload) {
        SendResult result = rocketMQTemplate.syncSend(topic, payload);
        log.info("发送成功:msgId={}", result.getMsgId());
    }
    
    /**
     * 异步发送
     */
    public void sendAsync(String topic, Object payload) {
        rocketMQTemplate.asyncSend(topic, payload, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送成功:msgId={}", sendResult.getMsgId());
            }
            
            @Override
            public void onException(Throwable e) {
                log.error("发送失败", e);
            }
        });
    }
    
    /**
     * 单向发送
     */
    public void sendOneway(String topic, Object payload) {
        rocketMQTemplate.sendOneWay(topic, payload);
    }
    
    /**
     * 发送延迟消息
     */
    public void sendDelay(String topic, Object payload, int delayLevel) {
        rocketMQTemplate.syncSend(topic, payload, 3000, delayLevel);
    }
}

2.2 自定义 Producer

@Configuration
public class RocketMQProducerConfig {
    
    @Bean
    public DefaultMQProducer defaultMQProducer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("custom-producer");
        producer.setNamesrvAddr("ns1:9876;ns2:9876");
        producer.setSendMsgTimeout(5000);
        producer.setRetryTimesWhenSendFailed(3);
        producer.setRetryTimesWhenSendAsyncFailed(3);
        producer.setVipChannelEnabled(false);
        producer.start();
        return producer;
    }
}

2.3 批量发送

@Service
public class BatchProducerService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 批量发送
     */
    public void sendBatch(String topic, List<Object> payloads) {
        List<Message> messages = payloads.stream()
            .map(payload -> {
                Message msg = new Message(topic, JSON.toJSONString(payload).getBytes());
                return msg;
            })
            .collect(Collectors.toList());
        
        SendResult result = rocketMQTemplate.getProducer().send(messages);
        log.info("批量发送成功:size={}", messages.size());
    }
}

三、Consumer 开发

3.1 基础 Consumer

@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        log.info("收到订单:{}", order);
        processOrder(order);
    }
    
    private void processOrder(Order order) {
        // 处理订单
    }
}

3.2 并发消费

@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group",
    consumeThreadMax = 64,
    messageModel = MessageModel.CLUSTERING
)
public class ConcurrentOrderConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        // 并发处理
    }
}

3.3 顺序消费

@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group",
    consumeMode = ConsumeMode.ORDERLY
)
public class OrderlyOrderConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        // 顺序处理
    }
}

3.4 事务消息 Consumer

@Component
@RocketMQMessageListener(
    topic = "transaction-topic",
    consumerGroup = "transaction-consumer-group"
)
public class TransactionConsumer implements RocketMQListener<Order> {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    public void onMessage(Order order) {
        // 处理事务消息
        orderService.processTransaction(order);
    }
}

四、事务消息

4.1 事务监听器

@Component
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 1. 解析消息
            Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
            
            // 2. 执行本地事务
            orderService.createOrder(order);
            
            // 3. 返回提交
            return RocketMQLocalTransactionState.COMMIT;
            
        } catch (Exception e) {
            log.error("执行本地事务失败", e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 1. 解析消息
        Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
        
        try {
            // 2. 查询本地事务状态
            Order dbOrder = orderService.queryOrder(order.getId());
            
            if (dbOrder != null && dbOrder.getStatus() == OrderStatus.CREATED) {
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            log.error("回查事务状态失败", e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

4.2 事务 Producer

@Service
public class TransactionProducerService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送事务消息
     */
    public void sendTransactionMessage(String topic, Order order) {
        Message msg = MessageBuilder.withPayload(order)
            .setHeader(MessageConst.PROPERTY_TRANSACTION_ID, UUID.randomUUID().toString())
            .build();
        
        SendResult result = rocketMQTemplate.sendMessageInTransaction(
            topic, 
            msg, 
            order  // 业务参数
        );
        
        switch (result.getSendStatus()) {
            case SEND_OK:
                log.info("事务消息发送成功");
                break;
            case SEND_MESSAGE_ILLEGAL:
                log.error("消息非法");
                break;
            default:
                log.error("发送失败:{}", result.getSendStatus());
        }
    }
}

五、实战案例

5.1 订单处理

@Service
public class OrderService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    /**
     * 创建订单
     */
    @Transactional
    public Order createOrder(OrderDTO dto) {
        // 1. 创建订单
        Order order = new Order();
        order.setId(UUID.randomUUID().toString());
        order.setUserId(dto.getUserId());
        order.setAmount(dto.getAmount());
        order.setStatus(OrderStatus.CREATED);
        
        orderRepository.save(order);
        
        // 2. 发送订单创建事件
        rocketMQTemplate.convertAndSend("order-events", 
            OrderEvent.builder()
                .type("ORDER_CREATED")
                .orderId(order.getId())
                .build());
        
        return order;
    }
}

// 订单事件消费者
@Component
@RocketMQMessageListener(
    topic = "order-events",
    consumerGroup = "order-event-consumer"
)
public class OrderEventConsumer implements RocketMQListener<OrderEvent> {
    
    @Override
    public void onMessage(OrderEvent event) {
        switch (event.getType()) {
            case "ORDER_CREATED":
                handleOrderCreated(event.getOrderId());
                break;
            case "ORDER_PAID":
                handleOrderPaid(event.getOrderId());
                break;
            case "ORDER_SHIPPED":
                handleOrderShipped(event.getOrderId());
                break;
        }
    }
}

5.2 日志收集

@Service
public class LogCollector {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 收集日志
     */
    public void collectLog(LogEntry log) {
        rocketMQTemplate.convertAndSend("logs-topic", log);
    }
}

// 日志拦截器
@Component
public class LogInterceptor implements HandlerInterceptor {
    
    @Autowired
    private LogCollector logCollector;
    
    @Override
    public void afterCompletion(HttpServletRequest request, 
                               HttpServletResponse response,
                               Object handler, Exception ex) {
        LogEntry log = LogEntry.builder()
            .service("api-gateway")
            .method(request.getMethod())
            .url(request.getRequestURI())
            .status(response.getStatus())
            .timestamp(System.currentTimeMillis())
            .build();
        
        // 异步发送
        CompletableFuture.runAsync(() -> logCollector.collectLog(log));
    }
}

5.3 配置中心

@Component
public class ConfigCenter {
    
    private final Map<String, Object> configs = new ConcurrentHashMap<>();
    
    @RocketMQMessageListener(
        topic = "config-center",
        consumerGroup = "config-subscriber"
    )
    public class ConfigListener implements RocketMQListener<ConfigUpdateEvent> {
        
        @Override
        public void onMessage(ConfigUpdateEvent event) {
            log.info("配置更新:key={}, value={}", event.getKey(), event.getValue());
            configs.put(event.getKey(), event.getValue());
        }
    }
    
    public Object getConfig(String key) {
        return configs.get(key);
    }
}

六、错误处理

6.1 重试配置

@Configuration
public class RocketMQRetryConfig {
    
    @Bean
    public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer");
        consumer.setNamesrvAddr("ns1:9876");
        
        // 重试配置
        consumer.setMaxReconsumeTimes(3);
        consumer.setConsumeTimeout(15, TimeUnit.MINUTES);
        
        return consumer;
    }
}

6.2 死信队列

@Component
@RocketMQMessageListener(
    topic = "%DLQ%order-consumer-group",
    consumerGroup = "dead-letter-consumer-group"
)
public class DeadLetterConsumer implements RocketMQListener<MessageExt> {
    
    @Override
    public void onMessage(MessageExt msg) {
        // 处理死信消息
        log.error("死信消息:msgId={}", msg.getMsgId());
        
        // 保存失败消息
        saveFailedMessage(msg);
        
        // 发送告警
        sendAlert("死信消息:msgId=" + msg.getMsgId());
    }
}

6.3 异常处理

@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        try {
            processOrder(order);
        } catch (BusinessException e) {
            // 业务异常,记录日志
            log.error("业务异常:orderId={}", order.getId(), e);
            saveFailedMessage(order, e);
            
        } catch (Exception e) {
            // 系统异常,抛出重试
            log.error("系统异常:orderId={}", order.getId(), e);
            throw e;
        }
    }
}

七、监控运维

7.1 健康检查

@Component
public class RocketMQHealthIndicator implements HealthIndicator {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Override
    public Health health() {
        try {
            // 发送测试消息
            rocketMQTemplate.syncSend("health-check", "test");
            return Health.up().build();
        } catch (Exception e) {
            return Health.down(e).build();
        }
    }
}

7.2 监控指标

@Component
public class RocketMQMetrics {
    
    private final MeterRegistry meterRegistry;
    
    public RocketMQMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        
        // 注册指标
        meterRegistry.gauge("rocketmq.producer.send.count", sendCount);
        meterRegistry.gauge("rocketmq.consumer.consume.count", consumeCount);
        meterRegistry.gauge("rocketmq.consumer.failed.count", failedCount);
    }
    
    private final AtomicLong sendCount = new AtomicLong();
    private final AtomicLong consumeCount = new AtomicLong();
    private final AtomicLong failedCount = new AtomicLong();
}

八、最佳实践

8.1 配置建议

# 生产环境配置
rocketmq:
  name-server: ns1:9876;ns2:9876
  
  producer:
    group: prod-producer-group
    send-message-timeout: 5000
    retry-times-when-send-failed: 3
    retry-times-when-send-async-failed: 3
    max-message-size: 4194304
    vip-channel-enabled: false
  
  consumer:
    listeners:
      - topic: order-topic
        group: order-consumer-group
        consume-thread-max: 64
        message-model: CLUSTERING

8.2 代码规范

// ✅ 推荐
@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        process(order);
    }
}

// ❌ 不推荐
@Component
@RocketMQMessageListener(topic = "order-topic")  // 缺少 consumerGroup
public class OrderConsumer {
    // ...
}

8.3 检查清单

开发检查:
- [ ] 配置 NameServer 地址
- [ ] 配置 Producer Group
- [ ] 配置 Consumer Group
- [ ] 实现错误处理
- [ ] 配置重试机制

运维检查:
- [ ] 监控发送成功率
- [ ] 监控消费滞后
- [ ] 配置告警规则
- [ ] 定期备份配置

总结

Spring Boot 集成 RocketMQ 的核心要点:

  1. 基础配置:依赖、YAML 配置、Bean 配置
  2. Producer 开发:RocketMQTemplate、自定义配置、批量发送
  3. Consumer 开发:@RocketMQMessageListener、并发消费、顺序消费
  4. 事务消息:事务监听器、事务 Producer
  5. 错误处理:重试配置、死信队列、异常处理
  6. 监控运维:健康检查、指标监控

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 系列完整学习指南
下一篇文章
Redis 系列完整学习指南