Skip to content
清晨的一缕阳光
返回

事件驱动架构

事件驱动架构

事件驱动基础

核心概念

事件(Event)

事件生产者(Event Producer)

事件消费者(Event Consumer)

事件通道(Event Channel)

架构模式

┌─────────────┐      ┌─────────────┐
│  事件生产者  │      │  事件消费者  │
│   Service A │      │  Service B  │
└──────┬──────┘      └──────▲──────┘
       │                    │
       │ 发布事件            │ 消费事件
       ▼                    │
┌─────────────────────────────────┐
│         事件通道                 │
│   (Message Queue / Event Bus)   │
└─────────────────────────────────┘
       │                    │
       ▼                    │
┌─────────────┐      ┌─────────────┐
│  事件消费者  │      │  事件消费者  │
│  Service C  │      │  Service D  │
└─────────────┘      └─────────────┘

事件类型

领域事件(Domain Event)

集成事件(Integration Event)

系统事件(System Event)

Spring 事件机制

1. 应用事件

定义事件

public class OrderCreatedEvent extends ApplicationEvent {
    
    private final Order order;
    
    public OrderCreatedEvent(Object source, Order order) {
        super(source);
        this.order = order;
    }
    
    public Order getOrder() {
        return order;
    }
}

发布事件

@Service
public class OrderService {
    
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    
    public Order createOrder(CreateOrderRequest request) {
        // 创建订单
        Order order = orderRepository.save(request.toOrder());
        
        // 发布事件
        eventPublisher.publishEvent(new OrderCreatedEvent(this, order));
        
        return order;
    }
}

监听事件

@Component
public class OrderEventListener {
    
    @EventListener
    @Async
    public void handleOrderCreated(OrderCreatedEvent event) {
        Order order = event.getOrder();
        
        log.info("收到订单创建事件:{}", order.getId());
        
        // 发送通知邮件
        sendEmail(order);
        
        // 扣减库存
        decreaseInventory(order);
    }
    
    private void sendEmail(Order order) {
        // 发送邮件逻辑
    }
    
    private void decreaseInventory(Order order) {
        // 扣减库存逻辑
    }
}

条件监听

@Component
public class ConditionalEventListener {
    
    @EventListener(condition = "#event.order.amount > 1000")
    public void handleLargeOrder(OrderCreatedEvent event) {
        // 只处理大额订单
        log.info("大额订单:{}", event.getOrder().getId());
    }
}

多个事件监听

@EventListener
public void handleEvents(ApplicationEvent event) {
    if (event instanceof OrderCreatedEvent) {
        // 处理订单创建
    } else if (event instanceof OrderPaidEvent) {
        // 处理订单支付
    }
}

2. 事务事件

事务事件监听

@Component
public class TransactionalEventListener {
    
    // 事务提交前执行
    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
    public void handleBeforeCommit(OrderCreatedEvent event) {
        log.info("事务提交前:{}", event.getOrder().getId());
    }
    
    // 事务提交后执行
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void handleAfterCommit(OrderCreatedEvent event) {
        log.info("事务提交后:{}", event.getOrder().getId());
        
        // 发送消息到 MQ
        kafkaTemplate.send("order-created", event.getOrder());
    }
    
    // 事务回滚后执行
    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void handleAfterRollback(OrderCreatedEvent event) {
        log.warn("事务回滚:{}", event.getOrder().getId());
        
        // 清理资源
        cleanup(event.getOrder());
    }
}

基于 MQ 的事件驱动

1. 事件发布

定义事件

@Data
@Builder
public class OrderCreatedEvent {
    
    private String eventId;
    private String eventType;
    private LocalDateTime timestamp;
    private OrderData data;
    
    @Data
    public static class OrderData {
        private Long orderId;
        private Long userId;
        private BigDecimal amount;
        private List<OrderItem> items;
    }
}

发布事件

@Service
public class OrderEventPublisher {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    // 使用 Kafka 发布
    public void publishWithKafka(OrderCreatedEvent event) {
        event.setEventId(UUID.randomUUID().toString());
        event.setEventType("ORDER_CREATED");
        event.setTimestamp(LocalDateTime.now());
        
        kafkaTemplate.send(
            "order-events",
            event.getEventId(),
            event
        );
        
        log.info("Kafka 事件已发布:{}", event.getEventId());
    }
    
    // 使用 RocketMQ 发布
    public void publishWithRocketMQ(OrderCreatedEvent event) {
        event.setEventId(UUID.randomUUID().toString());
        event.setEventType("ORDER_CREATED");
        event.setTimestamp(LocalDateTime.now());
        
        rocketMQTemplate.convertAndSend(
            "TOPIC_ORDER_EVENTS:ORDER_CREATED",
            event
        );
        
        log.info("RocketMQ 事件已发布:{}", event.getEventId());
    }
}

2. 事件消费

事件消费者

@Component
public class OrderEventConsumer {
    
    @KafkaListener(
        topics = "order-events",
        groupId = "order-event-consumer-group"
    )
    public void consumeOrderEvent(OrderCreatedEvent event) {
        log.info("收到订单事件:{}", event.getEventId());
        
        try {
            // 处理事件
            handleEvent(event);
        } catch (Exception e) {
            log.error("处理事件失败", e);
            
            // 发送死信
            sendToDeadLetterQueue(event, e);
        }
    }
    
    private void handleEvent(OrderCreatedEvent event) {
        switch (event.getEventType()) {
            case "ORDER_CREATED":
                handleOrderCreated(event);
                break;
            case "ORDER_PAID":
                handleOrderPaid(event);
                break;
            case "ORDER_SHIPPED":
                handleOrderShipped(event);
                break;
        }
    }
    
    private void handleOrderCreated(OrderCreatedEvent event) {
        OrderData data = event.getData();
        
        // 发送通知
        notificationService.sendOrderCreated(data.getOrderId());
        
        // 扣减库存
        inventoryService.decrease(data.getItems());
        
        // 更新统计
        analyticsService.recordOrder(data);
    }
}

事件过滤

@Component
public class FilteredEventConsumer {
    
    @KafkaListener(
        topics = "order-events",
        groupId = "large-order-consumer-group",
        containerFactory = "filterFactory"
    )
    public void consumeLargeOrder(OrderCreatedEvent event) {
        // 只处理大额订单
        if (event.getData().getAmount().compareTo(new BigDecimal("1000")) > 0) {
            log.info("大额订单事件:{}", event.getEventId());
            handleLargeOrder(event);
        }
    }
}

3. 事件溯源

事件存储

@Entity
@Table(name = "event_store")
public class EventStore {
    
    @Id
    @GeneratedValue
    private Long id;
    
    @Column(nullable = false)
    private String aggregateId;
    
    @Column(nullable = false)
    private String aggregateType;
    
    @Column(nullable = false)
    private String eventType;
    
    @Column(nullable = false)
    @Lob
    private String eventData;
    
    @Column(nullable = false)
    private Integer version;
    
    @Column(nullable = false)
    private LocalDateTime timestamp;
}

事件存储库

@Repository
public class EventStoreRepository {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    public void save(String aggregateId, String aggregateType, 
                    String eventType, Object eventData, int version) {
        String sql = """
            INSERT INTO event_store 
            (aggregate_id, aggregate_type, event_type, event_data, version, timestamp)
            VALUES (?, ?, ?, ?, ?, ?)
            """;
        
        jdbcTemplate.update(sql,
            aggregateId,
            aggregateType,
            eventType,
            JSON.toJSONString(eventData),
            version,
            LocalDateTime.now()
        );
    }
    
    public List<EventStore> findByAggregateId(String aggregateId) {
        String sql = "SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version";
        
        return jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(EventStore.class), aggregateId);
    }
}

聚合根重建

public class Order {
    
    private Long id;
    private Long userId;
    private BigDecimal amount;
    private OrderStatus status;
    private List<OrderItem> items;
    
    // 从事件重建
    public static Order reconstruct(List<EventStore> events) {
        Order order = new Order();
        
        for (EventStore eventStore : events) {
            switch (eventStore.getEventType()) {
                case "OrderCreated":
                    order.apply((OrderCreated) parseEvent(eventStore));
                    break;
                case "OrderPaid":
                    order.apply((OrderPaid) parseEvent(eventStore));
                    break;
                case "OrderShipped":
                    order.apply((OrderShipped) parseEvent(eventStore));
                    break;
            }
        }
        
        return order;
    }
    
    private void apply(OrderCreated event) {
        this.id = event.getOrderId();
        this.userId = event.getUserId();
        this.amount = event.getAmount();
        this.items = event.getItems();
        this.status = OrderStatus.CREATED;
    }
    
    private void apply(OrderPaid event) {
        this.status = OrderStatus.PAID;
    }
    
    private void apply(OrderShipped event) {
        this.status = OrderStatus.SHIPPED;
    }
}

CQRS 模式

1. 命令端

命令定义

public class CreateOrderCommand {
    
    private final Long userId;
    private final List<OrderItem> items;
    private final String address;
    
    // constructor, getters
}

命令处理器

@Component
public class CreateOrderCommandHandler {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    
    @Transactional
    public Order handle(CreateOrderCommand command) {
        // 创建订单
        Order order = new Order();
        order.setUserId(command.getUserId());
        order.setItems(command.getItems());
        order.setAddress(command.getAddress());
        order.setStatus(OrderStatus.CREATED);
        
        orderRepository.save(order);
        
        // 发布事件
        eventPublisher.publishEvent(new OrderCreatedEvent(this, order));
        
        return order;
    }
}

2. 查询端

查询定义

public class GetOrderQuery {
    
    private final Long orderId;
    
    // constructor, getter
}

查询处理器

@Component
public class GetOrderQueryHandler {
    
    @Autowired
    private OrderReadRepository orderReadRepository;
    
    public OrderDTO handle(GetOrderQuery query) {
        return orderReadRepository.findById(query.getOrderId())
            .map(this::toDTO)
            .orElseThrow(() -> new ResourceNotFoundException("订单不存在"));
    }
    
    private OrderDTO toDTO(OrderReadModel model) {
        return OrderDTO.builder()
            .id(model.getId())
            .userId(model.getUserId())
            .amount(model.getAmount())
            .status(model.getStatus())
            .build();
    }
}

读模型投影

@Component
public class OrderProjection {
    
    @Autowired
    private OrderReadRepository orderReadRepository;
    
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void on(OrderCreatedEvent event) {
        Order order = event.getOrder();
        
        OrderReadModel readModel = new OrderReadModel();
        readModel.setId(order.getId());
        readModel.setUserId(order.getUserId());
        readModel.setAmount(order.getAmount());
        readModel.setStatus(order.getStatus().name());
        
        orderReadRepository.save(readModel);
    }
    
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void on(OrderPaidEvent event) {
        orderReadRepository.updateStatus(event.getOrderId(), "PAID");
    }
}

事件编排

1. 流程编排

Saga 编排

@Component
public class OrderSaga {
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private PaymentService paymentService;
    
    @Autowired
    private InventoryService inventoryService;
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    @SagaStart
    public void createOrder(CreateOrderRequest request) {
        try {
            // 1. 创建订单
            Order order = orderService.create(request);
            kafkaTemplate.send("order-created", order);
            
            // 2. 扣减库存
            inventoryService.decrease(order.getItems());
            kafkaTemplate.send("inventory-decreased", order);
            
            // 3. 发起支付
            Payment payment = paymentService.pay(order);
            kafkaTemplate.send("payment-completed", payment);
            
            // 4. 订单完成
            orderService.complete(order.getId());
            kafkaTemplate.send("order-completed", order);
            
        } catch (Exception e) {
            // 补偿
            compensate(request, e);
        }
    }
    
    private void compensate(CreateOrderRequest request, Exception e) {
        // 执行补偿操作
        orderService.cancel(request.getOrderId());
        inventoryService.increase(request.getItems());
        
        kafkaTemplate.send("order-compensated", request);
    }
}

2. 事件 choreography

Choreography 模式

// 订单服务
@Component
public class OrderService {
    
    @KafkaListener(topics = "payment-completed")
    public void onPaymentCompleted(PaymentCompletedEvent event) {
        // 支付完成,更新订单状态
        orderService.updateStatus(event.getOrderId(), OrderStatus.PAID);
        
        // 发布订单支付事件
        kafkaTemplate.send("order-paid", new OrderPaidEvent(event.getOrderId()));
    }
}

// 库存服务
@Component
public class InventoryService {
    
    @KafkaListener(topics = "order-created")
    public void onOrderCreated(OrderCreatedEvent event) {
        // 扣减库存
        inventoryService.decrease(event.getOrder().getItems());
        
        // 发布库存扣减事件
        kafkaTemplate.send("inventory-decreased", event.getOrder());
    }
}

// 物流服务
@Component
public class ShippingService {
    
    @KafkaListener(topics = "order-paid")
    public void onOrderPaid(OrderPaidEvent event) {
        // 订单已支付,安排发货
        shippingService.schedule(event.getOrderId());
        
        // 发布发货事件
        kafkaTemplate.send("order-shipped", new OrderShippedEvent(event.getOrderId()));
    }
}

最佳实践

1. 事件设计

事件命名

事件内容

{
  "eventId": "uuid",
  "eventType": "OrderCreated",
  "aggregateId": "order-123",
  "aggregateType": "Order",
  "version": 1,
  "timestamp": "2026-04-10T10:00:00Z",
  "data": {
    "orderId": "123",
    "userId": "456",
    "amount": 100.00
  },
  "metadata": {
    "correlationId": "corr-123",
    "causationId": "cause-456"
  }
}

2. 事件版本控制

版本管理

public class OrderCreatedEventV2 extends OrderCreatedEvent {
    
    private String couponCode;  // 新增字段
    
    // 向后兼容
    public static OrderCreatedEvent migrate(OrderCreatedEventV2 v2) {
        OrderCreatedEvent v1 = new OrderCreatedEvent();
        v1.setOrderId(v2.getOrderId());
        v1.setUserId(v2.getUserId());
        // ...
        return v1;
    }
}

3. 事件处理

幂等性

@Component
public class IdempotentEventConsumer {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @KafkaListener(topics = "order-events")
    public void consume(OrderCreatedEvent event) {
        String key = "event:processed:" + event.getEventId();
        
        Boolean isNew = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", 24, TimeUnit.HOURS);
        
        if (Boolean.FALSE.equals(isNew)) {
            log.warn("事件已处理:{}", event.getEventId());
            return;
        }
        
        // 处理事件
        handleEvent(event);
    }
}

顺序保证

spring:
  kafka:
    listener:
      concurrency: 1  # 单线程保证顺序
      ack-mode: record  # 逐条确认

4. 错误处理

重试机制

@Retryable(
    value = Exception.class,
    maxAttempts = 3,
    backoff = @Backoff(delay = 1000)
)
@KafkaListener(topics = "order-events")
public void consume(OrderCreatedEvent event) {
    handleEvent(event);
}

@Recover
public void recover(Exception e, OrderCreatedEvent event) {
    // 重试失败处理
    log.error("事件处理失败", e);
    sendToDeadLetterQueue(event, e);
}

死信队列

@Component
public class DeadLetterQueueHandler {
    
    @KafkaListener(topics = "order-events-dlq")
    public void handleDlq(ConsumerRecord<String, Object> record) {
        // 记录失败事件
        log.error("死信事件:topic={}, key={}, value={}",
            record.topic(), record.key(), record.value()
        );
        
        // 告警
        alertService.sendAlert("Kafka 死信事件", record.value());
        
        // 存储到数据库,人工处理
        failedEventRepository.save(toEntity(record));
    }
}

总结

事件驱动架构通过事件进行服务间通信,实现服务解耦、异步处理、事件溯源等能力。

Spring 提供完善的事件机制,结合 Kafka、RocketMQ 等消息中间件,可以构建可靠的事件驱动系统。

在实际应用中,需要根据业务场景选择合适的模式(编排 vs 协作),并做好事件的版本控制、幂等处理和错误处理。


分享这篇文章到:

上一篇文章
Spring Boot 自动配置原理详解
下一篇文章
Spring Boot 异步编程实战