Skip to content
清晨的一缕阳光
返回

CQRS 模式实战:命令查询职责分离架构设计

引言

在复杂的业务系统中,我们经常面临这样的困境:

CQRS(Command Query Responsibility Segregation,命令查询职责分离) 模式通过将读操作和写操作分离,有效解决了这些问题。

本文基于 Spring Boot + Axon Framework,详解 CQRS 模式的实战落地。


一、什么是 CQRS

核心概念

CQRS 的核心思想是将读操作(Query)和写操作(Command)分离,使用不同的模型来处理。

┌─────────────────────────────────────────────┐
│                 CQRS 架构                    │
├─────────────────────────────────────────────┤
│                                             │
│  ┌──────────┐         ┌──────────┐         │
│  │  Command │         │  Query   │         │
│  │  Model   │         │  Model   │         │
│  │  (写)    │         │  (读)    │         │
│  └────┬─────┘         └────┬─────┘         │
│       │                    │                │
│       ▼                    ▼                │
│  ┌──────────┐         ┌──────────┐         │
│  │  Write   │         │   Read   │         │
│  │ Database │         │ Database │         │
│  │          │         │          │         │
│  │  Event   │─────────▶          │         │
│  │  Store   │  事件同步  │          │         │
│  └──────────┘         └──────────┘         │
│                                             │
└─────────────────────────────────────────────┘

CQRS vs CRUD

对比项CRUD 模式CQRS 模式
数据模型统一模型读写分离模型
数据库单一数据库可分离数据库
扩展性读写耦合独立扩展
适用场景简单业务复杂业务
复杂度

二、何时使用 CQRS

✅ 适合场景

  1. 读写负载不均衡

    • 读多写少(如电商商品详情)
    • 写多读少(如日志记录)
  2. 复杂查询需求

    • 多表关联查询
    • 复杂报表统计
  3. 高性能要求

    • 需要独立优化读/写性能
    • 需要缓存查询结果
  4. 事件驱动架构

    • 需要事件溯源
    • 需要审计日志

❌ 不适合场景

  1. 简单 CRUD 系统

    • 增删改查逻辑简单
    • 无需复杂查询
  2. 数据一致性要求极高

    • 读写延迟可能导致问题
    • 如银行核心系统
  3. 团队规模小

    • 维护成本高
    • 技术复杂度大

三、CQRS 架构设计

整体架构

graph TB
    Client[客户端] --> Command[命令端]
    Client --> Query[查询端]
    
    Command --> CommandHandler[命令处理器]
    CommandHandler --> Aggregate[聚合根]
    Aggregate --> EventStore[事件存储]
    
    EventStore --> EventProcessor[事件处理器]
    EventProcessor --> ReadDB[读数据库]
    
    Query --> QueryHandler[查询处理器]
    QueryHandler --> ReadDB

核心组件

  1. Command(命令)

    • 表示写操作意图
    • 如:CreateOrderCommand、UpdateUserCommand
  2. Command Handler(命令处理器)

    • 处理命令
    • 调用领域模型
  3. Aggregate(聚合根)

    • 领域模型核心
    • 产生领域事件
  4. Event Store(事件存储)

    • 存储领域事件
    • 支持事件溯源
  5. Query(查询)

    • 表示读操作请求
    • 如:GetOrderQuery、ListUsersQuery
  6. Query Handler(查询处理器)

    • 处理查询
    • 返回 DTO/VO

四、Spring Boot 实现

项目结构

src/
├── command/                 # 命令端
│   ├── domain/             # 领域模型
│   │   ├── aggregate/      # 聚合根
│   │   ├── event/          # 领域事件
│   │   └── repository/     # 写端仓库
│   └── handler/            # 命令处理器
├── query/                  # 查询端
│   ├── dto/               # 数据传输对象
│   ├── handler/           # 查询处理器
│   └── repository/        # 读端仓库
└── infrastructure/         # 基础设施
    ├── event/             # 事件总线
    └── persistence/       # 持久化

1. 定义命令

// CreateOrderCommand.java
public class CreateOrderCommand {
    
    private final String orderId;
    private final String userId;
    private final List<OrderItem> items;
    private final BigDecimal totalAmount;
    
    public CreateOrderCommand(String orderId, String userId, 
                              List<OrderItem> items, BigDecimal totalAmount) {
        this.orderId = orderId;
        this.userId = userId;
        this.items = items;
        this.totalAmount = totalAmount;
    }
    
    // Getters
    public String getOrderId() { return orderId; }
    public String getUserId() { return userId; }
    public List<OrderItem> getItems() { return items; }
    public BigDecimal getTotalAmount() { return totalAmount; }
}

2. 定义聚合根

// OrderAggregate.java
@Entity
@Table(name = "orders")
public class OrderAggregate {
    
    @Id
    private String orderId;
    
    private String userId;
    
    @Embedded
    private List<OrderItem> items;
    
    private BigDecimal totalAmount;
    
    private OrderStatus status;
    
    @Version
    private Long version;
    
    protected OrderAggregate() {}
    
    public OrderAggregate(String orderId, String userId, 
                          List<OrderItem> items, BigDecimal totalAmount) {
        this.orderId = orderId;
        this.userId = userId;
        this.items = items;
        this.totalAmount = totalAmount;
        this.status = OrderStatus.CREATED;
        
        // 发布领域事件
        DomainEventPublisher.publish(
            new OrderCreatedEvent(orderId, userId, items, totalAmount)
        );
    }
    
    // 业务方法
    public void pay() {
        if (this.status != OrderStatus.CREATED) {
            throw new IllegalStateException("Order cannot be paid");
        }
        this.status = OrderStatus.PAID;
        DomainEventPublisher.publish(
            new OrderPaidEvent(orderId)
        );
    }
    
    public void ship() {
        if (this.status != OrderStatus.PAID) {
            throw new IllegalStateException("Order cannot be shipped");
        }
        this.status = OrderStatus.SHIPPED;
        DomainEventPublisher.publish(
            new OrderShippedEvent(orderId)
        );
    }
    
    // Getters
    public String getOrderId() { return orderId; }
    public String getUserId() { return userId; }
    public OrderStatus getStatus() { return status; }
}

3. 定义领域事件

// OrderCreatedEvent.java
public class OrderCreatedEvent implements DomainEvent {
    
    private final String eventId;
    private final String orderId;
    private final String userId;
    private final List<OrderItem> items;
    private final BigDecimal totalAmount;
    private final LocalDateTime timestamp;
    
    public OrderCreatedEvent(String orderId, String userId, 
                             List<OrderItem> items, BigDecimal totalAmount) {
        this.eventId = UUID.randomUUID().toString();
        this.orderId = orderId;
        this.userId = userId;
        this.items = items;
        this.totalAmount = totalAmount;
        this.timestamp = LocalDateTime.now();
    }
    
    // Getters
    public String getEventId() { return eventId; }
    public String getOrderId() { return orderId; }
    public String getUserId() { return userId; }
    public List<OrderItem> getItems() { return items; }
    public BigDecimal getTotalAmount() { return totalAmount; }
    public LocalDateTime getTimestamp() { return timestamp; }
}

4. 命令处理器

// OrderCommandHandler.java
@Component
@RequiredArgsConstructor
public class OrderCommandHandler {
    
    private final OrderRepository orderRepository;
    private final EventPublisher eventPublisher;
    
    @Transactional
    public String handle(CreateOrderCommand command) {
        // 创建聚合根
        OrderAggregate order = new OrderAggregate(
            command.getOrderId(),
            command.getUserId(),
            command.getItems(),
            command.getTotalAmount()
        );
        
        // 保存
        orderRepository.save(order);
        
        // 发布事件(异步)
        eventPublisher.publishAsync(order.getEvents());
        
        return order.getOrderId();
    }
    
    @Transactional
    public void handle(PayOrderCommand command) {
        OrderAggregate order = orderRepository.findById(command.getOrderId())
            .orElseThrow(() -> new OrderNotFoundException(command.getOrderId()));
        
        order.pay();
        orderRepository.save(order);
        
        eventPublisher.publishAsync(order.getEvents());
    }
}

5. 查询处理器

// OrderQueryHandler.java
@Component
@RequiredArgsConstructor
public class OrderQueryHandler {
    
    private final OrderReadRepository orderReadRepository;
    
    public OrderDetailVO handle(GetOrderQuery query) {
        return orderReadRepository.findDetailById(query.getOrderId())
            .orElseThrow(() -> new OrderNotFoundException(query.getOrderId()));
    }
    
    public Page<OrderListItemVO> handle(ListOrdersQuery query) {
        return orderReadRepository.findListByUserId(
            query.getUserId(),
            PageRequest.of(query.getPage(), query.getSize())
        );
    }
    
    public OrderStatisticsVO handle(GetOrderStatisticsQuery query) {
        return orderReadRepository.findStatisticsByUserId(query.getUserId());
    }
}

6. 读端 Repository

// OrderReadRepository.java
@Repository
public interface OrderReadRepository extends JpaRepository<OrderReadModel, String> {
    
    @Query("""
        SELECT new com.example.query.dto.OrderDetailVO(
            o.orderId, o.userId, o.items, o.totalAmount, 
            o.status, o.createdAt, o.updatedAt
        )
        FROM OrderReadModel o
        WHERE o.orderId = :orderId
        """)
    Optional<OrderDetailVO> findDetailById(@Param("orderId") String orderId);
    
    @Query("""
        SELECT new com.example.query.dto.OrderListItemVO(
            o.orderId, o.totalAmount, o.status, o.createdAt
        )
        FROM OrderReadModel o
        WHERE o.userId = :userId
        ORDER BY o.createdAt DESC
        """)
    Page<OrderListItemVO> findListByUserId(
        @Param("userId") String userId,
        Pageable pageable
    );
    
    @Query("""
        SELECT new com.example.query.dto.OrderStatisticsVO(
            COUNT(o), SUM(o.totalAmount), AVG(o.totalAmount)
        )
        FROM OrderReadModel o
        WHERE o.userId = :userId
        """)
    OrderStatisticsVO findStatisticsByUserId(@Param("userId") String userId);
}

7. 事件处理器(同步读写数据)

// OrderEventHandler.java
@Component
@RequiredArgsConstructor
public class OrderEventHandler {
    
    private final OrderReadRepository orderReadRepository;
    
    @Transactional
    @EventListener
    public void on(OrderCreatedEvent event) {
        OrderReadModel readModel = new OrderReadModel();
        readModel.setOrderId(event.getOrderId());
        readModel.setUserId(event.getUserId());
        readModel.setItems(event.getItems());
        readModel.setTotalAmount(event.getTotalAmount());
        readModel.setStatus(OrderStatus.CREATED);
        readModel.setCreatedAt(event.getTimestamp());
        
        orderReadRepository.save(readModel);
    }
    
    @Transactional
    @EventListener
    public void on(OrderPaidEvent event) {
        OrderReadModel order = orderReadRepository.findById(event.getOrderId())
            .orElseThrow();
        order.setStatus(OrderStatus.PAID);
        order.setUpdatedAt(LocalDateTime.now());
        orderReadRepository.save(order);
    }
    
    @Transactional
    @EventListener
    public void on(OrderShippedEvent event) {
        OrderReadModel order = orderReadRepository.findById(event.getOrderId())
            .orElseThrow();
        order.setStatus(OrderStatus.SHIPPED);
        order.setUpdatedAt(LocalDateTime.now());
        orderReadRepository.save(order);
    }
}

五、读写数据库分离

配置多数据源

# application.yml
spring:
  datasource:
    write:
      url: jdbc:mysql://localhost:3306/order_write
      username: root
      password: password
    read:
      url: jdbc:mysql://localhost:3306/order_read
      username: root
      password: password
// DataSourceConfig.java
@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties("spring.datasource.write")
    public DataSource writeDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties("spring.datasource.read")
    public DataSource readDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    public JpaTransactionManager writeTransactionManager() {
        return new JpaTransactionManager(writeDataSource());
    }
}

读写路由

// RoutingDataSource.java
public class RoutingDataSource extends AbstractRoutingDataSource {
    
    private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
    
    public static void setWrite() {
        CONTEXT_HOLDER.set("write");
    }
    
    public static void setRead() {
        CONTEXT_HOLDER.set("read");
    }
    
    public static void clear() {
        CONTEXT_HOLDER.remove();
    }
    
    @Override
    protected Object determineCurrentLookupKey() {
        return CONTEXT_HOLDER.get();
    }
}

六、性能优化

1. 查询缓存

// OrderQueryHandler.java
@Component
@RequiredArgsConstructor
public class OrderQueryHandler {
    
    private final OrderReadRepository orderReadRepository;
    private final CacheManager cacheManager;
    
    @Cacheable(value = "orders", key = "#query.orderId")
    public OrderDetailVO handle(GetOrderQuery query) {
        return orderReadRepository.findDetailById(query.getOrderId())
            .orElseThrow(() -> new OrderNotFoundException(query.getOrderId()));
    }
    
    @Cacheable(value = "orderList", key = "#query.userId + ':' + #query.page")
    public Page<OrderListItemVO> handle(ListOrdersQuery query) {
        return orderReadRepository.findListByUserId(
            query.getUserId(),
            PageRequest.of(query.getPage(), query.getSize())
        );
    }
}

2. 读库使用 Elasticsearch

// OrderElasticsearchRepository.java
@Repository
public interface OrderElasticsearchRepository 
    extends ElasticsearchRepository<OrderDocument, String> {
    
    Page<OrderDocument> findByUserId(String userId, Pageable pageable);
    
    @Query("""
        {
          "bool": {
            "must": [
              { "term": { "userId": "?0" } },
              { "range": { "createdAt": { "gte": "?1", "lte": "?2" } } }
            ]
          }
        }
        """)
    Page<OrderDocument> findByUserIdAndDateRange(
        String userId, LocalDateTime from, LocalDateTime to, Pageable pageable
    );
}

3. 异步事件处理

// EventPublisher.java
@Component
public class EventPublisher {
    
    private final ApplicationEventPublisher eventPublisher;
    private final Executor executor;
    
    public void publishAsync(List<DomainEvent> events) {
        events.forEach(event -> 
            executor.execute(() -> eventPublisher.publishEvent(event))
        );
    }
}

七、CQRS + 事件溯源

事件存储

// EventStore.java
@Repository
public class EventStore {
    
    @PersistenceContext
    private EntityManager entityManager;
    
    public void append(String aggregateId, List<DomainEvent> events) {
        for (DomainEvent event : events) {
            EventEntry entry = new EventEntry();
            entry.setEventId(UUID.randomUUID().toString());
            entry.setAggregateId(aggregateId);
            entry.setEventType(event.getClass().getName());
            entry.setEventData(toJson(event));
            entry.setTimestamp(LocalDateTime.now());
            
            entityManager.persist(entry);
        }
    }
    
    public List<DomainEvent> read(String aggregateId) {
        List<EventEntry> entries = entityManager.createQuery(
            "SELECT e FROM EventEntry e WHERE e.aggregateId = :aggregateId ORDER BY e.id",
            EventEntry.class
        )
        .setParameter("aggregateId", aggregateId)
        .getResultList();
        
        return entries.stream()
            .map(this::fromEntry)
            .collect(Collectors.toList());
    }
}

聚合根重建

// OrderAggregate.java
public static OrderAggregate reconstitute(List<DomainEvent> events) {
    OrderAggregate order = new OrderAggregate();
    
    for (DomainEvent event : events) {
        if (event instanceof OrderCreatedEvent) {
            OrderCreatedEvent e = (OrderCreatedEvent) event;
            order.orderId = e.getOrderId();
            order.userId = e.getUserId();
            order.items = e.getItems();
            order.totalAmount = e.getTotalAmount();
            order.status = OrderStatus.CREATED;
        } else if (event instanceof OrderPaidEvent) {
            order.status = OrderStatus.PAID;
        } else if (event instanceof OrderShippedEvent) {
            order.status = OrderStatus.SHIPPED;
        }
    }
    
    return order;
}

八、实战案例:电商订单系统

业务场景

性能对比

指标CRUD 架构CQRS 架构
订单详情查询150ms20ms
订单列表查询300ms50ms
销售统计查询2000ms100ms
创建订单 TPS500800
查询 QPS200010000

架构收益

  1. 查询性能提升 5-10 倍
  2. 写操作 TPS 提升 60%
  3. 支持独立扩展
  4. 代码结构更清晰

九、常见问题

Q1: 读写数据不一致怎么办?

A: CQRS 中读写分离会带来最终一致性问题:

写操作 → 事件发布 → 事件处理 → 读库更新

延迟:通常 < 1 秒

解决方案:

  1. 业务层接受最终一致性(推荐)
  2. 关键查询走写库(降级方案)
  3. 前端 loading 状态(用户体验)

Q2: 事件处理器失败怎么办?

A: 需要事件重试机制

@EventListener
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 1000))
public void on(OrderCreatedEvent event) {
    // 处理事件
}

Q3: 如何保证事件不丢失?

A: 使用事务性发件箱模式

@Transactional
public void handle(CreateOrderCommand command) {
    OrderAggregate order = new OrderAggregate(...);
    orderRepository.save(order);
    
    // 事件与订单在同一事务中保存
    eventStore.append(order.getOrderId(), order.getEvents());
}

十、总结

CQRS 优缺点

优点

缺点

实施建议

  1. 从简单开始

    • 先分离读写模型
    • 逐步引入事件驱动
  2. 选择合适的场景

    • 读写负载不均衡
    • 查询性能要求高
  3. 接受最终一致性

    • 业务层设计考虑延迟
    • 前端做好用户体验
  4. 监控和告警

    • 监控事件处理延迟
    • 设置合理的告警阈值

参考资料

  1. Martin Fowler - CQRS
  2. Greg Young - CQRS Documents
  3. Axon Framework 官方文档
  4. 《实现领域驱动设计》

分享这篇文章到:

上一篇文章
整洁架构实战:构建可维护的 Java 应用
下一篇文章
领域驱动设计实战