Skip to content
清晨的一缕阳光
返回

事件驱动架构设计

事件驱动架构设计

事件驱动架构(EDA)是构建松耦合、可扩展系统的核心模式。通过事件的发布和订阅,可以实现服务解耦、数据最终一致、业务灵活性。本文详解事件驱动架构的核心模式:Event Sourcing、CQRS、事件总线等。

一、事件驱动基础

1.1 核心概念

mindmap
  root((EDA 核心概念))
    事件 Event
      已经发生的事实
      不可变/有时间戳
      命名:过去时 OrderCreated
    命令 Command
      要求执行某个动作
      可被拒绝/有意图
      命名:祈使句 CreateOrder
    查询 Query
      请求读取数据
      无副作用
      命名:疑问句 GetOrder
    事件处理器 EventHandler
      订阅并处理事件
      异步/幂等
      业务逻辑/数据更新

1.2 架构模式对比

模式特点适用场景
请求 - 响应同步、紧耦合简单 CRUD
发布 - 订阅异步、松耦合事件通知
Event Sourcing事件即状态审计、回溯
CQRS读写分离高并发读

二、Event Sourcing

2.1 核心原理

graph TB
    subgraph Traditional[传统方式]
        T1[当前状态数据库 Order: id=1, status=PAID]
        T2[问题<br/>丢失历史/难以审计/难以回溯]
    end
    
    subgraph ES[Event Sourcing 方式]
        E1[事件流 Event Store]
        E1 --> E2[OrderCreated]
        E1 --> E3[ItemAdded]
        E1 --> E4[PaymentReceived]
        E1 --> E5[OrderShipped]
        E6[当前状态=所有事件累加]
        E7[优势<br/>完整历史/易于审计/可以回溯/支持多种视图]
    end

2.2 事件定义

/**
 * 领域事件基类
 */
public abstract class DomainEvent {
    private final String eventId;
    private final LocalDateTime occurredOn;
    private final int version;
    
    public DomainEvent() {
        this.eventId = UUID.randomUUID().toString();
        this.occurredOn = LocalDateTime.now();
        this.version = 1;
    }
    
    // Getter 方法
    public String getEventId() { return eventId; }
    public LocalDateTime getOccurredOn() { return occurredOn; }
    public int getVersion() { return version; }
}

/**
 * 订单创建事件
 */
public class OrderCreated extends DomainEvent {
    private final String orderId;
    private final Long userId;
    private final List<OrderItemEvent> items;
    private final Money totalAmount;
    
    public OrderCreated(String orderId, Long userId, List<OrderItemEvent> items, Money totalAmount) {
        this.orderId = orderId;
        this.userId = userId;
        this.items = items;
        this.totalAmount = totalAmount;
    }
    
    // Getter 方法
}

/**
 * 订单支付事件
 */
public class OrderPaid extends DomainEvent {
    private final String orderId;
    private final Money amount;
    private final String paymentMethod;
    private final String transactionId;
    
    public OrderPaid(String orderId, Money amount, String paymentMethod, String transactionId) {
        this.orderId = orderId;
        this.amount = amount;
        this.paymentMethod = paymentMethod;
        this.transactionId = transactionId;
    }
    
    // Getter 方法
}

/**
 * 订单取消事件
 */
public class OrderCancelled extends DomainEvent {
    private final String orderId;
    private final String reason;
    private final LocalDateTime cancelledAt;
    
    public OrderCancelled(String orderId, String reason) {
        this.orderId = orderId;
        this.reason = reason;
        this.cancelledAt = LocalDateTime.now();
    }
    
    // Getter 方法
}

2.3 事件存储

/**
 * 事件存储库
 */
public interface EventStore {
    /**
     * 追加事件
     */
    void appendEvents(String aggregateId, List<DomainEvent> events, int expectedVersion);
    
    /**
     * 读取事件流
     */
    List<DomainEvent> getEventsForAggregate(String aggregateId);
    
    /**
     * 读取事件流(从指定版本开始)
     */
    List<DomainEvent> getEventsForAggregate(String aggregateId, int fromVersion);
}

/**
 * 事件存储实现(基于数据库)
 */
@Repository
public class JpaEventStore implements EventStore {
    
    @Autowired
    private EventEntryRepository eventEntryRepository;
    
    @Override
    @Transactional
    public void appendEvents(String aggregateId, List<DomainEvent> events, int expectedVersion) {
        // 乐观锁检查
        int currentVersion = eventEntryRepository.getLastVersion(aggregateId);
        if (currentVersion != expectedVersion) {
            throw new ConcurrencyException("并发冲突");
        }
        
        // 保存事件
        for (DomainEvent event : events) {
            EventEntry entry = new EventEntry();
            entry.setAggregateId(aggregateId);
            entry.setEventType(event.getClass().getName());
            entry.setEventData(serialize(event));
            entry.setVersion(currentVersion + 1);
            entry.setOccurredOn(event.getOccurredOn());
            
            eventEntryRepository.save(entry);
        }
    }
    
    @Override
    public List<DomainEvent> getEventsForAggregate(String aggregateId) {
        List<EventEntry> entries = eventEntryRepository.findByAggregateIdOrderByVersion(aggregateId);
        
        return entries.stream()
            .map(this::deserialize)
            .collect(Collectors.toList());
    }
    
    private String serialize(DomainEvent event) {
        // JSON 序列化
        return JsonUtils.toJson(event);
    }
    
    private DomainEvent deserialize(EventEntry entry) {
        // JSON 反序列化
        return JsonUtils.fromJson(entry.getEventData(), getEventClass(entry.getEventType()));
    }
    
    private Class<?> getEventClass(String eventTypeName) {
        try {
            return Class.forName(eventTypeName);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("事件类不存在", e);
        }
    }
}

/**
 * 事件表结构
 */
@Entity
@Table(name = "t_event_store")
public class EventEntry {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "aggregate_id", length = 100)
    private String aggregateId;
    
    @Column(name = "event_type", length = 200)
    private String eventType;
    
    @Column(name = "event_data", columnDefinition = "TEXT")
    private String eventData;
    
    @Column(name = "version")
    private Integer version;
    
    @Column(name = "occurred_on")
    private LocalDateTime occurredOn;
    
    // Getter/Setter
}

-- 创建索引
CREATE INDEX idx_aggregate_id ON t_event_store(aggregate_id, version);

2.4 聚合根实现

/**
 * 聚合根基类
 */
public abstract class AggregateRoot {
    private String id;
    private int version;
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
    
    /**
     * 注册事件
     */
    protected void registerEvent(DomainEvent event) {
        uncommittedEvents.add(event);
        applyEvent(event);
    }
    
    /**
     * 应用事件(更新状态)
     */
    protected abstract void applyEvent(DomainEvent event);
    
    /**
     * 加载历史事件(重建状态)
     */
    public void loadFromHistory(List<DomainEvent> events) {
        for (DomainEvent event : events) {
            applyEvent(event);
            this.version = event.getVersion();
        }
    }
    
    /**
     * 获取未提交事件
     */
    public List<DomainEvent> getUncommittedEvents() {
        return new ArrayList<>(uncommittedEvents);
    }
    
    /**
     * 标记事件已提交
     */
    public void markEventsAsCommitted() {
        uncommittedEvents.clear();
    }
    
    // Getter/Setter
    public String getId() { return id; }
    public int getVersion() { return version; }
}

/**
 * 订单聚合根
 */
public class Order extends AggregateRoot {
    private Long userId;
    private OrderStatus status;
    private Money totalAmount;
    private List<OrderItem> items = new ArrayList<>();
    
    /**
     * 创建订单(工厂方法)
     */
    public static Order create(Long userId, List<OrderItem> items) {
        Order order = new Order();
        order.setId(IdGenerator.generate());
        
        // 注册创建事件
        order.registerEvent(new OrderCreated(
            order.getId(),
            userId,
            items.stream().map(OrderItemEvent::new).collect(Collectors.toList()),
            calculateTotal(items)
        ));
        
        return order;
    }
    
    /**
     * 支付订单
     */
    public void pay(String paymentMethod, String transactionId) {
        if (this.status != OrderStatus.UNPAID) {
            throw new DomainException("只有未支付订单才能支付");
        }
        
        registerEvent(new OrderPaid(getId(), totalAmount, paymentMethod, transactionId));
    }
    
    /**
     * 取消订单
     */
    public void cancel(String reason) {
        if (this.status != OrderStatus.UNPAID) {
            throw new DomainException("只有未支付订单才能取消");
        }
        
        registerEvent(new OrderCancelled(getId(), reason));
    }
    
    /**
     * 应用事件
     */
    @Override
    protected void applyEvent(DomainEvent event) {
        if (event instanceof OrderCreated) {
            apply((OrderCreated) event);
        } else if (event instanceof OrderPaid) {
            apply((OrderPaid) event);
        } else if (event instanceof OrderCancelled) {
            apply((OrderCancelled) event);
        }
    }
    
    private void apply(OrderCreated event) {
        this.setId(event.getOrderId());
        this.userId = event.getUserId();
        this.items = event.getItems().stream()
            .map(OrderItem::fromEvent)
            .collect(Collectors.toList());
        this.totalAmount = event.getTotalAmount();
        this.status = OrderStatus.UNPAID;
    }
    
    private void apply(OrderPaid event) {
        this.status = OrderStatus.PAID;
    }
    
    private void apply(OrderCancelled event) {
        this.status = OrderStatus.CANCELLED;
    }
    
    private static Money calculateTotal(List<OrderItem> items) {
        return items.stream()
            .map(OrderItem::getSubtotal)
            .reduce(Money.ZERO, Money::add);
    }
    
    // Getter 方法
}

三、CQRS

3.1 核心原理

graph TB
    Client[Client] --> Cmd[Command Side<br/>写操作/更新状态]
    Client --> Qry[Query Side<br/>读操作/查询状态]
    
    Cmd --> ES[EventStore<br/>事件源]
    Qry --> RM[ReadModel<br/>投影]
    
    ES -->|事件发布 | RM
    
    style Cmd fill:#e1f5fe
    style Qry fill:#f3e5f5

特点:

3.2 命令端实现

/**
 * 命令基类
 */
public abstract class Command {
    private final String commandId;
    private final LocalDateTime timestamp;
    
    public Command() {
        this.commandId = UUID.randomUUID().toString();
        this.timestamp = LocalDateTime.now();
    }
    
    // Getter 方法
}

/**
 * 创建订单命令
 */
public class CreateOrderCommand extends Command {
    private final Long userId;
    private final List<OrderItemRequest> items;
    private final Address shippingAddress;
    
    // Constructor + Getter
}

/**
 * 命令处理器
 */
public interface CommandHandler<C extends Command, R> {
    R handle(C command);
}

/**
 * 创建订单命令处理器
 */
@Component
public class CreateOrderCommandHandler implements CommandHandler<CreateOrderCommand, String> {
    
    @Autowired
    private EventStore eventStore;
    
    @Autowired
    private MessagePublisher messagePublisher;
    
    @Override
    @Transactional
    public String handle(CreateOrderCommand command) {
        // 1. 创建聚合根
        Order order = Order.create(
            command.getUserId(),
            command.getItems().stream()
                .map(item -> new OrderItem(item.getProductId(), item.getQuantity()))
                .collect(Collectors.toList())
        );
        
        // 2. 保存事件
        List<DomainEvent> events = order.getUncommittedEvents();
        eventStore.appendEvents(order.getId(), events, 0);
        
        // 3. 发布事件(异步更新读模型)
        for (DomainEvent event : events) {
            messagePublisher.publish(event);
        }
        
        // 4. 标记事件已提交
        order.markEventsAsCommitted();
        
        return order.getId();
    }
}

/**
 * 命令总线
 */
@Component
public class CommandBus {
    
    private final Map<Class<?>, CommandHandler> handlers = new ConcurrentHashMap<>();
    
    /**
     * 注册处理器
     */
    public <C extends Command, R> void register(Class<C> commandType, CommandHandler<C, R> handler) {
        handlers.put(commandType, handler);
    }
    
    /**
     * 发送命令
     */
    @SuppressWarnings("unchecked")
    public <C extends Command, R> R send(C command) {
        CommandHandler<C, ?> handler = handlers.get(command.getClass());
        if (handler == null) {
            throw new IllegalArgumentException("未找到命令处理器:" + command.getClass());
        }
        return (R) handler.handle(command);
    }
}

3.3 查询端实现

/**
 * 读模型:订单视图
 */
@Entity
@Table(name = "v_order")
public class OrderView {
    
    @Id
    private String id;
    
    private Long userId;
    
    private String status;
    
    private BigDecimal totalAmount;
    
    private LocalDateTime createdAt;
    
    private LocalDateTime paidAt;
    
    // Getter/Setter
}

/**
 * 查询服务
 */
@Service
public class OrderQueryService {
    
    @Autowired
    private OrderViewRepository orderViewRepository;
    
    /**
     * 获取订单详情
     */
    @Transactional(readOnly = true)
    public OrderView getOrderById(String orderId) {
        return orderViewRepository.findById(orderId)
            .orElseThrow(() -> new NotFoundException("订单不存在"));
    }
    
    /**
     * 获取用户订单列表
     */
    @Transactional(readOnly = true)
    public Page<OrderView> getUserOrders(Long userId, Pageable pageable) {
        return orderViewRepository.findByUserId(userId, pageable);
    }
    
    /**
     * 搜索订单
     */
    @Transactional(readOnly = true)
    public Page<OrderView> searchOrders(OrderSearchCriteria criteria, Pageable pageable) {
        return orderViewRepository.search(criteria, pageable);
    }
}

/**
 * 事件处理器(更新读模型)
 */
@Component
public class OrderEventHandlers {
    
    @Autowired
    private OrderViewRepository orderViewRepository;
    
    /**
     * 处理订单创建事件
     */
    @EventHandler
    @Transactional
    public void handle(OrderCreated event) {
        OrderView view = new OrderView();
        view.setId(event.getOrderId());
        view.setUserId(event.getUserId());
        view.setStatus(OrderStatus.UNPAID.name());
        view.setTotalAmount(event.getTotalAmount().getAmount());
        view.setCreatedAt(event.getOccurredOn());
        
        orderViewRepository.save(view);
    }
    
    /**
     * 处理订单支付事件
     */
    @EventHandler
    @Transactional
    public void handle(OrderPaid event) {
        OrderView view = orderViewRepository.findById(event.getOrderId())
            .orElseThrow(() -> new NotFoundException("订单不存在"));
        
        view.setStatus(OrderStatus.PAID.name());
        view.setPaidAt(event.getOccurredOn());
        
        orderViewRepository.save(view);
    }
    
    /**
     * 处理订单取消事件
     */
    @EventHandler
    @Transactional
    public void handle(OrderCancelled event) {
        OrderView view = orderViewRepository.findById(event.getOrderId())
            .orElseThrow(() -> new NotFoundException("订单不存在"));
        
        view.setStatus(OrderStatus.CANCELLED.name());
        
        orderViewRepository.save(view);
    }
}

3.4 投影重建

/**
 * 投影重建服务
 */
@Service
public class ProjectionRebuilder {
    
    @Autowired
    private EventStore eventStore;
    
    @Autowired
    private ApplicationContext applicationContext;
    
    /**
     * 重建所有投影
     */
    @Transactional
    public void rebuildAllProjections() {
        // 1. 清空读模型
        orderViewRepository.deleteAll();
        
        // 2. 获取所有聚合 ID
        List<String> aggregateIds = eventStore.getAllAggregateIds();
        
        // 3. 逐个重建
        for (String aggregateId : aggregateIds) {
            rebuildProjection(aggregateId);
        }
    }
    
    /**
     * 重建单个聚合的投影
     */
    @Transactional
    public void rebuildProjection(String aggregateId) {
        // 1. 读取所有事件
        List<DomainEvent> events = eventStore.getEventsForAggregate(aggregateId);
        
        // 2. 依次应用事件
        for (DomainEvent event : events) {
            publishEvent(event);
        }
    }
    
    private void publishEvent(DomainEvent event) {
        // 发布事件,触发投影更新
        applicationContext.publishEvent(event);
    }
    
    @Autowired
    private OrderViewRepository orderViewRepository;
}

四、事件总线

4.1 内存事件总线

/**
 * 内存事件总线(简单场景)
 */
@Component
public class InMemoryEventBus {
    
    private final List<EventHandler> handlers = new CopyOnWriteArrayList<>();
    
    /**
     * 订阅事件
     */
    public void subscribe(EventHandler handler) {
        handlers.add(handler);
    }
    
    /**
     * 发布事件
     */
    public void publish(DomainEvent event) {
        for (EventHandler handler : handlers) {
            if (handler.supports(event.getClass())) {
                try {
                    handler.handle(event);
                } catch (Exception e) {
                    log.error("事件处理失败", e);
                }
            }
        }
    }
    
    public interface EventHandler {
        boolean supports(Class<?> eventType);
        void handle(DomainEvent event);
    }
}

4.2 基于消息队列的事件总线

/**
 * 基于 RocketMQ 的事件总线
 */
@Component
public class RocketMQEventBus {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发布事件
     */
    public void publish(DomainEvent event) {
        String topic = getTopicForEvent(event.getClass());
        
        EventMessage message = new EventMessage();
        message.setEventType(event.getClass().getName());
        message.setEventData(JsonUtils.toJson(event));
        message.setOccurredOn(event.getOccurredOn());
        
        rocketMQTemplate.send(topic, message);
    }
    
    private String getTopicForEvent(Class<?> eventType) {
        // 根据事件类型映射到 Topic
        if (eventType == OrderCreated.class) {
            return "order-created-topic";
        } else if (eventType == OrderPaid.class) {
            return "order-paid-topic";
        }
        return "default-topic";
    }
}

/**
 * 事件消费者
 */
@Component
public class EventConsumer {
    
    @Autowired
    private ApplicationContext applicationContext;
    
    @RocketMQMessageListener(
        topic = "order-created-topic",
        consumerGroup = "event-consumer-group"
    )
    public class OrderCreatedConsumer implements RocketMQListener<EventMessage> {
        
        @Override
        public void onMessage(EventMessage message) {
            DomainEvent event = deserialize(message);
            applicationContext.publishEvent(event);
        }
    }
    
    private DomainEvent deserialize(EventMessage message) {
        // JSON 反序列化
        return JsonUtils.fromJson(message.getEventData(), getEventClass(message.getEventType()));
    }
    
    private Class<?> getEventClass(String eventTypeName) {
        try {
            return Class.forName(eventTypeName);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("事件类不存在", e);
        }
    }
}

五、实战案例

5.1 订单系统完整实现

订单系统事件流

创建订单
├── 命令:CreateOrderCommand
├── 事件:OrderCreated
├── 投影更新:OrderView
└── 下游:库存扣减、优惠券使用

支付订单
├── 命令:PayOrderCommand
├── 事件:OrderPaid
├── 投影更新:OrderView(状态变更)
└── 下游:创建发货单、发送通知

取消订单
├── 命令:CancelOrderCommand
├── 事件:OrderCancelled
├── 投影更新:OrderView(状态变更)
└── 下游:库存恢复、退款

5.2 Saga 分布式事务

/**
 * Saga 协调器
 */
@Component
public class OrderSaga {
    
    @Autowired
    private MessagePublisher publisher;
    
    /**
     * 创建订单 Saga
     */
    @SagaStart
    public void createOrder(CreateOrderCommand command) {
        // 1. 创建订单
        String orderId = orderService.create(command);
        
        // 2. 扣减库存(异步)
        publisher.publish(new DeductStockCommand(orderId, command.getItems()));
        
        // 3. 使用优惠券(异步)
        if (command.getCouponId() != null) {
            publisher.publish(new UseCouponCommand(orderId, command.getCouponId()));
        }
    }
    
    /**
     * 库存扣减失败补偿
     */
    @SagaCompensation
    public void compensateStock(OrderCreated event) {
        // 取消订单
        orderService.cancel(event.getOrderId(), "库存不足");
    }
}

六、总结

6.1 核心要点

  1. Event Sourcing:事件即状态,完整历史
  2. CQRS:读写分离,独立扩展
  3. 事件总线:松耦合,异步通信
  4. Saga:分布式事务,最终一致
  5. 投影重建:灵活视图,易于扩展

6.2 适用场景

场景推荐模式
审计需求Event Sourcing
高并发读CQRS
服务解耦事件总线
分布式事务Saga

事件驱动架构不是银弹,复杂度较高。对于简单 CRUD 系统,传统架构可能更合适。


分享这篇文章到:

上一篇文章
Kubernetes 实战
下一篇文章
性能调优实战