Seata AT 模式实战
AT 模式原理
两阶段提交
阶段一(Prepare):
- 执行 SQL 业务操作
- 记录数据前镜像(Before Image)
- 执行 SQL,记录数据后镜像(After Image)
- 提交本地事务
- 释放本地锁
阶段二(Commit):
- TC 发送 Commit 请求
- RM 异步删除 undo_log
- 提交成功
阶段二(Rollback):
- TC 发送 Rollback 请求
- RM 根据 undo_log 补偿数据
- 提交补偿事务
工作流程
阶段一:
┌─────────────────────────────────────────────┐
│ 1. 执行 SQL:UPDATE account SET money=100 │
│ 2. 查询前镜像:money=200 │
│ 3. 查询后镜像:money=100 │
│ 4. 插入 undo_log │
│ 5. 提交本地事务 │
│ 6. 释放本地锁 │
└─────────────────────────────────────────────┘
阶段二(回滚):
┌─────────────────────────────────────────────┐
│ 1. 接收回滚请求 │
│ 2. 读取 undo_log │
│ 3. 生成回滚 SQL:UPDATE account SET │
│ money=200 WHERE money=100 │
│ 4. 执行回滚 SQL │
│ 5. 删除 undo_log │
│ 6. 提交回滚事务 │
└─────────────────────────────────────────────┘
隔离性保证
写-写隔离:
- 阶段一提交本地事务,释放本地锁
- 全局锁保证写操作互斥
- 防止脏写
读-写隔离:
- 默认不隔离(读未提交)
- 可配置为读已提交
- 通过全局锁实现
快速开始
1. 环境准备
-- 创建业务数据库
CREATE DATABASE seata_demo;
-- 创建 undo_log 表
CREATE TABLE `undo_log` (
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`),
KEY `ix_log_created` (`log_created`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 创建账户表
CREATE TABLE `account` (
`id` BIGINT PRIMARY KEY AUTO_INCREMENT,
`user_id` BIGINT NOT NULL,
`money` DECIMAL(10,2) NOT NULL DEFAULT 0
);
-- 创建订单表
CREATE TABLE `order` (
`id` BIGINT PRIMARY KEY AUTO_INCREMENT,
`user_id` BIGINT NOT NULL,
`product_id` BIGINT NOT NULL,
`amount` INT NOT NULL,
`money` DECIMAL(10,2) NOT NULL
);
-- 创建库存表
CREATE TABLE `storage` (
`id` BIGINT PRIMARY KEY AUTO_INCREMENT,
`product_id` BIGINT NOT NULL,
`count` INT NOT NULL
);
2. 添加依赖
<dependencies>
<!-- Seata 分布式事务 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!-- Nacos 服务注册发现 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- MyBatis Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3</version>
</dependency>
</dependencies>
3. 配置 Seata
spring:
application:
name: order-service
cloud:
nacos:
discovery:
server-addr: localhost:8848
sentinel:
transport:
dashboard: localhost:8080
datasource:
url: jdbc:mysql://localhost:3306/seata_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
seata:
enabled: true
tx-service-group: default_tx_group
service:
vgroup-mapping:
default_tx_group: default
grouplist:
default: 127.0.0.1:8091
registry:
type: nacos
nacos:
server-addr: localhost:8848
namespace: public
group: SEATA_GROUP
application: seata-server
config:
type: nacos
nacos:
server-addr: localhost:8848
namespace: public
group: SEATA_GROUP
data-id: seataServer.properties
4. 数据源配置
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource() {
return new DruidDataSource();
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSource);
// 配置 MyBatis
factoryBean.setMapperLocations(
new PathMatchingResourcePatternResolver()
.getResources("classpath*:mapper/*.xml")
);
return factoryBean.getObject();
}
}
5. 使用全局事务
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountClient accountClient;
@Autowired
private StorageClient storageClient;
@GlobalTransactional // 开启全局事务
public void createOrder(Order order) {
// 1. 创建订单
orderMapper.insert(order);
// 2. 扣减库存
storageClient.decrease(order.getProductId(), order.getCount());
// 3. 扣减账户余额
accountClient.decrease(order.getUserId(), order.getAmount());
// 4. 模拟异常
if (order.getAmount() > 1000) {
throw new BusinessException("订单金额过大");
}
}
}
6. Feign 客户端
@FeignClient(name = "account-service")
public interface AccountClient {
@PostMapping("/account/decrease")
Result<Void> decrease(@RequestParam("userId") Long userId,
@RequestParam("amount") BigDecimal amount);
}
@FeignClient(name = "storage-service")
public interface StorageClient {
@PostMapping("/storage/decrease")
Result<Void> decrease(@RequestParam("productId") Long productId,
@RequestParam("count") Integer count);
}
进阶配置
1. 事务超时配置
@GlobalTransactional(timeoutMills = 60000, name = "create-order-tx")
public void createOrder(Order order) {
// 业务逻辑
}
2. 回滚例外配置
@GlobalTransactional(
rollbackFor = Exception.class,
noRollbackFor = BusinessException.class
)
public void createOrder(Order order) {
// 业务异常不回滚,其他异常回滚
}
3. 传播行为配置
@GlobalTransactional(propagation = Propagation.REQUIRED)
public void method1() {
// 需要新事务
}
@GlobalTransactional(propagation = Propagation.NOT_SUPPORTED)
public void method2() {
// 以非事务方式执行
}
隔离级别
1. 读未提交(默认)
seata:
data-source-proxy-mode: XA # 或 AT
2. 读已提交
@Configuration
public class SeataConfig {
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
DataSourceProxy dataSourceProxy = new DataSourceProxy(dataSource);
// 设置隔离级别为读已提交
dataSourceProxy.setDefaultTransactionIsolation(
Connection.TRANSACTION_READ_COMMITTED
);
return dataSourceProxy;
}
}
性能优化
1. 批量操作优化
@Service
public class BatchOrderService {
@GlobalTransactional
public void batchCreateOrders(List<Order> orders) {
// 批量插入订单
orderMapper.batchInsert(orders);
// 批量扣减库存
for (Order order : orders) {
storageClient.decrease(order.getProductId(), order.getCount());
}
// 批量扣减账户
for (Order order : orders) {
accountClient.decrease(order.getUserId(), order.getAmount());
}
}
}
2. 异步提交
@Service
public class AsyncCommitService {
@GlobalTransactional(asyncCommit = true)
public void createOrder(Order order) {
// 阶段二异步提交
orderMapper.insert(order);
storageClient.decrease(order.getProductId(), order.getCount());
accountClient.decrease(order.getUserId(), order.getAmount());
}
}
3. 分支事务合并
@Service
public class MergeBranchService {
@GlobalTransactional
public void process(Order order) {
// 多个操作合并为一个分支事务
doOperation1(order);
doOperation2(order);
doOperation3(order);
}
private void doOperation1(Order order) {
// 操作 1
}
private void doOperation2(Order order) {
// 操作 2
}
private void doOperation3(Order order) {
// 操作 3
}
}
监控与排查
1. 事务日志查询
@RestController
@RequestMapping("/seata")
public class SeataController {
@Autowired
private TransactionMapper transactionMapper;
@GetMapping("/transactions")
public List<Transaction> listTransactions() {
return transactionMapper.selectList(null);
}
@GetMapping("/transactions/{xid}")
public Transaction getTransaction(@PathVariable String xid) {
return transactionMapper.selectOne(
new QueryWrapper<Transaction>().eq("xid", xid)
);
}
}
2. undo_log 查询
@RestController
@RequestMapping("/seata")
public class UndoLogController {
@Autowired
private UndoLogMapper undoLogMapper;
@GetMapping("/undo-logs")
public List<UndoLog> listUndoLogs(@RequestParam String xid) {
return undoLogMapper.selectList(
new QueryWrapper<UndoLog>().eq("xid", xid)
);
}
}
3. 事务状态监控
@Component
public class TransactionMonitor {
@Autowired
private MeterRegistry meterRegistry;
@Scheduled(fixedRate = 60000)
public void monitorTransactions() {
// 统计活跃事务数
long activeCount = getActiveTransactionCount();
meterRegistry.gauge("seata.transaction.active", activeCount);
// 统计回滚事务数
long rollbackCount = getRollbackTransactionCount();
meterRegistry.counter("seata.transaction.rollback", rollbackCount);
// 统计平均事务耗时
double avgDuration = getAverageTransactionDuration();
meterRegistry.timer("seata.transaction.duration")
.record(avgDuration, TimeUnit.MILLISECONDS);
}
}
常见问题
1. 事务不生效
问题:@GlobalTransactional 注解不生效
排查步骤:
- 检查是否启用 Seata
- 检查数据源配置
- 检查 undo_log 表是否存在
- 查看 Seata 日志
解决方案:
seata:
enabled: true
tx-service-group: default_tx_group
2. 死锁问题
问题:出现死锁异常
解决方案:
- 调整事务执行顺序
- 减少事务持有时间
- 使用读已提交隔离级别
3. 性能问题
问题:使用 Seata 后性能下降
解决方案:
- 优化事务粒度
- 配置异步提交
- 调整 undo_log 清理策略
4. 数据不一致
问题:分布式事务后数据不一致
解决方案:
- 检查网络是否稳定
- 查看 undo_log 记录
- 检查 TC 服务器状态
最佳实践
1. 事务粒度
- 小事务原则:事务范围尽可能小
- 避免长事务:减少事务持有时间
- 避免跨服务大事务:尽量在单服务内完成
2. 异常处理
@GlobalTransactional
public void createOrder(Order order) {
try {
// 业务逻辑
orderMapper.insert(order);
storageClient.decrease(order.getProductId(), order.getCount());
accountClient.decrease(order.getUserId(), order.getAmount());
} catch (BusinessException e) {
// 业务异常,记录日志
log.error("业务异常:{}", e.getMessage());
throw e;
} catch (Exception e) {
// 系统异常,记录日志并抛出
log.error("系统异常", e);
throw new SystemException("系统异常", e);
}
}
3. 超时配置
// 根据业务复杂度设置合理的超时时间
@GlobalTransactional(timeoutMills = 30000) // 30 秒
public void simpleOperation() {
// 简单操作
}
@GlobalTransactional(timeoutMills = 60000) // 60 秒
public void complexOperation() {
// 复杂操作
}
4. 监控告警
@Component
public class TransactionAlert {
@Autowired
private AlertService alertService;
@Scheduled(fixedRate = 60000)
public void checkLongTransactions() {
List<Transaction> longTx = getLongTransactions(60000);
if (!longTx.isEmpty()) {
alertService.sendAlert(
String.format("发现 %d 个长事务,最长 %d 秒",
longTx.size(),
longTx.get(0).getDuration() / 1000)
);
}
}
}
总结
Seata AT 模式是一种无侵入的分布式事务解决方案,基于两阶段提交协议,无需修改业务代码即可实现分布式事务。
合理配置事务参数,优化事务粒度,可以有效提升系统性能和可靠性。
在生产环境中,建议建立完善的监控告警机制,并定期 review 事务配置。