Skip to content
清晨的一缕阳光
返回

RocketMQ EventBridge 事件驱动架构实战

RocketMQ EventBridge 是 RocketMQ 5.0 推出的事件总线服务,支持事件驱动架构(EDA)。本文将深入探讨 EventBridge 的实现原理和实战应用。

一、EventBridge 基础

1.1 什么是 EventBridge?

定义

EventBridge = 事件路由 + 目标投递 + 事件转换

功能:
- 事件发布/订阅
- 事件路由过滤
- 多目标投递
- 事件转换

1.2 架构

graph TB
    subgraph 事件源
        S1[应用 A]
        S2[应用 B]
        S3[云服务]
    end
    
    subgraph EventBridge
        EB[事件总线]
        ER[事件路由]
        ET[事件转换]
    end
    
    subgraph 事件目标
        T1[MQ Topic]
        T2[HTTP Endpoint]
        T3[函数计算]
        T4[数据库]
    end
    
    S1 --> EB
    S2 --> EB
    S3 --> EB
    EB --> ER
    ER --> ET
    ET --> T1
    ET --> T2
    ET --> T3
    ET --> T4

1.3 核心概念

概念说明
Event事件对象,包含事件源、类型、数据
EventBus事件总线,事件的通道
Rule事件路由规则,定义过滤条件
Target事件目标,定义投递目的地
Transform事件转换,定义数据格式转换

1.4 事件格式

CloudEvents 标准

{
  "specversion": "1.0",
  "id": "event-123",
  "source": "order-service",
  "type": "com.example.order.created",
  "datacontenttype": "application/json",
  "time": "2026-07-25T10:00:00Z",
  "data": {
    "orderId": "order_123",
    "userId": "user_456",
    "amount": 99.99
  }
}

二、配置使用

2.1 创建事件总线

# 创建事件总线
mqadmin createEventBus -n ns1:9876 \
  -b order-event-bus \
  -d "Order Event Bus"

# 查看事件总线
mqadmin listEventBus -n ns1:9876

# 删除事件总线
mqadmin deleteEventBus -n ns1:9876 -b order-event-bus

2.2 创建事件路由

# 创建事件路由
mqadmin createEventRule -n ns1:9876 \
  -b order-event-bus \
  -r order-created-rule \
  -e '{"type": ["com.example.order.created"]}' \
  -t '{"target": "order-topic", "type": "rocketmq"}'

# 查看事件路由
mqadmin listEventRules -n ns1:9876 -b order-event-bus

# 删除事件路由
mqadmin deleteEventRule -n ns1:9876 -b order-event-bus -r order-created-rule

2.3 发布事件

Java Producer

public class EventPublisher {
    
    private final EventBridgeProducer producer;
    
    public EventPublisher() {
        producer = new EventBridgeProducer("event-publisher");
        producer.setNamesrvAddr("ns1:9876");
        producer.start();
    }
    
    public void publishOrderCreated(Order order) {
        CloudEvent event = CloudEventBuilder.v1()
            .withId(UUID.randomUUID().toString())
            .withSource(URI.create("order-service"))
            .withType("com.example.order.created")
            .withDataContentType("application/json")
            .withData(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
            .withTime(OffsetDateTime.now())
            .build();
        
        producer.send("order-event-bus", event);
    }
}

HTTP 发布

# 通过 HTTP 发布事件
curl -X POST http://eventbridge:8080/eventbus/order-event-bus/publish \
  -H "Content-Type: application/cloudevents+json" \
  -d '{
    "specversion": "1.0",
    "id": "event-123",
    "source": "api-gateway",
    "type": "com.example.order.created",
    "data": {
      "orderId": "order_123",
      "userId": "user_456",
      "amount": 99.99
    }
  }'

三、事件路由

3.1 精确匹配

{
  "rule": {
    "type": ["com.example.order.created"]
  }
}

3.2 前缀匹配

{
  "rule": {
    "type": [{
      "prefix": "com.example.order."
    }]
  }
}

3.3 多条件匹配

{
  "rule": {
    "type": ["com.example.order.created"],
    "data": {
      "amount": [{
        "numeric": [">", 100]
      }],
      "userId": [{
        "prefix": "vip_"
      }]
    }
  }
}

3.4 复杂路由

{
  "rule": {
    "and": [
      {
        "type": ["com.example.order.created"]
      },
      {
        "or": [
          {
            "data": {
              "amount": [{
                "numeric": [">", 1000]
              }]
            }
          },
          {
            "data": {
              "userId": [{
                "prefix": "vip_"
              }]
            }
          }
        ]
      }
    ]
  }
}

四、事件目标

4.1 RocketMQ Topic

{
  "target": {
    "name": "order-topic-target",
    "type": "rocketmq",
    "config": {
      "topic": "order-topic",
      "tags": "order-created",
      "keys": "orderId"
    }
  }
}

4.2 HTTP Endpoint

{
  "target": {
    "name": "http-target",
    "type": "http",
    "config": {
      "url": "http://order-service:8080/api/events",
      "method": "POST",
      "headers": {
        "Content-Type": "application/json",
        "Authorization": "Bearer token"
      },
      "retryPolicy": {
        "maxRetries": 3,
        "retryInterval": 5000
      }
    }
  }
}

4.3 函数计算

{
  "target": {
    "name": "fc-target",
    "type": "fc",
    "config": {
      "serviceName": "event-processor",
      "functionName": "order-processor",
      "region": "cn-beijing"
    }
  }
}

4.4 多目标投递

{
  "targets": [
    {
      "name": "mq-target",
      "type": "rocketmq",
      "config": {
        "topic": "order-topic"
      }
    },
    {
      "name": "http-target",
      "type": "http",
      "config": {
        "url": "http://analytics-service:8080/events"
      }
    },
    {
      "name": "db-target",
      "type": "jdbc",
      "config": {
        "url": "jdbc:mysql://localhost:3306/events",
        "table": "order_events"
      }
    }
  ]
}

五、事件转换

5.1 字段映射

{
  "transform": {
    "mappings": [
      {
        "source": "$.data.orderId",
        "target": "$.orderId"
      },
      {
        "source": "$.data.userId",
        "target": "$.userId"
      },
      {
        "source": "$.data.amount",
        "target": "$.price"
      }
    ]
  }
}

5.2 模板转换

{
  "transform": {
    "template": "{\n  \"order_id\": \"{{data.orderId}}\",\n  \"user_id\": \"{{data.userId}}\",\n  \"amount\": {{data.amount}},\n  \"event_time\": \"{{time}}\"\n}"
  }
}

5.3 条件转换

{
  "transform": {
    "conditions": [
      {
        "condition": "$.data.amount > 1000",
        "template": "{\n  \"type\": \"high_value_order\",\n  \"data\": {{data}}\n}"
      },
      {
        "condition": "true",
        "template": "{\n  \"type\": \"normal_order\",\n  \"data\": {{data}}\n}"
      }
    ]
  }
}

六、实战应用

6.1 订单事件驱动

graph TB
    subgraph 订单服务
        OS[订单创建]
    end
    
    subgraph EventBridge
        EB[订单事件总线]
        R1[创建路由]
        R2[支付路由]
        R3[通知路由]
    end
    
    subgraph 事件目标
        T1[库存 Topic]
        T2[支付 Topic]
        T3[通知 HTTP]
        T4[分析 Topic]
    end
    
    OS --> EB
    EB --> R1
    EB --> R2
    EB --> R3
    R1 --> T1
    R2 --> T2
    R3 --> T3
    R1 --> T4
    R2 --> T4
    R3 --> T4

事件路由配置

[
  {
    "name": "inventory-rule",
    "rule": {
      "type": ["com.example.order.created"]
    },
    "targets": [
      {
        "type": "rocketmq",
        "config": {
          "topic": "inventory-topic",
          "tags": "deduct"
        }
      }
    ]
  },
  {
    "name": "payment-rule",
    "rule": {
      "type": ["com.example.order.created"]
    },
    "targets": [
      {
        "type": "rocketmq",
        "config": {
          "topic": "payment-topic",
          "tags": "pay"
        }
      }
    ]
  },
  {
    "name": "notification-rule",
    "rule": {
      "type": ["com.example.order.created"]
    },
    "targets": [
      {
        "type": "http",
        "config": {
          "url": "http://notification-service:8080/api/order-created"
        }
      }
    ]
  }
]

6.2 事件溯源

public class EventSourcingService {
    
    private final EventBridgeProducer producer;
    
    public EventSourcingService() {
        producer = new EventBridgeProducer("event-sourcing");
        producer.setNamesrvAddr("ns1:9876");
        producer.start();
    }
    
    /**
     * 发布领域事件
     */
    public void publishDomainEvent(String aggregateId, String eventType, Object data) {
        CloudEvent event = CloudEventBuilder.v1()
            .withId(UUID.randomUUID().toString())
            .withSource(URI.create("domain-service"))
            .withType("com.example.domain." + eventType)
            .withDataContentType("application/json")
            .withData(JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8))
            .withTime(OffsetDateTime.now())
            .withSubject(aggregateId)
            .build();
        
        producer.send("domain-event-bus", event);
    }
    
    /**
     * 发布订单事件
     */
    public void publishOrderCreated(Order order) {
        publishDomainEvent(order.getId(), "order.created", order);
    }
    
    public void publishOrderPaid(Order order) {
        publishDomainEvent(order.getId(), "order.paid", order);
    }
    
    public void publishOrderShipped(Order order) {
        publishDomainEvent(order.getId(), "order.shipped", order);
    }
}

6.3 CQRS 架构

graph TB
    subgraph 命令端
        C1[创建订单]
        C2[更新订单]
    end
    
    subgraph EventBridge
        EB[事件总线]
    end
    
    subgraph 查询端
        Q1[订单视图]
        Q2[用户视图]
        Q3[统计视图]
    end
    
    C1 --> EB
    C2 --> EB
    EB --> Q1
    EB --> Q2
    EB --> Q3

七、监控运维

7.1 监控指标

指标说明告警阈值
event_publish_rate事件发布速率-
event_delivery_rate事件投递速率-
event_delivery_latency事件投递延迟> 1000ms
event_delivery_failed投递失败数> 0
rule_match_rate规则匹配率-

7.2 死信队列

{
  "deadLetterQueue": {
    "enabled": true,
    "topic": "event-dlq",
    "maxRetries": 3,
    "retryPolicy": {
      "type": "exponential",
      "initialInterval": 1000,
      "maxInterval": 60000
    }
  }
}

7.3 事件追踪

public class EventTraceService {
    
    /**
     * 记录事件轨迹
     */
    public void traceEvent(CloudEvent event, String status, String error) {
        EventTrace trace = new EventTrace();
        trace.setEventId(event.getId());
        trace.setEventType(event.getType());
        trace.setEventSource(event.getSource());
        trace.setStatus(status);
        trace.setError(error);
        trace.setTimestamp(System.currentTimeMillis());
        
        // 存储到数据库
        eventTraceMapper.insert(trace);
    }
    
    /**
     * 查询事件轨迹
     */
    public List<EventTrace> queryEventTrace(String eventId) {
        return eventTraceMapper.selectByEventId(eventId);
    }
}

八、最佳实践

8.1 事件设计

// ✅ 好的事件设计
{
  "specversion": "1.0",
  "id": "uuid",
  "source": "service-name",
  "type": "com.example.domain.event",
  "data": {...}
}

// ❌ 不好的事件设计
{
  "id": "123",  // 不唯一
  "type": "order",  // 不明确
  "data": {...}
}

8.2 路由设计

// ✅ 推荐:单一职责
{
  "name": "order-created-rule",
  "rule": {"type": ["com.example.order.created"]},
  "targets": [{"topic": "inventory-topic"}]
}

// ❌ 不推荐:过于复杂
{
  "name": "all-order-rule",
  "rule": {"type": ["com.example.order.*"]},
  "targets": [
    {"topic": "inventory-topic"},
    {"topic": "payment-topic"},
    {"topic": "notification-topic"},
    {"url": "http://..."}
  ]
}

8.3 错误处理

public class EventErrorHandler {
    
    /**
     * 处理事件投递失败
     */
    public void handleDeliveryFailure(CloudEvent event, Throwable error) {
        // 1. 记录错误日志
        log.error("事件投递失败:eventId={}", event.getId(), error);
        
        // 2. 发送到死信队列
        sendToDeadLetterQueue(event, error);
        
        // 3. 发送告警
        if (isCriticalError(error)) {
            sendAlert("事件投递失败:eventId=" + event.getId());
        }
    }
    
    /**
     * 重试策略
     */
    public void retryDelivery(CloudEvent event, int retryCount) {
        if (retryCount >= 3) {
            handleDeliveryFailure(event, new RuntimeException("Max retries exceeded"));
            return;
        }
        
        // 指数退避
        long delay = (long) (1000 * Math.pow(2, retryCount));
        scheduledExecutor.schedule(() -> {
            producer.send(event);
        }, delay, TimeUnit.MILLISECONDS);
    }
}

总结

RocketMQ EventBridge 的核心要点:

  1. 基础概念:事件、事件总线、路由、目标
  2. 配置使用:创建总线、路由、发布事件
  3. 事件路由:精确匹配、前缀匹配、多条件
  4. 事件目标:MQ、HTTP、函数计算
  5. 事件转换:字段映射、模板转换
  6. 实战应用:订单事件、事件溯源、CQRS

核心要点

参考资料


分享这篇文章到:

上一篇文章
Agent 评估与调试实战
下一篇文章
Redis Cluster 原理详解