核心概念
分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分布在多个节点上的事务。
分布式事务挑战
单机事务(ACID):
- 原子性:通过 Undo/Redo Log 保证
- 一致性:通过约束、触发器保证
- 隔离性:通过锁、MVCC 保证
- 持久性:通过 WAL 保证
分布式事务:
- 跨数据库、跨服务、跨网络
- 无法同时满足 CAP(最多满足两项)
CAP 定理
| 理论 | 含义 | 说明 |
|---|---|---|
| Consistency | 一致性 | 所有节点同一时刻数据一致 |
| Availability | 可用性 | 每个请求都能获得响应 |
| Partition tolerance | 分区容错性 | 网络分区不影响系统运行 |
结论: 分布式系统必须满足 P,只能在 C 和 A 之间权衡。
BASE 理论
| 理论 | 含义 | 说明 |
|---|---|---|
| Basically Available | 基本可用 | 允许损失部分可用性 |
| Soft state | 软状态 | 允许中间状态 |
| Eventually consistent | 最终一致性 | 最终达到一致即可 |
核心思想: 放弃强一致性,追求最终一致性。
分布式事务解决方案
方案 1:2PC(Two-Phase Commit)
原理
阶段 1:准备阶段(Prepare)
1. 协调者询问所有参与者是否可以提交
2. 参与者执行事务操作,但不提交
3. 参与者返回成功/失败
阶段 2:提交阶段(Commit/Rollback)
1. 所有参与者都成功 → 协调者发送 Commit
2. 有参与者失败 → 协调者发送 Rollback
3. 参与者执行提交/回滚,释放资源
流程
sequenceDiagram
participant C as 协调者
participant P1 as 参与者 1
participant P2 as 参与者 2
C->>P1: Prepare
C->>P2: Prepare
P1-->>C: Yes
P2-->>C: Yes
C->>P1: Commit
C->>P2: Commit
P1-->>C: ACK
P2-->>C: ACK
MySQL XA 事务实现
-- 1. 开启 XA 事务
XA START 'xid1';
-- 2. 执行 SQL
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
-- 3. 准备提交
XA END 'xid1';
XA PREPARE 'xid1';
-- 4. 提交
XA COMMIT 'xid1';
-- 或回滚
XA ROLLBACK 'xid1';
2PC 缺点
- 同步阻塞:所有参与者阻塞等待
- 单点故障:协调者故障导致系统不可用
- 数据不一致:协调者在阶段 2 故障可能导致不一致
- 性能差:两次网络往返,多次磁盘 IO
方案 2:3PC(Three-Phase Commit)
原理
阶段 1:CanCommit
- 协调者询问参与者是否可以提交
- 不执行事务,只检查资源
阶段 2:PreCommit
- 执行事务操作,但不提交
- 参与者准备提交
阶段 3:DoCommit
- 正式提交事务
3PC 改进
- 减少阻塞时间:超时自动提交
- 降低单点影响:超时后可继续
3PC 问题
- 网络分区仍可能导致不一致
- 实现复杂,性能提升有限
- 生产环境很少使用
方案 3:TCC(Try-Confirm-Cancel)
原理
Try 阶段:
- 检查资源,预留资源
- 不执行业务操作
Confirm 阶段:
- 执行实际业务
- 使用 Try 预留的资源
Cancel 阶段:
- 释放 Try 预留的资源
- 回滚操作
流程
sequenceDiagram
participant C as 协调者
participant P1 as 参与者 1
participant P2 as 参与者 2
C->>P1: Try
C->>P2: Try
P1-->>C: Success
P2-->>C: Success
C->>P1: Confirm
C->>P2: Confirm
P1-->>C: ACK
P2-->>C: ACK
TCC 实现示例
// 1. Try 接口
@TccTransaction(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
public void transfer(Long fromId, Long toId, BigDecimal amount) {
// Try:冻结资金
accountService.freeze(fromId, amount);
}
// 2. Confirm 方法
public void confirmMethod(Long fromId, Long toId, BigDecimal amount) {
// 实际扣减
accountService.deduct(fromId, amount);
accountService.add(toId, amount);
}
// 3. Cancel 方法
public void cancelMethod(Long fromId, Long toId, BigDecimal amount) {
// 解冻资金
accountService.unfreeze(fromId, amount);
}
// 服务实现
@Service
public class AccountService {
// Try:冻结
public void freeze(Long accountId, BigDecimal amount) {
Account account = accountMapper.selectById(accountId);
if (account.getBalance().compareTo(amount) < 0) {
throw new BusinessException("余额不足");
}
// 冻结金额
accountMapper.freeze(accountId, amount);
}
// Confirm:扣减
public void deduct(Long accountId, BigDecimal amount) {
accountMapper.deduct(accountId, amount);
}
// Cancel:解冻
public void unfreeze(Long accountId, BigDecimal amount) {
accountMapper.unfreeze(accountId, amount);
}
}
TCC 优缺点
优点:
- 性能优于 2PC
- 不长期持有锁
- 支持高并发
缺点:
- 实现复杂(三个接口)
- 需要幂等设计
- 需要防悬挂处理
- 回滚逻辑复杂
TCC 关键问题处理
// 1. 幂等性:防止重复执行
public void confirmMethod(Long accountId, BigDecimal amount) {
// 检查是否已执行
if (tccRecordMapper.exists(accountId, "CONFIRMED")) {
return; // 已执行,直接返回
}
// 执行业务
accountMapper.deduct(accountId, amount);
// 记录状态
tccRecordMapper.record(accountId, "CONFIRMED");
}
// 2. 防悬挂:Cancel 先于 Try 执行
public void cancelMethod(Long accountId, BigDecimal amount) {
// 检查 Try 是否执行
if (!tccRecordMapper.exists(accountId, "TRY")) {
// Try 未执行,记录防止悬挂
tccRecordMapper.record(accountId, "CANCELLED");
return;
}
// 执行回滚
accountMapper.unfreeze(accountId, amount);
}
// 3. 空回滚:Try 失败,Cancel 被调用
public void cancelMethod(Long accountId, BigDecimal amount) {
// Try 未执行,直接返回(空回滚)
if (!tccRecordMapper.exists(accountId, "TRY")) {
return;
}
// 执行回滚
accountMapper.unfreeze(accountId, amount);
}
方案 4:本地消息表
原理
1. 业务数据 + 消息记录在同一本地事务
2. 定时任务轮询消息表,发送消息
3. 消费者处理消息,实现最终一致性
流程
sequenceDiagram
participant App as 应用
participant DB as 数据库
participant MQ as 消息队列
participant Consumer as 消费者
App->>DB: 业务操作 + 插入消息(同一事务)
App->>App: 定时任务轮询消息表
App->>MQ: 发送消息
MQ->>Consumer: 消费消息
Consumer->>Consumer: 执行业务
实现示例
// 1. 消息表
CREATE TABLE local_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
business_id VARCHAR(64), -- 业务 ID
message_type VARCHAR(32), -- 消息类型
message_body TEXT, -- 消息内容
status TINYINT, -- 0:待发送 1:已发送 2:已完成
retry_count INT DEFAULT 0, -- 重试次数
create_time DATETIME,
send_time DATETIME,
INDEX idx_status (status),
INDEX idx_retry (status, retry_count)
);
// 2. 发送消息(本地事务)
@Transactional
public void createOrder(Order order) {
// 1. 创建订单
orderMapper.insert(order);
// 2. 插入消息记录
LocalMessage message = new LocalMessage();
message.setBusinessId(order.getId());
message.setMessageType("ORDER_CREATED");
message.setMessageBody(JSON.toJSONString(order));
message.setStatus(0); // 待发送
messageMapper.insert(message);
}
// 3. 定时任务发送消息
@Component
public class MessageSender {
@Scheduled(fixedDelay = 5000)
public void sendMessages() {
// 查询待发送消息
List<LocalMessage> messages = messageMapper.findPending(100);
for (LocalMessage message : messages) {
try {
// 发送消息
kafkaTemplate.send("order-topic", message.getMessageBody());
// 更新状态
message.setStatus(1);
message.setSendTime(new Date());
messageMapper.update(message);
} catch (Exception e) {
// 重试
if (message.getRetryCount() >= 3) {
message.setStatus(4); // 失败
} else {
message.setRetryCount(message.getRetryCount() + 1);
}
messageMapper.update(message);
}
}
}
}
// 4. 消费者处理
@KafkaListener(topics = "order-topic")
public void consume(String message) {
Order order = JSON.parseObject(message, Order.class);
// 执行业务逻辑
inventoryService.decrease(order.getProductId(), order.getQuantity());
}
本地消息表优缺点
优点:
- 实现简单
- 可靠性高(本地事务保证)
- 最终一致性
缺点:
- 依赖定时任务,有延迟
- 消息表与业务耦合
- 需要处理幂等性
方案 5:事务消息(RocketMQ)
原理
1. 发送 Half 消息(半消息,对消费者不可见)
2. 执行本地事务
3. 根据本地事务结果提交/回滚消息
4. 消费者消费消息
流程
sequenceDiagram
participant Producer as 生产者
participant MQ as RocketMQ
participant Consumer as 消费者
Producer->>MQ: 发送 Half 消息
MQ-->>Producer: Half 消息 OK
Producer->>Producer: 执行本地事务
Producer->>MQ: Commit/Rollback
MQ->>Consumer: 投递消息(仅 Commit)
RocketMQ 事务消息实现
// 1. 生产者配置
@Bean
public TransactionMQProducer transactionMQProducer() {
TransactionMQProducer producer = new TransactionMQProducer("producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
// 事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
Order order = (Order) arg;
orderService.createOrder(order);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务回查(当 MQ 未收到 Commit/Rollback 时)
String orderId = msg.getUserProperty("orderId");
Order order = orderService.getOrder(orderId);
if (order != null) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
return producer;
}
// 2. 发送事务消息
public void sendTransactionMessage(Order order) {
Message message = new Message("order-topic", JSON.toJSONString(order));
message.putUserProperty("orderId", order.getId());
// 发送 Half 消息
TransactionSendResult result = producer.sendMessageInTransaction(message, order);
if (result.getLocalTransactionState() == LocalTransactionState.UNKNOW) {
// 未知状态,等待事务回查
}
}
// 3. 消费者
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
Order order = JSON.parseObject(message, Order.class);
// 执行业务
inventoryService.decrease(order.getProductId(), order.getQuantity());
}
}
事务消息优缺点
优点:
- 解耦业务和消息
- 最终一致性保证
- 性能较好
缺点:
- 依赖 RocketMQ(其他 MQ 不支持)
- 需要实现事务回查
- 实现复杂度中等
方案 6:Saga 模式
原理
长事务拆分为多个本地短事务:
1. 每个本地事务提交后释放资源
2. 如果某步失败,执行前面事务的补偿操作
3. 最终一致性
流程
sequenceDiagram
participant S as Saga 协调者
participant T1 as 事务 1
participant T2 as 事务 2
participant T3 as 事务 3
S->>T1: 执行
T1-->>S: 成功
S->>T2: 执行
T2-->>S: 成功
S->>T3: 执行
T3-->>S: 失败
S->>T2: 补偿
S->>T1: 补偿
Saga 实现
// 1. Saga 流程定义
public class OrderSaga {
public void createOrder(Order order) {
try {
// 步骤 1:创建订单
orderService.create(order);
// 步骤 2:扣减库存
inventoryService.decrease(order.getProductId(), order.getQuantity());
// 步骤 3:扣减余额
accountService.deduct(order.getUserId(), order.getAmount());
} catch (Exception e) {
// 补偿
compensate(order);
}
}
private void compensate(Order order) {
// 补偿顺序与执行顺序相反
try {
accountService.refund(order.getUserId(), order.getAmount());
} catch (Exception e) {
// 记录补偿日志,异步重试
}
try {
inventoryService.increase(order.getProductId(), order.getQuantity());
} catch (Exception e) {
// 记录补偿日志,异步重试
}
try {
orderService.cancel(order.getId());
} catch (Exception e) {
// 记录补偿日志,异步重试
}
}
}
Saga 优缺点
优点:
- 长事务拆分,减少锁持有
- 性能较好
- 适合业务流程长的场景
缺点:
- 补偿逻辑复杂
- 不隔离(可能读到中间状态)
- 需要幂等设计
方案对比与选型
方案对比
| 方案 | 一致性 | 性能 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|
| 2PC | 强一致 | 差 | 低 | 内部系统,低并发 |
| 3PC | 强一致 | 中 | 中 | 较少使用 |
| TCC | 最终一致 | 优 | 高 | 高并发,核心业务 |
| 本地消息表 | 最终一致 | 良 | 低 | 一般业务 |
| 事务消息 | 最终一致 | 优 | 中 | 解耦场景 |
| Saga | 最终一致 | 优 | 中 | 长流程业务 |
选型建议
1. 强一致性需求(金融核心):
→ 2PC / 分布式数据库(TiDB、OceanBase)
2. 高并发核心业务(支付、下单):
→ TCC
3. 一般业务(订单、物流):
→ 本地消息表 / 事务消息
4. 长流程业务(审批流):
→ Saga
5. 解耦场景(异步处理):
→ 事务消息
生产实践案例
案例 1:电商下单流程
业务场景:
1. 创建订单(订单服务)
2. 扣减库存(库存服务)
3. 扣减余额(账户服务)
4. 发送通知(消息服务)
解决方案:TCC
// 订单服务 TCC 实现
@TccTransaction(confirmMethod = "confirm", cancelMethod = "cancel")
public Order createOrder(Order order) {
// Try:预占库存、预占额度
inventoryService.tryLock(order.getProductId(), order.getQuantity());
accountService.tryFreeze(order.getUserId(), order.getAmount());
// 创建订单(状态:处理中)
order.setStatus("PROCESSING");
orderMapper.insert(order);
return order;
}
public void confirm(Order order) {
// Confirm:确认扣减
inventoryService.deduct(order.getProductId(), order.getQuantity());
accountService.deduct(order.getUserId(), order.getAmount());
// 更新订单状态
order.setStatus("SUCCESS");
orderMapper.update(order);
}
public void cancel(Order order) {
// Cancel:释放资源
inventoryService.unlock(order.getProductId(), order.getQuantity());
accountService.unfreeze(order.getUserId(), order.getAmount());
// 取消订单
order.setStatus("CANCELLED");
orderMapper.update(order);
}
案例 2:支付回调处理
业务场景:
1. 接收支付回调
2. 更新订单状态
3. 增加账户余额
4. 发送通知
解决方案:事务消息
// 支付回调处理
public void handlePayCallback(PayCallback callback) {
// 1. 发送事务消息
Message message = new Message("pay-success-topic", JSON.toJSONString(callback));
producer.sendMessageInTransaction(message, callback);
}
// 本地事务执行
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
PayCallback callback = (PayCallback) arg;
// 更新订单状态
orderMapper.updateStatus(callback.getOrderId(), "PAID");
// 增加余额
accountMapper.addBalance(callback.getUserId(), callback.getAmount());
return LocalTransactionState.COMMIT_MESSAGE;
}
// 消费者处理
@RocketMQMessageListener(topic = "pay-success-topic", ...)
public void onMessage(String message) {
PayCallback callback = JSON.parseObject(message, PayCallback.class);
// 发送通知
notificationService.send(callback.getUserId(), "支付成功");
}
案例 3:跨境转账(Saga)
业务场景:
1. 扣减源账户(国内)
2. 汇率转换
3. 增加目标账户(国外)
4. 发送 SWIFT 报文
解决方案:Saga
public class CrossBorderTransferSaga {
public void transfer(TransferRequest request) {
List<CompensableAction> actions = new ArrayList<>();
try {
// 步骤 1:扣减源账户
accountService.deduct(request.getFromId(), request.getAmount());
actions.add(() -> accountService.refund(request.getFromId(), request.getAmount()));
// 步骤 2:汇率转换
BigDecimal targetAmount = exchangeService.convert(request.getAmount(), request.getCurrency());
actions.add(() -> {}); // 无需补偿
// 步骤 3:增加目标账户
accountService.add(request.getToId(), targetAmount);
actions.add(() -> accountService.deduct(request.getToId(), targetAmount));
// 步骤 4:发送 SWIFT
swiftService.send(request, targetAmount);
actions.add(() -> swiftService.cancel(request));
} catch (Exception e) {
// 反向补偿
Collections.reverse(actions);
for (CompensableAction action : actions) {
try {
action.compensate();
} catch (Exception ex) {
// 记录日志,异步重试
}
}
}
}
}
最佳实践
1. 幂等性设计
// 消费者幂等处理
public void consume(Message message) {
String messageId = message.getId();
// 检查是否已处理
if (processedMapper.exists(messageId)) {
return; // 已处理,直接返回
}
// 处理业务
process(message);
// 记录已处理
processedMapper.insert(messageId);
}
2. 重试机制
// 带退避的重试
@Retryable(
value = Exception.class,
maxAttempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void processWithRetry(Message message) {
process(message);
}
3. 对账补偿
-- 定时对账任务
CREATE PROCEDURE reconcile_orders()
BEGIN
-- 查找不一致的订单
SELECT o.id
FROM orders o
LEFT JOIN payments p ON o.id = p.order_id
WHERE o.status = 'PAID' AND p.id IS NULL;
-- 修复不一致数据
UPDATE orders SET status = 'UNPAID' WHERE ...;
END;
4. 监控告警
# Prometheus 监控
groups:
- name: distributed_transaction
rules:
- alert: TransactionTimeout
expr: tcc_pending_transactions > 100
for: 5m
labels:
severity: warning
- alert: CompensationFailed
expr: increase(compensation_failures_total[5m]) > 0
labels:
severity: critical
总结
核心要点
- CAP 定理:分布式系统必须在 C 和 A 之间权衡
- BASE 理论:追求最终一致性
- 主流方案:
- 2PC:强一致,性能差
- TCC:最终一致,性能优,实现复杂
- 本地消息表:最终一致,实现简单
- 事务消息:最终一致,解耦
- Saga:最终一致,适合长流程
选型原则
- 强一致性需求 → 2PC / 分布式数据库
- 高并发核心 → TCC
- 一般业务 → 本地消息表 / 事务消息
- 长流程 → Saga
参考资料
- MySQL XA 事务
- RocketMQ 事务消息
- 《分布式事务:从理论到实践》
- 《数据密集型应用系统设计》第 9 章