RocketMQ 事务消息是其区别于其他消息队列的核心特性之一,广泛应用于分布式事务场景。本文将深入探讨事务消息的实现原理和实战应用。
一、事务消息基础
1.1 什么是事务消息?
事务消息是一种保证本地事务和消息发送要么都成功、要么都失败的机制:
sequenceDiagram
participant P as Producer
participant MQ as RocketMQ
participant DB as 本地数据库
participant C as Consumer
P->>MQ: 1. 发送半消息
MQ-->>P: 2. 返回 PREPARE
P->>DB: 3. 执行本地事务
DB-->>P: 4. 返回结果
alt 事务成功
P->>MQ: 5a. 提交消息
else 事务失败
P->>MQ: 5b. 回滚消息
end
MQ-->>C: 6. 投递消息(仅提交后)
1.2 应用场景
| 场景 | 说明 | 示例 |
|---|---|---|
| 支付成功通知 | 支付成功后通知其他系统 | 支付→积分、优惠券 |
| 订单创建 | 订单创建后通知库存 | 订单→库存扣减 |
| 数据同步 | 数据库变更后同步到其他系统 | DB→ES、缓存 |
| 注册流程 | 用户注册后发送邮件 | 注册→邮件、短信 |
1.3 与传统事务对比
| 特性 | 2PC/XA | RocketMQ 事务消息 |
|---|---|---|
| 一致性 | 强一致 | 最终一致 |
| 性能 | 低 | 高 |
| 可用性 | 低 | 高 |
| 适用场景 | 金融核心 | 大部分业务 |
二、实现原理
2.1 事务消息模型
graph TB
subgraph 事务流程
A[发送半消息] --> B{MQ 返回 PREPARE}
B --> C[执行本地事务]
C --> D{事务结果?}
D -->|COMMIT | E[提交消息]
D -->|ROLLBACK | F[回滚消息]
D -->|UNKNOWN | G[等待回查]
G --> H[事务回查]
H --> E
end
2.2 半消息(Half Message)
半消息是事务消息的核心概念:
// 半消息主题
String halfTopic = MessageConst.TOPIC_HALF_MESSAGE;
// 半消息格式
MessageExt msg = new MessageExt();
msg.setTopic(halfTopic); // 特殊主题
msg.setBody(body);
msg.putUserProperty(MessageConst.PROPERTY_REAL_TOPIC, realTopic);
msg.putUserProperty(MessageConst.PROPERTY_REAL_MESSAGE_ID, realMsgId);
特点:
- 存储在特殊的 RMQ_SYS_TRANS_HALF_TOPIC
- 对消费者不可见
- 只有提交后才对消费者可见
2.3 事务状态
public enum LocalTransactionState {
/**
* 提交事务,消息可被消费
*/
COMMIT_MESSAGE,
/**
* 回滚事务,消息被删除
*/
ROLLBACK_MESSAGE,
/**
* 状态未知,等待回查
*/
UNKNOW
}
三、代码实现
3.1 事务监听器
public class OrderTransactionListener implements TransactionListener {
@Autowired
private OrderService orderService;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 1. 解析消息
String orderJson = new String(msg.getBody());
Order order = JSON.parseObject(orderJson, Order.class);
try {
// 2. 执行本地事务
orderService.createOrder(order);
// 3. 返回提交
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("创建订单失败", e);
// 4. 返回回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 1. 解析消息
String orderJson = new String(msg.getBody());
Order order = JSON.parseObject(orderJson, Order.class);
try {
// 2. 查询本地事务状态
Order dbOrder = orderService.queryOrder(order.getOrderId());
if (dbOrder != null && dbOrder.getStatus() == OrderStatus.CREATED) {
log.info("订单已创建,提交消息:orderId={}", order.getOrderId());
return LocalTransactionState.COMMIT_MESSAGE;
} else {
log.warn("订单不存在,回滚消息:orderId={}", order.getOrderId());
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
log.error("回查事务状态失败", e);
return LocalTransactionState.UNKNOW;
}
}
}
3.2 事务生产者
@Configuration
public class TransactionProducerConfig {
@Bean
public TransactionMQProducer transactionProducer() throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-group");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(new OrderTransactionListener());
// 配置线程池
ExecutorService executorService = new ThreadPoolExecutor(
2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
new ThreadFactoryImpl("transaction-thread")
);
producer.setExecutorService(executorService);
// 配置回查参数
producer.setSendMsgTimeout(3000);
producer.start();
log.info("事务生产者启动成功");
return producer;
}
}
3.3 发送事务消息
@Service
public class OrderService {
@Autowired
private TransactionMQProducer producer;
/**
* 创建订单并发送事务消息
*/
public void createOrderWithMessage(Order order) {
// 1. 构建消息
Message msg = new Message("order-topic", "create",
JSON.toJSONString(order).getBytes());
// 2. 发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(msg, order);
// 3. 处理结果
switch (result.getSendStatus()) {
case SEND_OK:
log.info("事务消息发送成功:msgId={}", result.getMsgId());
break;
case SEND_MESSAGE_ILLEGAL:
log.error("消息非法");
throw new BusinessException("消息非法");
default:
log.error("发送失败:{}", result.getSendStatus());
throw new BusinessException("发送失败");
}
}
}
四、回查机制
4.1 回查触发条件
graph TD
A[发送半消息] --> B{等待响应}
B -->|超时 | C[触发回查]
B -->|UNKNOW | C
C --> D[定时回查]
D --> E{回查间隔?}
E -->|第一次 | F[60 秒后]
E -->|第二次 | G[60 秒后]
E -->|后续 | H[60 秒后]
H --> I{超过回查上限?}
I -->|是 | J[回滚消息]
I -->|否 | D
4.2 回查配置
# Broker 配置
transactionCheckInterval=60000 # 回查间隔 60 秒
transactionCheckMax=15 # 最大回查次数
4.3 回查流程
// Broker 端回查逻辑
public void checkTransactionState(String brokerAddr, MessageExt msg) {
// 1. 构建回查请求
CheckTransactionStateRequestHeader header = new CheckTransactionStateRequestHeader();
header.setMsgId(msg.getMsgId());
header.setTransactionId(msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
header.setCommitLogOffset(msg.getCommitLogOffset());
// 2. 发送给 Producer
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.CHECK_TRANSACTION_STATE, header);
remotingClient.invokeOneway(brokerAddr, request, 3000);
}
// Producer 端处理回查
@Override
public void processRequest(Channel channel, RemotingCommand request) {
CheckTransactionStateRequestHeader header = ...;
// 1. 回查本地事务
LocalTransactionState state = listener.checkLocalTransaction(msg);
// 2. 返回结果
switch (state) {
case COMMIT_MESSAGE:
sendCommitRequest(channel, header);
break;
case ROLLBACK_MESSAGE:
sendRollbackRequest(channel, header);
break;
case UNKNOW:
// 继续等待下次回查
log.warn("事务状态未知,等待下次回查");
break;
}
}
五、最佳实践
5.1 事务表方案
/**
* 本地事务表
*/
@Entity
@Table(name = "transaction_message")
public class TransactionMessage {
@Id
private String msgId;
private String topic;
private String tags;
private String body;
private String businessId; // 业务 ID
private String status; // SENDING, COMMITTED, ROLLBACK
private LocalDateTime createTime;
private LocalDateTime lastCheckTime;
private Integer checkTimes; // 回查次数
}
/**
* 事务消息服务
*/
@Service
public class TransactionMessageService {
@Transactional
public void saveAndSend(TransactionMessage txMsg, Message mqMsg) {
// 1. 保存事务消息到数据库
transactionMessageMapper.insert(txMsg);
// 2. 发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(mqMsg, txMsg);
// 3. 更新状态
if (result.getSendStatus() == SendStatus.SEND_OK) {
txMsg.setStatus("SENDING");
transactionMessageMapper.update(txMsg);
}
}
/**
* 定时任务:检查未提交的消息
*/
@Scheduled(fixedRate = 60000)
public void checkUncommittedMessages() {
List<TransactionMessage> messages = transactionMessageMapper
.selectByStatus("SENDING");
for (TransactionMessage msg : messages) {
// 检查业务状态
boolean businessSuccess = checkBusinessStatus(msg.getBusinessId());
if (businessSuccess) {
msg.setStatus("COMMITTED");
} else {
msg.setStatus("ROLLBACK");
}
transactionMessageMapper.update(msg);
}
}
}
5.2 幂等性保证
@Service
public class OrderConsumer {
@Autowired
private OrderMapper orderMapper;
/**
* 消费订单消息(保证幂等性)
*/
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group"
)
public class OrderListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 1. 检查是否已处理
Order existing = orderMapper.selectByOrderId(order.getOrderId());
if (existing != null) {
log.info("订单已处理,跳过:orderId={}", order.getOrderId());
return;
}
// 2. 处理订单
try {
orderMapper.insert(order);
log.info("订单处理成功:orderId={}", order.getOrderId());
} catch (DuplicateKeyException e) {
// 并发情况下可能重复,忽略
log.info("订单已存在:orderId={}", order.getOrderId());
}
}
}
}
5.3 补偿机制
/**
* 事务补偿服务
*/
@Service
public class TransactionCompensateService {
/**
* 定时补偿未提交的事务
*/
@Scheduled(fixedRate = 300000) // 5 分钟
public void compensateUncommittedTransactions() {
// 1. 查询超过 5 分钟未提交的事务
List<TransactionMessage> messages = transactionMessageMapper
.selectUncommitted(LocalDateTime.now().minusMinutes(5));
for (TransactionMessage msg : messages) {
try {
// 2. 检查业务状态
boolean success = checkBusinessStatus(msg.getBusinessId());
if (success) {
// 3. 重新发送消息
Message mqMsg = new Message(msg.getTopic(), msg.getTags(), msg.getBody());
producer.send(mqMsg);
msg.setStatus("COMMITTED");
} else {
msg.setStatus("ROLLBACK");
}
transactionMessageMapper.update(msg);
} catch (Exception e) {
log.error("补偿失败:msgId={}", msg.getMsgId(), e);
}
}
}
private boolean checkBusinessStatus(String businessId) {
// 查询业务系统确认状态
// ...
return true;
}
}
六、监控与告警
6.1 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| 事务消息发送量 | 每秒发送的事务消息数 | - |
| 事务提交率 | 提交成功的事务比例 | < 99% |
| 平均回查次数 | 每次事务的平均回查次数 | > 3 |
| 未提交消息数 | 超过 5 分钟未提交的消息数 | > 100 |
6.2 监控实现
@Component
public class TransactionMetrics {
@Autowired
private MeterRegistry meterRegistry;
private final AtomicLong sendCount = new AtomicLong();
private final AtomicLong commitCount = new AtomicLong();
private final AtomicLong rollbackCount = new AtomicLong();
private final AtomicLong checkCount = new AtomicLong();
@PostConstruct
public void init() {
meterRegistry.gauge("transaction.send.count", sendCount);
meterRegistry.gauge("transaction.commit.count", commitCount);
meterRegistry.gauge("transaction.rollback.count", rollbackCount);
meterRegistry.gauge("transaction.check.count", checkCount);
}
public void recordSend() {
sendCount.incrementAndGet();
}
public void recordCommit() {
commitCount.incrementAndGet();
}
public void recordRollback() {
rollbackCount.incrementAndGet();
}
public void recordCheck() {
checkCount.incrementAndGet();
}
public double getCommitRate() {
long total = commitCount.get() + rollbackCount.get();
return total > 0 ? (double) commitCount.get() / total : 0;
}
}
七、常见问题排查
7.1 事务一直处于 UNKNOW 状态
原因:
- 本地事务执行时间过长
- 回查逻辑有问题
解决:
// 1. 优化本地事务执行时间
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 异步处理耗时操作
CompletableFuture.runAsync(() -> {
// 耗时操作
});
// 立即返回 UNKNOW,等待回查
return LocalTransactionState.UNKNOW;
}
// 2. 完善回查逻辑
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 查询数据库确认状态
boolean exists = orderService.exists(msg.getBusinessId());
return exists ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
7.2 消息重复消费
原因:
- 网络抖动导致重试
- 消费者重复处理
解决:
// 实现幂等性
@RedissonLock(key = "#order.orderId")
public void processOrder(Order order) {
// 1. 检查是否已处理
if (orderMapper.exists(order.getOrderId())) {
return;
}
// 2. 处理订单
orderMapper.insert(order);
}
7.3 回查过于频繁
原因:
- 本地事务执行慢
- 回查间隔设置过短
解决:
# 调整回查间隔
transactionCheckInterval=120000 # 2 分钟
# 优化本地事务
# 使用异步处理
# 减少数据库操作
总结
RocketMQ 事务消息的核心要点:
- 半消息机制:先发送半消息,执行本地事务后再提交
- 事务监听器:executeLocalTransaction + checkLocalTransaction
- 回查机制:Broker 定时回查 Producer 确认事务状态
- 最终一致性:保证本地事务和消息发送的最终一致
- 幂等性保证:消费者必须实现幂等性
核心要点:
- 理解事务消息的两阶段提交流程
- 正确实现事务监听器的两个方法
- 完善回查逻辑,避免消息丢失
- 消费者必须实现幂等性
- 监控事务提交率和未提交消息
参考资料
- RocketMQ 事务消息官方文档
- RocketMQ 事务消息源码
- 《RocketMQ 技术内幕》第 7 章