RocketMQ Proxy 是 RocketMQ 5.0 引入的轻量代理层,简化了客户端、支持多协议、统一鉴权。本文将深入探讨 Proxy 的原理、配置和实战。
一、Proxy 架构
1.1 架构演进
4.x 架构:
graph TB
subgraph 客户端
P[Producer]
C[Consumer]
end
subgraph Broker 集群
B1[Broker 1]
B2[Broker 2]
end
P --> B1
C --> B1
5.0 Proxy 架构:
graph TB
subgraph 客户端
P[Producer]
C[Consumer]
end
subgraph Proxy 集群
PX1[Proxy 1]
PX2[Proxy 2]
end
subgraph Broker 集群
B1[Broker 1]
B2[Broker 2]
end
P --> PX1
C --> PX1
PX1 --> B1
PX1 --> B2
1.2 核心优势
| 优势 | 说明 |
|---|---|
| 简化客户端 | 客户端无需感知 Broker 拓扑 |
| 支持多协议 | gRPC、HTTP、TCP |
| 统一鉴权 | 集中式权限管理 |
| 负载均衡 | 自动负载均衡 |
| 云原生友好 | 容器化部署 |
1.3 组件对比
| 组件 | 4.x | 5.0 Proxy |
|---|---|---|
| 客户端 | 复杂 | 简化 |
| 协议 | 私有 | 多协议 |
| 鉴权 | 分散 | 集中 |
| 运维 | 复杂 | 简单 |
二、Proxy 配置
2.1 基础配置
# proxy.conf
# 基础配置
proxyClusterName=DefaultProxy
proxyName=proxy-1
# RocketMQ 地址
rocketMQProxyNameServers=ns1:9876;ns2:9876
# 监听端口
listenPort=8080
# 线程池配置
remotingWorkerThreads=16
remotingSelectorThreads=4
# 路由配置
routeRuleFilePath=/etc/rocketmq/route-rule.yaml
2.2 gRPC 配置
# gRPC 配置
grpcPort=8081
grpcMaxInboundMessageSize=134217728 # 128MB
grpcIdleTimeMills=90000 # 90 秒
grpcProxyRelayRequestTimeoutInSeconds=5
2.3 认证配置
# 认证配置
authenticationEnabled=true
authenticationProviders=org.apache.rocketmq.proxy.auth.SimpleAuthenticationProvider
# ACL 配置
aclConfigFilePath=/etc/rocketmq/plain_acl.yml
三、部署实战
3.1 Docker 部署
# docker-compose.yml
version: '3'
services:
nameserver:
image: apache/rocketmq:5.0.0
command: sh mqnamesrv
ports:
- "9876:9876"
broker:
image: apache/rocketmq:5.0.0
command: sh mqbroker -n nameserver:9876
ports:
- "10911:10911"
- "10909:10909"
depends_on:
- nameserver
proxy:
image: apache/rocketmq:5.0.0
command: sh mqproxy -n nameserver:9876
ports:
- "8080:8080"
- "8081:8081"
depends_on:
- broker
volumes:
- ./proxy.conf:/opt/rocketmq/conf/proxy.conf
3.2 Kubernetes 部署
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rocketmq-proxy
spec:
replicas: 2
selector:
matchLabels:
app: rocketmq-proxy
template:
metadata:
labels:
app: rocketmq-proxy
spec:
containers:
- name: proxy
image: apache/rocketmq:5.0.0
command: ["sh", "-c", "mqproxy -n rocketmq-nameserver:9876"]
ports:
- containerPort: 8080
- containerPort: 8081
env:
- name: PROXY_CONFIG
value: "/etc/rocketmq/proxy.conf"
volumeMounts:
- name: config
mountPath: /etc/rocketmq
volumes:
- name: config
configMap:
name: rocketmq-proxy-config
四、协议支持
4.1 gRPC 协议
Proto 定义:
syntax = "proto3";
package apache.rocketmq.v2;
service MessagingService {
// 发送消息
rpc SendMessage(SendMessageRequest) returns (SendMessageResponse);
// 拉取消息
rpc ReceiveMessage(ReceiveMessageRequest) returns (stream ReceiveMessageResponse);
// 确认消息
rpc AckMessage(AckMessageRequest) returns (AckMessageResponse);
}
message SendMessageRequest {
string topic = 1;
repeated Message messages = 2;
}
message Message {
string message_id = 1;
bytes body = 2;
map<string, string> properties = 3;
}
Java 客户端:
// gRPC 客户端
public class GrpcProducer {
private final MessagingServiceGrpc.MessagingServiceBlockingStub stub;
public GrpcProducer(String proxyAddress) {
ManagedChannel channel = ManagedChannelBuilder
.forTarget(proxyAddress)
.usePlaintext()
.build();
stub = MessagingServiceGrpc.newBlockingStub(channel);
}
public SendMessageResponse send(String topic, byte[] body) {
Message message = Message.newBuilder()
.setMessageId(UUID.randomUUID().toString())
.setBody(ByteString.copyFrom(body))
.build();
SendMessageRequest request = SendMessageRequest.newBuilder()
.setTopic(topic)
.addMessages(message)
.build();
return stub.sendMessage(request);
}
}
4.2 HTTP 协议
REST API:
# 发送消息
curl -X POST http://proxy:8080/topics/order-topic/messages \
-H "Content-Type: application/json" \
-d '{
"body": "test message",
"tags": "create",
"keys": "order_123"
}'
# 拉取消息
curl -X GET http://proxy:8080/consumers/order-consumer-group/messages \
-H "Consumer-Group: order-consumer-group"
五、负载均衡
5.1 路由策略
public class ProxyLoadBalancer {
private final List<BrokerAddress> brokers;
private final AtomicLong index = new AtomicLong(0);
/**
* 轮询策略
*/
public BrokerAddress roundRobin() {
int size = brokers.size();
if (size == 0) {
return null;
}
return brokers.get((int) (index.getAndIncrement() % size));
}
/**
* 最小负载策略
*/
public BrokerAddress leastLoad() {
return brokers.stream()
.min(Comparator.comparingInt(BrokerAddress::getLoad))
.orElse(null);
}
/**
* 一致性哈希
*/
public BrokerAddress consistentHash(String key) {
int hash = Math.abs(key.hashCode());
return brokers.get(hash % brokers.size());
}
}
5.2 健康检查
public class ProxyHealthChecker {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
private final Map<BrokerAddress, Boolean> healthStatus =
new ConcurrentHashMap<>();
public void start() {
scheduler.scheduleAtFixedRate(this::checkHealth, 0, 30, TimeUnit.SECONDS);
}
private void checkHealth() {
for (BrokerAddress broker : brokers) {
boolean healthy = checkBrokerHealth(broker);
healthStatus.put(broker, healthy);
}
}
private boolean checkBrokerHealth(BrokerAddress broker) {
try {
// 发送心跳
HeartbeatRequest request = new HeartbeatRequest();
HeartbeatResponse response = broker.send(request);
return response.isSuccess();
} catch (Exception e) {
return false;
}
}
}
六、认证鉴权
6.1 认证配置
# 启用认证
authenticationEnabled=true
# 认证提供者
authenticationProviders=org.apache.rocketmq.proxy.auth.SimpleAuthenticationProvider
# ACL 配置
aclConfigFilePath=/etc/rocketmq/plain_acl.yml
6.2 Token 认证
public class TokenAuthentication {
private final String secretKey;
public TokenAuthentication(String secretKey) {
this.secretKey = secretKey;
}
/**
* 生成 Token
*/
public String generateToken(String userId) {
long expiry = System.currentTimeMillis() + 3600000; // 1 小时
String payload = userId + ":" + expiry;
String signature = hmacSHA256(payload, secretKey);
return Base64.encode(payload + ":" + signature);
}
/**
* 验证 Token
*/
public boolean verifyToken(String token) {
try {
String decoded = Base64.decode(token);
String[] parts = decoded.split(":");
String userId = parts[0];
long expiry = Long.parseLong(parts[1]);
String signature = parts[2];
// 检查过期
if (System.currentTimeMillis() > expiry) {
return false;
}
// 验证签名
String expectedSignature = hmacSHA256(userId + ":" + expiry, secretKey);
return signature.equals(expectedSignature);
} catch (Exception e) {
return false;
}
}
private String hmacSHA256(String data, String key) {
// HMAC-SHA256 实现
// ...
}
}
七、最佳实践
7.1 配置建议
Proxy 配置建议:
1. Proxy 数量:2+(高可用)
2. 线程池:根据 CPU 核心数配置
3. 超时时间:5 秒
4. 健康检查:30 秒
5. 认证鉴权:启用
7.2 性能优化
性能优化:
1. 使用 gRPC 协议
2. 启用连接池
3. 配置合理的超时
4. 监控请求延迟
5. 建立监控告警
7.3 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
proxy_request_count | 请求数 | - |
proxy_request_latency | 请求延迟 | > 100ms |
proxy_error_count | 错误数 | > 0 |
proxy_connection_count | 连接数 | > 10000 |
总结
RocketMQ Proxy 的核心要点:
- 架构演进:轻量代理、多协议支持
- 配置方法:基础配置、gRPC 配置、认证配置
- 协议支持:gRPC、HTTP、TCP
- 负载均衡:路由策略、健康检查
- 认证鉴权:Token 认证、ACL 配置
- 部署实战:Docker、Kubernetes
核心要点:
- 理解 Proxy 架构优势
- 掌握 Proxy 配置方法
- 支持多协议访问
- 实现负载均衡
- 建立认证鉴权
参考资料
- RocketMQ Proxy 官方文档
- RocketMQ gRPC 协议
- 《RocketMQ 技术内幕》第 12 章