Skip to content
清晨的一缕阳光
返回

RocketMQ 消息设计最佳实践与案例

RocketMQ 消息设计直接影响系统性能和可维护性。本文将深入探讨消息格式、Key 设计、Tag 使用、批量优化等最佳实践。

一、消息格式设计

1.1 消息结构

推荐格式

{
  "version": "1.0",
  "timestamp": 1694329200000,
  "event_type": "order.created",
  "event_id": "uuid-123",
  "source": "order-service",
  "data": {
    "order_id": "order_123",
    "user_id": "user_456",
    "amount": 99.99
  },
  "metadata": {
    "trace_id": "trace_789",
    "span_id": "span_012"
  }
}

1.2 字段说明

字段说明必填
version消息格式版本
timestamp事件时间戳
event_type事件类型
event_id事件唯一 ID
source消息来源
data业务数据
metadata元数据

1.3 版本演进

// 版本 1.0
{
  "version": "1.0",
  "data": {...}
}

// 版本 2.0(向后兼容)
{
  "version": "2.0",
  "data": {...},
  "extensions": {...}  // 新增字段
}

二、Key 设计

2.1 Key 的作用

Key 的作用:
1. 消息去重
2. 消息查询
3. 事务消息回查
4. 消息追踪

2.2 Key 设计原则

设计原则:
1. 业务含义明确
2. 唯一性
3. 长度适中(建议<100 字符)
4. 避免敏感信息

2.3 Key 设计案例

订单场景

// ✅ 推荐:使用订单 ID
message.setKeys(order.getId());

// ✅ 推荐:组合 Key
message.setKeys(order.getId() + "_" + order.getUserId());

// ❌ 不推荐:使用敏感信息
message.setKeys(user.getPhone());

用户场景

// ✅ 推荐:使用用户 ID
message.setKeys(user.getId());

// ✅ 推荐:使用 UUID
message.setKeys(UUID.randomUUID().toString());

2.4 多 Key 处理

// 使用逗号分隔多个 Key
String keys = orderId + "," + userId + "," + traceId;
message.setKeys(keys);

// 查询时可以使用任意 Key
QueryResult result = consumer.queryMessage(topic, orderId, 32, start, end);

三、Tag 使用

3.1 Tag 的作用

Tag 的作用:
1. 消息分类
2. 消息过滤
3. 订阅选择

3.2 Tag 设计原则

设计原则:
1. 语义明确
2. 数量适中(建议<20 个)
3. 层级清晰
4. 避免频繁变更

3.3 Tag 设计案例

订单场景

// ✅ 推荐:按操作类型
message.setTags("create");
message.setTags("pay");
message.setTags("ship");
message.setTags("cancel");

// ✅ 推荐:按状态
message.setTags("created");
message.setTags("paid");
message.setTags("shipped");

// ❌ 不推荐:过于宽泛
message.setTags("order");

// ❌ 不推荐:多个 Tag
message.setTags("create,pay");  // 只能设置一个 Tag

日志场景

// ✅ 推荐:按日志级别
message.setTags("info");
message.setTags("warn");
message.setTags("error");

// ✅ 推荐:按服务
message.setTags("order-service");
message.setTags("pay-service");

3.4 Tag 过滤

// 订阅单个 Tag
consumer.subscribe("order-topic", "pay");

// 订阅多个 Tag(OR 关系)
consumer.subscribe("order-topic", "pay || ship || cancel");

// 订阅所有 Tag
consumer.subscribe("order-topic", "*");

四、消息大小优化

4.1 大小限制

RocketMQ 消息大小限制:
- 默认最大:4MB
- 推荐大小:< 100KB
- 最优大小:< 10KB

4.2 大消息处理

方案 1:消息分片

public void sendLargeMessage(String topic, byte[] largeData) {
    int chunkSize = 100 * 1024;  // 100KB
    
    for (int i = 0; i < largeData.length; i += chunkSize) {
        int end = Math.min(i + chunkSize, largeData.length);
        byte[] chunk = Arrays.copyOfRange(largeData, i, end);
        
        MessageChunk chunkMsg = new MessageChunk();
        chunkMsg.setId(UUID.randomUUID().toString());
        chunkMsg.setIndex(i / chunkSize);
        chunkMsg.setTotal((largeData.length + chunkSize - 1) / chunkSize);
        chunkMsg.setData(chunk);
        
        Message message = new Message(topic, "chunk", 
            chunkMsg.getId(), JSON.toJSONString(chunkMsg).getBytes());
        producer.send(message);
    }
}

方案 2:外部存储

public void sendLargeMessage(String topic, byte[] largeData) {
    // 1. 存储到外部系统
    String url = objectStorage.save(largeData);
    
    // 2. 发送引用消息
    MessageRef ref = new MessageRef();
    ref.setUrl(url);
    ref.setSize(largeData.length);
    
    Message message = new Message(topic, "ref", 
        ref.getId(), JSON.toJSONString(ref).getBytes());
    producer.send(message);
}

五、批量优化

5.1 批量发送

public void sendBatch(List<Order> orders) {
    List<Message> messages = new ArrayList<>();
    
    for (Order order : orders) {
        Message msg = new Message("order-topic", "create", 
            order.getId().toString(), 
            JSON.toJSONString(order).getBytes());
        messages.add(msg);
    }
    
    // 批量发送
    SendResult result = producer.send(messages);
}

5.2 分批处理

public void sendLargeBatch(List<Order> allOrders) {
    final int BATCH_SIZE = 128;  // 每批 128 条
    
    for (int i = 0; i < allOrders.size(); i += BATCH_SIZE) {
        int end = Math.min(i + BATCH_SIZE, allOrders.size());
        List<Order> batch = allOrders.subList(i, end);
        
        try {
            sendBatch(batch);
        } catch (Exception e) {
            log.error("批量发送失败", e);
            // 重试或记录失败
        }
    }
}

六、消息序列化

6.1 序列化格式对比

格式大小性能可读性适用场景
JSON调试、跨语言
Protobuf最小最高高性能场景
MessagePack跨语言场景
HessianJava 内部

6.2 JSON 序列化

// 使用 FastJSON
Order order = new Order();
order.setId("order_123");
order.setAmount(99.99);

String json = JSON.toJSONString(order);
Message message = new Message("order-topic", "create", 
    order.getId().toString(), json.getBytes());

6.3 Protobuf 序列化

// Proto 定义
syntax = "proto3";
package orders;

message Order {
  string order_id = 1;
  string user_id = 2;
  double amount = 3;
}
// Java 使用
Order order = Order.newBuilder()
    .setOrderId("order_123")
    .setUserId("user_456")
    .setAmount(99.99)
    .build();

Message message = new Message("order-topic", "create", 
    order.getOrderId(), order.toByteArray());

七、最佳实践总结

7.1 消息设计检查清单

消息设计检查:
- [ ] 消息格式规范
- [ ] 包含版本号
- [ ] 包含时间戳
- [ ] 包含事件类型
- [ ] 包含唯一 ID
- [ ] Key 设计合理
- [ ] Tag 使用正确
- [ ] 消息大小合理
- [ ] 序列化格式合适

7.2 性能优化建议

性能优化:
1. 使用 Protobuf 序列化
2. 控制消息大小
3. 批量发送
4. 合理设计 Key
5. 正确使用 Tag

7.3 可维护性建议

可维护性:
1. 定义消息格式规范
2. 使用版本号管理
3. 保持向后兼容
4. 添加消息文档
5. 建立消息注册表

总结

RocketMQ 消息设计的核心要点:

  1. 消息格式:版本、时间戳、事件类型、唯一 ID
  2. Key 设计:业务含义、唯一性、长度适中
  3. Tag 使用:语义明确、数量适中、层级清晰
  4. 消息大小:控制大小、分片处理、外部存储
  5. 批量优化:批量发送、分批处理
  6. 序列化:JSON 通用、Protobuf 高性能

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka Streams 实战案例精选
下一篇文章
Redis 哨兵模式详解