Skip to content
清晨的一缕阳光
返回

Kafka 高可用架构设计与实战

Kafka 的高可用架构是其作为企业级消息队列的核心竞争力。本文将深入探讨 Kafka 的副本机制、Leader 选举、多机房部署等高可用方案。

一、高可用架构概览

1.1 整体架构

graph TB
    subgraph 数据中心 1
        subgraph Kafka 集群 1
            B1[Broker 1<br/>Leader]
            B2[Broker 2<br/>Follower]
            B3[Broker 3<br/>Follower]
        end
        ZK1[ZooKeeper 集群]
    end
    
    subgraph 数据中心 2
        subgraph Kafka 集群 2
            B4[Broker 4<br/>Follower]
            B5[Broker 5<br/>Follower]
        end
        ZK2[ZooKeeper 集群]
    end
    
    B1 -.->|副本复制 | B4
    B2 -.->|副本复制 | B5

1.2 高可用特性

特性说明实现方式
副本冗余数据多副本存储Replication Factor
故障转移Broker 故障自动切换Leader Election
数据同步副本间数据一致ISR 机制
多机房跨机房容灾Rack Awareness

二、副本机制

2.1 副本类型

graph TB
    subgraph Partition
        L[Leader Replica]
        F1[Follower Replica 1]
        F2[Follower Replica 2]
    end
    
    L -->|读写 | P[Producer/Consumer]
    F1 -->|同步 | L
    F2 -->|同步 | L
    
    style L fill:#9f9,stroke:#333
    style F1 fill:#ff9,stroke:#333
    style F2 fill:#ff9,stroke:#333
副本类型说明读写权限
Leader主副本可读写
Follower从副本只读(同步用)

2.2 ISR(In-Sync Replicas)

sequenceDiagram
    participant L as Leader
    participant F1 as Follower 1
    participant F2 as Follower 2
    participant P as Producer
    
    P->>L: 写入消息
    L->>F1: 复制消息
    L->>F2: 复制消息
    F1-->>L: ACK
    F2-->>L: ACK
    L-->>P: 返回成功
    
    note over L,F2: 所有 ISR 完成复制

ISR 动态调整

ISR = {Leader} ∪ {Follower | lag < max.lag.time}

2.3 副本配置

# Broker 配置
default.replication.factor=3  # 默认副本数
min.insync.replicas=2  # 最小 ISR 数量
unclean.leader.election.enable=false  # 禁止非 ISR 选举
replica.lag.time.max.ms=30000  # 副本最大延迟

2.4 HW(High Watermark)

graph LR
    subgraph Leader
        L1[消息 1 ✓]
        L2[消息 2 ✓]
        L3[消息 3 ✓]
        L4[消息 4 ⏳]
    end
    
    subgraph Follower 1
        F1[消息 1 ✓]
        F2[消息 2 ✓]
        F3[消息 3 ✓]
        F4[消息 4 ⏳]
    end
    
    subgraph Follower 2
        FF1[消息 1 ✓]
        FF2[消息 2 ✓]
        FF3[消息 3 ⏳]
    end
    
    HW[HW=3<br/>消费者可见]
    
    L3 -.-> HW

HW 作用

三、Leader 选举

3.1 选举触发条件

条件说明示例
Leader 故障Broker 宕机网络分区、硬件故障
Leader 离线Broker 主动关闭维护、升级
ISR 变化当前 Leader 离开 ISR同步延迟

3.2 选举流程

sequenceDiagram
    participant ZK as ZooKeeper
    participant C as Controller
    participant B1 as Broker 1
    participant B2 as Broker 2
    
    B1->>ZK: 心跳停止
    ZK->>C: 通知 Broker 下线
    C->>C: 选择新 Leader
    C->>B2: 通知成为 Leader
    B2->>C: 确认
    C->>ZK: 更新元数据

3.3 选举策略

优先选择 ISR 内副本

// Controller 选举逻辑
public void electLeader(TopicPartition tp) {
    List<Replica> isr = partition.getISR();
    
    if (isr.isEmpty()) {
        if (uncleanLeaderElectionEnabled) {
            // 从非 ISR 选举(可能丢失数据)
            electUncleanLeader(tp);
        } else {
            // 等待 ISR 恢复
            log.warn("No ISR available, waiting...");
        }
    } else {
        // 选择 ISR 中第一个副本作为 Leader
        Replica newLeader = isr.get(0);
        partition.setLeader(newLeader);
    }
}

3.4 选举配置

# 禁止非 ISR 选举(推荐)
unclean.leader.election.enable=false

# Controller 配置
controller.socket.timeout.ms=30000
controller.message.max.bytes=10485760

四、多机房部署

4.1 Rack Awareness

graph TB
    subgraph Rack A
        B1[Broker 1<br/>Leader]
        B2[Broker 2<br/>Follower]
    end
    
    subgraph Rack B
        B3[Broker 3<br/>Follower]
        B4[Broker 4<br/>Follower]
    end
    
    subgraph Rack C
        B5[Broker 5<br/>Follower]
        B6[Broker 6<br/>Follower]
    end
    
    B1 -.-> B3
    B1 -.-> B5

4.2 配置方式

# Broker 配置
broker.rack=rack-1  # 指定机架

# Topic 配置
# 创建 Topic 时指定副本分配
kafka-topics.sh --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 3 \
  --config min.insync.replicas=2

4.3 副本分配策略

// Rack 感知副本分配
public class RackAwareReplicaAssigner {
    public Map<Integer, List<Integer>> assignReplicas(
        int numBrokers, 
        List<String> racks) {
        
        Map<Integer, List<Integer>> assignment = new HashMap<>();
        
        for (int partition = 0; partition < numPartitions; partition++) {
            List<Integer> replicas = new ArrayList<>();
            
            // 选择不同 Rack 的 Broker
            Set<String> selectedRacks = new HashSet<>();
            for (int broker : brokers) {
                String rack = racks.get(broker);
                if (!selectedRacks.contains(rack)) {
                    replicas.add(broker);
                    selectedRacks.add(rack);
                }
            }
            
            assignment.put(partition, replicas);
        }
        
        return assignment;
    }
}

五、多活架构

5.1 MirrorMaker 2.0

graph TB
    subgraph 数据中心 1
        K1[Kafka 集群 1]
        MM1[MirrorMaker 2]
    end
    
    subgraph 数据中心 2
        K2[Kafka 集群 2]
        MM2[MirrorMaker 2]
    end
    
    K1 -->|复制 | MM1
    MM1 -->|同步 | K2
    K2 -->|复制 | MM2
    MM2 -->|同步 | K1

5.2 配置示例

# clusters.properties
clusters = dc1, dc2
dc1.bootstrap.servers = dc1-kafka:9092
dc2.bootstrap.servers = dc2-kafka:9092

# connector 配置
connectors = dc1-to-dc2, dc2-to-dc1
dc1-to-dc2.source.cluster.alias = dc1
dc1-to-dc2.target.cluster.alias = dc2
dc1-to-dc2.topics = .*
dc2-to-dc1.source.cluster.alias = dc2
dc2-to-dc1.target.cluster.alias = dc1
dc2-to-dc1.topics = .*

5.3 故障切换

sequenceDiagram
    participant App as 应用程序
    participant DNS as DNS
    participant DC1 as 数据中心 1
    participant DC2 as 数据中心 2
    
    App->>DNS: 请求 Kafka 地址
    DNS-->>App: dc1.kafka.com
    
    DC1 --> DC1: 故障
    App->>DC1: 连接失败
    App->>DNS: 重新解析
    DNS-->>App: dc2.kafka.com
    App->>DC2: 连接成功

六、监控与告警

6.1 关键指标

指标说明告警阈值
UnderReplicatedPartitions未同步副本数> 0
OfflinePartitionsCount离线分区数> 0
ActiveControllerCountController 数量!= 1
IsrShrinkRateISR 收缩速率> 1/分钟
IsrExpandRateISR 扩展速率> 1/分钟

6.2 监控配置

# Prometheus 配置
scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka-1:9090', 'kafka-2:9090']
    metrics_path: '/metrics'

# 告警规则
groups:
  - name: kafka
    rules:
      - alert: UnderReplicatedPartitions
        expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 有未同步副本"

6.3 健康检查

#!/bin/bash
# Kafka 集群健康检查脚本

# 检查 Controller
controller=$(kafka-metadata.sh --snapshot /tmp/kafka-logs/__cluster_metadata-0/00000000000000000000.log | grep Leader)
if [ -z "$controller" ]; then
    echo "CRITICAL: No Controller"
    exit 2
fi

# 检查未同步副本
under_replicated=$(kafka-topics.sh --describe | grep -c "Isr:")
if [ "$under_replicated" -gt 0 ]; then
    echo "WARNING: $under_replicated 未同步副本"
    exit 1
fi

echo "OK: 集群健康"
exit 0

七、容灾演练

7.1 演练场景

场景预期结果恢复时间
Broker 宕机Leader 切换,数据不丢失< 30 秒
机房故障流量切换到备用机房< 1 分钟
ZooKeeper 故障Controller 重新选举< 1 分钟

7.2 演练步骤

# 1. 模拟 Broker 宕机
kill -9 $(ps -ef | grep kafka | grep -v grep | awk '{print $2}')

# 2. 观察 Leader 选举
kafka-topics.sh --describe --topic my-topic

# 3. 验证数据完整性
kafka-console-consumer.sh --topic my-topic --from-beginning

# 4. 恢复 Broker
./kafka-server-start.sh config/server.properties

八、最佳实践

8.1 副本配置建议

场景副本数min.insync.replicasunclean.leader.election
开发环境11true
测试环境21false
生产环境32false
金融场景53false

8.2 部署建议

# 推荐部署架构
- 3 个 Broker(跨 3 个机架)
- 3 个 ZooKeeper(跨 3 个机架)
- 副本因子 = 3
- min.insync.replicas = 2

8.3 运维建议

  1. 定期演练:每季度进行一次故障演练
  2. 监控告警:配置关键指标告警
  3. 备份策略:定期备份元数据
  4. 升级方案:滚动升级,避免服务中断

总结

Kafka 高可用架构的核心机制:

  1. 副本机制:Leader/Follower、ISR、HW
  2. Leader 选举:Controller 负责、ISR 优先
  3. 多机房部署:Rack Awareness、MirrorMaker
  4. 监控告警:关键指标、健康检查
  5. 容灾演练:定期演练、快速恢复

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 日志存储结构详解
下一篇文章
RocketMQ 消息过滤详解与实战