Skip to content
清晨的一缕阳光
返回

Spring Boot RocketMQ 消息队列集成

前言

RocketMQ 是阿里开源的分布式消息中间件,具有高吞吐、高可用、低延迟等特点。Spring Boot 通过 rocketmq-spring-boot-starter 可以方便地集成 RocketMQ。

快速开始

1. 添加依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.0</version>
</dependency>

2. 基础配置

rocketmq:
  name-server: localhost:9876
  producer:
    group: demo-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2

3. 发送消息

@Service
@RequiredArgsConstructor
public class MessageService {
    
    private final RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送普通消息
     */
    public void sendNormalMessage() {
        rocketMQTemplate.convertAndSend(
            "demo-topic",
            "Hello RocketMQ"
        );
    }
    
    /**
     * 发送对象消息
     */
    public void sendObjectMessage(UserDTO user) {
        rocketMQTemplate.convertAndSend(
            "demo-topic",
            user
        );
    }
    
    /**
     * 发送带标签的消息
     */
    public void sendWithTag() {
        rocketMQTemplate.convertAndSend(
            "demo-topic:tag1",
            "Message with tag"
        );
    }
    
    /**
     * 发送同步消息
     */
    public void sendSync() {
        SendResult result = rocketMQTemplate.syncSend(
            "demo-topic",
            "Sync message"
        );
        log.info("发送结果:{}", result.getSendStatus());
    }
    
    /**
     * 发送异步消息
     */
    public void sendAsync() {
        rocketMQTemplate.asyncSend(
            "demo-topic",
            "Async message",
            new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("发送成功:{}", sendResult);
                }
                
                @Override
                public void onException(Throwable e) {
                    log.error("发送失败", e);
                }
            }
        );
    }
    
    /**
     * 发送单向消息
     */
    public void sendOneWay() {
        rocketMQTemplate.sendOneWay(
            "demo-topic",
            "One way message"
        );
    }
}

4. 消费消息

@Component
@RocketMQMessageListener(
    topic = "demo-topic",
    consumerGroup = "demo-consumer-group",
    selectorExpression = "*",
    messageModel = MessageModel.CLUSTERING
)
public class DemoConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        log.info("收到消息:{}", message);
        
        // 业务处理
        processMessage(message);
    }
    
    private void processMessage(String message) {
        // 业务逻辑
    }
}

5. 消费对象消息

@Component
@RocketMQMessageListener(
    topic = "demo-topic",
    consumerGroup = "demo-consumer-group-object"
)
public class UserConsumer implements RocketMQListener<UserDTO> {
    
    @Override
    public void onMessage(UserDTO user) {
        log.info("收到用户消息:{}", user);
        
        // 处理用户数据
        userService.process(user);
    }
}

高级特性

1. 顺序消息

@Service
@RequiredArgsConstructor
public class OrderMessageService {
    
    private final RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送顺序消息
     */
    public void sendOrderMessage(Order order) {
        // 使用订单 ID 作为哈希键,保证同一订单的消息顺序
        rocketMQTemplate.syncSendOrderly(
            "order-topic",
            order,
            String.valueOf(order.getId())
        );
    }
}
@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group",
    consumeMode = ConsumeMode.ORDERLY
)
public class OrderConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        log.info("处理订单:{}", order.getId());
        
        // 按顺序处理订单
        processOrder(order);
    }
}

2. 延迟消息

@Service
@RequiredArgsConstructor
public class DelayMessageService {
    
    private final RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送延迟消息
     * delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     */
    public void sendDelayMessage(Order order) {
        // 延迟 30 分钟检查订单支付状态
        rocketMQTemplate.syncSend(
            "order-delay-topic",
            MessageBuilder.withPayload(order)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 16) // 30m
                .build(),
            3000
        );
    }
}
@Component
@RocketMQMessageListener(
    topic = "order-delay-topic",
    consumerGroup = "order-delay-consumer-group"
)
public class OrderDelayConsumer implements RocketMQListener<Order> {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    public void onMessage(Order order) {
        // 检查订单是否支付
        if (!orderService.isPaid(order.getId())) {
            // 取消订单
            orderService.cancelOrder(order.getId());
        }
    }
}

3. 事务消息

@Service
@RequiredArgsConstructor
public class TransactionMessageService {
    
    private final RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送事务消息
     */
    @Transactional
    public void sendTransactionMessage(Order order) {
        // 1. 发送半消息
        SendResult result = rocketMQTemplate.sendMessageInTransaction(
            "order-transaction-topic",
            MessageBuilder.withPayload(order).build(),
            order
        );
        
        log.info("事务消息发送结果:{}", result.getSendStatus());
    }
}
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
    
    @Autowired
    private OrderService orderService;
    
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(
        Message message,
        Object arg
    ) {
        try {
            Order order = (Order) arg;
            
            // 执行本地事务:创建订单
            orderService.createOrder(order);
            
            // 提交事务
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("执行本地事务失败", e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    /**
     * 事务回查
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // RocketMQ 未收到确认时,会回调此方法检查事务状态
        String messageId = message.getHeaders().get("id").toString();
        
        boolean isCommitted = orderService.isOrderCreated(messageId);
        
        return isCommitted ? 
            RocketMQLocalTransactionState.COMMIT : 
            RocketMQLocalTransactionState.ROLLBACK;
    }
}

4. 批量消息

@Service
@RequiredArgsConstructor
public class BatchMessageService {
    
    private final RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送批量消息
     */
    public void sendBatchMessages(List<Order> orders) {
        List<Message> messages = orders.stream()
            .map(order -> MessageBuilder.withPayload(order)
                .setHeader(MessageConst.PROPERTY_KEYS, String.valueOf(order.getId()))
                .build()
            )
            .collect(Collectors.toList());
        
        rocketMQTemplate.syncSend("batch-topic", messages);
    }
}

5. 消息过滤

@Component
@RocketMQMessageListener(
    topic = "filter-topic",
    consumerGroup = "filter-consumer-group",
    selectorExpression = "tag1 || tag2"  // SQL92 过滤
)
public class FilterConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        log.info("收到过滤后的消息:{}", message);
    }
}
@Service
@RequiredArgsConstructor
public class FilterMessageService {
    
    private final RocketMQTemplate rocketMQTemplate;
    
    public void sendWithFilter() {
        Message<String> message = MessageBuilder.withPayload("test").build();
        message.setHeader("type", "vip");
        
        rocketMQTemplate.send(
            "filter-topic:tag1",
            message
        );
    }
}

最佳实践

1. 消息幂等性

@Component
@RocketMQMessageListener(
    topic = "idempotent-topic",
    consumerGroup = "idempotent-consumer-group"
)
public class IdempotentConsumer implements RocketMQListener<Order> {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Override
    public void onMessage(Order order) {
        String key = "order:processed:" + order.getId();
        
        // 使用 Redis 实现幂等性
        Boolean isNew = redisTemplate.opsForValue()
            .setIfAbsent(key, "processed", 24, TimeUnit.HOURS);
        
        if (Boolean.FALSE.equals(isNew)) {
            log.warn("消息已处理,跳过:{}", order.getId());
            return;
        }
        
        // 处理业务
        processOrder(order);
    }
}

2. 消息重试

rocketmq:
  consumer:
    retry-times-when-consume-failed: 16  # 重试次数
    suspend-current-queue-time-millis: 1000  # 重试间隔
@Component
@RocketMQMessageListener(
    topic = "retry-topic",
    consumerGroup = "retry-consumer-group",
    maxReconsumeTimes = 5  // 最大重试次数
)
public class RetryConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        try {
            // 业务处理
            processMessage(message);
        } catch (Exception e) {
            log.error("处理失败,将重试", e);
            throw new RuntimeException(e); // 抛出异常触发重试
        }
    }
}

3. 死信队列

@Component
@RocketMQMessageListener(
    topic = "dlq-topic",
    consumerGroup = "dlq-consumer-group"
)
public class DLQConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        // 处理死信队列中的消息
        log.warn("处理死信消息:{}", message);
        
        // 记录日志、发送告警等
        alertService.sendAlert("死信消息:" + message);
    }
}

4. 消息追踪

@Component
@RocketMQMessageListener(
    topic = "trace-topic",
    consumerGroup = "trace-consumer-group"
)
public class TraceConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        // 记录消息处理日志
        log.info("收到消息,TraceId: {}", MDC.get("traceId"));
        
        processMessage(message);
    }
}

5. 监控告警

@Component
public class RocketMQMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    /**
     * 记录消息发送指标
     */
    public void recordSend(String topic, boolean success, long cost) {
        meterRegistry.counter("rocketmq.message.send",
            "topic", topic,
            "status", success ? "success" : "failed"
        ).increment();
        
        meterRegistry.timer("rocketmq.message.send.time",
            "topic", topic
        ).record(cost, TimeUnit.MILLISECONDS);
    }
    
    /**
     * 记录消息消费指标
     */
    public void recordConsume(String topic, boolean success, long cost) {
        meterRegistry.counter("rocketmq.message.consume",
            "topic", topic,
            "status", success ? "success" : "failed"
        ).increment();
    }
}

总结

RocketMQ 集成要点:

RocketMQ 是企业级消息中间件的优秀选择。


分享这篇文章到:

上一篇文章
Seata TCC 模式实战
下一篇文章
Prompt 工程基础与方法论