Skip to content
清晨的一缕阳光
返回

RocketMQ Proxy 深度解析与实战

RocketMQ Proxy 是 RocketMQ 5.0 引入的轻量代理层,简化了客户端、支持多协议、统一鉴权。本文将深入探讨 Proxy 的原理、配置和实战。

一、Proxy 架构

1.1 架构演进

4.x 架构

graph TB
    subgraph 客户端
        P[Producer]
        C[Consumer]
    end
    
    subgraph Broker 集群
        B1[Broker 1]
        B2[Broker 2]
    end
    
    P --> B1
    C --> B1

5.0 Proxy 架构

graph TB
    subgraph 客户端
        P[Producer]
        C[Consumer]
    end
    
    subgraph Proxy 集群
        PX1[Proxy 1]
        PX2[Proxy 2]
    end
    
    subgraph Broker 集群
        B1[Broker 1]
        B2[Broker 2]
    end
    
    P --> PX1
    C --> PX1
    PX1 --> B1
    PX1 --> B2

1.2 核心优势

优势说明
简化客户端客户端无需感知 Broker 拓扑
支持多协议gRPC、HTTP、TCP
统一鉴权集中式权限管理
负载均衡自动负载均衡
云原生友好容器化部署

1.3 组件对比

组件4.x5.0 Proxy
客户端复杂简化
协议私有多协议
鉴权分散集中
运维复杂简单

二、Proxy 配置

2.1 基础配置

# proxy.conf

# 基础配置
proxyClusterName=DefaultProxy
proxyName=proxy-1

# RocketMQ 地址
rocketMQProxyNameServers=ns1:9876;ns2:9876

# 监听端口
listenPort=8080

# 线程池配置
remotingWorkerThreads=16
remotingSelectorThreads=4

# 路由配置
routeRuleFilePath=/etc/rocketmq/route-rule.yaml

2.2 gRPC 配置

# gRPC 配置
grpcPort=8081
grpcMaxInboundMessageSize=134217728  # 128MB
grpcIdleTimeMills=90000  # 90 秒
grpcProxyRelayRequestTimeoutInSeconds=5

2.3 认证配置

# 认证配置
authenticationEnabled=true
authenticationProviders=org.apache.rocketmq.proxy.auth.SimpleAuthenticationProvider

# ACL 配置
aclConfigFilePath=/etc/rocketmq/plain_acl.yml

三、部署实战

3.1 Docker 部署

# docker-compose.yml
version: '3'
services:
  nameserver:
    image: apache/rocketmq:5.0.0
    command: sh mqnamesrv
    ports:
      - "9876:9876"
  
  broker:
    image: apache/rocketmq:5.0.0
    command: sh mqbroker -n nameserver:9876
    ports:
      - "10911:10911"
      - "10909:10909"
    depends_on:
      - nameserver
  
  proxy:
    image: apache/rocketmq:5.0.0
    command: sh mqproxy -n nameserver:9876
    ports:
      - "8080:8080"
      - "8081:8081"
    depends_on:
      - broker
    volumes:
      - ./proxy.conf:/opt/rocketmq/conf/proxy.conf

3.2 Kubernetes 部署

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rocketmq-proxy
spec:
  replicas: 2
  selector:
    matchLabels:
      app: rocketmq-proxy
  template:
    metadata:
      labels:
        app: rocketmq-proxy
    spec:
      containers:
      - name: proxy
        image: apache/rocketmq:5.0.0
        command: ["sh", "-c", "mqproxy -n rocketmq-nameserver:9876"]
        ports:
        - containerPort: 8080
        - containerPort: 8081
        env:
        - name: PROXY_CONFIG
          value: "/etc/rocketmq/proxy.conf"
        volumeMounts:
        - name: config
          mountPath: /etc/rocketmq
      volumes:
      - name: config
        configMap:
          name: rocketmq-proxy-config

四、协议支持

4.1 gRPC 协议

Proto 定义

syntax = "proto3";

package apache.rocketmq.v2;

service MessagingService {
  // 发送消息
  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse);
  
  // 拉取消息
  rpc ReceiveMessage(ReceiveMessageRequest) returns (stream ReceiveMessageResponse);
  
  // 确认消息
  rpc AckMessage(AckMessageRequest) returns (AckMessageResponse);
}

message SendMessageRequest {
  string topic = 1;
  repeated Message messages = 2;
}

message Message {
  string message_id = 1;
  bytes body = 2;
  map<string, string> properties = 3;
}

Java 客户端

// gRPC 客户端
public class GrpcProducer {
    
    private final MessagingServiceGrpc.MessagingServiceBlockingStub stub;
    
    public GrpcProducer(String proxyAddress) {
        ManagedChannel channel = ManagedChannelBuilder
            .forTarget(proxyAddress)
            .usePlaintext()
            .build();
        
        stub = MessagingServiceGrpc.newBlockingStub(channel);
    }
    
    public SendMessageResponse send(String topic, byte[] body) {
        Message message = Message.newBuilder()
            .setMessageId(UUID.randomUUID().toString())
            .setBody(ByteString.copyFrom(body))
            .build();
        
        SendMessageRequest request = SendMessageRequest.newBuilder()
            .setTopic(topic)
            .addMessages(message)
            .build();
        
        return stub.sendMessage(request);
    }
}

4.2 HTTP 协议

REST API

# 发送消息
curl -X POST http://proxy:8080/topics/order-topic/messages \
  -H "Content-Type: application/json" \
  -d '{
    "body": "test message",
    "tags": "create",
    "keys": "order_123"
  }'

# 拉取消息
curl -X GET http://proxy:8080/consumers/order-consumer-group/messages \
  -H "Consumer-Group: order-consumer-group"

五、负载均衡

5.1 路由策略

public class ProxyLoadBalancer {
    
    private final List<BrokerAddress> brokers;
    private final AtomicLong index = new AtomicLong(0);
    
    /**
     * 轮询策略
     */
    public BrokerAddress roundRobin() {
        int size = brokers.size();
        if (size == 0) {
            return null;
        }
        return brokers.get((int) (index.getAndIncrement() % size));
    }
    
    /**
     * 最小负载策略
     */
    public BrokerAddress leastLoad() {
        return brokers.stream()
            .min(Comparator.comparingInt(BrokerAddress::getLoad))
            .orElse(null);
    }
    
    /**
     * 一致性哈希
     */
    public BrokerAddress consistentHash(String key) {
        int hash = Math.abs(key.hashCode());
        return brokers.get(hash % brokers.size());
    }
}

5.2 健康检查

public class ProxyHealthChecker {
    
    private final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(1);
    
    private final Map<BrokerAddress, Boolean> healthStatus = 
        new ConcurrentHashMap<>();
    
    public void start() {
        scheduler.scheduleAtFixedRate(this::checkHealth, 0, 30, TimeUnit.SECONDS);
    }
    
    private void checkHealth() {
        for (BrokerAddress broker : brokers) {
            boolean healthy = checkBrokerHealth(broker);
            healthStatus.put(broker, healthy);
        }
    }
    
    private boolean checkBrokerHealth(BrokerAddress broker) {
        try {
            // 发送心跳
            HeartbeatRequest request = new HeartbeatRequest();
            HeartbeatResponse response = broker.send(request);
            return response.isSuccess();
        } catch (Exception e) {
            return false;
        }
    }
}

六、认证鉴权

6.1 认证配置

# 启用认证
authenticationEnabled=true

# 认证提供者
authenticationProviders=org.apache.rocketmq.proxy.auth.SimpleAuthenticationProvider

# ACL 配置
aclConfigFilePath=/etc/rocketmq/plain_acl.yml

6.2 Token 认证

public class TokenAuthentication {
    
    private final String secretKey;
    
    public TokenAuthentication(String secretKey) {
        this.secretKey = secretKey;
    }
    
    /**
     * 生成 Token
     */
    public String generateToken(String userId) {
        long expiry = System.currentTimeMillis() + 3600000;  // 1 小时
        String payload = userId + ":" + expiry;
        
        String signature = hmacSHA256(payload, secretKey);
        return Base64.encode(payload + ":" + signature);
    }
    
    /**
     * 验证 Token
     */
    public boolean verifyToken(String token) {
        try {
            String decoded = Base64.decode(token);
            String[] parts = decoded.split(":");
            
            String userId = parts[0];
            long expiry = Long.parseLong(parts[1]);
            String signature = parts[2];
            
            // 检查过期
            if (System.currentTimeMillis() > expiry) {
                return false;
            }
            
            // 验证签名
            String expectedSignature = hmacSHA256(userId + ":" + expiry, secretKey);
            return signature.equals(expectedSignature);
            
        } catch (Exception e) {
            return false;
        }
    }
    
    private String hmacSHA256(String data, String key) {
        // HMAC-SHA256 实现
        // ...
    }
}

七、最佳实践

7.1 配置建议

Proxy 配置建议:
1. Proxy 数量:2+(高可用)
2. 线程池:根据 CPU 核心数配置
3. 超时时间:5 秒
4. 健康检查:30 秒
5. 认证鉴权:启用

7.2 性能优化

性能优化:
1. 使用 gRPC 协议
2. 启用连接池
3. 配置合理的超时
4. 监控请求延迟
5. 建立监控告警

7.3 监控指标

指标说明告警阈值
proxy_request_count请求数-
proxy_request_latency请求延迟> 100ms
proxy_error_count错误数> 0
proxy_connection_count连接数> 10000

总结

RocketMQ Proxy 的核心要点:

  1. 架构演进:轻量代理、多协议支持
  2. 配置方法:基础配置、gRPC 配置、认证配置
  3. 协议支持:gRPC、HTTP、TCP
  4. 负载均衡:路由策略、健康检查
  5. 认证鉴权:Token 认证、ACL 配置
  6. 部署实战:Docker、Kubernetes

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 批量消息详解与实战
下一篇文章
RocketMQ 监控体系与可观测性实战