RocketMQ 监控体系是保障集群稳定运行的关键。本文将深入探讨指标采集、日志收集、链路追踪、告警配置等可观测性实践。
一、监控架构
1.1 监控层次
graph TB
subgraph 基础设施层
CPU[CPU]
MEM[内存]
DISK[磁盘]
NET[网络]
end
subgraph RocketMQ 层
NAMESRV[NameServer]
BROKER[Broker]
TOPIC[Topic]
CONSUMER[Consumer]
end
subgraph 业务层
TPS[TPS]
LAG[Consumer Lag]
ERROR[错误率]
end
subgraph 展示层
PROM[Prometheus]
GRAF[Grafana]
ALERT[告警]
end
CPU --> BROKER
BROKER --> TPS
TPS --> PROM
PROM --> GRAF
PROM --> ALERT
1.2 监控指标分类
| 分类 | 指标 | 说明 |
|---|---|---|
| 基础设施 | CPU、内存、磁盘、网络 | 服务器资源 |
| NameServer | 连接数、请求数 | NameServer 状态 |
| Broker | TPS、延迟、连接数 | Broker 性能 |
| Topic | 消息量、队列数 | Topic 状态 |
| Consumer | Lag、TPS、重平衡 | 消费状态 |
| 业务 | 成功率、错误率、延迟 | 业务指标 |
二、指标采集
2.1 RocketMQ Exporter
配置:
# rocketmq-exporter 配置
rocketmq.config.namesrvAddr: ns1:9876;ns2:9876
rocketmq.config.webTelemetryPath: /metrics
server.port: 5557
部署:
#!/bin/bash
# 部署 RocketMQ Exporter
# 1. 下载 Exporter
wget https://github.com/apache/rocketmq-exporter/releases/download/v0.0.2/rocketmq-exporter-0.0.2-SNAPSHOT-exec.jar
# 2. 启动 Exporter
java -jar rocketmq-exporter-0.0.2-SNAPSHOT-exec.jar \
--rocketmq.config.namesrvAddr=ns1:9876;ns2:9876 \
--server.port=5557 &
2.2 Prometheus 配置
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'rocketmq-broker'
static_configs:
- targets:
- 'broker-1:5557'
- 'broker-2:5557'
- 'broker-3:5557'
metrics_path: '/metrics'
- job_name: 'rocketmq-nameserver'
static_configs:
- targets:
- 'ns1:5558'
- 'ns2:5558'
- job_name: 'node'
static_configs:
- targets:
- 'broker-1:9100'
- 'broker-2:9100'
- 'broker-3:9100'
三、日志收集
3.1 Filebeat 配置
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/rocketmq/*.log
fields:
service: rocketmq
multiline.pattern: '^\['
multiline.negate: true
multiline.match: after
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
output.elasticsearch:
hosts: ["elasticsearch:9200"]
indices:
- index: "rocketmq-%{+yyyy.MM.dd}"
3.2 日志分析
错误日志分析:
#!/bin/bash
# 错误日志分析脚本
LOG_FILE="/var/log/rocketmq/broker.log"
echo "=== 错误统计 ==="
grep -i "error" $LOG_FILE | \
awk -F'ERROR' '{print $2}' | \
awk -F':' '{print $1}' | \
sort | uniq -c | sort -rn
echo -e "\n=== 警告统计 ==="
grep -i "warn" $LOG_FILE | \
awk -F'WARN' '{print $2}' | \
awk -F':' '{print $1}' | \
sort | uniq -c | sort -rn
echo -e "\n=== 最近错误 ==="
grep -i "error" $LOG_FILE | tail -20
慢查询分析:
#!/bin/bash
# 慢查询分析脚本
LOG_FILE="/var/log/rocketmq/broker.log"
echo "=== 慢请求统计 ==="
grep "cost" $LOG_FILE | \
awk -F'cost' '{print $2}' | \
awk '{print $1}' | \
sort -n | tail -20
四、链路追踪
4.1 OpenTelemetry 集成
配置:
// Java 应用配置
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("ns1:9876");
// OpenTelemetry 配置
System.setProperty("otel.traces.exporter", "jaeger");
System.setProperty("otel.exporter.jaeger.endpoint", "http://jaeger:14268");
producer.start();
4.2 链路追踪实现
public class TracedProducer {
private final DefaultMQProducer producer;
private final Tracer tracer;
public TracedProducer(Tracer tracer) {
this.tracer = tracer;
this.producer = createProducer();
}
public void send(Message message) throws Exception {
Span span = tracer.buildSpan("rocketmq-produce")
.withTag("topic", message.getTopic())
.withTag("tags", message.getTags())
.withTag("keys", message.getKeys())
.start();
try {
// 注入追踪上下文
message.putUserProperty("traceparent",
tracer.inject(span.context()));
SendResult result = producer.send(message);
span.finish();
} catch (Exception e) {
span.setTag("error", true);
span.finish();
throw e;
}
}
}
4.3 Jaeger 配置
# docker-compose.yml
version: '3'
services:
jaeger:
image: jaegertracing/all-in-one:1.37
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "16686:16686"
- "14268:14268"
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
五、告警配置
5.1 告警规则
# alerting_rules.yml
groups:
- name: rocketmq
rules:
# Broker 宕机
- alert: RocketMQBrokerDown
expr: up{job="rocketmq-broker"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Broker 宕机:{{ $labels.instance }}"
# Consumer Lag
- alert: RocketMQConsumerLag
expr: rocketmq_group_diff > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "消费滞后:{{ $labels.group }} - {{ $value }}"
# 磁盘使用率
- alert: RocketMQDiskHigh
expr: rocketmq_commitlog_disk_ratio > 80
for: 5m
labels:
severity: warning
annotations:
summary: "磁盘使用率过高:{{ $value }}%"
# 生产 TPS 过低
- alert: RocketMQProducerTPSLow
expr: rate(rocketmq_broker_tps[5m]) < 1000
for: 5m
labels:
severity: warning
annotations:
summary: "生产 TPS 过低:{{ $value }}"
# 消费失败
- alert: RocketMQConsumerFailed
expr: rate(rocketmq_consumer_failed_count[5m]) > 0
for: 5m
labels:
severity: warning
annotations:
summary: "消费失败:{{ $labels.group }}"
5.2 告警通知
# alertmanager.yml
global:
smtp_smarthost: 'smtp.example.com:587'
smtp_from: 'alert@example.com'
route:
group_by: ['alertname']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'email'
routes:
- match:
severity: critical
receiver: 'pagerduty'
- match:
severity: warning
receiver: 'email'
receivers:
- name: 'email'
email_configs:
- to: 'ops-team@example.com'
- name: 'pagerduty'
pagerduty_configs:
- service_key: 'your-pagerduty-key'
- name: 'dingtalk'
webhook_configs:
- url: 'https://oapi.dingtalk.com/robot/send?access_token=xxx'
六、Grafana 仪表盘
6.1 核心面板
{
"dashboard": {
"title": "RocketMQ 监控",
"panels": [
{
"title": "生产/消费 TPS",
"targets": [{
"expr": "sum(rate(rocketmq_broker_tps[1m]))"
}]
},
{
"title": "消费堆积",
"targets": [{
"expr": "sum(rocketmq_group_diff) by (group)"
}]
},
{
"title": "磁盘使用率",
"targets": [{
"expr": "rocketmq_commitlog_disk_ratio"
}]
},
{
"title": "Broker 状态",
"targets": [{
"expr": "rocketmq_brokeruntime_broker_membership"
}]
}
]
}
}
6.2 推荐仪表盘
| 仪表盘 | 说明 |
|---|---|
| RocketMQ Overview | 集群概览、TPS、延迟 |
| Broker Metrics | Broker 详细指标 |
| Consumer Lag | Consumer Lag 监控 |
| Topic Metrics | Topic 详细指标 |
| JVM Metrics | JVM 指标、GC |
七、最佳实践
7.1 监控建议
监控建议:
1. 采集间隔 15 秒
2. 保留数据 30 天
3. 配置多级告警
4. 建立值班制度
5. 定期演练
7.2 告警阈值
告警阈值建议:
- Broker 宕机:1 分钟
- Consumer Lag > 10000:5 分钟
- 磁盘使用率 > 80%:5 分钟
- 生产 TPS < 1000:5 分钟
- 消费失败 > 0:5 分钟
7.3 检查清单
监控检查:
- [ ] Exporter 正常运行
- [ ] Prometheus 正常采集
- [ ] Grafana 仪表盘正常
- [ ] 告警规则配置
- [ ] 告警通知正常
- [ ] 日志收集正常
- [ ] 链路追踪正常
总结
RocketMQ 监控体系的核心要点:
- 监控架构:基础设施、RocketMQ 层、业务层
- 指标采集:RocketMQ Exporter、Prometheus
- 日志收集:Filebeat、ELK
- 链路追踪:OpenTelemetry、Jaeger
- 告警配置:告警规则、通知配置
- 仪表盘:Grafana、核心面板
核心要点:
- 建立完整的监控体系
- 配置合理的告警阈值
- 实现链路追踪
- 建立值班制度
- 定期演练和优化