Skip to content
清晨的一缕阳光
返回

Seata AT 模式实战

Seata AT 模式实战

AT 模式原理

两阶段提交

阶段一(Prepare)

  1. 执行 SQL 业务操作
  2. 记录数据前镜像(Before Image)
  3. 执行 SQL,记录数据后镜像(After Image)
  4. 提交本地事务
  5. 释放本地锁

阶段二(Commit)

  1. TC 发送 Commit 请求
  2. RM 异步删除 undo_log
  3. 提交成功

阶段二(Rollback)

  1. TC 发送 Rollback 请求
  2. RM 根据 undo_log 补偿数据
  3. 提交补偿事务

工作流程

阶段一:
┌─────────────────────────────────────────────┐
│  1. 执行 SQL:UPDATE account SET money=100  │
│  2. 查询前镜像:money=200                   │
│  3. 查询后镜像:money=100                   │
│  4. 插入 undo_log                           │
│  5. 提交本地事务                            │
│  6. 释放本地锁                              │
└─────────────────────────────────────────────┘

阶段二(回滚):
┌─────────────────────────────────────────────┐
│  1. 接收回滚请求                            │
│  2. 读取 undo_log                           │
│  3. 生成回滚 SQL:UPDATE account SET        │
│                money=200 WHERE money=100    │
│  4. 执行回滚 SQL                            │
│  5. 删除 undo_log                           │
│  6. 提交回滚事务                            │
└─────────────────────────────────────────────┘

隔离性保证

写-写隔离

读-写隔离

快速开始

1. 环境准备

-- 创建业务数据库
CREATE DATABASE seata_demo;

-- 创建 undo_log 表
CREATE TABLE `undo_log` (
  `branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
  `xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
  `context` VARCHAR(128) NOT NULL COMMENT 'undo_log context',
  `rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
  `log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
  `log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`),
  KEY `ix_log_created` (`log_created`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 创建账户表
CREATE TABLE `account` (
  `id` BIGINT PRIMARY KEY AUTO_INCREMENT,
  `user_id` BIGINT NOT NULL,
  `money` DECIMAL(10,2) NOT NULL DEFAULT 0
);

-- 创建订单表
CREATE TABLE `order` (
  `id` BIGINT PRIMARY KEY AUTO_INCREMENT,
  `user_id` BIGINT NOT NULL,
  `product_id` BIGINT NOT NULL,
  `amount` INT NOT NULL,
  `money` DECIMAL(10,2) NOT NULL
);

-- 创建库存表
CREATE TABLE `storage` (
  `id` BIGINT PRIMARY KEY AUTO_INCREMENT,
  `product_id` BIGINT NOT NULL,
  `count` INT NOT NULL
);

2. 添加依赖

<dependencies>
    <!-- Seata 分布式事务 -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    </dependency>
    
    <!-- Nacos 服务注册发现 -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    
    <!-- MyBatis Plus -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.5.3</version>
    </dependency>
</dependencies>

3. 配置 Seata

spring:
  application:
    name: order-service
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
    sentinel:
      transport:
        dashboard: localhost:8080
  datasource:
    url: jdbc:mysql://localhost:3306/seata_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver

seata:
  enabled: true
  tx-service-group: default_tx_group
  service:
    vgroup-mapping:
      default_tx_group: default
    grouplist:
      default: 127.0.0.1:8091
  registry:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: public
      group: SEATA_GROUP
      application: seata-server
  config:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: public
      group: SEATA_GROUP
      data-id: seataServer.properties

4. 数据源配置

@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource() {
        return new DruidDataSource();
    }
    
    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dataSource);
        
        // 配置 MyBatis
        factoryBean.setMapperLocations(
            new PathMatchingResourcePatternResolver()
                .getResources("classpath*:mapper/*.xml")
        );
        
        return factoryBean.getObject();
    }
}

5. 使用全局事务

@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private AccountClient accountClient;
    
    @Autowired
    private StorageClient storageClient;
    
    @GlobalTransactional  // 开启全局事务
    public void createOrder(Order order) {
        // 1. 创建订单
        orderMapper.insert(order);
        
        // 2. 扣减库存
        storageClient.decrease(order.getProductId(), order.getCount());
        
        // 3. 扣减账户余额
        accountClient.decrease(order.getUserId(), order.getAmount());
        
        // 4. 模拟异常
        if (order.getAmount() > 1000) {
            throw new BusinessException("订单金额过大");
        }
    }
}

6. Feign 客户端

@FeignClient(name = "account-service")
public interface AccountClient {
    
    @PostMapping("/account/decrease")
    Result<Void> decrease(@RequestParam("userId") Long userId,
                         @RequestParam("amount") BigDecimal amount);
}

@FeignClient(name = "storage-service")
public interface StorageClient {
    
    @PostMapping("/storage/decrease")
    Result<Void> decrease(@RequestParam("productId") Long productId,
                         @RequestParam("count") Integer count);
}

进阶配置

1. 事务超时配置

@GlobalTransactional(timeoutMills = 60000, name = "create-order-tx")
public void createOrder(Order order) {
    // 业务逻辑
}

2. 回滚例外配置

@GlobalTransactional(
    rollbackFor = Exception.class,
    noRollbackFor = BusinessException.class
)
public void createOrder(Order order) {
    // 业务异常不回滚,其他异常回滚
}

3. 传播行为配置

@GlobalTransactional(propagation = Propagation.REQUIRED)
public void method1() {
    // 需要新事务
}

@GlobalTransactional(propagation = Propagation.NOT_SUPPORTED)
public void method2() {
    // 以非事务方式执行
}

隔离级别

1. 读未提交(默认)

seata:
  data-source-proxy-mode: XA  # 或 AT

2. 读已提交

@Configuration
public class SeataConfig {
    
    @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
        DataSourceProxy dataSourceProxy = new DataSourceProxy(dataSource);
        // 设置隔离级别为读已提交
        dataSourceProxy.setDefaultTransactionIsolation(
            Connection.TRANSACTION_READ_COMMITTED
        );
        return dataSourceProxy;
    }
}

性能优化

1. 批量操作优化

@Service
public class BatchOrderService {
    
    @GlobalTransactional
    public void batchCreateOrders(List<Order> orders) {
        // 批量插入订单
        orderMapper.batchInsert(orders);
        
        // 批量扣减库存
        for (Order order : orders) {
            storageClient.decrease(order.getProductId(), order.getCount());
        }
        
        // 批量扣减账户
        for (Order order : orders) {
            accountClient.decrease(order.getUserId(), order.getAmount());
        }
    }
}

2. 异步提交

@Service
public class AsyncCommitService {
    
    @GlobalTransactional(asyncCommit = true)
    public void createOrder(Order order) {
        // 阶段二异步提交
        orderMapper.insert(order);
        storageClient.decrease(order.getProductId(), order.getCount());
        accountClient.decrease(order.getUserId(), order.getAmount());
    }
}

3. 分支事务合并

@Service
public class MergeBranchService {
    
    @GlobalTransactional
    public void process(Order order) {
        // 多个操作合并为一个分支事务
        doOperation1(order);
        doOperation2(order);
        doOperation3(order);
    }
    
    private void doOperation1(Order order) {
        // 操作 1
    }
    
    private void doOperation2(Order order) {
        // 操作 2
    }
    
    private void doOperation3(Order order) {
        // 操作 3
    }
}

监控与排查

1. 事务日志查询

@RestController
@RequestMapping("/seata")
public class SeataController {
    
    @Autowired
    private TransactionMapper transactionMapper;
    
    @GetMapping("/transactions")
    public List<Transaction> listTransactions() {
        return transactionMapper.selectList(null);
    }
    
    @GetMapping("/transactions/{xid}")
    public Transaction getTransaction(@PathVariable String xid) {
        return transactionMapper.selectOne(
            new QueryWrapper<Transaction>().eq("xid", xid)
        );
    }
}

2. undo_log 查询

@RestController
@RequestMapping("/seata")
public class UndoLogController {
    
    @Autowired
    private UndoLogMapper undoLogMapper;
    
    @GetMapping("/undo-logs")
    public List<UndoLog> listUndoLogs(@RequestParam String xid) {
        return undoLogMapper.selectList(
            new QueryWrapper<UndoLog>().eq("xid", xid)
        );
    }
}

3. 事务状态监控

@Component
public class TransactionMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    @Scheduled(fixedRate = 60000)
    public void monitorTransactions() {
        // 统计活跃事务数
        long activeCount = getActiveTransactionCount();
        meterRegistry.gauge("seata.transaction.active", activeCount);
        
        // 统计回滚事务数
        long rollbackCount = getRollbackTransactionCount();
        meterRegistry.counter("seata.transaction.rollback", rollbackCount);
        
        // 统计平均事务耗时
        double avgDuration = getAverageTransactionDuration();
        meterRegistry.timer("seata.transaction.duration")
            .record(avgDuration, TimeUnit.MILLISECONDS);
    }
}

常见问题

1. 事务不生效

问题:@GlobalTransactional 注解不生效

排查步骤

解决方案

seata:
  enabled: true
  tx-service-group: default_tx_group

2. 死锁问题

问题:出现死锁异常

解决方案

3. 性能问题

问题:使用 Seata 后性能下降

解决方案

4. 数据不一致

问题:分布式事务后数据不一致

解决方案

最佳实践

1. 事务粒度

2. 异常处理

@GlobalTransactional
public void createOrder(Order order) {
    try {
        // 业务逻辑
        orderMapper.insert(order);
        storageClient.decrease(order.getProductId(), order.getCount());
        accountClient.decrease(order.getUserId(), order.getAmount());
    } catch (BusinessException e) {
        // 业务异常,记录日志
        log.error("业务异常:{}", e.getMessage());
        throw e;
    } catch (Exception e) {
        // 系统异常,记录日志并抛出
        log.error("系统异常", e);
        throw new SystemException("系统异常", e);
    }
}

3. 超时配置

// 根据业务复杂度设置合理的超时时间
@GlobalTransactional(timeoutMills = 30000)  // 30 秒
public void simpleOperation() {
    // 简单操作
}

@GlobalTransactional(timeoutMills = 60000)  // 60 秒
public void complexOperation() {
    // 复杂操作
}

4. 监控告警

@Component
public class TransactionAlert {
    
    @Autowired
    private AlertService alertService;
    
    @Scheduled(fixedRate = 60000)
    public void checkLongTransactions() {
        List<Transaction> longTx = getLongTransactions(60000);
        
        if (!longTx.isEmpty()) {
            alertService.sendAlert(
                String.format("发现 %d 个长事务,最长 %d 秒",
                    longTx.size(),
                    longTx.get(0).getDuration() / 1000)
            );
        }
    }
}

总结

Seata AT 模式是一种无侵入的分布式事务解决方案,基于两阶段提交协议,无需修改业务代码即可实现分布式事务。

合理配置事务参数,优化事务粒度,可以有效提升系统性能和可靠性。

在生产环境中,建议建立完善的监控告警机制,并定期 review 事务配置。


分享这篇文章到:

上一篇文章
Spring Boot 请求与响应处理
下一篇文章
Java 设计模式实战