RocketMQ 命名空间(Namespace)功能提供了多租户隔离能力,支持多环境、多项目的资源管理。本文将深入探讨命名空间的实现原理和实战应用。
一、命名空间基础
1.1 什么是命名空间?
定义:
命名空间 = 资源隔离的逻辑单元
作用:
- 多租户隔离
- 多环境管理
- 资源配额控制
- 权限管理
1.2 架构
graph TB
subgraph RocketMQ 集群
NS[NameServer]
B1[Broker 1]
B2[Broker 2]
end
subgraph 命名空间 A
T1[Topic A1]
T2[Topic A2]
G1[Group A1]
end
subgraph 命名空间 B
T3[Topic B1]
T4[Topic B2]
G2[Group B1]
end
NS --> T1
NS --> T3
B1 --> T1
B1 --> T3
1.3 使用场景
| 场景 | 说明 | 示例 |
|---|---|---|
| 多环境 | 开发、测试、生产隔离 | dev、test、prod |
| 多租户 | 不同业务/部门隔离 | order、pay、user |
| 多项目 | 不同项目隔离 | project-a、project-b |
| 多云 | 不同云厂商隔离 | aws、aliyun、tencent |
二、配置方法
2.1 Broker 配置
# broker.conf
# 启用命名空间
enableNamespace=true
# 默认命名空间(可选)
defaultNamespace=Default
# 命名空间隔离
namespaceIsolationEnable=true
2.2 Topic 配置
# 创建带命名空间的 Topic
mqadmin updateTopic -n ns1:9876 \
-c DefaultCluster \
-t order-topic \
-n dev # 命名空间
# 查看 Topic(带命名空间)
mqadmin updateTopic -n ns1:9876 -t dev%order-topic
# 删除 Topic
mqadmin deleteTopic -n ns1:9876 -t dev%order-topic -c DefaultCluster
2.3 客户端配置
Producer:
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("ns1:9876");
// 设置命名空间
producer.setNamespace("dev");
producer.start();
// 发送消息(自动添加命名空间前缀)
Message msg = new Message("order-topic", "tag", "body".getBytes());
producer.send(msg);
// 实际 Topic: dev%order-topic
Consumer:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("ns1:9876");
// 设置命名空间
consumer.setNamespace("dev");
consumer.start();
// 订阅 Topic(自动添加命名空间前缀)
consumer.subscribe("order-topic", "*");
// 实际 Topic: dev%order-topic
三、多租户实践
3.1 环境隔离
# 创建不同环境的命名空间
# 开发环境
mqadmin updateNamespace -n ns1:9876 -n dev -d "Development Environment"
# 测试环境
mqadmin updateNamespace -n ns1:9876 -n test -d "Test Environment"
# 生产环境
mqadmin updateNamespace -n ns1:9876 -n prod -d "Production Environment"
客户端配置:
public class MQClientFactory {
private static final String NAMESRV_ADDR = "ns1:9876;ns2:9876";
public static DefaultMQProducer createProducer(String env, String group) {
DefaultMQProducer producer = new DefaultMQProducer(group);
producer.setNamesrvAddr(NAMESRV_ADDR);
producer.setNamespace(env); // 设置环境
return producer;
}
public static DefaultMQPushConsumer createConsumer(String env, String group) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.setNamespace(env); // 设置环境
return consumer;
}
}
// 使用
DefaultMQProducer devProducer = MQClientFactory.createProducer("dev", "dev-producer");
DefaultMQProducer prodProducer = MQClientFactory.createProducer("prod", "prod-producer");
3.2 业务隔离
// 按业务划分命名空间
public enum BusinessNamespace {
ORDER("order", "订单业务"),
PAYMENT("payment", "支付业务"),
USER("user", "用户业务"),
LOGISTICS("logistics", "物流业务");
private final String code;
private final String description;
BusinessNamespace(String code, String description) {
this.code = code;
this.description = description;
}
}
// 创建业务 Producer
public DefaultMQProducer createBusinessProducer(BusinessNamespace business) {
DefaultMQProducer producer = new DefaultMQProducer(business.code + "-producer");
producer.setNamesrvAddr("ns1:9876");
producer.setNamespace(business.code);
return producer;
}
3.3 租户配额
public class TenantQuota {
private final String tenantId;
private final int maxTopics;
private final int maxGroups;
private final long maxMessages;
public TenantQuota(String tenantId, int maxTopics, int maxGroups, long maxMessages) {
this.tenantId = tenantId;
this.maxTopics = maxTopics;
this.maxGroups = maxGroups;
this.maxMessages = maxMessages;
}
// 检查配额
public boolean canCreateTopic(int currentTopicCount) {
return currentTopicCount < maxTopics;
}
public boolean canCreateGroup(int currentGroupCount) {
return currentGroupCount < maxGroups;
}
}
四、权限控制
4.1 ACL 配置
# plain_acl.yml
accounts:
- accessKey: tenant-a-key
secretKey: tenant-a-secret
whiteRemoteAddress: "*"
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: DENY
topicPerms:
- dev%order-topic=PUB|SUB
- dev%pay-topic=PUB|SUB
groupPerms:
- dev%order-group=SUB
- accessKey: tenant-b-key
secretKey: tenant-b-secret
whiteRemoteAddress: "*"
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: DENY
topicPerms:
- test%order-topic=PUB|SUB
- test%pay-topic=PUB|SUB
groupPerms:
- test%order-group=SUB
4.2 权限验证
public class AclClient {
public static DefaultMQProducer createAclProducer(String accessKey, String secretKey,
String namespace, String group) {
DefaultMQProducer producer = new DefaultMQProducer(group);
producer.setNamesrvAddr("ns1:9876");
producer.setNamespace(namespace);
// ACL 配置
producer.setVipChannelEnabled(false);
producer.setInstanceName(namespace + "_" + group);
// 添加认证信息
Map<String, String> headers = new HashMap<>();
headers.put(RemotingHelper.ROCKETMQ_SECURITY_HEADER, accessKey + ":" + secretKey);
return producer;
}
}
五、实战案例
5.1 SaaS 多租户
public class SaasMQService {
private final Map<String, DefaultMQProducer> producerCache = new ConcurrentHashMap<>();
private final Map<String, DefaultMQPushConsumer> consumerCache = new ConcurrentHashMap<>();
/**
* 为租户创建 MQ 资源
*/
public void createTenantResources(String tenantId, TenantConfig config) {
// 1. 创建命名空间
createNamespace(tenantId);
// 2. 创建 Topic
for (String topic : config.getTopics()) {
createTopic(tenantId, topic);
}
// 3. 创建 Producer
DefaultMQProducer producer = new DefaultMQProducer(tenantId + "-producer");
producer.setNamesrvAddr("ns1:9876");
producer.setNamespace(tenantId);
producer.start();
producerCache.put(tenantId, producer);
// 4. 配置 ACL
configureAcl(tenantId, config.getAccessKey(), config.getSecretKey());
}
/**
* 发送租户消息
*/
public void sendTenantMessage(String tenantId, String topic, Message msg) {
DefaultMQProducer producer = producerCache.get(tenantId);
if (producer == null) {
throw new BusinessException("租户不存在:" + tenantId);
}
producer.send(msg);
}
}
5.2 微服务环境管理
@Configuration
public class MQConfig {
@Value("${spring.profiles.active:dev}")
private String activeProfile;
@Bean
public DefaultMQProducer mqProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("app-producer");
producer.setNamesrvAddr("ns1:9876");
producer.setNamespace(activeProfile); // 根据环境设置命名空间
producer.start();
return producer;
}
@Bean
public DefaultMQPushConsumer mqConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("app-consumer");
consumer.setNamesrvAddr("ns1:9876");
consumer.setNamespace(activeProfile); // 根据环境设置命名空间
consumer.subscribe("app-topic", "*");
consumer.start();
return consumer;
}
}
5.3 资源管理
public class NamespaceResourceManager {
private final DefaultMQAdminExt adminExt;
public NamespaceResourceManager() throws MQClientException {
adminExt = new DefaultMQAdminExt();
adminExt.setNamesrvAddr("ns1:9876");
adminExt.start();
}
/**
* 获取命名空间资源使用情况
*/
public NamespaceUsage getNamespaceUsage(String namespace) {
NamespaceUsage usage = new NamespaceUsage();
usage.setNamespace(namespace);
try {
// 获取 Topic 列表
ClusterInfo clusterInfo = adminExt.examineBrokerClusterInfo();
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
TopicList topicList = adminExt.fetchAllTopicList();
long topicCount = topicList.getTopicList().stream()
.filter(t -> t.startsWith(namespace + "%"))
.count();
usage.setTopicCount(topicCount);
}
// 获取消费组列表
GroupList groupList = adminExt.examineSubscriptionGroupConfig(namespace);
usage.setGroupCount(groupList.getGroupList().size());
} catch (Exception e) {
throw new RuntimeException("获取命名空间使用失败", e);
}
return usage;
}
/**
* 清理命名空间
*/
public void cleanupNamespace(String namespace) {
try {
// 删除所有 Topic
TopicList topicList = adminExt.fetchAllTopicList();
for (String topic : topicList.getTopicList()) {
if (topic.startsWith(namespace + "%")) {
adminExt.deleteTopicInBroker(clusterInfo.getBrokerAddrTable().values(), topic);
adminExt.deleteTopicInNameServer(topic);
}
}
// 删除所有消费组
// ...
} catch (Exception e) {
throw new RuntimeException("清理命名空间失败", e);
}
}
}
六、监控运维
6.1 监控指标
| 指标 | 说明 |
|---|---|
namespace_topic_count | 命名空间 Topic 数量 |
namespace_group_count | 命名空间消费组数量 |
namespace_message_count | 命名空间消息数量 |
namespace_quota_usage | 配额使用率 |
6.2 运维脚本
#!/bin/bash
# 命名空间管理脚本
NAMESRV="ns1:9876"
# 列出所有命名空间
list_namespaces() {
echo "=== 命名空间列表 ==="
mqadmin updateNamespace -n $NAMESRV -l
}
# 创建命名空间
create_namespace() {
local ns=$1
local desc=$2
echo "创建命名空间:$ns"
mqadmin updateNamespace -n $NAMESRV -n $ns -d "$desc"
}
# 查看命名空间使用情况
usage_namespace() {
local ns=$1
echo "=== 命名空间 $ns 使用情况 ==="
# Topic 数量
topic_count=$(mqadmin updateTopic -n $NAMESRV | grep -c "^$ns%")
echo "Topic 数量:$topic_count"
# 消费组数量
# ...
}
# 删除命名空间
delete_namespace() {
local ns=$1
echo "删除命名空间:$ns"
mqadmin updateNamespace -n $NAMESRV -n $ns -o delete
}
# 主程序
case $1 in
list) list_namespaces ;;
create) create_namespace $2 $3 ;;
usage) usage_namespace $2 ;;
delete) delete_namespace $2 ;;
*) echo "用法:$0 {list|create|usage|delete}" ;;
esac
七、最佳实践
7.1 命名规范
# 命名空间命名
# ✅ 推荐
dev # 开发
test # 测试
prod # 生产
tenant-a # 租户 A
project-x # 项目 X
# ❌ 不推荐
Dev # 大小写不一致
dev-env # 带特殊字符
123 # 纯数字
7.2 配额管理
// 租户配额配置
public class TenantQuotaConfig {
// 开发环境
public static final TenantQuota DEV_QUOTA = new TenantQuota(
"dev",
10, // 最大 Topic 数
5, // 最大消费组数
1000000 // 最大消息数
);
// 生产环境
public static final TenantQuota PROD_QUOTA = new TenantQuota(
"prod",
50, // 最大 Topic 数
20, // 最大消费组数
10000000 // 最大消息数
);
}
7.3 安全检查清单
创建租户前检查:
- [ ] 命名空间已创建
- [ ] 配额已配置
- [ ] ACL 已配置
- [ ] 监控已配置
定期检查:
- [ ] 配额使用情况
- [ ] 权限是否正确
- [ ] 资源是否浪费
总结
RocketMQ 命名空间的核心要点:
- 基础概念:资源隔离、多租户、多环境
- 配置方法:Broker 配置、Topic 配置、客户端配置
- 多租户实践:环境隔离、业务隔离、配额管理
- 权限控制:ACL 配置、权限验证
- 监控运维:监控指标、运维脚本
核心要点:
- 理解命名空间隔离机制
- 合理设计多租户架构
- 配置配额和权限
- 建立监控体系
参考资料
- RocketMQ 命名空间官方文档
- RocketMQ ACL 文档
- 《RocketMQ 技术内幕》第 10 章