事件驱动架构设计
事件驱动架构(EDA)是构建松耦合、可扩展系统的核心模式。通过事件的发布和订阅,可以实现服务解耦、数据最终一致、业务灵活性。本文详解事件驱动架构的核心模式:Event Sourcing、CQRS、事件总线等。
一、事件驱动基础
1.1 核心概念
mindmap
root((EDA 核心概念))
事件 Event
已经发生的事实
不可变/有时间戳
命名:过去时 OrderCreated
命令 Command
要求执行某个动作
可被拒绝/有意图
命名:祈使句 CreateOrder
查询 Query
请求读取数据
无副作用
命名:疑问句 GetOrder
事件处理器 EventHandler
订阅并处理事件
异步/幂等
业务逻辑/数据更新
1.2 架构模式对比
| 模式 | 特点 | 适用场景 |
|---|---|---|
| 请求 - 响应 | 同步、紧耦合 | 简单 CRUD |
| 发布 - 订阅 | 异步、松耦合 | 事件通知 |
| Event Sourcing | 事件即状态 | 审计、回溯 |
| CQRS | 读写分离 | 高并发读 |
二、Event Sourcing
2.1 核心原理
graph TB
subgraph Traditional[传统方式]
T1[当前状态数据库 Order: id=1, status=PAID]
T2[问题<br/>丢失历史/难以审计/难以回溯]
end
subgraph ES[Event Sourcing 方式]
E1[事件流 Event Store]
E1 --> E2[OrderCreated]
E1 --> E3[ItemAdded]
E1 --> E4[PaymentReceived]
E1 --> E5[OrderShipped]
E6[当前状态=所有事件累加]
E7[优势<br/>完整历史/易于审计/可以回溯/支持多种视图]
end
2.2 事件定义
/**
* 领域事件基类
*/
public abstract class DomainEvent {
private final String eventId;
private final LocalDateTime occurredOn;
private final int version;
public DomainEvent() {
this.eventId = UUID.randomUUID().toString();
this.occurredOn = LocalDateTime.now();
this.version = 1;
}
// Getter 方法
public String getEventId() { return eventId; }
public LocalDateTime getOccurredOn() { return occurredOn; }
public int getVersion() { return version; }
}
/**
* 订单创建事件
*/
public class OrderCreated extends DomainEvent {
private final String orderId;
private final Long userId;
private final List<OrderItemEvent> items;
private final Money totalAmount;
public OrderCreated(String orderId, Long userId, List<OrderItemEvent> items, Money totalAmount) {
this.orderId = orderId;
this.userId = userId;
this.items = items;
this.totalAmount = totalAmount;
}
// Getter 方法
}
/**
* 订单支付事件
*/
public class OrderPaid extends DomainEvent {
private final String orderId;
private final Money amount;
private final String paymentMethod;
private final String transactionId;
public OrderPaid(String orderId, Money amount, String paymentMethod, String transactionId) {
this.orderId = orderId;
this.amount = amount;
this.paymentMethod = paymentMethod;
this.transactionId = transactionId;
}
// Getter 方法
}
/**
* 订单取消事件
*/
public class OrderCancelled extends DomainEvent {
private final String orderId;
private final String reason;
private final LocalDateTime cancelledAt;
public OrderCancelled(String orderId, String reason) {
this.orderId = orderId;
this.reason = reason;
this.cancelledAt = LocalDateTime.now();
}
// Getter 方法
}
2.3 事件存储
/**
* 事件存储库
*/
public interface EventStore {
/**
* 追加事件
*/
void appendEvents(String aggregateId, List<DomainEvent> events, int expectedVersion);
/**
* 读取事件流
*/
List<DomainEvent> getEventsForAggregate(String aggregateId);
/**
* 读取事件流(从指定版本开始)
*/
List<DomainEvent> getEventsForAggregate(String aggregateId, int fromVersion);
}
/**
* 事件存储实现(基于数据库)
*/
@Repository
public class JpaEventStore implements EventStore {
@Autowired
private EventEntryRepository eventEntryRepository;
@Override
@Transactional
public void appendEvents(String aggregateId, List<DomainEvent> events, int expectedVersion) {
// 乐观锁检查
int currentVersion = eventEntryRepository.getLastVersion(aggregateId);
if (currentVersion != expectedVersion) {
throw new ConcurrencyException("并发冲突");
}
// 保存事件
for (DomainEvent event : events) {
EventEntry entry = new EventEntry();
entry.setAggregateId(aggregateId);
entry.setEventType(event.getClass().getName());
entry.setEventData(serialize(event));
entry.setVersion(currentVersion + 1);
entry.setOccurredOn(event.getOccurredOn());
eventEntryRepository.save(entry);
}
}
@Override
public List<DomainEvent> getEventsForAggregate(String aggregateId) {
List<EventEntry> entries = eventEntryRepository.findByAggregateIdOrderByVersion(aggregateId);
return entries.stream()
.map(this::deserialize)
.collect(Collectors.toList());
}
private String serialize(DomainEvent event) {
// JSON 序列化
return JsonUtils.toJson(event);
}
private DomainEvent deserialize(EventEntry entry) {
// JSON 反序列化
return JsonUtils.fromJson(entry.getEventData(), getEventClass(entry.getEventType()));
}
private Class<?> getEventClass(String eventTypeName) {
try {
return Class.forName(eventTypeName);
} catch (ClassNotFoundException e) {
throw new RuntimeException("事件类不存在", e);
}
}
}
/**
* 事件表结构
*/
@Entity
@Table(name = "t_event_store")
public class EventEntry {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "aggregate_id", length = 100)
private String aggregateId;
@Column(name = "event_type", length = 200)
private String eventType;
@Column(name = "event_data", columnDefinition = "TEXT")
private String eventData;
@Column(name = "version")
private Integer version;
@Column(name = "occurred_on")
private LocalDateTime occurredOn;
// Getter/Setter
}
-- 创建索引
CREATE INDEX idx_aggregate_id ON t_event_store(aggregate_id, version);
2.4 聚合根实现
/**
* 聚合根基类
*/
public abstract class AggregateRoot {
private String id;
private int version;
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
/**
* 注册事件
*/
protected void registerEvent(DomainEvent event) {
uncommittedEvents.add(event);
applyEvent(event);
}
/**
* 应用事件(更新状态)
*/
protected abstract void applyEvent(DomainEvent event);
/**
* 加载历史事件(重建状态)
*/
public void loadFromHistory(List<DomainEvent> events) {
for (DomainEvent event : events) {
applyEvent(event);
this.version = event.getVersion();
}
}
/**
* 获取未提交事件
*/
public List<DomainEvent> getUncommittedEvents() {
return new ArrayList<>(uncommittedEvents);
}
/**
* 标记事件已提交
*/
public void markEventsAsCommitted() {
uncommittedEvents.clear();
}
// Getter/Setter
public String getId() { return id; }
public int getVersion() { return version; }
}
/**
* 订单聚合根
*/
public class Order extends AggregateRoot {
private Long userId;
private OrderStatus status;
private Money totalAmount;
private List<OrderItem> items = new ArrayList<>();
/**
* 创建订单(工厂方法)
*/
public static Order create(Long userId, List<OrderItem> items) {
Order order = new Order();
order.setId(IdGenerator.generate());
// 注册创建事件
order.registerEvent(new OrderCreated(
order.getId(),
userId,
items.stream().map(OrderItemEvent::new).collect(Collectors.toList()),
calculateTotal(items)
));
return order;
}
/**
* 支付订单
*/
public void pay(String paymentMethod, String transactionId) {
if (this.status != OrderStatus.UNPAID) {
throw new DomainException("只有未支付订单才能支付");
}
registerEvent(new OrderPaid(getId(), totalAmount, paymentMethod, transactionId));
}
/**
* 取消订单
*/
public void cancel(String reason) {
if (this.status != OrderStatus.UNPAID) {
throw new DomainException("只有未支付订单才能取消");
}
registerEvent(new OrderCancelled(getId(), reason));
}
/**
* 应用事件
*/
@Override
protected void applyEvent(DomainEvent event) {
if (event instanceof OrderCreated) {
apply((OrderCreated) event);
} else if (event instanceof OrderPaid) {
apply((OrderPaid) event);
} else if (event instanceof OrderCancelled) {
apply((OrderCancelled) event);
}
}
private void apply(OrderCreated event) {
this.setId(event.getOrderId());
this.userId = event.getUserId();
this.items = event.getItems().stream()
.map(OrderItem::fromEvent)
.collect(Collectors.toList());
this.totalAmount = event.getTotalAmount();
this.status = OrderStatus.UNPAID;
}
private void apply(OrderPaid event) {
this.status = OrderStatus.PAID;
}
private void apply(OrderCancelled event) {
this.status = OrderStatus.CANCELLED;
}
private static Money calculateTotal(List<OrderItem> items) {
return items.stream()
.map(OrderItem::getSubtotal)
.reduce(Money.ZERO, Money::add);
}
// Getter 方法
}
三、CQRS
3.1 核心原理
graph TB
Client[Client] --> Cmd[Command Side<br/>写操作/更新状态]
Client --> Qry[Query Side<br/>读操作/查询状态]
Cmd --> ES[EventStore<br/>事件源]
Qry --> RM[ReadModel<br/>投影]
ES -->|事件发布 | RM
style Cmd fill:#e1f5fe
style Qry fill:#f3e5f5
特点:
- 读写分离:不同的数据模型
- 最终一致:读模型异步更新
- 可扩展:独立扩展读/写
- 灵活性:多种读视图
3.2 命令端实现
/**
* 命令基类
*/
public abstract class Command {
private final String commandId;
private final LocalDateTime timestamp;
public Command() {
this.commandId = UUID.randomUUID().toString();
this.timestamp = LocalDateTime.now();
}
// Getter 方法
}
/**
* 创建订单命令
*/
public class CreateOrderCommand extends Command {
private final Long userId;
private final List<OrderItemRequest> items;
private final Address shippingAddress;
// Constructor + Getter
}
/**
* 命令处理器
*/
public interface CommandHandler<C extends Command, R> {
R handle(C command);
}
/**
* 创建订单命令处理器
*/
@Component
public class CreateOrderCommandHandler implements CommandHandler<CreateOrderCommand, String> {
@Autowired
private EventStore eventStore;
@Autowired
private MessagePublisher messagePublisher;
@Override
@Transactional
public String handle(CreateOrderCommand command) {
// 1. 创建聚合根
Order order = Order.create(
command.getUserId(),
command.getItems().stream()
.map(item -> new OrderItem(item.getProductId(), item.getQuantity()))
.collect(Collectors.toList())
);
// 2. 保存事件
List<DomainEvent> events = order.getUncommittedEvents();
eventStore.appendEvents(order.getId(), events, 0);
// 3. 发布事件(异步更新读模型)
for (DomainEvent event : events) {
messagePublisher.publish(event);
}
// 4. 标记事件已提交
order.markEventsAsCommitted();
return order.getId();
}
}
/**
* 命令总线
*/
@Component
public class CommandBus {
private final Map<Class<?>, CommandHandler> handlers = new ConcurrentHashMap<>();
/**
* 注册处理器
*/
public <C extends Command, R> void register(Class<C> commandType, CommandHandler<C, R> handler) {
handlers.put(commandType, handler);
}
/**
* 发送命令
*/
@SuppressWarnings("unchecked")
public <C extends Command, R> R send(C command) {
CommandHandler<C, ?> handler = handlers.get(command.getClass());
if (handler == null) {
throw new IllegalArgumentException("未找到命令处理器:" + command.getClass());
}
return (R) handler.handle(command);
}
}
3.3 查询端实现
/**
* 读模型:订单视图
*/
@Entity
@Table(name = "v_order")
public class OrderView {
@Id
private String id;
private Long userId;
private String status;
private BigDecimal totalAmount;
private LocalDateTime createdAt;
private LocalDateTime paidAt;
// Getter/Setter
}
/**
* 查询服务
*/
@Service
public class OrderQueryService {
@Autowired
private OrderViewRepository orderViewRepository;
/**
* 获取订单详情
*/
@Transactional(readOnly = true)
public OrderView getOrderById(String orderId) {
return orderViewRepository.findById(orderId)
.orElseThrow(() -> new NotFoundException("订单不存在"));
}
/**
* 获取用户订单列表
*/
@Transactional(readOnly = true)
public Page<OrderView> getUserOrders(Long userId, Pageable pageable) {
return orderViewRepository.findByUserId(userId, pageable);
}
/**
* 搜索订单
*/
@Transactional(readOnly = true)
public Page<OrderView> searchOrders(OrderSearchCriteria criteria, Pageable pageable) {
return orderViewRepository.search(criteria, pageable);
}
}
/**
* 事件处理器(更新读模型)
*/
@Component
public class OrderEventHandlers {
@Autowired
private OrderViewRepository orderViewRepository;
/**
* 处理订单创建事件
*/
@EventHandler
@Transactional
public void handle(OrderCreated event) {
OrderView view = new OrderView();
view.setId(event.getOrderId());
view.setUserId(event.getUserId());
view.setStatus(OrderStatus.UNPAID.name());
view.setTotalAmount(event.getTotalAmount().getAmount());
view.setCreatedAt(event.getOccurredOn());
orderViewRepository.save(view);
}
/**
* 处理订单支付事件
*/
@EventHandler
@Transactional
public void handle(OrderPaid event) {
OrderView view = orderViewRepository.findById(event.getOrderId())
.orElseThrow(() -> new NotFoundException("订单不存在"));
view.setStatus(OrderStatus.PAID.name());
view.setPaidAt(event.getOccurredOn());
orderViewRepository.save(view);
}
/**
* 处理订单取消事件
*/
@EventHandler
@Transactional
public void handle(OrderCancelled event) {
OrderView view = orderViewRepository.findById(event.getOrderId())
.orElseThrow(() -> new NotFoundException("订单不存在"));
view.setStatus(OrderStatus.CANCELLED.name());
orderViewRepository.save(view);
}
}
3.4 投影重建
/**
* 投影重建服务
*/
@Service
public class ProjectionRebuilder {
@Autowired
private EventStore eventStore;
@Autowired
private ApplicationContext applicationContext;
/**
* 重建所有投影
*/
@Transactional
public void rebuildAllProjections() {
// 1. 清空读模型
orderViewRepository.deleteAll();
// 2. 获取所有聚合 ID
List<String> aggregateIds = eventStore.getAllAggregateIds();
// 3. 逐个重建
for (String aggregateId : aggregateIds) {
rebuildProjection(aggregateId);
}
}
/**
* 重建单个聚合的投影
*/
@Transactional
public void rebuildProjection(String aggregateId) {
// 1. 读取所有事件
List<DomainEvent> events = eventStore.getEventsForAggregate(aggregateId);
// 2. 依次应用事件
for (DomainEvent event : events) {
publishEvent(event);
}
}
private void publishEvent(DomainEvent event) {
// 发布事件,触发投影更新
applicationContext.publishEvent(event);
}
@Autowired
private OrderViewRepository orderViewRepository;
}
四、事件总线
4.1 内存事件总线
/**
* 内存事件总线(简单场景)
*/
@Component
public class InMemoryEventBus {
private final List<EventHandler> handlers = new CopyOnWriteArrayList<>();
/**
* 订阅事件
*/
public void subscribe(EventHandler handler) {
handlers.add(handler);
}
/**
* 发布事件
*/
public void publish(DomainEvent event) {
for (EventHandler handler : handlers) {
if (handler.supports(event.getClass())) {
try {
handler.handle(event);
} catch (Exception e) {
log.error("事件处理失败", e);
}
}
}
}
public interface EventHandler {
boolean supports(Class<?> eventType);
void handle(DomainEvent event);
}
}
4.2 基于消息队列的事件总线
/**
* 基于 RocketMQ 的事件总线
*/
@Component
public class RocketMQEventBus {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发布事件
*/
public void publish(DomainEvent event) {
String topic = getTopicForEvent(event.getClass());
EventMessage message = new EventMessage();
message.setEventType(event.getClass().getName());
message.setEventData(JsonUtils.toJson(event));
message.setOccurredOn(event.getOccurredOn());
rocketMQTemplate.send(topic, message);
}
private String getTopicForEvent(Class<?> eventType) {
// 根据事件类型映射到 Topic
if (eventType == OrderCreated.class) {
return "order-created-topic";
} else if (eventType == OrderPaid.class) {
return "order-paid-topic";
}
return "default-topic";
}
}
/**
* 事件消费者
*/
@Component
public class EventConsumer {
@Autowired
private ApplicationContext applicationContext;
@RocketMQMessageListener(
topic = "order-created-topic",
consumerGroup = "event-consumer-group"
)
public class OrderCreatedConsumer implements RocketMQListener<EventMessage> {
@Override
public void onMessage(EventMessage message) {
DomainEvent event = deserialize(message);
applicationContext.publishEvent(event);
}
}
private DomainEvent deserialize(EventMessage message) {
// JSON 反序列化
return JsonUtils.fromJson(message.getEventData(), getEventClass(message.getEventType()));
}
private Class<?> getEventClass(String eventTypeName) {
try {
return Class.forName(eventTypeName);
} catch (ClassNotFoundException e) {
throw new RuntimeException("事件类不存在", e);
}
}
}
五、实战案例
5.1 订单系统完整实现
订单系统事件流
创建订单
├── 命令:CreateOrderCommand
├── 事件:OrderCreated
├── 投影更新:OrderView
└── 下游:库存扣减、优惠券使用
支付订单
├── 命令:PayOrderCommand
├── 事件:OrderPaid
├── 投影更新:OrderView(状态变更)
└── 下游:创建发货单、发送通知
取消订单
├── 命令:CancelOrderCommand
├── 事件:OrderCancelled
├── 投影更新:OrderView(状态变更)
└── 下游:库存恢复、退款
5.2 Saga 分布式事务
/**
* Saga 协调器
*/
@Component
public class OrderSaga {
@Autowired
private MessagePublisher publisher;
/**
* 创建订单 Saga
*/
@SagaStart
public void createOrder(CreateOrderCommand command) {
// 1. 创建订单
String orderId = orderService.create(command);
// 2. 扣减库存(异步)
publisher.publish(new DeductStockCommand(orderId, command.getItems()));
// 3. 使用优惠券(异步)
if (command.getCouponId() != null) {
publisher.publish(new UseCouponCommand(orderId, command.getCouponId()));
}
}
/**
* 库存扣减失败补偿
*/
@SagaCompensation
public void compensateStock(OrderCreated event) {
// 取消订单
orderService.cancel(event.getOrderId(), "库存不足");
}
}
六、总结
6.1 核心要点
- Event Sourcing:事件即状态,完整历史
- CQRS:读写分离,独立扩展
- 事件总线:松耦合,异步通信
- Saga:分布式事务,最终一致
- 投影重建:灵活视图,易于扩展
6.2 适用场景
| 场景 | 推荐模式 |
|---|---|
| 审计需求 | Event Sourcing |
| 高并发读 | CQRS |
| 服务解耦 | 事件总线 |
| 分布式事务 | Saga |
事件驱动架构不是银弹,复杂度较高。对于简单 CRUD 系统,传统架构可能更合适。