Skip to content
清晨的一缕阳光
返回

Kafka MirrorMaker 跨集群数据同步实战

Kafka MirrorMaker 是 Kafka 官方提供的跨集群数据同步工具,支持多集群数据复制和灾备。本文将深入探讨 MirrorMaker 的实现原理和实战应用。

一、MirrorMaker 基础

1.1 什么是 MirrorMaker?

定义

MirrorMaker = Kafka 集群间数据复制工具

功能:
- 跨集群消息复制
- 多活架构支持
- 灾备和迁移
- 数据聚合

1.2 架构演进

MirrorMaker 1.0

graph LR
    subgraph Source Cluster
        S1[Source Topic]
    end
    
    subgraph MirrorMaker
        MM[Consumer + Producer]
    end
    
    subgraph Target Cluster
        T1[Target Topic]
    end
    
    S1 -->|Consumer | MM
    MM -->|Producer | T1

MirrorMaker 2.0

graph TB
    subgraph Source Cluster
        S1[Source Topic]
        S2[Source Topic 2]
    end
    
    subgraph MirrorMaker 2.0
        SC[Source Connector]
        HC[Heartbeat Connector]
        MC[Mirror Connector]
    end
    
    subgraph Target Cluster
        T1[Mirror Topic]
        T2[Heartbeat Topic]
    end
    
    S1 --> SC
    S2 --> SC
    SC --> MC
    MC --> T1
    HC --> T2

1.3 版本对比

特性MM 1.0MM 2.0
架构Consumer+ProducerKafka Connect
配置命令行参数Connector 配置
监控有限JMX + Connect API
复制单向双向
心跳检测
推荐

二、部署配置

2.1 Docker 部署

# docker-compose.yml
version: '3'

services:
  # Source Cluster
  source-kafka:
    image: confluentinc/cp-kafka:7.4.0
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: source-zk:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://source-kafka:29092
  
  # Target Cluster
  target-kafka:
    image: confluentinc/cp-kafka:7.4.0
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: target-zk:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://target-kafka:29092
  
  # MirrorMaker 2.0
  mirror-maker:
    image: confluentinc/cp-kafka-connect:7.4.0
    depends_on:
      - source-kafka
      - target-kafka
    environment:
      CONNECT_BOOTSTRAP_SERVERS: target-kafka:29092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: mirror-maker-group
      CONNECT_CONFIG_STORAGE_TOPIC: mm-configs
      CONNECT_OFFSET_STORAGE_TOPIC: mm-offsets
      CONNECT_STATUS_STORAGE_TOPIC: mm-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_REST_ADVERTISED_HOST_NAME: mirror-maker
      CONNECT_PLUGIN_PATH: /usr/share/java
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
    volumes:
      - ./mm2-configs:/etc/kafka-connect
    command:
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        # MirrorMaker 2.0 已内置
        echo "Starting Kafka Connect"
        /etc/confluent/docker/run &
        sleep 30
        echo "Creating MirrorMaker 2.0 Connector"
        curl -X POST http://mirror-maker:8083/connectors \
          -H "Content-Type: application/json" \
          -d @/etc/kafka-connect/mirror-maker.json
        sleep infinity

2.2 Connector 配置

{
  "name": "source-to-target-mirror",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "tasks.max": 3,
    
    // Source Cluster
    "source.cluster.alias": "source",
    "source.cluster.bootstrap.servers": "source-kafka:29092",
    
    // Target Cluster
    "target.cluster.alias": "target",
    "target.cluster.bootstrap.servers": "target-kafka:29092",
    
    // Topics
    "topics": ".*",
    "topics.exclude": ".*-topic-to-exclude",
    
    // Replication
    "replication.factor": 2,
    "replication.policy.class": "org.apache.kafka.connect.mirror.DefaultReplicationPolicy",
    "replication.policy.topic.name.separator": ".",
    
    // Offsets
    "sync.group.offsets.enabled": true,
    "offset-syncs.topic.replication.factor": 2,
    
    // Heartbeat
    "heartbeat.topic.enabled": true,
    "heartbeat.topic.interval.millis": 10000,
    
    // Performance
    "consumer.fetch.max.bytes": 52428800,
    "producer.batch.size": 16384,
    "producer.linger.ms": 100
  }
}

三、复制模式

3.1 单向复制

graph LR
    subgraph Cluster A
        A1[Topic A]
    end
    
    subgraph MirrorMaker
        MM[Mirror Connector]
    end
    
    subgraph Cluster B
        B1[Topic A (Mirror)]
    end
    
    A1 --> MM
    MM --> B1

配置

{
  "name": "a-to-b-mirror",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "A",
    "source.cluster.bootstrap.servers": "cluster-a:9092",
    "target.cluster.alias": "B",
    "target.cluster.bootstrap.servers": "cluster-b:9092",
    "topics": "important-topic",
    "replication.factor": 3
  }
}

3.2 双向复制

graph TB
    subgraph Cluster A
        A1[Topic A]
    end
    
    subgraph Cluster B
        B1[Topic B]
    end
    
    subgraph MirrorMaker
        MM1[A->B Connector]
        MM2[B->A Connector]
    end
    
    A1 --> MM1
    MM1 --> B1
    B1 --> MM2
    MM2 --> A1

配置

[
  {
    "name": "a-to-b-mirror",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "source.cluster.alias": "A",
      "source.cluster.bootstrap.servers": "cluster-a:9092",
      "target.cluster.alias": "B",
      "target.cluster.bootstrap.servers": "cluster-b:9092",
      "topics": "important-topic"
    }
  },
  {
    "name": "b-to-a-mirror",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "source.cluster.alias": "B",
      "source.cluster.bootstrap.servers": "cluster-b:9092",
      "target.cluster.alias": "A",
      "target.cluster.bootstrap.servers": "cluster-a:9092",
      "topics": "important-topic"
    }
  }
]

3.3 聚合复制

graph TB
    subgraph Cluster A
        A1[Topic A]
    end
    
    subgraph Cluster B
        B1[Topic B]
    end
    
    subgraph Cluster C
        C1[Aggregated Topic]
    end
    
    subgraph MirrorMaker
        MM1[A->C Connector]
        MM2[B->C Connector]
    end
    
    A1 --> MM1
    MM1 --> C1
    B1 --> MM2
    MM2 --> C1

配置

[
  {
    "name": "a-to-c-mirror",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "source.cluster.alias": "A",
      "source.cluster.bootstrap.servers": "cluster-a:9092",
      "target.cluster.alias": "C",
      "target.cluster.bootstrap.servers": "cluster-c:9092",
      "topics": "events-topic",
      "replication.policy.topic.name.separator": ".a."
    }
  },
  {
    "name": "b-to-c-mirror",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "source.cluster.alias": "B",
      "source.cluster.bootstrap.servers": "cluster-b:9092",
      "target.cluster.alias": "C",
      "target.cluster.bootstrap.servers": "cluster-c:9092",
      "topics": "events-topic",
      "replication.policy.topic.name.separator": ".b."
    }
  }
]

四、实战应用

4.1 灾备架构

graph TB
    subgraph 主数据中心
        A1[Kafka Cluster A]
        APP1[应用程序]
    end
    
    subgraph 灾备中心
        B1[Kafka Cluster B]
        APP2[备用应用]
    end
    
    subgraph MirrorMaker
        MM[A->B Mirror]
    end
    
    APP1 --> A1
    A1 --> MM
    MM --> B1
    B1 -.->|故障切换 | APP2

配置

{
  "name": "primary-to-dr-mirror",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "primary",
    "source.cluster.bootstrap.servers": "primary-dc:9092",
    "target.cluster.alias": "dr",
    "target.cluster.bootstrap.servers": "dr-dc:9092",
    "topics": ".*",
    "replication.factor": 3,
    "sync.group.offsets.enabled": true,
    "heartbeat.topic.enabled": true
  }
}

故障切换脚本

#!/bin/bash
# 灾备切换脚本

PRIMARY="primary-dc:9092"
DR="dr-dc:9092"
CONSUMER_GROUP="my-consumer-group"

echo "=== 灾备切换 ==="

# 1. 检查灾备集群状态
echo "检查灾备集群..."
kafka-consumer-groups.sh --bootstrap-server $DR --describe --group $CONSUMER_GROUP

# 2. 获取灾备 Offset
DR_OFFSET=$(kafka-consumer-groups.sh --bootstrap-server $DR \
  --describe --group $CONSUMER_GROUP | awk 'NR>3 {print $5}')

echo "灾备 Offset: $DR_OFFSET"

# 3. 更新应用配置
echo "更新应用配置..."
# 更新应用程序的 bootstrap.servers 配置

# 4. 重启应用
echo "重启应用..."
# systemctl restart my-application

echo "切换完成"

4.2 多活架构

graph TB
    subgraph 数据中心 A
        A1[Kafka A]
        APP1[应用 A]
    end
    
    subgraph 数据中心 B
        B1[Kafka B]
        APP2[应用 B]
    end
    
    subgraph MirrorMaker
        MM1[A->B Mirror]
        MM2[B->A Mirror]
    end
    
    APP1 --> A1
    APP2 --> B1
    A1 --> MM1
    MM1 --> B1
    B1 --> MM2
    MM2 --> A1

配置

[
  {
    "name": "a-to-b-mirror",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "source.cluster.alias": "A",
      "source.cluster.bootstrap.servers": "dc-a:9092",
      "target.cluster.alias": "B",
      "target.cluster.bootstrap.servers": "dc-b:9092",
      "topics": "shared-topic",
      "replication.policy.topic.name.separator": ".a."
    }
  },
  {
    "name": "b-to-a-mirror",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "source.cluster.alias": "B",
      "source.cluster.bootstrap.servers": "dc-b:9092",
      "target.cluster.alias": "A",
      "target.cluster.bootstrap.servers": "dc-a:9092",
      "topics": "shared-topic",
      "replication.policy.topic.name.separator": ".b."
    }
  }
]

4.3 数据迁移

#!/bin/bash
# 集群迁移脚本

OLD_CLUSTER="old-cluster:9092"
NEW_CLUSTER="new-cluster:9092"
TOPICS="topic-1,topic-2,topic-3"

echo "=== Kafka 集群迁移 ==="

# 1. 创建 MirrorMaker 配置
cat > mm2-migration.json << EOF
{
  "name": "migration-mirror",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "old",
    "source.cluster.bootstrap.servers": "$OLD_CLUSTER",
    "target.cluster.alias": "new",
    "target.cluster.bootstrap.servers": "$NEW_CLUSTER",
    "topics": "$TOPICS",
    "replication.factor": 3,
    "sync.group.offsets.enabled": true
  }
}
EOF

# 2. 启动 MirrorMaker
echo "启动 MirrorMaker..."
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @mm2-migration.json

# 3. 监控同步进度
echo "监控同步进度..."
while true; do
  lag=$(curl -s http://localhost:8083/connectors/migration-mirror/status | \
    jq '.tasks[].state' | grep -c "RUNNING")
  echo "运行中任务数:$lag"
  sleep 10
done

# 4. 切换流量
echo "切换流量..."
# 更新生产者配置到新集群

# 5. 停止 MirrorMaker
echo "停止 MirrorMaker..."
curl -X DELETE http://localhost:8083/connectors/migration-mirror

echo "迁移完成"

五、监控运维

5.1 监控指标

指标说明
connect-mirror:replication-lag复制延迟
connect-mirror:messages-replicated已复制消息数
connect-mirror:bytes-replicated已复制字节数
connect-mirror:heartbeat-lag心跳延迟

5.2 REST API

# 查看 Connector 状态
curl http://localhost:8083/connectors/source-to-target-mirror/status

# 查看 Connector 配置
curl http://localhost:8083/connectors/source-to-target-mirror/config

# 重启 Connector
curl -X POST http://localhost:8083/connectors/source-to-target-mirror/restart

# 暂停 Connector
curl -X PUT http://localhost:8083/connectors/source-to-target-mirror/pause

# 恢复 Connector
curl -X PUT http://localhost:8083/connectors/source-to-target-mirror/resume

5.3 告警配置

# Prometheus 告警规则
groups:
  - name: kafka-mirror-maker
    rules:
      - alert: MirrorMakerReplicationLag
        expr: kafka_connect_mirror_replication_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "复制延迟过高:{{ $value }}"
      
      - alert: MirrorMakerConnectorDown
        expr: kafka_connect_connector_status != 1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Connector 宕机:{{ $labels.connector }}"
      
      - alert: MirrorMakerHeartbeatLag
        expr: kafka_connect_mirror_heartbeat_lag > 60
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "心跳延迟:{{ $value }}秒"

六、性能优化

6.1 并发配置

{
  "tasks.max": 6,
  "consumer.fetch.max.bytes": 52428800,
  "producer.batch.size": 32768,
  "producer.linger.ms": 100,
  "producer.compression.type": "lz4"
}

6.2 网络优化

{
  "consumer.max.poll.records": 1000,
  "consumer.max.poll.interval.ms": 300000,
  "producer.buffer.memory": 33554432,
  "producer.request.timeout.ms": 30000
}

6.3 故障恢复

{
  "connector.client.config.override.policy": "All",
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "mm2-dlq",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.retry.timeout": 60000,
  "errors.retry.backoff.ms": 1000
}

七、最佳实践

7.1 Topic 命名

# MirrorMaker 自动添加前缀
# 格式:{source-cluster}.{original-topic}

# 示例
# source.topic-a -> target.source.topic-a

# 自定义命名
"replication.policy.topic.name.separator": "."  # 默认
"replication.policy.topic.name.separator": "-"  # 自定义

7.2 偏移量同步

{
  "sync.group.offsets.enabled": true,
  "offset-syncs.topic.replication.factor": 3,
  "offset-syncs.topic.partitions": 10
}

7.3 监控检查清单

日常检查:
- [ ] 复制延迟 < 10000
- [ ] 心跳正常
- [ ] Connector 状态 RUNNING
- [ ] 无错误日志

定期检查:
- [ ] 数据一致性验证
- [ ] 性能基准测试
- [ ] 故障切换演练

总结

Kafka MirrorMaker 的核心要点:

  1. 架构模式:单向、双向、聚合复制
  2. 部署配置:Docker、Connector 配置
  3. 实战应用:灾备、多活、数据迁移
  4. 监控运维:指标、API、告警
  5. 性能优化:并发、网络、故障恢复

核心要点

参考资料


分享这篇文章到:

上一篇文章
Agent 规划与任务分解实战
下一篇文章
RocketMQ 高可用架构设计与实战