消息队列实战
消息队列是分布式系统的核心组件,用于解耦、异步、削峰。Kafka、RocketMQ、RabbitMQ 是主流选择。本文详解消息队列的应用场景、最佳实践和常见问题解决方案。
一、消息队列选型
1.1 主流 MQ 对比
| 特性 | Kafka | RocketMQ | RabbitMQ | Pulsar |
|---|---|---|---|---|
| 吞吐量 | 百万 + | 十万 + | 万级 | 百万 + |
| 延迟 | ms 级 | ms 级 | μs 级 | ms 级 |
| 可靠性 | 高 | 很高 | 高 | 高 |
| 消息顺序 | 分区有序 | 支持 | 支持 | 支持 |
| 事务消息 | 不支持 | 支持 | 不支持 | 支持 |
| 延迟消息 | 不支持 | 支持 | 支持 | 支持 |
| 适用场景 | 日志、大数据 | 交易、订单 | 低延迟 | 云原生 |
1.2 选型建议
graph TD
A[高吞吐量需求?] -->|是 | B[Kafka/Pulsar]
A -->|否 | C[需要事务消息?]
C -->|是 | D[RocketMQ]
C -->|否 | E[需要低延迟μs 级?]
E -->|是 | F[RabbitMQ]
E -->|否 | G[RocketMQ]
G --> H[云原生部署?]
H -->|是 | I[Pulsar]
H -->|否 | J[Kafka/RocketMQ]
二、Kafka 实战
2.1 基础配置
# Kafka 配置示例
spring:
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all # 所有副本确认
retries: 3 # 重试次数
batch-size: 16384 # 批次大小
buffer-memory: 33554432 # 缓冲区大小
compression-type: snappy # 压缩方式
properties:
enable.idempotence: true # 开启幂等
# 消费者配置
consumer:
group-id: myapp-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest # 从最新消费
enable-auto-commit: false # 手动提交
max-poll-records: 500 # 每次拉取数量
properties:
session.timeout.ms: 30000
heartbeat.interval.ms: 10000
2.2 生产者实现
/**
* Kafka 生产者服务
*/
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 发送消息(异步)
*/
public void send(String topic, String key, Object data) {
kafkaTemplate.send(topic, key, data)
.whenComplete((result, ex) -> {
if (ex == null) {
log.info("发送成功:topic={}, partition={}, offset={}",
result.getRecord().topic(),
result.getRecord().partition(),
result.getRecord().offset());
} else {
log.error("发送失败:topic={}", topic, ex);
}
});
}
/**
* 发送消息(同步)
*/
public void sendSync(String topic, String key, Object data) {
try {
SendResult result = kafkaTemplate.send(topic, key, data).get(5, TimeUnit.SECONDS);
log.info("发送成功:offset={}", result.getRecordMetadata().offset());
} catch (Exception e) {
log.error("发送失败", e);
throw new BusinessException("消息发送失败");
}
}
/**
* 发送事务消息
*/
@Transactional
public void sendTransactional(String topic, Object data) {
kafkaTemplate.executeInTransaction(operations -> {
operations.send(topic, "tx-key", data);
// 其他数据库操作...
return true;
});
}
}
/**
* 消息发送最佳实践
*/
@Component
public class BestPracticeProducer {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
/**
* 1. 消息压缩(大数据量)
*/
public void sendCompressed(OrderEvent event) {
// Kafka 配置 compression-type: snappy/lz4/zstd
kafkaTemplate.send("orders", event.getOrderId(), event);
}
/**
* 2. 消息分区(保证顺序)
*/
public void sendOrdered(OrderEvent event) {
// 相同 orderId 发送到同一分区
String key = event.getOrderId();
kafkaTemplate.send("orders", key, event);
}
/**
* 3. 消息去重(幂等性)
*/
public void sendIdempotent(OrderEvent event) {
// 开启 enable.idempotence: true
// 配合业务去重(Redis 记录已处理消息 ID)
kafkaTemplate.send("orders", event.getOrderId(), event);
}
/**
* 4. 消息确认(可靠性)
*/
public void sendWithAck(OrderEvent event) {
kafkaTemplate.send("orders", event.getOrderId(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
// 发送失败,记录日志或重试
log.error("发送失败", ex);
// 存入失败队列,稍后重试
saveToFailedQueue(event);
}
});
}
private void saveToFailedQueue(OrderEvent event) {
// 实现失败消息存储
}
}
2.3 消费者实现
/**
* Kafka 消费者服务
*/
@Component
public class KafkaConsumerService {
/**
* 基础消费
*/
@KafkaListener(topics = "orders", groupId = "order-group")
public void consumeOrder(ConsumerRecord<String, OrderEvent> record) {
log.info("收到消息:key={}, value={}, partition={}, offset={}",
record.key(), record.value(), record.partition(), record.offset());
// 处理业务
processOrder(record.value());
}
/**
* 手动 ack(保证消息不丢失)
*/
@KafkaListener(topics = "orders", groupId = "order-group")
public void consumeWithManualAck(
ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack) {
try {
// 处理业务
processOrder(record.value());
// 手动提交
ack.acknowledge();
} catch (Exception e) {
log.error("处理失败", e);
// 不提交,稍后重试
throw e; // 触发重试
}
}
/**
* 批量消费
*/
@KafkaListener(topics = "orders", groupId = "batch-group")
public void consumeBatch(List<ConsumerRecord<String, OrderEvent>> records) {
log.info("批量消费:{} 条", records.size());
for (ConsumerRecord<String, OrderEvent> record : records) {
processOrder(record.value());
}
}
/**
* 重试机制
*/
@Retryable(
value = Exception.class,
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
@KafkaListener(topics = "orders", groupId = "retry-group")
public void consumeWithRetry(ConsumerRecord<String, OrderEvent> record) {
processOrder(record.value());
}
/**
* 死信队列
*/
@KafkaListener(topics = "orders.dlq", groupId = "dlq-group")
public void consumeDLQ(ConsumerRecord<String, OrderEvent> record) {
log.error("死信消息:key={}, value={}", record.key(), record.value());
// 人工处理或告警
}
private void processOrder(OrderEvent event) {
// 业务处理逻辑
}
}
/**
* 重试和死信配置
*/
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 开启批量
factory.setBatchListener(true);
// 错误处理(死信队列)
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
(record, ex) -> {
// 死信处理
log.error("消息处理失败,发送到死信队列", ex);
sendToDLQ(record);
},
new FixedBackOff(1000L, 3L) // 重试 3 次,间隔 1 秒
);
// 不重试的异常
errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
private void sendToDLQ(ConsumerRecord<?, ?> record) {
// 发送到死信队列
}
}
三、RocketMQ 实战
3.1 基础配置
# RocketMQ 配置
rocketmq:
name-server: 192.168.1.100:9876;192.168.1.101:9876
producer:
group: order-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
consumer:
order-consumer:
group: order-consumer-group
topic: order-topic
consume-mode: CONCURRENTLY # 并发消费
message-model: CLUSTERING # 集群消费
pull-batch-size: 32
3.2 生产者实现
/**
* RocketMQ 生产者
*/
@Component
public class RocketMQProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*/
public void send(String topic, Object data) {
rocketMQTemplate.convertAndSend(topic, data);
}
/**
* 发送同步消息
*/
public void sendSync(String topic, Object data) {
SendResult result = rocketMQTemplate.syncSend(topic, data);
log.info("发送成功:msgId={}", result.getMsgId());
}
/**
* 发送异步消息
*/
public void sendAsync(String topic, Object data) {
rocketMQTemplate.asyncSend(topic, data, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功:msgId={}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("发送失败", e);
}
});
}
/**
* 发送顺序消息
*/
public void sendOrderly(String topic, Object data, String orderId) {
// 相同 orderId 发送到同一队列(保证顺序)
rocketMQTemplate.syncSendOrderly(topic, data, orderId);
}
/**
* 发送延迟消息
*/
public void sendDelay(String topic, Object data, int delayLevel) {
// delayLevel: 1-18 (1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h)
rocketMQTemplate.syncSend(topic,
MessageBuilder.withPayload(data)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel)
.build());
}
/**
* 发送事务消息
*/
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) arg;
// 1. 执行本地事务
orderService.create(order);
// 2. 返回提交
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("事务执行失败", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 事务回查(当 Broker 未收到确认时调用)
String orderId = msg.getHeaders().get("orderId").toString();
Order order = orderService.getById(orderId);
if (order != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 发送事务消息
*/
public void sendTransactionMessage(String topic, Order order) {
rocketMQTemplate.sendMessageInTransaction(
topic,
MessageBuilder.withPayload(order)
.setHeader("orderId", order.getId())
.build(),
order // 本地事务参数
);
}
}
3.3 消费者实现
/**
* RocketMQ 消费者
*/
@Component
public class RocketMQConsumerService {
/**
* 并发消费
*/
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING
)
public class OrderConsumer implements RocketMQListener<OrderEvent> {
@Override
public void onMessage(OrderEvent message) {
log.info("收到订单消息:{}", message.getOrderId());
processOrder(message);
}
}
/**
* 顺序消费
*/
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-sequence-group",
consumeMode = ConsumeMode.ORDERLY // 顺序消费
)
public class OrderlyConsumer implements RocketMQListener<OrderEvent> {
@Override
public void onMessage(OrderEvent message) {
// 同一订单的消息按顺序处理
processOrder(message);
}
}
/**
* 批量消费
*/
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-batch-group"
)
public class BatchConsumer implements RocketMQPushConsumerLifecycleListener {
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setConsumeMessageBatchMaxSize(32);
}
}
/**
* 延迟消息消费
*/
@RocketMQMessageListener(
topic = "order-delay-topic",
consumerGroup = "order-delay-group"
)
public class DelayConsumer implements RocketMQListener<OrderEvent> {
@Override
public void onMessage(OrderEvent message) {
// 处理延迟消息(如订单超时关闭)
closeOrder(message.getOrderId());
}
}
private void processOrder(OrderEvent message) {
// 业务处理
}
private void closeOrder(String orderId) {
// 关闭订单逻辑
}
}
3.4 订单超时关闭实战
/**
* 订单超时关闭(延迟消息实战)
*/
@Service
public class OrderTimeoutService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 创建订单时发送延迟消息
*/
@Transactional
public Order createOrder(OrderRequest request) {
// 1. 创建订单
Order order = new Order();
order.setId(IdGenerator.generate());
order.setStatus(OrderStatus.UNPAID);
orderMapper.insert(order);
// 2. 发送延迟消息(30 分钟后检查)
OrderTimeoutMessage timeoutMsg = new OrderTimeoutMessage();
timeoutMsg.setOrderId(order.getId());
timeoutMsg.setCreateTime(System.currentTimeMillis());
rocketMQTemplate.syncSend(
"order-timeout-topic",
timeoutMsg,
3000, // 超时时间
18 // 延迟级别(2 小时)
);
return order;
}
/**
* 消费延迟消息
*/
@RocketMQMessageListener(
topic = "order-timeout-topic",
consumerGroup = "order-timeout-group"
)
public class TimeoutConsumer implements RocketMQListener<OrderTimeoutMessage> {
@Autowired
private OrderService orderService;
@Override
@Transactional
public void onMessage(OrderTimeoutMessage message) {
String orderId = message.getOrderId();
// 检查订单状态
Order order = orderService.getById(orderId);
if (order != null && order.getStatus() == OrderStatus.UNPAID) {
// 关闭订单
orderService.closeOrder(orderId);
log.info("订单超时关闭:orderId={}", orderId);
}
}
}
}
四、常见问题
4.1 消息丢失
mindmap
root((消息丢失场景))
生产者发送失败
原因:网络问题/Broker 宕机
解决
同步发送 + 重试
事务消息
本地消息表
Broker 存储失败
原因:磁盘满/宕机
解决
同步刷盘
同步复制
Dledger 高可用
消费者消费失败
原因:处理异常/宕机
解决
手动 ack
重试机制
死信队列
4.2 消息重复
/**
* 消息去重(幂等性)
*/
@Service
public class IdempotentConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-idempotent-group"
)
public class IdempotentOrderConsumer implements RocketMQListener<OrderEvent> {
@Override
@Transactional
public void onMessage(OrderEvent message) {
String msgId = message.getMsgId();
String key = "msg:processed:" + msgId;
// 1. 检查是否已处理(Redis setnx)
Boolean added = redisTemplate.opsForValue()
.setIfAbsent(key, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(added)) {
log.warn("消息已处理:msgId={}", msgId);
return;
}
// 2. 处理业务
processOrder(message);
// 注意:Redis 写入和业务操作需保证原子性
// 可使用 Lua 脚本或分布式锁
}
}
private void processOrder(OrderEvent message) {
// 业务处理
}
}
4.3 消息积压
/**
* 消息积压处理
*/
@Component
public class BacklogHandler {
/**
* 监控积压量
*/
@Scheduled(fixedRate = 60000)
public void monitorBacklog() {
// 获取消费者积压量
long backlog = getConsumerBacklog("order-topic", "order-consumer-group");
if (backlog > 10000) {
log.warn("消息积压:{}", backlog);
// 告警
alertService.send("消息积压超过阈值:" + backlog);
// 自动扩容
if (backlog > 50000) {
scaleUpConsumers();
}
}
}
/**
* 临时扩容消费者
*/
private void scaleUpConsumers() {
// K8s HPA 自动扩容
// 或手动增加消费者实例
log.info("扩容消费者");
}
/**
* 积压消息处理(批量消费)
*/
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-backlog-group",
consumeThreadNumber = 64 // 增加消费线程
)
public class BacklogConsumer implements RocketMQListener<OrderEvent> {
@Override
public void onMessage(OrderEvent message) {
// 简化处理逻辑,提升消费速度
processSimple(message);
}
private void processSimple(OrderEvent message) {
// 快速处理
}
}
}
4.4 顺序消息
/**
* 顺序消息保证
*/
@Service
public class OrderMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送顺序消息
*/
public void sendOrderMessage(OrderEvent event) {
// 使用 orderId 作为 sharding key
// 相同 orderId 的消息发送到同一队列
rocketMQTemplate.syncSendOrderly(
"order-topic",
event,
event.getOrderId()
);
}
/**
* 顺序消费
*/
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-orderly-group",
consumeMode = ConsumeMode.ORDERLY // 顺序消费
)
public class OrderlyConsumer implements RocketMQListener<OrderEvent> {
@Override
public void onMessage(OrderEvent message) {
// RocketMQ 保证同一队列的消息按顺序投递
// 但需要保证处理成功(不抛异常)
try {
processOrder(message);
} catch (Exception e) {
log.error("顺序消费失败", e);
// 抛出异常,触发重试
// RocketMQ 会重试直到成功
throw e;
}
}
}
}
五、最佳实践
5.1 生产环境配置
# Kafka 生产配置
producer:
acks: all # 所有副本确认
retries: 3 # 重试次数
max.in.flight.requests.per.connection: 5 # 飞行请求数
enable.idempotence: true # 开启幂等
compression.type: snappy # 压缩
# RocketMQ 生产配置
producer:
send-message-timeout: 3000
retry-times-when-send-failed: 2
vip-channel-enabled: false # 关闭 VIP 通道
# 消费者配置
consumer:
consume-thread-min: 20
consume-thread-max: 64
pull-batch-size: 32
consume-message-batch-max-size: 32
5.2 监控告警
/**
* MQ 监控
*/
@Component
public class MQMonitor {
@Autowired
private MeterRegistry meterRegistry;
/**
* 记录发送指标
*/
public void recordSend(String topic, long latency, boolean success) {
Counter.builder("mq.send.total")
.tag("topic", topic)
.tag("success", String.valueOf(success))
.register(meterRegistry)
.increment();
Timer.builder("mq.send.latency")
.tag("topic", topic)
.register(meterRegistry)
.record(latency, TimeUnit.MILLISECONDS);
}
/**
* 记录消费指标
*/
public void recordConsume(String topic, long latency, boolean success) {
// 类似发送指标
}
}
5.3 设计原则
mindmap
root((MQ 设计原则))
推荐做法
消息体尽量小<10KB
重要消息持久化
开启手动 ack
实现消息去重
监控积压量
设置合理重试次数
避免做法
大消息>1MB
同步等待消费结果
消费者处理时间过长
忽略消息失败
无监控告警
六、总结
6.1 核心要点
- 选型:根据场景选择 Kafka/RocketMQ/RabbitMQ
- 可靠性:ack 机制、事务消息、持久化
- 幂等性:消息去重是必须的
- 顺序性:分区/队列有序 + 顺序消费
- 监控:积压量、发送延迟、消费延迟
6.2 应用场景
| 场景 | 推荐方案 |
|---|---|
| 日志收集 | Kafka |
| 订单处理 | RocketMQ |
| 实时计算 | Kafka |
| 异步解耦 | RocketMQ/Kafka |
| 延迟任务 | RocketMQ |
| 事件驱动 | Kafka/RocketMQ |
消息队列是分布式系统的血管,承载着系统间的通信。合理使用可以提升系统性能,滥用会带来灾难。