Skip to content
清晨的一缕阳光
返回

RocketMQ 集成

RocketMQ 集成

RocketMQ 简介

核心特性

高吞吐

可靠性

丰富功能

架构设计

┌─────────────┐      ┌─────────────┐
│  Producer   │      │   Consumer  │
│  (生产者)    │      │   (消费者)   │
└──────┬──────┘      └──────▲──────┘
       │                    │
       │ 发送消息            │ 消费消息
       ▼                    │
┌─────────────────────────────────┐
│         NameServer              │
│      (名字服务,无状态)          │
└─────────────────────────────────┘
       │                    │
       ▼                    │
┌─────────────────────────────────┐
│         Broker                  │
│    (消息存储和转发)              │
│  ┌─────────┐  ┌─────────┐      │
│  │ Master  │  │  Slave  │      │
│  └─────────┘  └─────────┘      │
└─────────────────────────────────┘

核心概念

主题(Topic)

标签(Tag)

消息队列(MessageQueue)

生产组(ProducerGroup)

消费组(ConsumerGroup)

快速开始

1. 部署 RocketMQ

Docker 部署

version: '3.8'

services:
  namesrv:
    image: apache/rocketmq:4.9.4
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    command: sh mqnamesrv
    environment:
      - JAVA_OPT_EXT=-server -Xms512m -Xmx512m

  broker:
    image: apache/rocketmq:4.9.4
    container_name: rmqbroker
    ports:
      - 10911:10911
      - 10909:10909
    depends_on:
      - namesrv
    environment:
      - NAMESRV_ADDR=namesrv:9876
      - JAVA_OPT_EXT=-server -Xms1g -Xmx1g
    command: sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-a.properties
    volumes:
      - ./broker.conf:/home/rocketmq/conf/2m-2s-sync/broker-a.conf

2. 添加依赖

<dependencies>
    <!-- RocketMQ Spring Boot Starter -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>
</dependencies>

3. 基础配置

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: order-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2
  consumer:
    listeners: order-consumer-listener

4. 发送消息

同步发送

@Service
public class OrderProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public SendResult sendOrderCreated(Order order) {
        String topic = "TOPIC_ORDER";
        String tag = "CREATE";
        String key = order.getId().toString();
        
        // 发送消息
        SendResult result = rocketMQTemplate.syncSend(
            topic + ":" + tag,
            order,
            3000  // 超时时间
        );
        
        log.info("订单消息发送成功:{}", result);
        return result;
    }
}

异步发送

@Service
public class AsyncOrderProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendOrderCreated(Order order) {
        String topic = "TOPIC_ORDER";
        String tag = "CREATE";
        
        rocketMQTemplate.asyncSend(
            topic + ":" + tag,
            order,
            new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("消息发送成功:{}", sendResult);
                }
                
                @Override
                public void onError(Throwable e) {
                    log.error("消息发送失败", e);
                }
            },
            3000
        );
    }
}

单向发送

@Service
public class OneWayProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendLog(Log log) {
        // 不关心发送结果,只发送一次
        rocketMQTemplate.sendOneWay("TOPIC_LOG", log);
    }
}

5. 消费消息

基础消费者

@Component
@RocketMQMessageListener(
    topic = "TOPIC_ORDER",
    consumerGroup = "order-consumer-group",
    selectorExpression = "*",  // 订阅所有标签
    messageModel = MessageModel.CLUSTERING  // 集群消费
)
public class OrderConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        log.info("收到订单消息:{}", order);
        
        // 处理业务逻辑
        processOrder(order);
    }
    
    private void processOrder(Order order) {
        // 业务处理
    }
}

消息过滤

@Component
@RocketMQMessageListener(
    topic = "TOPIC_ORDER",
    consumerGroup = "order-consumer-group",
    selectorExpression = "CREATE"  // 只订阅 CREATE 标签
)
public class OrderCreateConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        // 处理订单创建
    }
}

SQL 过滤

@Component
@RocketMQMessageListener(
    topic = "TOPIC_ORDER",
    consumerGroup = "order-consumer-group",
    selectorType = SelectorType.SQL92,
    selectorExpression = "amount > 100 and status = 'PAID'"
)
public class OrderFilterConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        // 处理符合条件的订单
    }
}

高级特性

1. 事务消息

事务消息配置

@Component
@RocketMQTransactionListener(txProducerGroup = "order-tx-group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(
        Message msg, Object arg) {
        try {
            Order order = JSON.parseObject(
                new String(msg.getPayload()), 
                Order.class
            );
            
            // 执行本地事务
            orderService.createOrder(order);
            
            // 提交事务
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("执行本地事务失败", e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 检查本地事务状态
        Order order = JSON.parseObject(
            new String(msg.getPayload()), 
            Order.class
        );
        
        boolean exists = orderService.exists(order.getId());
        
        if (exists) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

发送事务消息

@Service
public class OrderTransactionProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Transactional
    public void sendOrderMessage(Order order) {
        Message<String> message = MessageBuilder.withPayload(
            JSON.toJSONString(order)
        ).build();
        
        // 发送事务消息
        rocketMQTemplate.sendMessageInTransaction(
            "order-tx-group",
            "TOPIC_ORDER:CREATE",
            message,
            null
        );
    }
}

2. 顺序消息

顺序消息发送

@Service
public class OrderSequenceProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendOrderSequence(Order order) {
        // 使用订单 ID 作为 shardingKey,保证同一订单的消息顺序
        String shardingKey = order.getId().toString();
        
        rocketMQTemplate.syncSendOrderly(
            "TOPIC_ORDER_SEQUENCE",
            order,
            shardingKey
        );
    }
}

顺序消息消费

@Component
@RocketMQMessageListener(
    topic = "TOPIC_ORDER_SEQUENCE",
    consumerGroup = "order-sequence-consumer-group",
    consumeMode = ConsumeMode.ORDERLY  // 顺序消费
)
public class OrderSequenceConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        // 按顺序处理订单消息
        processOrder(order);
    }
}

3. 延迟消息

发送延迟消息

@Service
public class DelayProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendCancelOrder(Long orderId) {
        // 延迟级别:1-18,对应不同的延迟时间
        // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        int delayLevel = 4;  // 30 秒后
        
        Message<String> message = MessageBuilder.withPayload(
            orderId.toString()
        ).build();
        
        rocketMQTemplate.syncSend(
            "TOPIC_ORDER_DELAY",
            message,
            3000,
            delayLevel
        );
    }
}

消费延迟消息

@Component
@RocketMQMessageListener(
    topic = "TOPIC_ORDER_DELAY",
    consumerGroup = "order-delay-consumer-group"
)
public class DelayConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String orderId) {
        // 取消订单
        orderService.cancelOrder(Long.parseLong(orderId));
    }
}

4. 批量消息

发送批量消息

@Service
public class BatchProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendBatchOrders(List<Order> orders) {
        // 批量发送(不超过 4MB)
        List<Message<Order>> messages = orders.stream()
            .map(order -> MessageBuilder.withPayload(order).build())
            .collect(Collectors.toList());
        
        rocketMQTemplate.syncSend("TOPIC_ORDER_BATCH", messages);
    }
}

5. 消息回溯

配置回溯

@Component
@RocketMQMessageListener(
    topic = "TOPIC_ORDER",
    consumerGroup = "order-consumer-group",
    consumeTimestamp = "20260410100000"  // 回溯到指定时间
)
public class OrderBacktrackConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        // 消费历史消息
    }
}

消息可靠性

1. 消息不丢失

生产者保证

// 同步发送,等待 Broker 确认
SendResult result = rocketMQTemplate.syncSend(topic, message);
if (result.getSendStatus() == SendStatus.SEND_OK) {
    // 发送成功
} else {
    // 发送失败,重试或记录
}

// 失败重试
rocketMQTemplate.syncSend(
    topic, 
    message,
    3000,  // 超时时间
    3      // 重试次数
);

Broker 保证

# broker.conf
# 同步刷盘
flushDiskType=SYNC_FLUSH

# 同步复制
brokerRole=SYNC_MASTER

# 刷盘落盘间隔
flushInterval=1

消费者保证

@Component
@RocketMQMessageListener(
    topic = "TOPIC_ORDER",
    consumerGroup = "order-consumer-group",
    ackTimeout = 3000  // 确认超时
)
public class ReliableConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        try {
            // 业务处理
            processOrder(order);
            
            // 自动 ack
        } catch (Exception e) {
            log.error("消费失败", e);
            // 抛出异常,消息会重新投递
            throw e;
        }
    }
}

2. 消息重复消费

幂等性处理

@Component
public class IdempotentConsumer implements RocketMQListener<Order> {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Override
    public void onMessage(Order order) {
        String key = "order:processed:" + order.getId();
        
        // 使用 Redis 实现幂等
        Boolean isNew = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", 24, TimeUnit.HOURS);
        
        if (Boolean.FALSE.equals(isNew)) {
            log.warn("订单已处理,跳过:{}", order.getId());
            return;
        }
        
        // 处理业务
        processOrder(order);
    }
}

数据库唯一约束

CREATE TABLE `order` (
  `id` BIGINT PRIMARY KEY,
  `order_no` VARCHAR(64) UNIQUE NOT NULL,  -- 唯一约束
  ...
);

3. 消息积压处理

临时扩容

// 部署多个消费者实例
// 增加消费线程数
@Component
@RocketMQMessageListener(
    topic = "TOPIC_ORDER",
    consumerGroup = "order-consumer-group",
    consumeThreadMax = 64  // 增加消费线程
)
public class OrderConsumer implements RocketMQListener<Order> {
    // ...
}

批量消费

@Component
@RocketMQMessageListener(
    topic = "TOPIC_ORDER",
    consumerGroup = "order-consumer-group",
    consumeMode = ConsumeMode.CONCURRENTLY
)
public class BatchConsumer implements RocketMQMessageListenerConcurrently<Order> {
    
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
        List<MessageExt> msgs,
        ConsumeConcurrentlyContext context
    ) {
        // 批量处理消息
        for (MessageExt msg : msgs) {
            processMessage(msg);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

监控告警

1. 监控指标

生产者指标

消费者指标

Broker 指标

2. RocketMQ Dashboard

部署 Dashboard

docker run -d \
  -p 8080:8080 \
  -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876" \
  --name rocketmq-dashboard \
  apache/rocketmq-dashboard:latest

监控功能

3. 告警配置

# Prometheus 告警规则
groups:
  - name: rocketmq
    rules:
      - alert: RocketMQConsumerLag
        expr: rocketmq_consumer_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "RocketMQ 消费积压"
          description: "消费积压超过 10000 条"
      
      - alert: RocketMQSendFailed
        expr: rate(rocketmq_send_failed[5m]) > 0.1
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "RocketMQ 发送失败率高"
          description: "发送失败率超过 10%"

最佳实践

1. Topic 设计

按业务划分

TOPIC_ORDER - 订单相关
TOPIC_PAYMENT - 支付相关
TOPIC_INVENTORY - 库存相关
TOPIC_LOGISTICS - 物流相关

按重要性划分

TOPIC_CRITICAL - 核心业务
TOPIC_NORMAL - 普通业务
TOPIC_LOG - 日志类

2. 消息设计

消息体大小

消息格式

{
  "id": "123456",
  "type": "ORDER_CREATE",
  "timestamp": 1680000000000,
  "data": {
    "orderId": "123456",
    "userId": "789",
    "amount": 100.00
  }
}

3. 消费优化

批量消费

// 批量处理,减少数据库交互
List<Order> batch = new ArrayList<>();
for (MessageExt msg : msgs) {
    batch.add(parseOrder(msg));
    if (batch.size() >= 100) {
        batchProcess(batch);
        batch.clear();
    }
}

异步处理

@Override
public void onMessage(Order order) {
    // 快速 ack,异步处理
    CompletableFuture.runAsync(() -> {
        processOrder(order);
    });
}

4. 故障处理

死信队列

// 消息重试多次失败后进入死信队列
// %DLQ%consumerGroup 是死信队列名称
@Component
@RocketMQMessageListener(
    topic = "%DLQ%order-consumer-group",
    consumerGroup = "dlq-consumer-group"
)
public class DeadLetterConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        // 记录死信消息,人工处理
        log.error("死信消息:{}", order);
    }
}

重试策略

// 配置重试次数
@Component
@RocketMQMessageListener(
    topic = "TOPIC_ORDER",
    consumerGroup = "order-consumer-group",
    maxReconsumeTimes = 5  // 最多重试 5 次
)
public class OrderConsumer implements RocketMQListener<Order> {
    // ...
}

总结

RocketMQ 是高性能、高可靠的消息中间件,支持事务消息、顺序消息、延迟消息等丰富特性。

与 Spring Cloud 集成后,可以构建高可靠的异步通信架构,实现服务解耦、流量削峰、数据同步等功能。

在生产环境中,需要做好消息可靠性保障、监控告警和故障处理,确保消息系统的稳定运行。


分享这篇文章到:

上一篇文章
Spring Boot Redis 缓存集成实战
下一篇文章
Spring Boot RBAC 权限控制