Skip to content
清晨的一缕阳光
返回

RocketMQ 事务消息详解与实战

RocketMQ 事务消息是其区别于其他消息队列的核心特性之一,广泛应用于分布式事务场景。本文将深入探讨事务消息的实现原理和实战应用。

一、事务消息基础

1.1 什么是事务消息?

事务消息是一种保证本地事务和消息发送要么都成功、要么都失败的机制:

sequenceDiagram
    participant P as Producer
    participant MQ as RocketMQ
    participant DB as 本地数据库
    participant C as Consumer
    
    P->>MQ: 1. 发送半消息
    MQ-->>P: 2. 返回 PREPARE
    P->>DB: 3. 执行本地事务
    DB-->>P: 4. 返回结果
    alt 事务成功
        P->>MQ: 5a. 提交消息
    else 事务失败
        P->>MQ: 5b. 回滚消息
    end
    MQ-->>C: 6. 投递消息(仅提交后)

1.2 应用场景

场景说明示例
支付成功通知支付成功后通知其他系统支付→积分、优惠券
订单创建订单创建后通知库存订单→库存扣减
数据同步数据库变更后同步到其他系统DB→ES、缓存
注册流程用户注册后发送邮件注册→邮件、短信

1.3 与传统事务对比

特性2PC/XARocketMQ 事务消息
一致性强一致最终一致
性能
可用性
适用场景金融核心大部分业务

二、实现原理

2.1 事务消息模型

graph TB
    subgraph 事务流程
        A[发送半消息] --> B{MQ 返回 PREPARE}
        B --> C[执行本地事务]
        C --> D{事务结果?}
        D -->|COMMIT | E[提交消息]
        D -->|ROLLBACK | F[回滚消息]
        D -->|UNKNOWN | G[等待回查]
        G --> H[事务回查]
        H --> E
    end

2.2 半消息(Half Message)

半消息是事务消息的核心概念:

// 半消息主题
String halfTopic = MessageConst.TOPIC_HALF_MESSAGE;

// 半消息格式
MessageExt msg = new MessageExt();
msg.setTopic(halfTopic);  // 特殊主题
msg.setBody(body);
msg.putUserProperty(MessageConst.PROPERTY_REAL_TOPIC, realTopic);
msg.putUserProperty(MessageConst.PROPERTY_REAL_MESSAGE_ID, realMsgId);

特点

2.3 事务状态

public enum LocalTransactionState {
    /**
     * 提交事务,消息可被消费
     */
    COMMIT_MESSAGE,
    
    /**
     * 回滚事务,消息被删除
     */
    ROLLBACK_MESSAGE,
    
    /**
     * 状态未知,等待回查
     */
    UNKNOW
}

三、代码实现

3.1 事务监听器

public class OrderTransactionListener implements TransactionListener {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 1. 解析消息
        String orderJson = new String(msg.getBody());
        Order order = JSON.parseObject(orderJson, Order.class);
        
        try {
            // 2. 执行本地事务
            orderService.createOrder(order);
            
            // 3. 返回提交
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            log.error("创建订单失败", e);
            // 4. 返回回滚
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 1. 解析消息
        String orderJson = new String(msg.getBody());
        Order order = JSON.parseObject(orderJson, Order.class);
        
        try {
            // 2. 查询本地事务状态
            Order dbOrder = orderService.queryOrder(order.getOrderId());
            
            if (dbOrder != null && dbOrder.getStatus() == OrderStatus.CREATED) {
                log.info("订单已创建,提交消息:orderId={}", order.getOrderId());
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                log.warn("订单不存在,回滚消息:orderId={}", order.getOrderId());
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            log.error("回查事务状态失败", e);
            return LocalTransactionState.UNKNOW;
        }
    }
}

3.2 事务生产者

@Configuration
public class TransactionProducerConfig {
    
    @Bean
    public TransactionMQProducer transactionProducer() throws MQClientException {
        TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-group");
        producer.setNamesrvAddr("localhost:9876");
        
        // 设置事务监听器
        producer.setTransactionListener(new OrderTransactionListener());
        
        // 配置线程池
        ExecutorService executorService = new ThreadPoolExecutor(
            2, 5, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2000),
            new ThreadFactoryImpl("transaction-thread")
        );
        producer.setExecutorService(executorService);
        
        // 配置回查参数
        producer.setSendMsgTimeout(3000);
        
        producer.start();
        log.info("事务生产者启动成功");
        
        return producer;
    }
}

3.3 发送事务消息

@Service
public class OrderService {
    
    @Autowired
    private TransactionMQProducer producer;
    
    /**
     * 创建订单并发送事务消息
     */
    public void createOrderWithMessage(Order order) {
        // 1. 构建消息
        Message msg = new Message("order-topic", "create", 
            JSON.toJSONString(order).getBytes());
        
        // 2. 发送事务消息
        TransactionSendResult result = producer.sendMessageInTransaction(msg, order);
        
        // 3. 处理结果
        switch (result.getSendStatus()) {
            case SEND_OK:
                log.info("事务消息发送成功:msgId={}", result.getMsgId());
                break;
            case SEND_MESSAGE_ILLEGAL:
                log.error("消息非法");
                throw new BusinessException("消息非法");
            default:
                log.error("发送失败:{}", result.getSendStatus());
                throw new BusinessException("发送失败");
        }
    }
}

四、回查机制

4.1 回查触发条件

graph TD
    A[发送半消息] --> B{等待响应}
    B -->|超时 | C[触发回查]
    B -->|UNKNOW | C
    C --> D[定时回查]
    D --> E{回查间隔?}
    E -->|第一次 | F[60 秒后]
    E -->|第二次 | G[60 秒后]
    E -->|后续 | H[60 秒后]
    H --> I{超过回查上限?}
    I -->|是 | J[回滚消息]
    I -->|否 | D

4.2 回查配置

# Broker 配置
transactionCheckInterval=60000  # 回查间隔 60 秒
transactionCheckMax=15  # 最大回查次数

4.3 回查流程

// Broker 端回查逻辑
public void checkTransactionState(String brokerAddr, MessageExt msg) {
    // 1. 构建回查请求
    CheckTransactionStateRequestHeader header = new CheckTransactionStateRequestHeader();
    header.setMsgId(msg.getMsgId());
    header.setTransactionId(msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    header.setCommitLogOffset(msg.getCommitLogOffset());
    
    // 2. 发送给 Producer
    RemotingCommand request = RemotingCommand.createRequestCommand(
        RequestCode.CHECK_TRANSACTION_STATE, header);
    
    remotingClient.invokeOneway(brokerAddr, request, 3000);
}

// Producer 端处理回查
@Override
public void processRequest(Channel channel, RemotingCommand request) {
    CheckTransactionStateRequestHeader header = ...;
    
    // 1. 回查本地事务
    LocalTransactionState state = listener.checkLocalTransaction(msg);
    
    // 2. 返回结果
    switch (state) {
        case COMMIT_MESSAGE:
            sendCommitRequest(channel, header);
            break;
        case ROLLBACK_MESSAGE:
            sendRollbackRequest(channel, header);
            break;
        case UNKNOW:
            // 继续等待下次回查
            log.warn("事务状态未知,等待下次回查");
            break;
    }
}

五、最佳实践

5.1 事务表方案

/**
 * 本地事务表
 */
@Entity
@Table(name = "transaction_message")
public class TransactionMessage {
    @Id
    private String msgId;
    
    private String topic;
    private String tags;
    private String body;
    
    private String businessId;  // 业务 ID
    private String status;      // SENDING, COMMITTED, ROLLBACK
    
    private LocalDateTime createTime;
    private LocalDateTime lastCheckTime;
    private Integer checkTimes;  // 回查次数
}

/**
 * 事务消息服务
 */
@Service
public class TransactionMessageService {
    
    @Transactional
    public void saveAndSend(TransactionMessage txMsg, Message mqMsg) {
        // 1. 保存事务消息到数据库
        transactionMessageMapper.insert(txMsg);
        
        // 2. 发送事务消息
        TransactionSendResult result = producer.sendMessageInTransaction(mqMsg, txMsg);
        
        // 3. 更新状态
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            txMsg.setStatus("SENDING");
            transactionMessageMapper.update(txMsg);
        }
    }
    
    /**
     * 定时任务:检查未提交的消息
     */
    @Scheduled(fixedRate = 60000)
    public void checkUncommittedMessages() {
        List<TransactionMessage> messages = transactionMessageMapper
            .selectByStatus("SENDING");
        
        for (TransactionMessage msg : messages) {
            // 检查业务状态
            boolean businessSuccess = checkBusinessStatus(msg.getBusinessId());
            
            if (businessSuccess) {
                msg.setStatus("COMMITTED");
            } else {
                msg.setStatus("ROLLBACK");
            }
            
            transactionMessageMapper.update(msg);
        }
    }
}

5.2 幂等性保证

@Service
public class OrderConsumer {
    
    @Autowired
    private OrderMapper orderMapper;
    
    /**
     * 消费订单消息(保证幂等性)
     */
    @RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-consumer-group"
    )
    public class OrderListener implements RocketMQListener<Order> {
        
        @Override
        public void onMessage(Order order) {
            // 1. 检查是否已处理
            Order existing = orderMapper.selectByOrderId(order.getOrderId());
            if (existing != null) {
                log.info("订单已处理,跳过:orderId={}", order.getOrderId());
                return;
            }
            
            // 2. 处理订单
            try {
                orderMapper.insert(order);
                log.info("订单处理成功:orderId={}", order.getOrderId());
            } catch (DuplicateKeyException e) {
                // 并发情况下可能重复,忽略
                log.info("订单已存在:orderId={}", order.getOrderId());
            }
        }
    }
}

5.3 补偿机制

/**
 * 事务补偿服务
 */
@Service
public class TransactionCompensateService {
    
    /**
     * 定时补偿未提交的事务
     */
    @Scheduled(fixedRate = 300000)  // 5 分钟
    public void compensateUncommittedTransactions() {
        // 1. 查询超过 5 分钟未提交的事务
        List<TransactionMessage> messages = transactionMessageMapper
            .selectUncommitted(LocalDateTime.now().minusMinutes(5));
        
        for (TransactionMessage msg : messages) {
            try {
                // 2. 检查业务状态
                boolean success = checkBusinessStatus(msg.getBusinessId());
                
                if (success) {
                    // 3. 重新发送消息
                    Message mqMsg = new Message(msg.getTopic(), msg.getTags(), msg.getBody());
                    producer.send(mqMsg);
                    
                    msg.setStatus("COMMITTED");
                } else {
                    msg.setStatus("ROLLBACK");
                }
                
                transactionMessageMapper.update(msg);
            } catch (Exception e) {
                log.error("补偿失败:msgId={}", msg.getMsgId(), e);
            }
        }
    }
    
    private boolean checkBusinessStatus(String businessId) {
        // 查询业务系统确认状态
        // ...
        return true;
    }
}

六、监控与告警

6.1 监控指标

指标说明告警阈值
事务消息发送量每秒发送的事务消息数-
事务提交率提交成功的事务比例< 99%
平均回查次数每次事务的平均回查次数> 3
未提交消息数超过 5 分钟未提交的消息数> 100

6.2 监控实现

@Component
public class TransactionMetrics {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final AtomicLong sendCount = new AtomicLong();
    private final AtomicLong commitCount = new AtomicLong();
    private final AtomicLong rollbackCount = new AtomicLong();
    private final AtomicLong checkCount = new AtomicLong();
    
    @PostConstruct
    public void init() {
        meterRegistry.gauge("transaction.send.count", sendCount);
        meterRegistry.gauge("transaction.commit.count", commitCount);
        meterRegistry.gauge("transaction.rollback.count", rollbackCount);
        meterRegistry.gauge("transaction.check.count", checkCount);
    }
    
    public void recordSend() {
        sendCount.incrementAndGet();
    }
    
    public void recordCommit() {
        commitCount.incrementAndGet();
    }
    
    public void recordRollback() {
        rollbackCount.incrementAndGet();
    }
    
    public void recordCheck() {
        checkCount.incrementAndGet();
    }
    
    public double getCommitRate() {
        long total = commitCount.get() + rollbackCount.get();
        return total > 0 ? (double) commitCount.get() / total : 0;
    }
}

七、常见问题排查

7.1 事务一直处于 UNKNOW 状态

原因

解决

// 1. 优化本地事务执行时间
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    // 异步处理耗时操作
    CompletableFuture.runAsync(() -> {
        // 耗时操作
    });
    
    // 立即返回 UNKNOW,等待回查
    return LocalTransactionState.UNKNOW;
}

// 2. 完善回查逻辑
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    // 查询数据库确认状态
    boolean exists = orderService.exists(msg.getBusinessId());
    return exists ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}

7.2 消息重复消费

原因

解决

// 实现幂等性
@RedissonLock(key = "#order.orderId")
public void processOrder(Order order) {
    // 1. 检查是否已处理
    if (orderMapper.exists(order.getOrderId())) {
        return;
    }
    
    // 2. 处理订单
    orderMapper.insert(order);
}

7.3 回查过于频繁

原因

解决

# 调整回查间隔
transactionCheckInterval=120000  # 2 分钟

# 优化本地事务
# 使用异步处理
# 减少数据库操作

总结

RocketMQ 事务消息的核心要点:

  1. 半消息机制:先发送半消息,执行本地事务后再提交
  2. 事务监听器:executeLocalTransaction + checkLocalTransaction
  3. 回查机制:Broker 定时回查 Producer 确认事务状态
  4. 最终一致性:保证本地事务和消息发送的最终一致
  5. 幂等性保证:消费者必须实现幂等性

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka Controller 控制器详解与高可用
下一篇文章
以日为鉴:穿越光环与困境,医生仍是好职业吗?