RocketMQ EventBridge 是 RocketMQ 5.0 推出的事件总线服务,支持事件驱动架构(EDA)。本文将深入探讨 EventBridge 的实现原理和实战应用。
一、EventBridge 基础
1.1 什么是 EventBridge?
定义:
EventBridge = 事件路由 + 目标投递 + 事件转换
功能:
- 事件发布/订阅
- 事件路由过滤
- 多目标投递
- 事件转换
1.2 架构
graph TB
subgraph 事件源
S1[应用 A]
S2[应用 B]
S3[云服务]
end
subgraph EventBridge
EB[事件总线]
ER[事件路由]
ET[事件转换]
end
subgraph 事件目标
T1[MQ Topic]
T2[HTTP Endpoint]
T3[函数计算]
T4[数据库]
end
S1 --> EB
S2 --> EB
S3 --> EB
EB --> ER
ER --> ET
ET --> T1
ET --> T2
ET --> T3
ET --> T4
1.3 核心概念
| 概念 | 说明 |
|---|---|
| Event | 事件对象,包含事件源、类型、数据 |
| EventBus | 事件总线,事件的通道 |
| Rule | 事件路由规则,定义过滤条件 |
| Target | 事件目标,定义投递目的地 |
| Transform | 事件转换,定义数据格式转换 |
1.4 事件格式
CloudEvents 标准:
{
"specversion": "1.0",
"id": "event-123",
"source": "order-service",
"type": "com.example.order.created",
"datacontenttype": "application/json",
"time": "2026-07-25T10:00:00Z",
"data": {
"orderId": "order_123",
"userId": "user_456",
"amount": 99.99
}
}
二、配置使用
2.1 创建事件总线
# 创建事件总线
mqadmin createEventBus -n ns1:9876 \
-b order-event-bus \
-d "Order Event Bus"
# 查看事件总线
mqadmin listEventBus -n ns1:9876
# 删除事件总线
mqadmin deleteEventBus -n ns1:9876 -b order-event-bus
2.2 创建事件路由
# 创建事件路由
mqadmin createEventRule -n ns1:9876 \
-b order-event-bus \
-r order-created-rule \
-e '{"type": ["com.example.order.created"]}' \
-t '{"target": "order-topic", "type": "rocketmq"}'
# 查看事件路由
mqadmin listEventRules -n ns1:9876 -b order-event-bus
# 删除事件路由
mqadmin deleteEventRule -n ns1:9876 -b order-event-bus -r order-created-rule
2.3 发布事件
Java Producer:
public class EventPublisher {
private final EventBridgeProducer producer;
public EventPublisher() {
producer = new EventBridgeProducer("event-publisher");
producer.setNamesrvAddr("ns1:9876");
producer.start();
}
public void publishOrderCreated(Order order) {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("order-service"))
.withType("com.example.order.created")
.withDataContentType("application/json")
.withData(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
.withTime(OffsetDateTime.now())
.build();
producer.send("order-event-bus", event);
}
}
HTTP 发布:
# 通过 HTTP 发布事件
curl -X POST http://eventbridge:8080/eventbus/order-event-bus/publish \
-H "Content-Type: application/cloudevents+json" \
-d '{
"specversion": "1.0",
"id": "event-123",
"source": "api-gateway",
"type": "com.example.order.created",
"data": {
"orderId": "order_123",
"userId": "user_456",
"amount": 99.99
}
}'
三、事件路由
3.1 精确匹配
{
"rule": {
"type": ["com.example.order.created"]
}
}
3.2 前缀匹配
{
"rule": {
"type": [{
"prefix": "com.example.order."
}]
}
}
3.3 多条件匹配
{
"rule": {
"type": ["com.example.order.created"],
"data": {
"amount": [{
"numeric": [">", 100]
}],
"userId": [{
"prefix": "vip_"
}]
}
}
}
3.4 复杂路由
{
"rule": {
"and": [
{
"type": ["com.example.order.created"]
},
{
"or": [
{
"data": {
"amount": [{
"numeric": [">", 1000]
}]
}
},
{
"data": {
"userId": [{
"prefix": "vip_"
}]
}
}
]
}
]
}
}
四、事件目标
4.1 RocketMQ Topic
{
"target": {
"name": "order-topic-target",
"type": "rocketmq",
"config": {
"topic": "order-topic",
"tags": "order-created",
"keys": "orderId"
}
}
}
4.2 HTTP Endpoint
{
"target": {
"name": "http-target",
"type": "http",
"config": {
"url": "http://order-service:8080/api/events",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer token"
},
"retryPolicy": {
"maxRetries": 3,
"retryInterval": 5000
}
}
}
}
4.3 函数计算
{
"target": {
"name": "fc-target",
"type": "fc",
"config": {
"serviceName": "event-processor",
"functionName": "order-processor",
"region": "cn-beijing"
}
}
}
4.4 多目标投递
{
"targets": [
{
"name": "mq-target",
"type": "rocketmq",
"config": {
"topic": "order-topic"
}
},
{
"name": "http-target",
"type": "http",
"config": {
"url": "http://analytics-service:8080/events"
}
},
{
"name": "db-target",
"type": "jdbc",
"config": {
"url": "jdbc:mysql://localhost:3306/events",
"table": "order_events"
}
}
]
}
五、事件转换
5.1 字段映射
{
"transform": {
"mappings": [
{
"source": "$.data.orderId",
"target": "$.orderId"
},
{
"source": "$.data.userId",
"target": "$.userId"
},
{
"source": "$.data.amount",
"target": "$.price"
}
]
}
}
5.2 模板转换
{
"transform": {
"template": "{\n \"order_id\": \"{{data.orderId}}\",\n \"user_id\": \"{{data.userId}}\",\n \"amount\": {{data.amount}},\n \"event_time\": \"{{time}}\"\n}"
}
}
5.3 条件转换
{
"transform": {
"conditions": [
{
"condition": "$.data.amount > 1000",
"template": "{\n \"type\": \"high_value_order\",\n \"data\": {{data}}\n}"
},
{
"condition": "true",
"template": "{\n \"type\": \"normal_order\",\n \"data\": {{data}}\n}"
}
]
}
}
六、实战应用
6.1 订单事件驱动
graph TB
subgraph 订单服务
OS[订单创建]
end
subgraph EventBridge
EB[订单事件总线]
R1[创建路由]
R2[支付路由]
R3[通知路由]
end
subgraph 事件目标
T1[库存 Topic]
T2[支付 Topic]
T3[通知 HTTP]
T4[分析 Topic]
end
OS --> EB
EB --> R1
EB --> R2
EB --> R3
R1 --> T1
R2 --> T2
R3 --> T3
R1 --> T4
R2 --> T4
R3 --> T4
事件路由配置:
[
{
"name": "inventory-rule",
"rule": {
"type": ["com.example.order.created"]
},
"targets": [
{
"type": "rocketmq",
"config": {
"topic": "inventory-topic",
"tags": "deduct"
}
}
]
},
{
"name": "payment-rule",
"rule": {
"type": ["com.example.order.created"]
},
"targets": [
{
"type": "rocketmq",
"config": {
"topic": "payment-topic",
"tags": "pay"
}
}
]
},
{
"name": "notification-rule",
"rule": {
"type": ["com.example.order.created"]
},
"targets": [
{
"type": "http",
"config": {
"url": "http://notification-service:8080/api/order-created"
}
}
]
}
]
6.2 事件溯源
public class EventSourcingService {
private final EventBridgeProducer producer;
public EventSourcingService() {
producer = new EventBridgeProducer("event-sourcing");
producer.setNamesrvAddr("ns1:9876");
producer.start();
}
/**
* 发布领域事件
*/
public void publishDomainEvent(String aggregateId, String eventType, Object data) {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("domain-service"))
.withType("com.example.domain." + eventType)
.withDataContentType("application/json")
.withData(JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8))
.withTime(OffsetDateTime.now())
.withSubject(aggregateId)
.build();
producer.send("domain-event-bus", event);
}
/**
* 发布订单事件
*/
public void publishOrderCreated(Order order) {
publishDomainEvent(order.getId(), "order.created", order);
}
public void publishOrderPaid(Order order) {
publishDomainEvent(order.getId(), "order.paid", order);
}
public void publishOrderShipped(Order order) {
publishDomainEvent(order.getId(), "order.shipped", order);
}
}
6.3 CQRS 架构
graph TB
subgraph 命令端
C1[创建订单]
C2[更新订单]
end
subgraph EventBridge
EB[事件总线]
end
subgraph 查询端
Q1[订单视图]
Q2[用户视图]
Q3[统计视图]
end
C1 --> EB
C2 --> EB
EB --> Q1
EB --> Q2
EB --> Q3
七、监控运维
7.1 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
event_publish_rate | 事件发布速率 | - |
event_delivery_rate | 事件投递速率 | - |
event_delivery_latency | 事件投递延迟 | > 1000ms |
event_delivery_failed | 投递失败数 | > 0 |
rule_match_rate | 规则匹配率 | - |
7.2 死信队列
{
"deadLetterQueue": {
"enabled": true,
"topic": "event-dlq",
"maxRetries": 3,
"retryPolicy": {
"type": "exponential",
"initialInterval": 1000,
"maxInterval": 60000
}
}
}
7.3 事件追踪
public class EventTraceService {
/**
* 记录事件轨迹
*/
public void traceEvent(CloudEvent event, String status, String error) {
EventTrace trace = new EventTrace();
trace.setEventId(event.getId());
trace.setEventType(event.getType());
trace.setEventSource(event.getSource());
trace.setStatus(status);
trace.setError(error);
trace.setTimestamp(System.currentTimeMillis());
// 存储到数据库
eventTraceMapper.insert(trace);
}
/**
* 查询事件轨迹
*/
public List<EventTrace> queryEventTrace(String eventId) {
return eventTraceMapper.selectByEventId(eventId);
}
}
八、最佳实践
8.1 事件设计
// ✅ 好的事件设计
{
"specversion": "1.0",
"id": "uuid",
"source": "service-name",
"type": "com.example.domain.event",
"data": {...}
}
// ❌ 不好的事件设计
{
"id": "123", // 不唯一
"type": "order", // 不明确
"data": {...}
}
8.2 路由设计
// ✅ 推荐:单一职责
{
"name": "order-created-rule",
"rule": {"type": ["com.example.order.created"]},
"targets": [{"topic": "inventory-topic"}]
}
// ❌ 不推荐:过于复杂
{
"name": "all-order-rule",
"rule": {"type": ["com.example.order.*"]},
"targets": [
{"topic": "inventory-topic"},
{"topic": "payment-topic"},
{"topic": "notification-topic"},
{"url": "http://..."}
]
}
8.3 错误处理
public class EventErrorHandler {
/**
* 处理事件投递失败
*/
public void handleDeliveryFailure(CloudEvent event, Throwable error) {
// 1. 记录错误日志
log.error("事件投递失败:eventId={}", event.getId(), error);
// 2. 发送到死信队列
sendToDeadLetterQueue(event, error);
// 3. 发送告警
if (isCriticalError(error)) {
sendAlert("事件投递失败:eventId=" + event.getId());
}
}
/**
* 重试策略
*/
public void retryDelivery(CloudEvent event, int retryCount) {
if (retryCount >= 3) {
handleDeliveryFailure(event, new RuntimeException("Max retries exceeded"));
return;
}
// 指数退避
long delay = (long) (1000 * Math.pow(2, retryCount));
scheduledExecutor.schedule(() -> {
producer.send(event);
}, delay, TimeUnit.MILLISECONDS);
}
}
总结
RocketMQ EventBridge 的核心要点:
- 基础概念:事件、事件总线、路由、目标
- 配置使用:创建总线、路由、发布事件
- 事件路由:精确匹配、前缀匹配、多条件
- 事件目标:MQ、HTTP、函数计算
- 事件转换:字段映射、模板转换
- 实战应用:订单事件、事件溯源、CQRS
核心要点:
- 使用 CloudEvents 标准格式
- 合理设计事件路由规则
- 配置多目标投递
- 实现事件追踪和错误处理
参考资料
- RocketMQ EventBridge 官方文档
- CloudEvents 规范
- 《事件驱动架构设计》