Skip to content
清晨的一缕阳光
返回

RocketMQ 监控体系与可观测性实战

RocketMQ 监控体系是保障集群稳定运行的关键。本文将深入探讨指标采集、日志收集、链路追踪、告警配置等可观测性实践。

一、监控架构

1.1 监控层次

graph TB
    subgraph 基础设施层
        CPU[CPU]
        MEM[内存]
        DISK[磁盘]
        NET[网络]
    end
    
    subgraph RocketMQ 层
        NAMESRV[NameServer]
        BROKER[Broker]
        TOPIC[Topic]
        CONSUMER[Consumer]
    end
    
    subgraph 业务层
        TPS[TPS]
        LAG[Consumer Lag]
        ERROR[错误率]
    end
    
    subgraph 展示层
        PROM[Prometheus]
        GRAF[Grafana]
        ALERT[告警]
    end
    
    CPU --> BROKER
    BROKER --> TPS
    TPS --> PROM
    PROM --> GRAF
    PROM --> ALERT

1.2 监控指标分类

分类指标说明
基础设施CPU、内存、磁盘、网络服务器资源
NameServer连接数、请求数NameServer 状态
BrokerTPS、延迟、连接数Broker 性能
Topic消息量、队列数Topic 状态
ConsumerLag、TPS、重平衡消费状态
业务成功率、错误率、延迟业务指标

二、指标采集

2.1 RocketMQ Exporter

配置

# rocketmq-exporter 配置
rocketmq.config.namesrvAddr: ns1:9876;ns2:9876
rocketmq.config.webTelemetryPath: /metrics
server.port: 5557

部署

#!/bin/bash
# 部署 RocketMQ Exporter

# 1. 下载 Exporter
wget https://github.com/apache/rocketmq-exporter/releases/download/v0.0.2/rocketmq-exporter-0.0.2-SNAPSHOT-exec.jar

# 2. 启动 Exporter
java -jar rocketmq-exporter-0.0.2-SNAPSHOT-exec.jar \
  --rocketmq.config.namesrvAddr=ns1:9876;ns2:9876 \
  --server.port=5557 &

2.2 Prometheus 配置

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'rocketmq-broker'
    static_configs:
      - targets: 
          - 'broker-1:5557'
          - 'broker-2:5557'
          - 'broker-3:5557'
    metrics_path: '/metrics'
  
  - job_name: 'rocketmq-nameserver'
    static_configs:
      - targets:
          - 'ns1:5558'
          - 'ns2:5558'
  
  - job_name: 'node'
    static_configs:
      - targets:
          - 'broker-1:9100'
          - 'broker-2:9100'
          - 'broker-3:9100'

三、日志收集

3.1 Filebeat 配置

# filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/rocketmq/*.log
  fields:
    service: rocketmq
  multiline.pattern: '^\['
  multiline.negate: true
  multiline.match: after

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  indices:
    - index: "rocketmq-%{+yyyy.MM.dd}"

3.2 日志分析

错误日志分析

#!/bin/bash
# 错误日志分析脚本

LOG_FILE="/var/log/rocketmq/broker.log"

echo "=== 错误统计 ==="
grep -i "error" $LOG_FILE | \
  awk -F'ERROR' '{print $2}' | \
  awk -F':' '{print $1}' | \
  sort | uniq -c | sort -rn

echo -e "\n=== 警告统计 ==="
grep -i "warn" $LOG_FILE | \
  awk -F'WARN' '{print $2}' | \
  awk -F':' '{print $1}' | \
  sort | uniq -c | sort -rn

echo -e "\n=== 最近错误 ==="
grep -i "error" $LOG_FILE | tail -20

慢查询分析

#!/bin/bash
# 慢查询分析脚本

LOG_FILE="/var/log/rocketmq/broker.log"

echo "=== 慢请求统计 ==="
grep "cost" $LOG_FILE | \
  awk -F'cost' '{print $2}' | \
  awk '{print $1}' | \
  sort -n | tail -20

四、链路追踪

4.1 OpenTelemetry 集成

配置

// Java 应用配置
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("ns1:9876");

// OpenTelemetry 配置
System.setProperty("otel.traces.exporter", "jaeger");
System.setProperty("otel.exporter.jaeger.endpoint", "http://jaeger:14268");

producer.start();

4.2 链路追踪实现

public class TracedProducer {
    
    private final DefaultMQProducer producer;
    private final Tracer tracer;
    
    public TracedProducer(Tracer tracer) {
        this.tracer = tracer;
        this.producer = createProducer();
    }
    
    public void send(Message message) throws Exception {
        Span span = tracer.buildSpan("rocketmq-produce")
            .withTag("topic", message.getTopic())
            .withTag("tags", message.getTags())
            .withTag("keys", message.getKeys())
            .start();
        
        try {
            // 注入追踪上下文
            message.putUserProperty("traceparent", 
                tracer.inject(span.context()));
            
            SendResult result = producer.send(message);
            span.finish();
            
        } catch (Exception e) {
            span.setTag("error", true);
            span.finish();
            throw e;
        }
    }
}

4.3 Jaeger 配置

# docker-compose.yml
version: '3'
services:
  jaeger:
    image: jaegertracing/all-in-one:1.37
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "16686:16686"
      - "14268:14268"
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411

五、告警配置

5.1 告警规则

# alerting_rules.yml
groups:
  - name: rocketmq
    rules:
      # Broker 宕机
      - alert: RocketMQBrokerDown
        expr: up{job="rocketmq-broker"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Broker 宕机:{{ $labels.instance }}"
      
      # Consumer Lag
      - alert: RocketMQConsumerLag
        expr: rocketmq_group_diff > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消费滞后:{{ $labels.group }} - {{ $value }}"
      
      # 磁盘使用率
      - alert: RocketMQDiskHigh
        expr: rocketmq_commitlog_disk_ratio > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "磁盘使用率过高:{{ $value }}%"
      
      # 生产 TPS 过低
      - alert: RocketMQProducerTPSLow
        expr: rate(rocketmq_broker_tps[5m]) < 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "生产 TPS 过低:{{ $value }}"
      
      # 消费失败
      - alert: RocketMQConsumerFailed
        expr: rate(rocketmq_consumer_failed_count[5m]) > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消费失败:{{ $labels.group }}"

5.2 告警通知

# alertmanager.yml
global:
  smtp_smarthost: 'smtp.example.com:587'
  smtp_from: 'alert@example.com'

route:
  group_by: ['alertname']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'email'
  
  routes:
    - match:
        severity: critical
      receiver: 'pagerduty'
    - match:
        severity: warning
      receiver: 'email'

receivers:
  - name: 'email'
    email_configs:
      - to: 'ops-team@example.com'
  
  - name: 'pagerduty'
    pagerduty_configs:
      - service_key: 'your-pagerduty-key'
  
  - name: 'dingtalk'
    webhook_configs:
      - url: 'https://oapi.dingtalk.com/robot/send?access_token=xxx'

六、Grafana 仪表盘

6.1 核心面板

{
  "dashboard": {
    "title": "RocketMQ 监控",
    "panels": [
      {
        "title": "生产/消费 TPS",
        "targets": [{
          "expr": "sum(rate(rocketmq_broker_tps[1m]))"
        }]
      },
      {
        "title": "消费堆积",
        "targets": [{
          "expr": "sum(rocketmq_group_diff) by (group)"
        }]
      },
      {
        "title": "磁盘使用率",
        "targets": [{
          "expr": "rocketmq_commitlog_disk_ratio"
        }]
      },
      {
        "title": "Broker 状态",
        "targets": [{
          "expr": "rocketmq_brokeruntime_broker_membership"
        }]
      }
    ]
  }
}

6.2 推荐仪表盘

仪表盘说明
RocketMQ Overview集群概览、TPS、延迟
Broker MetricsBroker 详细指标
Consumer LagConsumer Lag 监控
Topic MetricsTopic 详细指标
JVM MetricsJVM 指标、GC

七、最佳实践

7.1 监控建议

监控建议:
1. 采集间隔 15 秒
2. 保留数据 30 天
3. 配置多级告警
4. 建立值班制度
5. 定期演练

7.2 告警阈值

告警阈值建议:
- Broker 宕机:1 分钟
- Consumer Lag > 10000:5 分钟
- 磁盘使用率 > 80%:5 分钟
- 生产 TPS < 1000:5 分钟
- 消费失败 > 0:5 分钟

7.3 检查清单

监控检查:
- [ ] Exporter 正常运行
- [ ] Prometheus 正常采集
- [ ] Grafana 仪表盘正常
- [ ] 告警规则配置
- [ ] 告警通知正常
- [ ] 日志收集正常
- [ ] 链路追踪正常

总结

RocketMQ 监控体系的核心要点:

  1. 监控架构:基础设施、RocketMQ 层、业务层
  2. 指标采集:RocketMQ Exporter、Prometheus
  3. 日志收集:Filebeat、ELK
  4. 链路追踪:OpenTelemetry、Jaeger
  5. 告警配置:告警规则、通知配置
  6. 仪表盘:Grafana、核心面板

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ Proxy 深度解析与实战
下一篇文章
Redis 最佳实践总结