Skip to content
清晨的一缕阳光
返回

RocketMQ 消息过滤详解与实战

RocketMQ 提供灵活的消息过滤机制,允许消费者根据条件精准订阅消息。本文将深入探讨 RocketMQ 的各种消息过滤方式。

一、消息过滤基础

1.1 为什么需要消息过滤?

场景

订单系统 -> 多种事件:
- 订单创建
- 订单支付
- 订单发货
- 订单取消

下游系统只需要特定类型的事件

解决方案

graph TB
    subgraph Producer
        P[订单事件]
    end
    
    subgraph RocketMQ
        T[Topic]
        F[消息过滤]
    end
    
    subgraph Consumer
        C1[支付系统<br/>只接收支付事件]
        C2[物流系统<br/>只接收发货事件]
        C3[分析系统<br/>接收所有事件]
    end
    
    P --> T
    T --> F
    F -->|Tag=pay | C1
    F -->|Tag=ship | C2
    F -->|* | C3

1.2 过滤方式对比

过滤方式说明优点缺点
Tag 过滤基于 Tag 字段过滤简单、性能好功能有限
SQL92 过滤基于属性 SQL 表达式灵活、功能强需要配置
ClassFilter自定义过滤逻辑最灵活复杂度高

二、Tag 过滤

2.1 Tag 基础

Tag 规则

设置 Tag

// 创建消息时设置 Tag
Message msg = new Message(
    "order-topic",           // Topic
    "pay",                   // Tag
    "order_123",             // Key
    "body".getBytes()        // Body
);

// 或通过 setTags 方法
msg.setTags("pay");

2.2 单 Tag 订阅

// 订阅单个 Tag
consumer.subscribe("order-topic", "pay");

// 只接收支付相关的消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pay-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order-topic", "pay");
consumer.start();

2.3 多 Tag 订阅

OR 关系

// 订阅多个 Tag(OR 关系)
consumer.subscribe("order-topic", "pay || ship || cancel");

// 接收支付、发货或取消消息

通配符

// 订阅所有 Tag
consumer.subscribe("order-topic", "*");

// 订阅特定前缀的 Tag(不支持,需要 SQL92)

2.4 Tag 最佳实践

命名规范

// ✅ 好的 Tag 命名
msg.setTags("order_created");
msg.setTags("order_paid");
msg.setTags("order_shipped");

// ❌ 不好的 Tag 命名
msg.setTags("order");  // 太宽泛
msg.setTags("created,paid");  // 多个 Tag

使用场景

// 订单状态流转
public enum OrderEvent {
    CREATED("created"),
    PAID("paid"),
    SHIPPED("shipped"),
    COMPLETED("completed"),
    CANCELLED("cancelled");
    
    private final String tag;
    
    OrderEvent(String tag) {
        this.tag = tag;
    }
    
    public String getTag() {
        return tag;
    }
}

// 发送消息
Message msg = new Message(
    "order-topic",
    OrderEvent.PAID.getTag(),
    order.getId().toString(),
    JSON.toJSONString(order).getBytes()
);

三、SQL92 过滤

3.1 开启 SQL92 过滤

Broker 配置

# broker.conf

# 启用属性过滤
enablePropertyFilter=true

# 内存限制(默认 1MB)
maxPropertyFilterMemory=1048576

# 线程池大小
propertyFilterThreadPoolNums=4

3.2 消息属性

设置属性

Message msg = new Message(
    "order-topic",
    "pay",
    "order_123",
    "body".getBytes()
);

// 添加自定义属性
msg.putUserProperty("user_id", "user_456");
msg.putUserProperty("amount", "99.99");
msg.putUserProperty("province", "beijing");
msg.putUserProperty("vip", "true");

系统属性

属性说明
TAGS消息 Tag
KEYS消息 Key
UNIQ_KEY唯一消息 ID
WAIT是否等待存储

3.3 SQL92 语法

比较运算符

// 等于
consumer.subscribe("order-topic", MessageSelector.bySql("user_id = 'user_456'"));

// 不等于
consumer.subscribe("order-topic", MessageSelector.bySql("province != 'beijing'"));

// 大于
consumer.subscribe("order-topic", MessageSelector.bySql("amount > 100"));

// 大于等于
consumer.subscribe("order-topic", MessageSelector.bySql("amount >= 100"));

// 小于
consumer.subscribe("order-topic", MessageSelector.bySql("amount < 1000"));

// 小于等于
consumer.subscribe("order-topic", MessageSelector.bySql("amount <= 1000"));

逻辑运算符

// AND
consumer.subscribe("order-topic", MessageSelector.bySql(
    "amount > 100 AND province = 'beijing'"
));

// OR
consumer.subscribe("order-topic", MessageSelector.bySql(
    "vip = 'true' OR amount > 1000"
));

// NOT
consumer.subscribe("order-topic", MessageSelector.bySql(
    "NOT province = 'beijing'"
));

// 复杂表达式
consumer.subscribe("order-topic", MessageSelector.bySql(
    "(amount > 100 AND province = 'beijing') OR vip = 'true'"
));

IN 和 BETWEEN

// IN
consumer.subscribe("order-topic", MessageSelector.bySql(
    "province IN ('beijing', 'shanghai', 'guangzhou')"
));

// BETWEEN
consumer.subscribe("order-topic", MessageSelector.bySql(
    "amount BETWEEN 100 AND 1000"
));

// IS NULL
consumer.subscribe("order-topic", MessageSelector.bySql(
    "user_id IS NOT NULL"
));

LIKE

// 模糊匹配
consumer.subscribe("order-topic", MessageSelector.bySql(
    "user_id LIKE 'user_%'"
));

// 前缀匹配
consumer.subscribe("order-topic", MessageSelector.bySql(
    "user_id LIKE 'user_123%'"
));

3.4 实战案例

按用户等级过滤

// VIP 用户专属消费者
consumer.subscribe("order-topic", MessageSelector.bySql(
    "vip = 'true'"
));

// 普通用户消费者
consumer.subscribe("order-topic", MessageSelector.bySql(
    "vip = 'false'"
));

按金额范围过滤

// 大额订单处理
consumer.subscribe("order-topic", MessageSelector.bySql(
    "amount >= 10000"
));

// 普通订单处理
consumer.subscribe("order-topic", MessageSelector.bySql(
    "amount < 10000"
));

按地区过滤

// 华北地区
consumer.subscribe("order-topic", MessageSelector.bySql(
    "province IN ('beijing', 'tianjin', 'hebei')"
));

// 华东地区
consumer.subscribe("order-topic", MessageSelector.bySql(
    "province IN ('shanghai', 'jiangsu', 'zhejiang')"
));

四、ClassFilter 过滤

4.1 自定义过滤逻辑

// 1. 实现 MessageSelector
public class CustomMessageFilter implements MessageSelector {
    
    @Override
    public boolean isSatisfied(MessageExt message) {
        try {
            // 解析消息
            String body = new String(message.getBody());
            Order order = JSON.parseObject(body, Order.class);
            
            // 自定义过滤逻辑
            // 例如:只处理特定商品类目的订单
            return "electronics".equals(order.getCategory());
            
        } catch (Exception e) {
            return false;
        }
    }
}

// 2. 使用过滤器
consumer.subscribe("order-topic", new CustomMessageFilter());

4.2 复杂过滤场景

多条件组合

public class ComplexMessageFilter implements MessageSelector {
    
    @Override
    public boolean isSatisfied(MessageExt message) {
        String tags = message.getTags();
        String userId = message.getUserProperty("user_id");
        String amount = message.getUserProperty("amount");
        
        // 条件 1:Tag 必须是 pay
        if (!"pay".equals(tags)) {
            return false;
        }
        
        // 条件 2:用户 ID 以 VIP 开头
        if (userId == null || !userId.startsWith("VIP")) {
            return false;
        }
        
        // 条件 3:金额大于 1000
        try {
            double amountValue = Double.parseDouble(amount);
            return amountValue > 1000;
        } catch (NumberFormatException e) {
            return false;
        }
    }
}

正则表达式过滤

public class RegexMessageFilter implements MessageSelector {
    
    private static final Pattern EMAIL_PATTERN = Pattern.compile(
        "^[A-Za-z0-9+_.-]+@(.+)$"
    );
    
    @Override
    public boolean isSatisfied(MessageExt message) {
        String email = message.getUserProperty("email");
        
        if (email == null) {
            return false;
        }
        
        return EMAIL_PATTERN.matcher(email).matches();
    }
}

五、过滤性能

5.1 性能对比

过滤方式性能说明
Tag 过滤最快Broker 端过滤
SQL92 过滤中等Broker 端过滤,需解析表达式
ClassFilter最慢Consumer 端过滤,网络传输多

5.2 优化建议

// 1. 优先使用 Tag 过滤
// ✅ 推荐
consumer.subscribe("order-topic", "pay");

// ❌ 不推荐(能用 Tag 就用 Tag)
consumer.subscribe("order-topic", MessageSelector.bySql("TAGS = 'pay'"));

// 2. SQL92 过滤限制字段数量
// ✅ 推荐
consumer.subscribe("order-topic", MessageSelector.bySql("amount > 100"));

// ❌ 不推荐(过多条件)
consumer.subscribe("order-topic", MessageSelector.bySql(
    "amount > 100 AND vip = 'true' AND province = 'beijing' AND ..."
));

// 3. 避免在 Consumer 端过滤
// ❌ 不推荐
consumer.subscribe("order-topic", "*");
// 然后在消费逻辑中过滤
for (MessageExt msg : msgs) {
    if (!shouldProcess(msg)) {
        continue;  // 浪费网络带宽
    }
}

六、最佳实践

6.1 Tag 设计原则

// 1. Tag 要有明确含义
msg.setTags("order_created");  // ✅
msg.setTags("order");          // ❌ 太宽泛

// 2. 使用下划线分隔
msg.setTags("order_created");  // ✅
msg.setTags("orderCreated");   // ❌

// 3. 保持 Tag 数量合理
// 建议:5-20 个 Tag
// 过多考虑使用 SQL92 过滤

6.2 属性命名规范

// 1. 使用小写字母和下划线
msg.putUserProperty("user_id", "123");  // ✅
msg.putUserProperty("UserId", "123");   // ❌

// 2. 避免使用保留字
msg.putUserProperty("TAGS", "pay");     // ❌
msg.putUserProperty("KEYS", "order_1"); // ❌

// 3. 值类型统一
msg.putUserProperty("amount", "99.99");  // ✅ 字符串
msg.putUserProperty("vip", "true");      // ✅ 字符串

6.3 过滤策略

graph TD
    A[消息过滤需求] --> B{简单分类?}
    B -->|是 | C[使用 Tag 过滤]
    B -->|否 | D{需要复杂条件?}
    
    D -->|是 | E[使用 SQL92 过滤]
    D -->|否 | F{需要自定义逻辑?}
    
    F -->|是 | G[使用 ClassFilter]
    F -->|否 | C
    
    style C fill:#9f9
    style E fill:#ff9
    style G fill:#f99

七、常见问题

7.1 Tag 不生效

原因

解决

// 检查 Tag 设置
Message msg = new Message("topic", "tag", "body".getBytes());
System.out.println("Tag: " + msg.getTags());

// 检查订阅
consumer.subscribe("topic", "tag");  // 不要加引号

7.2 SQL92 过滤失败

原因

解决

// 1. 检查 Broker 配置
// enablePropertyFilter=true

// 2. 设置属性
msg.putUserProperty("key", "value");

// 3. 验证 SQL 语法
// 先简单条件测试
consumer.subscribe("topic", MessageSelector.bySql("key = 'value'"));

7.3 消息堆积

原因

解决

// 1. 放宽过滤条件
consumer.subscribe("topic", MessageSelector.bySql("amount > 50"));  // 降低阈值

// 2. 增加消费者
// 启动多个消费者实例

// 3. 优化处理逻辑

总结

RocketMQ 消息过滤的核心要点:

  1. Tag 过滤:简单、快速、推荐使用
  2. SQL92 过滤:灵活、功能强、需要配置
  3. ClassFilter:最灵活、Consumer 端过滤
  4. 性能优化:优先 Tag、限制 SQL 条件
  5. 最佳实践:合理设计 Tag 和属性

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 高可用架构设计与实战
下一篇文章
Tool/Function 设计规范