RocketMQ 提供灵活的消息过滤机制,允许消费者根据条件精准订阅消息。本文将深入探讨 RocketMQ 的各种消息过滤方式。
一、消息过滤基础
1.1 为什么需要消息过滤?
场景:
订单系统 -> 多种事件:
- 订单创建
- 订单支付
- 订单发货
- 订单取消
下游系统只需要特定类型的事件
解决方案:
graph TB
subgraph Producer
P[订单事件]
end
subgraph RocketMQ
T[Topic]
F[消息过滤]
end
subgraph Consumer
C1[支付系统<br/>只接收支付事件]
C2[物流系统<br/>只接收发货事件]
C3[分析系统<br/>接收所有事件]
end
P --> T
T --> F
F -->|Tag=pay | C1
F -->|Tag=ship | C2
F -->|* | C3
1.2 过滤方式对比
| 过滤方式 | 说明 | 优点 | 缺点 |
|---|---|---|---|
| Tag 过滤 | 基于 Tag 字段过滤 | 简单、性能好 | 功能有限 |
| SQL92 过滤 | 基于属性 SQL 表达式 | 灵活、功能强 | 需要配置 |
| ClassFilter | 自定义过滤逻辑 | 最灵活 | 复杂度高 |
二、Tag 过滤
2.1 Tag 基础
Tag 规则:
- 每条消息只能设置一个 Tag
- Tag 在发送时设置
- Tag 用于消息分类
设置 Tag:
// 创建消息时设置 Tag
Message msg = new Message(
"order-topic", // Topic
"pay", // Tag
"order_123", // Key
"body".getBytes() // Body
);
// 或通过 setTags 方法
msg.setTags("pay");
2.2 单 Tag 订阅
// 订阅单个 Tag
consumer.subscribe("order-topic", "pay");
// 只接收支付相关的消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pay-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order-topic", "pay");
consumer.start();
2.3 多 Tag 订阅
OR 关系:
// 订阅多个 Tag(OR 关系)
consumer.subscribe("order-topic", "pay || ship || cancel");
// 接收支付、发货或取消消息
通配符:
// 订阅所有 Tag
consumer.subscribe("order-topic", "*");
// 订阅特定前缀的 Tag(不支持,需要 SQL92)
2.4 Tag 最佳实践
命名规范:
// ✅ 好的 Tag 命名
msg.setTags("order_created");
msg.setTags("order_paid");
msg.setTags("order_shipped");
// ❌ 不好的 Tag 命名
msg.setTags("order"); // 太宽泛
msg.setTags("created,paid"); // 多个 Tag
使用场景:
// 订单状态流转
public enum OrderEvent {
CREATED("created"),
PAID("paid"),
SHIPPED("shipped"),
COMPLETED("completed"),
CANCELLED("cancelled");
private final String tag;
OrderEvent(String tag) {
this.tag = tag;
}
public String getTag() {
return tag;
}
}
// 发送消息
Message msg = new Message(
"order-topic",
OrderEvent.PAID.getTag(),
order.getId().toString(),
JSON.toJSONString(order).getBytes()
);
三、SQL92 过滤
3.1 开启 SQL92 过滤
Broker 配置:
# broker.conf
# 启用属性过滤
enablePropertyFilter=true
# 内存限制(默认 1MB)
maxPropertyFilterMemory=1048576
# 线程池大小
propertyFilterThreadPoolNums=4
3.2 消息属性
设置属性:
Message msg = new Message(
"order-topic",
"pay",
"order_123",
"body".getBytes()
);
// 添加自定义属性
msg.putUserProperty("user_id", "user_456");
msg.putUserProperty("amount", "99.99");
msg.putUserProperty("province", "beijing");
msg.putUserProperty("vip", "true");
系统属性:
| 属性 | 说明 |
|---|---|
TAGS | 消息 Tag |
KEYS | 消息 Key |
UNIQ_KEY | 唯一消息 ID |
WAIT | 是否等待存储 |
3.3 SQL92 语法
比较运算符:
// 等于
consumer.subscribe("order-topic", MessageSelector.bySql("user_id = 'user_456'"));
// 不等于
consumer.subscribe("order-topic", MessageSelector.bySql("province != 'beijing'"));
// 大于
consumer.subscribe("order-topic", MessageSelector.bySql("amount > 100"));
// 大于等于
consumer.subscribe("order-topic", MessageSelector.bySql("amount >= 100"));
// 小于
consumer.subscribe("order-topic", MessageSelector.bySql("amount < 1000"));
// 小于等于
consumer.subscribe("order-topic", MessageSelector.bySql("amount <= 1000"));
逻辑运算符:
// AND
consumer.subscribe("order-topic", MessageSelector.bySql(
"amount > 100 AND province = 'beijing'"
));
// OR
consumer.subscribe("order-topic", MessageSelector.bySql(
"vip = 'true' OR amount > 1000"
));
// NOT
consumer.subscribe("order-topic", MessageSelector.bySql(
"NOT province = 'beijing'"
));
// 复杂表达式
consumer.subscribe("order-topic", MessageSelector.bySql(
"(amount > 100 AND province = 'beijing') OR vip = 'true'"
));
IN 和 BETWEEN:
// IN
consumer.subscribe("order-topic", MessageSelector.bySql(
"province IN ('beijing', 'shanghai', 'guangzhou')"
));
// BETWEEN
consumer.subscribe("order-topic", MessageSelector.bySql(
"amount BETWEEN 100 AND 1000"
));
// IS NULL
consumer.subscribe("order-topic", MessageSelector.bySql(
"user_id IS NOT NULL"
));
LIKE:
// 模糊匹配
consumer.subscribe("order-topic", MessageSelector.bySql(
"user_id LIKE 'user_%'"
));
// 前缀匹配
consumer.subscribe("order-topic", MessageSelector.bySql(
"user_id LIKE 'user_123%'"
));
3.4 实战案例
按用户等级过滤:
// VIP 用户专属消费者
consumer.subscribe("order-topic", MessageSelector.bySql(
"vip = 'true'"
));
// 普通用户消费者
consumer.subscribe("order-topic", MessageSelector.bySql(
"vip = 'false'"
));
按金额范围过滤:
// 大额订单处理
consumer.subscribe("order-topic", MessageSelector.bySql(
"amount >= 10000"
));
// 普通订单处理
consumer.subscribe("order-topic", MessageSelector.bySql(
"amount < 10000"
));
按地区过滤:
// 华北地区
consumer.subscribe("order-topic", MessageSelector.bySql(
"province IN ('beijing', 'tianjin', 'hebei')"
));
// 华东地区
consumer.subscribe("order-topic", MessageSelector.bySql(
"province IN ('shanghai', 'jiangsu', 'zhejiang')"
));
四、ClassFilter 过滤
4.1 自定义过滤逻辑
// 1. 实现 MessageSelector
public class CustomMessageFilter implements MessageSelector {
@Override
public boolean isSatisfied(MessageExt message) {
try {
// 解析消息
String body = new String(message.getBody());
Order order = JSON.parseObject(body, Order.class);
// 自定义过滤逻辑
// 例如:只处理特定商品类目的订单
return "electronics".equals(order.getCategory());
} catch (Exception e) {
return false;
}
}
}
// 2. 使用过滤器
consumer.subscribe("order-topic", new CustomMessageFilter());
4.2 复杂过滤场景
多条件组合:
public class ComplexMessageFilter implements MessageSelector {
@Override
public boolean isSatisfied(MessageExt message) {
String tags = message.getTags();
String userId = message.getUserProperty("user_id");
String amount = message.getUserProperty("amount");
// 条件 1:Tag 必须是 pay
if (!"pay".equals(tags)) {
return false;
}
// 条件 2:用户 ID 以 VIP 开头
if (userId == null || !userId.startsWith("VIP")) {
return false;
}
// 条件 3:金额大于 1000
try {
double amountValue = Double.parseDouble(amount);
return amountValue > 1000;
} catch (NumberFormatException e) {
return false;
}
}
}
正则表达式过滤:
public class RegexMessageFilter implements MessageSelector {
private static final Pattern EMAIL_PATTERN = Pattern.compile(
"^[A-Za-z0-9+_.-]+@(.+)$"
);
@Override
public boolean isSatisfied(MessageExt message) {
String email = message.getUserProperty("email");
if (email == null) {
return false;
}
return EMAIL_PATTERN.matcher(email).matches();
}
}
五、过滤性能
5.1 性能对比
| 过滤方式 | 性能 | 说明 |
|---|---|---|
| Tag 过滤 | 最快 | Broker 端过滤 |
| SQL92 过滤 | 中等 | Broker 端过滤,需解析表达式 |
| ClassFilter | 最慢 | Consumer 端过滤,网络传输多 |
5.2 优化建议
// 1. 优先使用 Tag 过滤
// ✅ 推荐
consumer.subscribe("order-topic", "pay");
// ❌ 不推荐(能用 Tag 就用 Tag)
consumer.subscribe("order-topic", MessageSelector.bySql("TAGS = 'pay'"));
// 2. SQL92 过滤限制字段数量
// ✅ 推荐
consumer.subscribe("order-topic", MessageSelector.bySql("amount > 100"));
// ❌ 不推荐(过多条件)
consumer.subscribe("order-topic", MessageSelector.bySql(
"amount > 100 AND vip = 'true' AND province = 'beijing' AND ..."
));
// 3. 避免在 Consumer 端过滤
// ❌ 不推荐
consumer.subscribe("order-topic", "*");
// 然后在消费逻辑中过滤
for (MessageExt msg : msgs) {
if (!shouldProcess(msg)) {
continue; // 浪费网络带宽
}
}
六、最佳实践
6.1 Tag 设计原则
// 1. Tag 要有明确含义
msg.setTags("order_created"); // ✅
msg.setTags("order"); // ❌ 太宽泛
// 2. 使用下划线分隔
msg.setTags("order_created"); // ✅
msg.setTags("orderCreated"); // ❌
// 3. 保持 Tag 数量合理
// 建议:5-20 个 Tag
// 过多考虑使用 SQL92 过滤
6.2 属性命名规范
// 1. 使用小写字母和下划线
msg.putUserProperty("user_id", "123"); // ✅
msg.putUserProperty("UserId", "123"); // ❌
// 2. 避免使用保留字
msg.putUserProperty("TAGS", "pay"); // ❌
msg.putUserProperty("KEYS", "order_1"); // ❌
// 3. 值类型统一
msg.putUserProperty("amount", "99.99"); // ✅ 字符串
msg.putUserProperty("vip", "true"); // ✅ 字符串
6.3 过滤策略
graph TD
A[消息过滤需求] --> B{简单分类?}
B -->|是 | C[使用 Tag 过滤]
B -->|否 | D{需要复杂条件?}
D -->|是 | E[使用 SQL92 过滤]
D -->|否 | F{需要自定义逻辑?}
F -->|是 | G[使用 ClassFilter]
F -->|否 | C
style C fill:#9f9
style E fill:#ff9
style G fill:#f99
七、常见问题
7.1 Tag 不生效
原因:
- Tag 设置错误
- 订阅表达式错误
解决:
// 检查 Tag 设置
Message msg = new Message("topic", "tag", "body".getBytes());
System.out.println("Tag: " + msg.getTags());
// 检查订阅
consumer.subscribe("topic", "tag"); // 不要加引号
7.2 SQL92 过滤失败
原因:
- Broker 未启用属性过滤
- 属性未设置
- SQL 语法错误
解决:
// 1. 检查 Broker 配置
// enablePropertyFilter=true
// 2. 设置属性
msg.putUserProperty("key", "value");
// 3. 验证 SQL 语法
// 先简单条件测试
consumer.subscribe("topic", MessageSelector.bySql("key = 'value'"));
7.3 消息堆积
原因:
- 过滤条件过严
- 消费者处理慢
解决:
// 1. 放宽过滤条件
consumer.subscribe("topic", MessageSelector.bySql("amount > 50")); // 降低阈值
// 2. 增加消费者
// 启动多个消费者实例
// 3. 优化处理逻辑
总结
RocketMQ 消息过滤的核心要点:
- Tag 过滤:简单、快速、推荐使用
- SQL92 过滤:灵活、功能强、需要配置
- ClassFilter:最灵活、Consumer 端过滤
- 性能优化:优先 Tag、限制 SQL 条件
- 最佳实践:合理设计 Tag 和属性
核心要点:
- 优先使用 Tag 过滤
- 复杂场景使用 SQL92
- 特殊需求使用 ClassFilter
- 注意过滤性能影响
参考资料
- RocketMQ 消息过滤官方文档
- RocketMQ 源码
- 《RocketMQ 技术内幕》第 6 章