Kafka MirrorMaker 是 Kafka 官方提供的跨集群数据同步工具,支持多集群数据复制和灾备。本文将深入探讨 MirrorMaker 的实现原理和实战应用。
一、MirrorMaker 基础
1.1 什么是 MirrorMaker?
定义:
MirrorMaker = Kafka 集群间数据复制工具
功能:
- 跨集群消息复制
- 多活架构支持
- 灾备和迁移
- 数据聚合
1.2 架构演进
MirrorMaker 1.0:
graph LR
subgraph Source Cluster
S1[Source Topic]
end
subgraph MirrorMaker
MM[Consumer + Producer]
end
subgraph Target Cluster
T1[Target Topic]
end
S1 -->|Consumer | MM
MM -->|Producer | T1
MirrorMaker 2.0:
graph TB
subgraph Source Cluster
S1[Source Topic]
S2[Source Topic 2]
end
subgraph MirrorMaker 2.0
SC[Source Connector]
HC[Heartbeat Connector]
MC[Mirror Connector]
end
subgraph Target Cluster
T1[Mirror Topic]
T2[Heartbeat Topic]
end
S1 --> SC
S2 --> SC
SC --> MC
MC --> T1
HC --> T2
1.3 版本对比
| 特性 | MM 1.0 | MM 2.0 |
|---|---|---|
| 架构 | Consumer+Producer | Kafka Connect |
| 配置 | 命令行参数 | Connector 配置 |
| 监控 | 有限 | JMX + Connect API |
| 复制 | 单向 | 双向 |
| 心跳检测 | 无 | 有 |
| 推荐 | ❌ | ✅ |
二、部署配置
2.1 Docker 部署
# docker-compose.yml
version: '3'
services:
# Source Cluster
source-kafka:
image: confluentinc/cp-kafka:7.4.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: source-zk:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://source-kafka:29092
# Target Cluster
target-kafka:
image: confluentinc/cp-kafka:7.4.0
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: target-zk:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://target-kafka:29092
# MirrorMaker 2.0
mirror-maker:
image: confluentinc/cp-kafka-connect:7.4.0
depends_on:
- source-kafka
- target-kafka
environment:
CONNECT_BOOTSTRAP_SERVERS: target-kafka:29092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: mirror-maker-group
CONNECT_CONFIG_STORAGE_TOPIC: mm-configs
CONNECT_OFFSET_STORAGE_TOPIC: mm-offsets
CONNECT_STATUS_STORAGE_TOPIC: mm-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: mirror-maker
CONNECT_PLUGIN_PATH: /usr/share/java
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
volumes:
- ./mm2-configs:/etc/kafka-connect
command:
- bash
- -c
- |
echo "Installing connector plugins"
# MirrorMaker 2.0 已内置
echo "Starting Kafka Connect"
/etc/confluent/docker/run &
sleep 30
echo "Creating MirrorMaker 2.0 Connector"
curl -X POST http://mirror-maker:8083/connectors \
-H "Content-Type: application/json" \
-d @/etc/kafka-connect/mirror-maker.json
sleep infinity
2.2 Connector 配置
{
"name": "source-to-target-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"tasks.max": 3,
// Source Cluster
"source.cluster.alias": "source",
"source.cluster.bootstrap.servers": "source-kafka:29092",
// Target Cluster
"target.cluster.alias": "target",
"target.cluster.bootstrap.servers": "target-kafka:29092",
// Topics
"topics": ".*",
"topics.exclude": ".*-topic-to-exclude",
// Replication
"replication.factor": 2,
"replication.policy.class": "org.apache.kafka.connect.mirror.DefaultReplicationPolicy",
"replication.policy.topic.name.separator": ".",
// Offsets
"sync.group.offsets.enabled": true,
"offset-syncs.topic.replication.factor": 2,
// Heartbeat
"heartbeat.topic.enabled": true,
"heartbeat.topic.interval.millis": 10000,
// Performance
"consumer.fetch.max.bytes": 52428800,
"producer.batch.size": 16384,
"producer.linger.ms": 100
}
}
三、复制模式
3.1 单向复制
graph LR
subgraph Cluster A
A1[Topic A]
end
subgraph MirrorMaker
MM[Mirror Connector]
end
subgraph Cluster B
B1[Topic A (Mirror)]
end
A1 --> MM
MM --> B1
配置:
{
"name": "a-to-b-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "A",
"source.cluster.bootstrap.servers": "cluster-a:9092",
"target.cluster.alias": "B",
"target.cluster.bootstrap.servers": "cluster-b:9092",
"topics": "important-topic",
"replication.factor": 3
}
}
3.2 双向复制
graph TB
subgraph Cluster A
A1[Topic A]
end
subgraph Cluster B
B1[Topic B]
end
subgraph MirrorMaker
MM1[A->B Connector]
MM2[B->A Connector]
end
A1 --> MM1
MM1 --> B1
B1 --> MM2
MM2 --> A1
配置:
[
{
"name": "a-to-b-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "A",
"source.cluster.bootstrap.servers": "cluster-a:9092",
"target.cluster.alias": "B",
"target.cluster.bootstrap.servers": "cluster-b:9092",
"topics": "important-topic"
}
},
{
"name": "b-to-a-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "B",
"source.cluster.bootstrap.servers": "cluster-b:9092",
"target.cluster.alias": "A",
"target.cluster.bootstrap.servers": "cluster-a:9092",
"topics": "important-topic"
}
}
]
3.3 聚合复制
graph TB
subgraph Cluster A
A1[Topic A]
end
subgraph Cluster B
B1[Topic B]
end
subgraph Cluster C
C1[Aggregated Topic]
end
subgraph MirrorMaker
MM1[A->C Connector]
MM2[B->C Connector]
end
A1 --> MM1
MM1 --> C1
B1 --> MM2
MM2 --> C1
配置:
[
{
"name": "a-to-c-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "A",
"source.cluster.bootstrap.servers": "cluster-a:9092",
"target.cluster.alias": "C",
"target.cluster.bootstrap.servers": "cluster-c:9092",
"topics": "events-topic",
"replication.policy.topic.name.separator": ".a."
}
},
{
"name": "b-to-c-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "B",
"source.cluster.bootstrap.servers": "cluster-b:9092",
"target.cluster.alias": "C",
"target.cluster.bootstrap.servers": "cluster-c:9092",
"topics": "events-topic",
"replication.policy.topic.name.separator": ".b."
}
}
]
四、实战应用
4.1 灾备架构
graph TB
subgraph 主数据中心
A1[Kafka Cluster A]
APP1[应用程序]
end
subgraph 灾备中心
B1[Kafka Cluster B]
APP2[备用应用]
end
subgraph MirrorMaker
MM[A->B Mirror]
end
APP1 --> A1
A1 --> MM
MM --> B1
B1 -.->|故障切换 | APP2
配置:
{
"name": "primary-to-dr-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "primary",
"source.cluster.bootstrap.servers": "primary-dc:9092",
"target.cluster.alias": "dr",
"target.cluster.bootstrap.servers": "dr-dc:9092",
"topics": ".*",
"replication.factor": 3,
"sync.group.offsets.enabled": true,
"heartbeat.topic.enabled": true
}
}
故障切换脚本:
#!/bin/bash
# 灾备切换脚本
PRIMARY="primary-dc:9092"
DR="dr-dc:9092"
CONSUMER_GROUP="my-consumer-group"
echo "=== 灾备切换 ==="
# 1. 检查灾备集群状态
echo "检查灾备集群..."
kafka-consumer-groups.sh --bootstrap-server $DR --describe --group $CONSUMER_GROUP
# 2. 获取灾备 Offset
DR_OFFSET=$(kafka-consumer-groups.sh --bootstrap-server $DR \
--describe --group $CONSUMER_GROUP | awk 'NR>3 {print $5}')
echo "灾备 Offset: $DR_OFFSET"
# 3. 更新应用配置
echo "更新应用配置..."
# 更新应用程序的 bootstrap.servers 配置
# 4. 重启应用
echo "重启应用..."
# systemctl restart my-application
echo "切换完成"
4.2 多活架构
graph TB
subgraph 数据中心 A
A1[Kafka A]
APP1[应用 A]
end
subgraph 数据中心 B
B1[Kafka B]
APP2[应用 B]
end
subgraph MirrorMaker
MM1[A->B Mirror]
MM2[B->A Mirror]
end
APP1 --> A1
APP2 --> B1
A1 --> MM1
MM1 --> B1
B1 --> MM2
MM2 --> A1
配置:
[
{
"name": "a-to-b-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "A",
"source.cluster.bootstrap.servers": "dc-a:9092",
"target.cluster.alias": "B",
"target.cluster.bootstrap.servers": "dc-b:9092",
"topics": "shared-topic",
"replication.policy.topic.name.separator": ".a."
}
},
{
"name": "b-to-a-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "B",
"source.cluster.bootstrap.servers": "dc-b:9092",
"target.cluster.alias": "A",
"target.cluster.bootstrap.servers": "dc-a:9092",
"topics": "shared-topic",
"replication.policy.topic.name.separator": ".b."
}
}
]
4.3 数据迁移
#!/bin/bash
# 集群迁移脚本
OLD_CLUSTER="old-cluster:9092"
NEW_CLUSTER="new-cluster:9092"
TOPICS="topic-1,topic-2,topic-3"
echo "=== Kafka 集群迁移 ==="
# 1. 创建 MirrorMaker 配置
cat > mm2-migration.json << EOF
{
"name": "migration-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "old",
"source.cluster.bootstrap.servers": "$OLD_CLUSTER",
"target.cluster.alias": "new",
"target.cluster.bootstrap.servers": "$NEW_CLUSTER",
"topics": "$TOPICS",
"replication.factor": 3,
"sync.group.offsets.enabled": true
}
}
EOF
# 2. 启动 MirrorMaker
echo "启动 MirrorMaker..."
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @mm2-migration.json
# 3. 监控同步进度
echo "监控同步进度..."
while true; do
lag=$(curl -s http://localhost:8083/connectors/migration-mirror/status | \
jq '.tasks[].state' | grep -c "RUNNING")
echo "运行中任务数:$lag"
sleep 10
done
# 4. 切换流量
echo "切换流量..."
# 更新生产者配置到新集群
# 5. 停止 MirrorMaker
echo "停止 MirrorMaker..."
curl -X DELETE http://localhost:8083/connectors/migration-mirror
echo "迁移完成"
五、监控运维
5.1 监控指标
| 指标 | 说明 |
|---|---|
connect-mirror:replication-lag | 复制延迟 |
connect-mirror:messages-replicated | 已复制消息数 |
connect-mirror:bytes-replicated | 已复制字节数 |
connect-mirror:heartbeat-lag | 心跳延迟 |
5.2 REST API
# 查看 Connector 状态
curl http://localhost:8083/connectors/source-to-target-mirror/status
# 查看 Connector 配置
curl http://localhost:8083/connectors/source-to-target-mirror/config
# 重启 Connector
curl -X POST http://localhost:8083/connectors/source-to-target-mirror/restart
# 暂停 Connector
curl -X PUT http://localhost:8083/connectors/source-to-target-mirror/pause
# 恢复 Connector
curl -X PUT http://localhost:8083/connectors/source-to-target-mirror/resume
5.3 告警配置
# Prometheus 告警规则
groups:
- name: kafka-mirror-maker
rules:
- alert: MirrorMakerReplicationLag
expr: kafka_connect_mirror_replication_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "复制延迟过高:{{ $value }}"
- alert: MirrorMakerConnectorDown
expr: kafka_connect_connector_status != 1
for: 1m
labels:
severity: critical
annotations:
summary: "Connector 宕机:{{ $labels.connector }}"
- alert: MirrorMakerHeartbeatLag
expr: kafka_connect_mirror_heartbeat_lag > 60
for: 5m
labels:
severity: warning
annotations:
summary: "心跳延迟:{{ $value }}秒"
六、性能优化
6.1 并发配置
{
"tasks.max": 6,
"consumer.fetch.max.bytes": 52428800,
"producer.batch.size": 32768,
"producer.linger.ms": 100,
"producer.compression.type": "lz4"
}
6.2 网络优化
{
"consumer.max.poll.records": 1000,
"consumer.max.poll.interval.ms": 300000,
"producer.buffer.memory": 33554432,
"producer.request.timeout.ms": 30000
}
6.3 故障恢复
{
"connector.client.config.override.policy": "All",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "mm2-dlq",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.retry.timeout": 60000,
"errors.retry.backoff.ms": 1000
}
七、最佳实践
7.1 Topic 命名
# MirrorMaker 自动添加前缀
# 格式:{source-cluster}.{original-topic}
# 示例
# source.topic-a -> target.source.topic-a
# 自定义命名
"replication.policy.topic.name.separator": "." # 默认
"replication.policy.topic.name.separator": "-" # 自定义
7.2 偏移量同步
{
"sync.group.offsets.enabled": true,
"offset-syncs.topic.replication.factor": 3,
"offset-syncs.topic.partitions": 10
}
7.3 监控检查清单
日常检查:
- [ ] 复制延迟 < 10000
- [ ] 心跳正常
- [ ] Connector 状态 RUNNING
- [ ] 无错误日志
定期检查:
- [ ] 数据一致性验证
- [ ] 性能基准测试
- [ ] 故障切换演练
总结
Kafka MirrorMaker 的核心要点:
- 架构模式:单向、双向、聚合复制
- 部署配置:Docker、Connector 配置
- 实战应用:灾备、多活、数据迁移
- 监控运维:指标、API、告警
- 性能优化:并发、网络、故障恢复
核心要点:
- 优先使用 MirrorMaker 2.0
- 配置心跳检测
- 监控复制延迟
- 定期演练故障切换
参考资料
- Kafka MirrorMaker 官方文档
- KIP-382: MirrorMaker 2.0
- 《Kafka 权威指南》第 11 章