Skip to content
清晨的一缕阳光
返回

RocketMQ 命名空间详解与多租户实践

RocketMQ 命名空间(Namespace)功能提供了多租户隔离能力,支持多环境、多项目的资源管理。本文将深入探讨命名空间的实现原理和实战应用。

一、命名空间基础

1.1 什么是命名空间?

定义

命名空间 = 资源隔离的逻辑单元

作用:
- 多租户隔离
- 多环境管理
- 资源配额控制
- 权限管理

1.2 架构

graph TB
    subgraph RocketMQ 集群
        NS[NameServer]
        B1[Broker 1]
        B2[Broker 2]
    end
    
    subgraph 命名空间 A
        T1[Topic A1]
        T2[Topic A2]
        G1[Group A1]
    end
    
    subgraph 命名空间 B
        T3[Topic B1]
        T4[Topic B2]
        G2[Group B1]
    end
    
    NS --> T1
    NS --> T3
    B1 --> T1
    B1 --> T3

1.3 使用场景

场景说明示例
多环境开发、测试、生产隔离dev、test、prod
多租户不同业务/部门隔离order、pay、user
多项目不同项目隔离project-a、project-b
多云不同云厂商隔离aws、aliyun、tencent

二、配置方法

2.1 Broker 配置

# broker.conf

# 启用命名空间
enableNamespace=true

# 默认命名空间(可选)
defaultNamespace=Default

# 命名空间隔离
namespaceIsolationEnable=true

2.2 Topic 配置

# 创建带命名空间的 Topic
mqadmin updateTopic -n ns1:9876 \
  -c DefaultCluster \
  -t order-topic \
  -n dev  # 命名空间

# 查看 Topic(带命名空间)
mqadmin updateTopic -n ns1:9876 -t dev%order-topic

# 删除 Topic
mqadmin deleteTopic -n ns1:9876 -t dev%order-topic -c DefaultCluster

2.3 客户端配置

Producer

DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("ns1:9876");

// 设置命名空间
producer.setNamespace("dev");

producer.start();

// 发送消息(自动添加命名空间前缀)
Message msg = new Message("order-topic", "tag", "body".getBytes());
producer.send(msg);
// 实际 Topic: dev%order-topic

Consumer

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("ns1:9876");

// 设置命名空间
consumer.setNamespace("dev");

consumer.start();

// 订阅 Topic(自动添加命名空间前缀)
consumer.subscribe("order-topic", "*");
// 实际 Topic: dev%order-topic

三、多租户实践

3.1 环境隔离

# 创建不同环境的命名空间
# 开发环境
mqadmin updateNamespace -n ns1:9876 -n dev -d "Development Environment"

# 测试环境
mqadmin updateNamespace -n ns1:9876 -n test -d "Test Environment"

# 生产环境
mqadmin updateNamespace -n ns1:9876 -n prod -d "Production Environment"

客户端配置

public class MQClientFactory {
    
    private static final String NAMESRV_ADDR = "ns1:9876;ns2:9876";
    
    public static DefaultMQProducer createProducer(String env, String group) {
        DefaultMQProducer producer = new DefaultMQProducer(group);
        producer.setNamesrvAddr(NAMESRV_ADDR);
        producer.setNamespace(env);  // 设置环境
        return producer;
    }
    
    public static DefaultMQPushConsumer createConsumer(String env, String group) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.setNamespace(env);  // 设置环境
        return consumer;
    }
}

// 使用
DefaultMQProducer devProducer = MQClientFactory.createProducer("dev", "dev-producer");
DefaultMQProducer prodProducer = MQClientFactory.createProducer("prod", "prod-producer");

3.2 业务隔离

// 按业务划分命名空间
public enum BusinessNamespace {
    ORDER("order", "订单业务"),
    PAYMENT("payment", "支付业务"),
    USER("user", "用户业务"),
    LOGISTICS("logistics", "物流业务");
    
    private final String code;
    private final String description;
    
    BusinessNamespace(String code, String description) {
        this.code = code;
        this.description = description;
    }
}

// 创建业务 Producer
public DefaultMQProducer createBusinessProducer(BusinessNamespace business) {
    DefaultMQProducer producer = new DefaultMQProducer(business.code + "-producer");
    producer.setNamesrvAddr("ns1:9876");
    producer.setNamespace(business.code);
    return producer;
}

3.3 租户配额

public class TenantQuota {
    
    private final String tenantId;
    private final int maxTopics;
    private final int maxGroups;
    private final long maxMessages;
    
    public TenantQuota(String tenantId, int maxTopics, int maxGroups, long maxMessages) {
        this.tenantId = tenantId;
        this.maxTopics = maxTopics;
        this.maxGroups = maxGroups;
        this.maxMessages = maxMessages;
    }
    
    // 检查配额
    public boolean canCreateTopic(int currentTopicCount) {
        return currentTopicCount < maxTopics;
    }
    
    public boolean canCreateGroup(int currentGroupCount) {
        return currentGroupCount < maxGroups;
    }
}

四、权限控制

4.1 ACL 配置

# plain_acl.yml
accounts:
  - accessKey: tenant-a-key
    secretKey: tenant-a-secret
    whiteRemoteAddress: "*"
    admin: false
    defaultTopicPerm: DENY
    defaultGroupPerm: DENY
    topicPerms:
      - dev%order-topic=PUB|SUB
      - dev%pay-topic=PUB|SUB
    groupPerms:
      - dev%order-group=SUB
    
  - accessKey: tenant-b-key
    secretKey: tenant-b-secret
    whiteRemoteAddress: "*"
    admin: false
    defaultTopicPerm: DENY
    defaultGroupPerm: DENY
    topicPerms:
      - test%order-topic=PUB|SUB
      - test%pay-topic=PUB|SUB
    groupPerms:
      - test%order-group=SUB

4.2 权限验证

public class AclClient {
    
    public static DefaultMQProducer createAclProducer(String accessKey, String secretKey, 
                                                       String namespace, String group) {
        DefaultMQProducer producer = new DefaultMQProducer(group);
        producer.setNamesrvAddr("ns1:9876");
        producer.setNamespace(namespace);
        
        // ACL 配置
        producer.setVipChannelEnabled(false);
        producer.setInstanceName(namespace + "_" + group);
        
        // 添加认证信息
        Map<String, String> headers = new HashMap<>();
        headers.put(RemotingHelper.ROCKETMQ_SECURITY_HEADER, accessKey + ":" + secretKey);
        
        return producer;
    }
}

五、实战案例

5.1 SaaS 多租户

public class SaasMQService {
    
    private final Map<String, DefaultMQProducer> producerCache = new ConcurrentHashMap<>();
    private final Map<String, DefaultMQPushConsumer> consumerCache = new ConcurrentHashMap<>();
    
    /**
     * 为租户创建 MQ 资源
     */
    public void createTenantResources(String tenantId, TenantConfig config) {
        // 1. 创建命名空间
        createNamespace(tenantId);
        
        // 2. 创建 Topic
        for (String topic : config.getTopics()) {
            createTopic(tenantId, topic);
        }
        
        // 3. 创建 Producer
        DefaultMQProducer producer = new DefaultMQProducer(tenantId + "-producer");
        producer.setNamesrvAddr("ns1:9876");
        producer.setNamespace(tenantId);
        producer.start();
        producerCache.put(tenantId, producer);
        
        // 4. 配置 ACL
        configureAcl(tenantId, config.getAccessKey(), config.getSecretKey());
    }
    
    /**
     * 发送租户消息
     */
    public void sendTenantMessage(String tenantId, String topic, Message msg) {
        DefaultMQProducer producer = producerCache.get(tenantId);
        if (producer == null) {
            throw new BusinessException("租户不存在:" + tenantId);
        }
        
        producer.send(msg);
    }
}

5.2 微服务环境管理

@Configuration
public class MQConfig {
    
    @Value("${spring.profiles.active:dev}")
    private String activeProfile;
    
    @Bean
    public DefaultMQProducer mqProducer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("app-producer");
        producer.setNamesrvAddr("ns1:9876");
        producer.setNamespace(activeProfile);  // 根据环境设置命名空间
        producer.start();
        return producer;
    }
    
    @Bean
    public DefaultMQPushConsumer mqConsumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("app-consumer");
        consumer.setNamesrvAddr("ns1:9876");
        consumer.setNamespace(activeProfile);  // 根据环境设置命名空间
        consumer.subscribe("app-topic", "*");
        consumer.start();
        return consumer;
    }
}

5.3 资源管理

public class NamespaceResourceManager {
    
    private final DefaultMQAdminExt adminExt;
    
    public NamespaceResourceManager() throws MQClientException {
        adminExt = new DefaultMQAdminExt();
        adminExt.setNamesrvAddr("ns1:9876");
        adminExt.start();
    }
    
    /**
     * 获取命名空间资源使用情况
     */
    public NamespaceUsage getNamespaceUsage(String namespace) {
        NamespaceUsage usage = new NamespaceUsage();
        usage.setNamespace(namespace);
        
        try {
            // 获取 Topic 列表
            ClusterInfo clusterInfo = adminExt.examineBrokerClusterInfo();
            for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
                TopicList topicList = adminExt.fetchAllTopicList();
                long topicCount = topicList.getTopicList().stream()
                    .filter(t -> t.startsWith(namespace + "%"))
                    .count();
                usage.setTopicCount(topicCount);
            }
            
            // 获取消费组列表
            GroupList groupList = adminExt.examineSubscriptionGroupConfig(namespace);
            usage.setGroupCount(groupList.getGroupList().size());
            
        } catch (Exception e) {
            throw new RuntimeException("获取命名空间使用失败", e);
        }
        
        return usage;
    }
    
    /**
     * 清理命名空间
     */
    public void cleanupNamespace(String namespace) {
        try {
            // 删除所有 Topic
            TopicList topicList = adminExt.fetchAllTopicList();
            for (String topic : topicList.getTopicList()) {
                if (topic.startsWith(namespace + "%")) {
                    adminExt.deleteTopicInBroker(clusterInfo.getBrokerAddrTable().values(), topic);
                    adminExt.deleteTopicInNameServer(topic);
                }
            }
            
            // 删除所有消费组
            // ...
            
        } catch (Exception e) {
            throw new RuntimeException("清理命名空间失败", e);
        }
    }
}

六、监控运维

6.1 监控指标

指标说明
namespace_topic_count命名空间 Topic 数量
namespace_group_count命名空间消费组数量
namespace_message_count命名空间消息数量
namespace_quota_usage配额使用率

6.2 运维脚本

#!/bin/bash
# 命名空间管理脚本

NAMESRV="ns1:9876"

# 列出所有命名空间
list_namespaces() {
    echo "=== 命名空间列表 ==="
    mqadmin updateNamespace -n $NAMESRV -l
}

# 创建命名空间
create_namespace() {
    local ns=$1
    local desc=$2
    echo "创建命名空间:$ns"
    mqadmin updateNamespace -n $NAMESRV -n $ns -d "$desc"
}

# 查看命名空间使用情况
usage_namespace() {
    local ns=$1
    echo "=== 命名空间 $ns 使用情况 ==="
    
    # Topic 数量
    topic_count=$(mqadmin updateTopic -n $NAMESRV | grep -c "^$ns%")
    echo "Topic 数量:$topic_count"
    
    # 消费组数量
    # ...
}

# 删除命名空间
delete_namespace() {
    local ns=$1
    echo "删除命名空间:$ns"
    mqadmin updateNamespace -n $NAMESRV -n $ns -o delete
}

# 主程序
case $1 in
    list) list_namespaces ;;
    create) create_namespace $2 $3 ;;
    usage) usage_namespace $2 ;;
    delete) delete_namespace $2 ;;
    *) echo "用法:$0 {list|create|usage|delete}" ;;
esac

七、最佳实践

7.1 命名规范

# 命名空间命名
# ✅ 推荐
dev          # 开发
test         # 测试
prod         # 生产
tenant-a     # 租户 A
project-x    # 项目 X

# ❌ 不推荐
Dev          # 大小写不一致
dev-env      # 带特殊字符
123          # 纯数字

7.2 配额管理

// 租户配额配置
public class TenantQuotaConfig {
    
    // 开发环境
    public static final TenantQuota DEV_QUOTA = new TenantQuota(
        "dev",
        10,   // 最大 Topic 数
        5,    // 最大消费组数
        1000000  // 最大消息数
    );
    
    // 生产环境
    public static final TenantQuota PROD_QUOTA = new TenantQuota(
        "prod",
        50,   // 最大 Topic 数
        20,   // 最大消费组数
        10000000  // 最大消息数
    );
}

7.3 安全检查清单

创建租户前检查:
- [ ] 命名空间已创建
- [ ] 配额已配置
- [ ] ACL 已配置
- [ ] 监控已配置

定期检查:
- [ ] 配额使用情况
- [ ] 权限是否正确
- [ ] 资源是否浪费

总结

RocketMQ 命名空间的核心要点:

  1. 基础概念:资源隔离、多租户、多环境
  2. 配置方法:Broker 配置、Topic 配置、客户端配置
  3. 多租户实践:环境隔离、业务隔离、配额管理
  4. 权限控制:ACL 配置、权限验证
  5. 监控运维:监控指标、运维脚本

核心要点

参考资料


分享这篇文章到:

上一篇文章
AI 应用监控与告警体系
下一篇文章
RocketMQ 生产者发送机制详解