RocketMQ 消息设计直接影响系统性能和可维护性。本文将深入探讨消息格式、Key 设计、Tag 使用、批量优化等最佳实践。
一、消息格式设计
1.1 消息结构
推荐格式:
{
"version": "1.0",
"timestamp": 1694329200000,
"event_type": "order.created",
"event_id": "uuid-123",
"source": "order-service",
"data": {
"order_id": "order_123",
"user_id": "user_456",
"amount": 99.99
},
"metadata": {
"trace_id": "trace_789",
"span_id": "span_012"
}
}
1.2 字段说明
| 字段 | 说明 | 必填 |
|---|---|---|
| version | 消息格式版本 | 是 |
| timestamp | 事件时间戳 | 是 |
| event_type | 事件类型 | 是 |
| event_id | 事件唯一 ID | 是 |
| source | 消息来源 | 是 |
| data | 业务数据 | 是 |
| metadata | 元数据 | 否 |
1.3 版本演进
// 版本 1.0
{
"version": "1.0",
"data": {...}
}
// 版本 2.0(向后兼容)
{
"version": "2.0",
"data": {...},
"extensions": {...} // 新增字段
}
二、Key 设计
2.1 Key 的作用
Key 的作用:
1. 消息去重
2. 消息查询
3. 事务消息回查
4. 消息追踪
2.2 Key 设计原则
设计原则:
1. 业务含义明确
2. 唯一性
3. 长度适中(建议<100 字符)
4. 避免敏感信息
2.3 Key 设计案例
订单场景:
// ✅ 推荐:使用订单 ID
message.setKeys(order.getId());
// ✅ 推荐:组合 Key
message.setKeys(order.getId() + "_" + order.getUserId());
// ❌ 不推荐:使用敏感信息
message.setKeys(user.getPhone());
用户场景:
// ✅ 推荐:使用用户 ID
message.setKeys(user.getId());
// ✅ 推荐:使用 UUID
message.setKeys(UUID.randomUUID().toString());
2.4 多 Key 处理
// 使用逗号分隔多个 Key
String keys = orderId + "," + userId + "," + traceId;
message.setKeys(keys);
// 查询时可以使用任意 Key
QueryResult result = consumer.queryMessage(topic, orderId, 32, start, end);
三、Tag 使用
3.1 Tag 的作用
Tag 的作用:
1. 消息分类
2. 消息过滤
3. 订阅选择
3.2 Tag 设计原则
设计原则:
1. 语义明确
2. 数量适中(建议<20 个)
3. 层级清晰
4. 避免频繁变更
3.3 Tag 设计案例
订单场景:
// ✅ 推荐:按操作类型
message.setTags("create");
message.setTags("pay");
message.setTags("ship");
message.setTags("cancel");
// ✅ 推荐:按状态
message.setTags("created");
message.setTags("paid");
message.setTags("shipped");
// ❌ 不推荐:过于宽泛
message.setTags("order");
// ❌ 不推荐:多个 Tag
message.setTags("create,pay"); // 只能设置一个 Tag
日志场景:
// ✅ 推荐:按日志级别
message.setTags("info");
message.setTags("warn");
message.setTags("error");
// ✅ 推荐:按服务
message.setTags("order-service");
message.setTags("pay-service");
3.4 Tag 过滤
// 订阅单个 Tag
consumer.subscribe("order-topic", "pay");
// 订阅多个 Tag(OR 关系)
consumer.subscribe("order-topic", "pay || ship || cancel");
// 订阅所有 Tag
consumer.subscribe("order-topic", "*");
四、消息大小优化
4.1 大小限制
RocketMQ 消息大小限制:
- 默认最大:4MB
- 推荐大小:< 100KB
- 最优大小:< 10KB
4.2 大消息处理
方案 1:消息分片:
public void sendLargeMessage(String topic, byte[] largeData) {
int chunkSize = 100 * 1024; // 100KB
for (int i = 0; i < largeData.length; i += chunkSize) {
int end = Math.min(i + chunkSize, largeData.length);
byte[] chunk = Arrays.copyOfRange(largeData, i, end);
MessageChunk chunkMsg = new MessageChunk();
chunkMsg.setId(UUID.randomUUID().toString());
chunkMsg.setIndex(i / chunkSize);
chunkMsg.setTotal((largeData.length + chunkSize - 1) / chunkSize);
chunkMsg.setData(chunk);
Message message = new Message(topic, "chunk",
chunkMsg.getId(), JSON.toJSONString(chunkMsg).getBytes());
producer.send(message);
}
}
方案 2:外部存储:
public void sendLargeMessage(String topic, byte[] largeData) {
// 1. 存储到外部系统
String url = objectStorage.save(largeData);
// 2. 发送引用消息
MessageRef ref = new MessageRef();
ref.setUrl(url);
ref.setSize(largeData.length);
Message message = new Message(topic, "ref",
ref.getId(), JSON.toJSONString(ref).getBytes());
producer.send(message);
}
五、批量优化
5.1 批量发送
public void sendBatch(List<Order> orders) {
List<Message> messages = new ArrayList<>();
for (Order order : orders) {
Message msg = new Message("order-topic", "create",
order.getId().toString(),
JSON.toJSONString(order).getBytes());
messages.add(msg);
}
// 批量发送
SendResult result = producer.send(messages);
}
5.2 分批处理
public void sendLargeBatch(List<Order> allOrders) {
final int BATCH_SIZE = 128; // 每批 128 条
for (int i = 0; i < allOrders.size(); i += BATCH_SIZE) {
int end = Math.min(i + BATCH_SIZE, allOrders.size());
List<Order> batch = allOrders.subList(i, end);
try {
sendBatch(batch);
} catch (Exception e) {
log.error("批量发送失败", e);
// 重试或记录失败
}
}
}
六、消息序列化
6.1 序列化格式对比
| 格式 | 大小 | 性能 | 可读性 | 适用场景 |
|---|---|---|---|---|
| JSON | 大 | 中 | 高 | 调试、跨语言 |
| Protobuf | 最小 | 最高 | 低 | 高性能场景 |
| MessagePack | 小 | 高 | 低 | 跨语言场景 |
| Hessian | 中 | 中 | 低 | Java 内部 |
6.2 JSON 序列化
// 使用 FastJSON
Order order = new Order();
order.setId("order_123");
order.setAmount(99.99);
String json = JSON.toJSONString(order);
Message message = new Message("order-topic", "create",
order.getId().toString(), json.getBytes());
6.3 Protobuf 序列化
// Proto 定义
syntax = "proto3";
package orders;
message Order {
string order_id = 1;
string user_id = 2;
double amount = 3;
}
// Java 使用
Order order = Order.newBuilder()
.setOrderId("order_123")
.setUserId("user_456")
.setAmount(99.99)
.build();
Message message = new Message("order-topic", "create",
order.getOrderId(), order.toByteArray());
七、最佳实践总结
7.1 消息设计检查清单
消息设计检查:
- [ ] 消息格式规范
- [ ] 包含版本号
- [ ] 包含时间戳
- [ ] 包含事件类型
- [ ] 包含唯一 ID
- [ ] Key 设计合理
- [ ] Tag 使用正确
- [ ] 消息大小合理
- [ ] 序列化格式合适
7.2 性能优化建议
性能优化:
1. 使用 Protobuf 序列化
2. 控制消息大小
3. 批量发送
4. 合理设计 Key
5. 正确使用 Tag
7.3 可维护性建议
可维护性:
1. 定义消息格式规范
2. 使用版本号管理
3. 保持向后兼容
4. 添加消息文档
5. 建立消息注册表
总结
RocketMQ 消息设计的核心要点:
- 消息格式:版本、时间戳、事件类型、唯一 ID
- Key 设计:业务含义、唯一性、长度适中
- Tag 使用:语义明确、数量适中、层级清晰
- 消息大小:控制大小、分片处理、外部存储
- 批量优化:批量发送、分批处理
- 序列化:JSON 通用、Protobuf 高性能
核心要点:
- 设计合理的消息格式
- 正确使用 Key 和 Tag
- 控制消息大小
- 批量发送提升性能
- 选择合适的序列化格式
参考资料
- RocketMQ 消息官方文档
- RocketMQ 最佳实践
- 《RocketMQ 技术内幕》第 3 章