事件驱动架构
事件驱动基础
核心概念
事件(Event):
- 系统中发生的有意义的事情
- 不可变的,表示过去发生的事情
- 通常包含时间戳和事件数据
事件生产者(Event Producer):
- 产生事件的服务
- 不关心事件的消费者
- 只负责发布事件
事件消费者(Event Consumer):
- 消费事件的服务
- 订阅感兴趣的事件
- 处理事件并产生业务逻辑
事件通道(Event Channel):
- 事件传输的媒介
- 消息队列、事件总线等
- 支持发布订阅模式
架构模式
┌─────────────┐ ┌─────────────┐
│ 事件生产者 │ │ 事件消费者 │
│ Service A │ │ Service B │
└──────┬──────┘ └──────▲──────┘
│ │
│ 发布事件 │ 消费事件
▼ │
┌─────────────────────────────────┐
│ 事件通道 │
│ (Message Queue / Event Bus) │
└─────────────────────────────────┘
│ │
▼ │
┌─────────────┐ ┌─────────────┐
│ 事件消费者 │ │ 事件消费者 │
│ Service C │ │ Service D │
└─────────────┘ └─────────────┘
事件类型
领域事件(Domain Event):
- 业务领域内发生的事件
- 如:OrderCreated, PaymentCompleted
集成事件(Integration Event):
- 跨服务通信的事件
- 如:UserRegistered, OrderShipped
系统事件(System Event):
- 系统层面的事件
- 如:ServiceStarted, ConfigChanged
Spring 事件机制
1. 应用事件
定义事件:
public class OrderCreatedEvent extends ApplicationEvent {
private final Order order;
public OrderCreatedEvent(Object source, Order order) {
super(source);
this.order = order;
}
public Order getOrder() {
return order;
}
}
发布事件:
@Service
public class OrderService {
@Autowired
private ApplicationEventPublisher eventPublisher;
public Order createOrder(CreateOrderRequest request) {
// 创建订单
Order order = orderRepository.save(request.toOrder());
// 发布事件
eventPublisher.publishEvent(new OrderCreatedEvent(this, order));
return order;
}
}
监听事件:
@Component
public class OrderEventListener {
@EventListener
@Async
public void handleOrderCreated(OrderCreatedEvent event) {
Order order = event.getOrder();
log.info("收到订单创建事件:{}", order.getId());
// 发送通知邮件
sendEmail(order);
// 扣减库存
decreaseInventory(order);
}
private void sendEmail(Order order) {
// 发送邮件逻辑
}
private void decreaseInventory(Order order) {
// 扣减库存逻辑
}
}
条件监听:
@Component
public class ConditionalEventListener {
@EventListener(condition = "#event.order.amount > 1000")
public void handleLargeOrder(OrderCreatedEvent event) {
// 只处理大额订单
log.info("大额订单:{}", event.getOrder().getId());
}
}
多个事件监听:
@EventListener
public void handleEvents(ApplicationEvent event) {
if (event instanceof OrderCreatedEvent) {
// 处理订单创建
} else if (event instanceof OrderPaidEvent) {
// 处理订单支付
}
}
2. 事务事件
事务事件监听:
@Component
public class TransactionalEventListener {
// 事务提交前执行
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void handleBeforeCommit(OrderCreatedEvent event) {
log.info("事务提交前:{}", event.getOrder().getId());
}
// 事务提交后执行
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleAfterCommit(OrderCreatedEvent event) {
log.info("事务提交后:{}", event.getOrder().getId());
// 发送消息到 MQ
kafkaTemplate.send("order-created", event.getOrder());
}
// 事务回滚后执行
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void handleAfterRollback(OrderCreatedEvent event) {
log.warn("事务回滚:{}", event.getOrder().getId());
// 清理资源
cleanup(event.getOrder());
}
}
基于 MQ 的事件驱动
1. 事件发布
定义事件:
@Data
@Builder
public class OrderCreatedEvent {
private String eventId;
private String eventType;
private LocalDateTime timestamp;
private OrderData data;
@Data
public static class OrderData {
private Long orderId;
private Long userId;
private BigDecimal amount;
private List<OrderItem> items;
}
}
发布事件:
@Service
public class OrderEventPublisher {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 使用 Kafka 发布
public void publishWithKafka(OrderCreatedEvent event) {
event.setEventId(UUID.randomUUID().toString());
event.setEventType("ORDER_CREATED");
event.setTimestamp(LocalDateTime.now());
kafkaTemplate.send(
"order-events",
event.getEventId(),
event
);
log.info("Kafka 事件已发布:{}", event.getEventId());
}
// 使用 RocketMQ 发布
public void publishWithRocketMQ(OrderCreatedEvent event) {
event.setEventId(UUID.randomUUID().toString());
event.setEventType("ORDER_CREATED");
event.setTimestamp(LocalDateTime.now());
rocketMQTemplate.convertAndSend(
"TOPIC_ORDER_EVENTS:ORDER_CREATED",
event
);
log.info("RocketMQ 事件已发布:{}", event.getEventId());
}
}
2. 事件消费
事件消费者:
@Component
public class OrderEventConsumer {
@KafkaListener(
topics = "order-events",
groupId = "order-event-consumer-group"
)
public void consumeOrderEvent(OrderCreatedEvent event) {
log.info("收到订单事件:{}", event.getEventId());
try {
// 处理事件
handleEvent(event);
} catch (Exception e) {
log.error("处理事件失败", e);
// 发送死信
sendToDeadLetterQueue(event, e);
}
}
private void handleEvent(OrderCreatedEvent event) {
switch (event.getEventType()) {
case "ORDER_CREATED":
handleOrderCreated(event);
break;
case "ORDER_PAID":
handleOrderPaid(event);
break;
case "ORDER_SHIPPED":
handleOrderShipped(event);
break;
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
OrderData data = event.getData();
// 发送通知
notificationService.sendOrderCreated(data.getOrderId());
// 扣减库存
inventoryService.decrease(data.getItems());
// 更新统计
analyticsService.recordOrder(data);
}
}
事件过滤:
@Component
public class FilteredEventConsumer {
@KafkaListener(
topics = "order-events",
groupId = "large-order-consumer-group",
containerFactory = "filterFactory"
)
public void consumeLargeOrder(OrderCreatedEvent event) {
// 只处理大额订单
if (event.getData().getAmount().compareTo(new BigDecimal("1000")) > 0) {
log.info("大额订单事件:{}", event.getEventId());
handleLargeOrder(event);
}
}
}
3. 事件溯源
事件存储:
@Entity
@Table(name = "event_store")
public class EventStore {
@Id
@GeneratedValue
private Long id;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String eventType;
@Column(nullable = false)
@Lob
private String eventData;
@Column(nullable = false)
private Integer version;
@Column(nullable = false)
private LocalDateTime timestamp;
}
事件存储库:
@Repository
public class EventStoreRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
public void save(String aggregateId, String aggregateType,
String eventType, Object eventData, int version) {
String sql = """
INSERT INTO event_store
(aggregate_id, aggregate_type, event_type, event_data, version, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
""";
jdbcTemplate.update(sql,
aggregateId,
aggregateType,
eventType,
JSON.toJSONString(eventData),
version,
LocalDateTime.now()
);
}
public List<EventStore> findByAggregateId(String aggregateId) {
String sql = "SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version";
return jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(EventStore.class), aggregateId);
}
}
聚合根重建:
public class Order {
private Long id;
private Long userId;
private BigDecimal amount;
private OrderStatus status;
private List<OrderItem> items;
// 从事件重建
public static Order reconstruct(List<EventStore> events) {
Order order = new Order();
for (EventStore eventStore : events) {
switch (eventStore.getEventType()) {
case "OrderCreated":
order.apply((OrderCreated) parseEvent(eventStore));
break;
case "OrderPaid":
order.apply((OrderPaid) parseEvent(eventStore));
break;
case "OrderShipped":
order.apply((OrderShipped) parseEvent(eventStore));
break;
}
}
return order;
}
private void apply(OrderCreated event) {
this.id = event.getOrderId();
this.userId = event.getUserId();
this.amount = event.getAmount();
this.items = event.getItems();
this.status = OrderStatus.CREATED;
}
private void apply(OrderPaid event) {
this.status = OrderStatus.PAID;
}
private void apply(OrderShipped event) {
this.status = OrderStatus.SHIPPED;
}
}
CQRS 模式
1. 命令端
命令定义:
public class CreateOrderCommand {
private final Long userId;
private final List<OrderItem> items;
private final String address;
// constructor, getters
}
命令处理器:
@Component
public class CreateOrderCommandHandler {
@Autowired
private OrderRepository orderRepository;
@Autowired
private ApplicationEventPublisher eventPublisher;
@Transactional
public Order handle(CreateOrderCommand command) {
// 创建订单
Order order = new Order();
order.setUserId(command.getUserId());
order.setItems(command.getItems());
order.setAddress(command.getAddress());
order.setStatus(OrderStatus.CREATED);
orderRepository.save(order);
// 发布事件
eventPublisher.publishEvent(new OrderCreatedEvent(this, order));
return order;
}
}
2. 查询端
查询定义:
public class GetOrderQuery {
private final Long orderId;
// constructor, getter
}
查询处理器:
@Component
public class GetOrderQueryHandler {
@Autowired
private OrderReadRepository orderReadRepository;
public OrderDTO handle(GetOrderQuery query) {
return orderReadRepository.findById(query.getOrderId())
.map(this::toDTO)
.orElseThrow(() -> new ResourceNotFoundException("订单不存在"));
}
private OrderDTO toDTO(OrderReadModel model) {
return OrderDTO.builder()
.id(model.getId())
.userId(model.getUserId())
.amount(model.getAmount())
.status(model.getStatus())
.build();
}
}
读模型投影:
@Component
public class OrderProjection {
@Autowired
private OrderReadRepository orderReadRepository;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void on(OrderCreatedEvent event) {
Order order = event.getOrder();
OrderReadModel readModel = new OrderReadModel();
readModel.setId(order.getId());
readModel.setUserId(order.getUserId());
readModel.setAmount(order.getAmount());
readModel.setStatus(order.getStatus().name());
orderReadRepository.save(readModel);
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void on(OrderPaidEvent event) {
orderReadRepository.updateStatus(event.getOrderId(), "PAID");
}
}
事件编排
1. 流程编排
Saga 编排:
@Component
public class OrderSaga {
@Autowired
private OrderService orderService;
@Autowired
private PaymentService paymentService;
@Autowired
private InventoryService inventoryService;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@SagaStart
public void createOrder(CreateOrderRequest request) {
try {
// 1. 创建订单
Order order = orderService.create(request);
kafkaTemplate.send("order-created", order);
// 2. 扣减库存
inventoryService.decrease(order.getItems());
kafkaTemplate.send("inventory-decreased", order);
// 3. 发起支付
Payment payment = paymentService.pay(order);
kafkaTemplate.send("payment-completed", payment);
// 4. 订单完成
orderService.complete(order.getId());
kafkaTemplate.send("order-completed", order);
} catch (Exception e) {
// 补偿
compensate(request, e);
}
}
private void compensate(CreateOrderRequest request, Exception e) {
// 执行补偿操作
orderService.cancel(request.getOrderId());
inventoryService.increase(request.getItems());
kafkaTemplate.send("order-compensated", request);
}
}
2. 事件 choreography
Choreography 模式:
// 订单服务
@Component
public class OrderService {
@KafkaListener(topics = "payment-completed")
public void onPaymentCompleted(PaymentCompletedEvent event) {
// 支付完成,更新订单状态
orderService.updateStatus(event.getOrderId(), OrderStatus.PAID);
// 发布订单支付事件
kafkaTemplate.send("order-paid", new OrderPaidEvent(event.getOrderId()));
}
}
// 库存服务
@Component
public class InventoryService {
@KafkaListener(topics = "order-created")
public void onOrderCreated(OrderCreatedEvent event) {
// 扣减库存
inventoryService.decrease(event.getOrder().getItems());
// 发布库存扣减事件
kafkaTemplate.send("inventory-decreased", event.getOrder());
}
}
// 物流服务
@Component
public class ShippingService {
@KafkaListener(topics = "order-paid")
public void onOrderPaid(OrderPaidEvent event) {
// 订单已支付,安排发货
shippingService.schedule(event.getOrderId());
// 发布发货事件
kafkaTemplate.send("order-shipped", new OrderShippedEvent(event.getOrderId()));
}
}
最佳实践
1. 事件设计
事件命名:
- 使用过去时态:OrderCreated, PaymentCompleted
- 体现业务含义
- 避免技术术语
事件内容:
{
"eventId": "uuid",
"eventType": "OrderCreated",
"aggregateId": "order-123",
"aggregateType": "Order",
"version": 1,
"timestamp": "2026-04-10T10:00:00Z",
"data": {
"orderId": "123",
"userId": "456",
"amount": 100.00
},
"metadata": {
"correlationId": "corr-123",
"causationId": "cause-456"
}
}
2. 事件版本控制
版本管理:
public class OrderCreatedEventV2 extends OrderCreatedEvent {
private String couponCode; // 新增字段
// 向后兼容
public static OrderCreatedEvent migrate(OrderCreatedEventV2 v2) {
OrderCreatedEvent v1 = new OrderCreatedEvent();
v1.setOrderId(v2.getOrderId());
v1.setUserId(v2.getUserId());
// ...
return v1;
}
}
3. 事件处理
幂等性:
@Component
public class IdempotentEventConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@KafkaListener(topics = "order-events")
public void consume(OrderCreatedEvent event) {
String key = "event:processed:" + event.getEventId();
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(key, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(isNew)) {
log.warn("事件已处理:{}", event.getEventId());
return;
}
// 处理事件
handleEvent(event);
}
}
顺序保证:
spring:
kafka:
listener:
concurrency: 1 # 单线程保证顺序
ack-mode: record # 逐条确认
4. 错误处理
重试机制:
@Retryable(
value = Exception.class,
maxAttempts = 3,
backoff = @Backoff(delay = 1000)
)
@KafkaListener(topics = "order-events")
public void consume(OrderCreatedEvent event) {
handleEvent(event);
}
@Recover
public void recover(Exception e, OrderCreatedEvent event) {
// 重试失败处理
log.error("事件处理失败", e);
sendToDeadLetterQueue(event, e);
}
死信队列:
@Component
public class DeadLetterQueueHandler {
@KafkaListener(topics = "order-events-dlq")
public void handleDlq(ConsumerRecord<String, Object> record) {
// 记录失败事件
log.error("死信事件:topic={}, key={}, value={}",
record.topic(), record.key(), record.value()
);
// 告警
alertService.sendAlert("Kafka 死信事件", record.value());
// 存储到数据库,人工处理
failedEventRepository.save(toEntity(record));
}
}
总结
事件驱动架构通过事件进行服务间通信,实现服务解耦、异步处理、事件溯源等能力。
Spring 提供完善的事件机制,结合 Kafka、RocketMQ 等消息中间件,可以构建可靠的事件驱动系统。
在实际应用中,需要根据业务场景选择合适的模式(编排 vs 协作),并做好事件的版本控制、幂等处理和错误处理。