Skip to content
清晨的一缕阳光
返回

MySQL 分布式事务解决方案

核心概念

分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分布在多个节点上的事务。

分布式事务挑战

单机事务(ACID):
- 原子性:通过 Undo/Redo Log 保证
- 一致性:通过约束、触发器保证
- 隔离性:通过锁、MVCC 保证
- 持久性:通过 WAL 保证

分布式事务:
- 跨数据库、跨服务、跨网络
- 无法同时满足 CAP(最多满足两项)

CAP 定理

理论含义说明
Consistency一致性所有节点同一时刻数据一致
Availability可用性每个请求都能获得响应
Partition tolerance分区容错性网络分区不影响系统运行

结论: 分布式系统必须满足 P,只能在 C 和 A 之间权衡。

BASE 理论

理论含义说明
Basically Available基本可用允许损失部分可用性
Soft state软状态允许中间状态
Eventually consistent最终一致性最终达到一致即可

核心思想: 放弃强一致性,追求最终一致性。

分布式事务解决方案

方案 1:2PC(Two-Phase Commit)

原理

阶段 1:准备阶段(Prepare)
1. 协调者询问所有参与者是否可以提交
2. 参与者执行事务操作,但不提交
3. 参与者返回成功/失败

阶段 2:提交阶段(Commit/Rollback)
1. 所有参与者都成功 → 协调者发送 Commit
2. 有参与者失败 → 协调者发送 Rollback
3. 参与者执行提交/回滚,释放资源

流程

sequenceDiagram
    participant C as 协调者
    participant P1 as 参与者 1
    participant P2 as 参与者 2
    
    C->>P1: Prepare
    C->>P2: Prepare
    P1-->>C: Yes
    P2-->>C: Yes
    C->>P1: Commit
    C->>P2: Commit
    P1-->>C: ACK
    P2-->>C: ACK

MySQL XA 事务实现

-- 1. 开启 XA 事务
XA START 'xid1';

-- 2. 执行 SQL
UPDATE accounts SET balance = balance - 100 WHERE id = 1;

-- 3. 准备提交
XA END 'xid1';
XA PREPARE 'xid1';

-- 4. 提交
XA COMMIT 'xid1';

-- 或回滚
XA ROLLBACK 'xid1';

2PC 缺点

  1. 同步阻塞:所有参与者阻塞等待
  2. 单点故障:协调者故障导致系统不可用
  3. 数据不一致:协调者在阶段 2 故障可能导致不一致
  4. 性能差:两次网络往返,多次磁盘 IO

方案 2:3PC(Three-Phase Commit)

原理

阶段 1:CanCommit
- 协调者询问参与者是否可以提交
- 不执行事务,只检查资源

阶段 2:PreCommit
- 执行事务操作,但不提交
- 参与者准备提交

阶段 3:DoCommit
- 正式提交事务

3PC 改进

3PC 问题

方案 3:TCC(Try-Confirm-Cancel)

原理

Try 阶段:
- 检查资源,预留资源
- 不执行业务操作

Confirm 阶段:
- 执行实际业务
- 使用 Try 预留的资源

Cancel 阶段:
- 释放 Try 预留的资源
- 回滚操作

流程

sequenceDiagram
    participant C as 协调者
    participant P1 as 参与者 1
    participant P2 as 参与者 2
    
    C->>P1: Try
    C->>P2: Try
    P1-->>C: Success
    P2-->>C: Success
    C->>P1: Confirm
    C->>P2: Confirm
    P1-->>C: ACK
    P2-->>C: ACK

TCC 实现示例

// 1. Try 接口
@TccTransaction(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
public void transfer(Long fromId, Long toId, BigDecimal amount) {
    // Try:冻结资金
    accountService.freeze(fromId, amount);
}

// 2. Confirm 方法
public void confirmMethod(Long fromId, Long toId, BigDecimal amount) {
    // 实际扣减
    accountService.deduct(fromId, amount);
    accountService.add(toId, amount);
}

// 3. Cancel 方法
public void cancelMethod(Long fromId, Long toId, BigDecimal amount) {
    // 解冻资金
    accountService.unfreeze(fromId, amount);
}

// 服务实现
@Service
public class AccountService {
    
    // Try:冻结
    public void freeze(Long accountId, BigDecimal amount) {
        Account account = accountMapper.selectById(accountId);
        if (account.getBalance().compareTo(amount) < 0) {
            throw new BusinessException("余额不足");
        }
        // 冻结金额
        accountMapper.freeze(accountId, amount);
    }
    
    // Confirm:扣减
    public void deduct(Long accountId, BigDecimal amount) {
        accountMapper.deduct(accountId, amount);
    }
    
    // Cancel:解冻
    public void unfreeze(Long accountId, BigDecimal amount) {
        accountMapper.unfreeze(accountId, amount);
    }
}

TCC 优缺点

优点:

缺点:

TCC 关键问题处理

// 1. 幂等性:防止重复执行
public void confirmMethod(Long accountId, BigDecimal amount) {
    // 检查是否已执行
    if (tccRecordMapper.exists(accountId, "CONFIRMED")) {
        return;  // 已执行,直接返回
    }
    // 执行业务
    accountMapper.deduct(accountId, amount);
    // 记录状态
    tccRecordMapper.record(accountId, "CONFIRMED");
}

// 2. 防悬挂:Cancel 先于 Try 执行
public void cancelMethod(Long accountId, BigDecimal amount) {
    // 检查 Try 是否执行
    if (!tccRecordMapper.exists(accountId, "TRY")) {
        // Try 未执行,记录防止悬挂
        tccRecordMapper.record(accountId, "CANCELLED");
        return;
    }
    // 执行回滚
    accountMapper.unfreeze(accountId, amount);
}

// 3. 空回滚:Try 失败,Cancel 被调用
public void cancelMethod(Long accountId, BigDecimal amount) {
    // Try 未执行,直接返回(空回滚)
    if (!tccRecordMapper.exists(accountId, "TRY")) {
        return;
    }
    // 执行回滚
    accountMapper.unfreeze(accountId, amount);
}

方案 4:本地消息表

原理

1. 业务数据 + 消息记录在同一本地事务
2. 定时任务轮询消息表,发送消息
3. 消费者处理消息,实现最终一致性

流程

sequenceDiagram
    participant App as 应用
    participant DB as 数据库
    participant MQ as 消息队列
    participant Consumer as 消费者
    
    App->>DB: 业务操作 + 插入消息(同一事务)
    App->>App: 定时任务轮询消息表
    App->>MQ: 发送消息
    MQ->>Consumer: 消费消息
    Consumer->>Consumer: 执行业务

实现示例

// 1. 消息表
CREATE TABLE local_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    business_id VARCHAR(64),      -- 业务 ID
    message_type VARCHAR(32),     -- 消息类型
    message_body TEXT,            -- 消息内容
    status TINYINT,               -- 0:待发送 1:已发送 2:已完成
    retry_count INT DEFAULT 0,    -- 重试次数
    create_time DATETIME,
    send_time DATETIME,
    INDEX idx_status (status),
    INDEX idx_retry (status, retry_count)
);

// 2. 发送消息(本地事务)
@Transactional
public void createOrder(Order order) {
    // 1. 创建订单
    orderMapper.insert(order);
    
    // 2. 插入消息记录
    LocalMessage message = new LocalMessage();
    message.setBusinessId(order.getId());
    message.setMessageType("ORDER_CREATED");
    message.setMessageBody(JSON.toJSONString(order));
    message.setStatus(0);  // 待发送
    messageMapper.insert(message);
}

// 3. 定时任务发送消息
@Component
public class MessageSender {
    
    @Scheduled(fixedDelay = 5000)
    public void sendMessages() {
        // 查询待发送消息
        List<LocalMessage> messages = messageMapper.findPending(100);
        
        for (LocalMessage message : messages) {
            try {
                // 发送消息
                kafkaTemplate.send("order-topic", message.getMessageBody());
                
                // 更新状态
                message.setStatus(1);
                message.setSendTime(new Date());
                messageMapper.update(message);
            } catch (Exception e) {
                // 重试
                if (message.getRetryCount() >= 3) {
                    message.setStatus(4);  // 失败
                } else {
                    message.setRetryCount(message.getRetryCount() + 1);
                }
                messageMapper.update(message);
            }
        }
    }
}

// 4. 消费者处理
@KafkaListener(topics = "order-topic")
public void consume(String message) {
    Order order = JSON.parseObject(message, Order.class);
    // 执行业务逻辑
    inventoryService.decrease(order.getProductId(), order.getQuantity());
}

本地消息表优缺点

优点:

缺点:

方案 5:事务消息(RocketMQ)

原理

1. 发送 Half 消息(半消息,对消费者不可见)
2. 执行本地事务
3. 根据本地事务结果提交/回滚消息
4. 消费者消费消息

流程

sequenceDiagram
    participant Producer as 生产者
    participant MQ as RocketMQ
    participant Consumer as 消费者
    
    Producer->>MQ: 发送 Half 消息
    MQ-->>Producer: Half 消息 OK
    Producer->>Producer: 执行本地事务
    Producer->>MQ: Commit/Rollback
    MQ->>Consumer: 投递消息(仅 Commit)

RocketMQ 事务消息实现

// 1. 生产者配置
@Bean
public TransactionMQProducer transactionMQProducer() {
    TransactionMQProducer producer = new TransactionMQProducer("producer-group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    
    // 事务监听器
    producer.setTransactionListener(new TransactionListener() {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // 执行本地事务
            try {
                Order order = (Order) arg;
                orderService.createOrder(order);
                return LocalTransactionState.COMMIT_MESSAGE;
            } catch (Exception e) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // 事务回查(当 MQ 未收到 Commit/Rollback 时)
            String orderId = msg.getUserProperty("orderId");
            Order order = orderService.getOrder(orderId);
            if (order != null) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    });
    
    return producer;
}

// 2. 发送事务消息
public void sendTransactionMessage(Order order) {
    Message message = new Message("order-topic", JSON.toJSONString(order));
    message.putUserProperty("orderId", order.getId());
    
    // 发送 Half 消息
    TransactionSendResult result = producer.sendMessageInTransaction(message, order);
    
    if (result.getLocalTransactionState() == LocalTransactionState.UNKNOW) {
        // 未知状态,等待事务回查
    }
}

// 3. 消费者
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        Order order = JSON.parseObject(message, Order.class);
        // 执行业务
        inventoryService.decrease(order.getProductId(), order.getQuantity());
    }
}

事务消息优缺点

优点:

缺点:

方案 6:Saga 模式

原理

长事务拆分为多个本地短事务:
1. 每个本地事务提交后释放资源
2. 如果某步失败,执行前面事务的补偿操作
3. 最终一致性

流程

sequenceDiagram
    participant S as Saga 协调者
    participant T1 as 事务 1
    participant T2 as 事务 2
    participant T3 as 事务 3
    
    S->>T1: 执行
    T1-->>S: 成功
    S->>T2: 执行
    T2-->>S: 成功
    S->>T3: 执行
    T3-->>S: 失败
    S->>T2: 补偿
    S->>T1: 补偿

Saga 实现

// 1. Saga 流程定义
public class OrderSaga {
    
    public void createOrder(Order order) {
        try {
            // 步骤 1:创建订单
            orderService.create(order);
            
            // 步骤 2:扣减库存
            inventoryService.decrease(order.getProductId(), order.getQuantity());
            
            // 步骤 3:扣减余额
            accountService.deduct(order.getUserId(), order.getAmount());
            
        } catch (Exception e) {
            // 补偿
            compensate(order);
        }
    }
    
    private void compensate(Order order) {
        // 补偿顺序与执行顺序相反
        try {
            accountService.refund(order.getUserId(), order.getAmount());
        } catch (Exception e) {
            // 记录补偿日志,异步重试
        }
        
        try {
            inventoryService.increase(order.getProductId(), order.getQuantity());
        } catch (Exception e) {
            // 记录补偿日志,异步重试
        }
        
        try {
            orderService.cancel(order.getId());
        } catch (Exception e) {
            // 记录补偿日志,异步重试
        }
    }
}

Saga 优缺点

优点:

缺点:

方案对比与选型

方案对比

方案一致性性能实现复杂度适用场景
2PC强一致内部系统,低并发
3PC强一致较少使用
TCC最终一致高并发,核心业务
本地消息表最终一致一般业务
事务消息最终一致解耦场景
Saga最终一致长流程业务

选型建议

1. 强一致性需求(金融核心):
   → 2PC / 分布式数据库(TiDB、OceanBase)

2. 高并发核心业务(支付、下单):
   → TCC

3. 一般业务(订单、物流):
   → 本地消息表 / 事务消息

4. 长流程业务(审批流):
   → Saga

5. 解耦场景(异步处理):
   → 事务消息

生产实践案例

案例 1:电商下单流程

业务场景:
1. 创建订单(订单服务)
2. 扣减库存(库存服务)
3. 扣减余额(账户服务)
4. 发送通知(消息服务)

解决方案:TCC
// 订单服务 TCC 实现
@TccTransaction(confirmMethod = "confirm", cancelMethod = "cancel")
public Order createOrder(Order order) {
    // Try:预占库存、预占额度
    inventoryService.tryLock(order.getProductId(), order.getQuantity());
    accountService.tryFreeze(order.getUserId(), order.getAmount());
    
    // 创建订单(状态:处理中)
    order.setStatus("PROCESSING");
    orderMapper.insert(order);
    
    return order;
}

public void confirm(Order order) {
    // Confirm:确认扣减
    inventoryService.deduct(order.getProductId(), order.getQuantity());
    accountService.deduct(order.getUserId(), order.getAmount());
    
    // 更新订单状态
    order.setStatus("SUCCESS");
    orderMapper.update(order);
}

public void cancel(Order order) {
    // Cancel:释放资源
    inventoryService.unlock(order.getProductId(), order.getQuantity());
    accountService.unfreeze(order.getUserId(), order.getAmount());
    
    // 取消订单
    order.setStatus("CANCELLED");
    orderMapper.update(order);
}

案例 2:支付回调处理

业务场景:
1. 接收支付回调
2. 更新订单状态
3. 增加账户余额
4. 发送通知

解决方案:事务消息
// 支付回调处理
public void handlePayCallback(PayCallback callback) {
    // 1. 发送事务消息
    Message message = new Message("pay-success-topic", JSON.toJSONString(callback));
    producer.sendMessageInTransaction(message, callback);
}

// 本地事务执行
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    PayCallback callback = (PayCallback) arg;
    
    // 更新订单状态
    orderMapper.updateStatus(callback.getOrderId(), "PAID");
    
    // 增加余额
    accountMapper.addBalance(callback.getUserId(), callback.getAmount());
    
    return LocalTransactionState.COMMIT_MESSAGE;
}

// 消费者处理
@RocketMQMessageListener(topic = "pay-success-topic", ...)
public void onMessage(String message) {
    PayCallback callback = JSON.parseObject(message, PayCallback.class);
    // 发送通知
    notificationService.send(callback.getUserId(), "支付成功");
}

案例 3:跨境转账(Saga)

业务场景:
1. 扣减源账户(国内)
2. 汇率转换
3. 增加目标账户(国外)
4. 发送 SWIFT 报文

解决方案:Saga
public class CrossBorderTransferSaga {
    
    public void transfer(TransferRequest request) {
        List<CompensableAction> actions = new ArrayList<>();
        
        try {
            // 步骤 1:扣减源账户
            accountService.deduct(request.getFromId(), request.getAmount());
            actions.add(() -> accountService.refund(request.getFromId(), request.getAmount()));
            
            // 步骤 2:汇率转换
            BigDecimal targetAmount = exchangeService.convert(request.getAmount(), request.getCurrency());
            actions.add(() -> {});  // 无需补偿
            
            // 步骤 3:增加目标账户
            accountService.add(request.getToId(), targetAmount);
            actions.add(() -> accountService.deduct(request.getToId(), targetAmount));
            
            // 步骤 4:发送 SWIFT
            swiftService.send(request, targetAmount);
            actions.add(() -> swiftService.cancel(request));
            
        } catch (Exception e) {
            // 反向补偿
            Collections.reverse(actions);
            for (CompensableAction action : actions) {
                try {
                    action.compensate();
                } catch (Exception ex) {
                    // 记录日志,异步重试
                }
            }
        }
    }
}

最佳实践

1. 幂等性设计

// 消费者幂等处理
public void consume(Message message) {
    String messageId = message.getId();
    
    // 检查是否已处理
    if (processedMapper.exists(messageId)) {
        return;  // 已处理,直接返回
    }
    
    // 处理业务
    process(message);
    
    // 记录已处理
    processedMapper.insert(messageId);
}

2. 重试机制

// 带退避的重试
@Retryable(
    value = Exception.class,
    maxAttempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void processWithRetry(Message message) {
    process(message);
}

3. 对账补偿

-- 定时对账任务
CREATE PROCEDURE reconcile_orders()
BEGIN
    -- 查找不一致的订单
    SELECT o.id
    FROM orders o
    LEFT JOIN payments p ON o.id = p.order_id
    WHERE o.status = 'PAID' AND p.id IS NULL;
    
    -- 修复不一致数据
    UPDATE orders SET status = 'UNPAID' WHERE ...;
END;

4. 监控告警

# Prometheus 监控
groups:
  - name: distributed_transaction
    rules:
      - alert: TransactionTimeout
        expr: tcc_pending_transactions > 100
        for: 5m
        labels:
          severity: warning
          
      - alert: CompensationFailed
        expr: increase(compensation_failures_total[5m]) > 0
        labels:
          severity: critical

总结

核心要点

  1. CAP 定理:分布式系统必须在 C 和 A 之间权衡
  2. BASE 理论:追求最终一致性
  3. 主流方案
    • 2PC:强一致,性能差
    • TCC:最终一致,性能优,实现复杂
    • 本地消息表:最终一致,实现简单
    • 事务消息:最终一致,解耦
    • Saga:最终一致,适合长流程

选型原则

参考资料


分享这篇文章到:

上一篇文章
Java 序列化机制详解
下一篇文章
Collections 工具类详解