Skip to content
清晨的一缕阳光
返回

Kafka KRaft 深度解析与实战

Kafka KRaft(Kafka Raft)是 Kafka 2.8 引入的重大架构变革,实现了去 ZooKeeper 化。本文将深入探讨 KRaft 的原理、配置和实战。

一、KRaft 架构

1.1 架构演进

ZooKeeper 模式

graph TB
    subgraph Kafka 集群
        B1[Broker 1]
        B2[Broker 2]
        B3[Broker 3]
    end
    
    subgraph ZooKeeper 集群
        ZK1[ZK 1]
        ZK2[ZK 2]
        ZK3[ZK 3]
    end
    
    B1 --> ZK1
    B2 --> ZK2
    B3 --> ZK3

KRaft 模式

graph TB
    subgraph Kafka 集群
        C1[Controller 1]
        C2[Controller 2]
        C3[Controller 3]
        B1[Broker 1]
        B2[Broker 2]
        B3[Broker 3]
    end
    
    C1 -.->|Raft | C2
    C1 -.->|Raft | C3
    C1 -->|管理 | B1
    C1 -->|管理 | B2
    C1 -->|管理 | B3

1.2 核心组件

组件说明
Controller集群管理者,负责元数据
Broker数据存储和消息处理
Raft QuorumController 集群,保证一致性

1.3 角色对比

特性ZooKeeper 模式KRaft 模式
依赖外部 ZooKeeper内置 Raft
元数据ZK 存储Kafka 存储
一致性ZK 保证Raft 保证
扩展性有限优秀
运维复杂简单

二、KRaft 配置

2.1 Cluster ID 生成

# 生成 Cluster ID
KAFKA_CLUSTER_ID=$(kafka-storage.sh random-uuid)
echo $KAFKA_CLUSTER_ID
# 输出:XwFp2...(随机 UUID)

2.2 存储格式化

# 格式化存储目录
kafka-storage.sh format \
  -t $KAFKA_CLUSTER_ID \
  -c /etc/kafka/kraft/server.properties

2.3 Controller 配置

# server.properties

# 进程角色(Controller+Broker)
process.roles=broker,controller

# 节点 ID
node.id=1

# Controller Quorum 配置
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093

# 监听器配置
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://kafka-1:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT

# Controller 配置
controller.listener.names=CONTROLLER
inter.broker.listener.names=PLAINTEXT

# 元数据存储
metadata.log.dir=/var/kafka/metadata
log.dirs=/var/kafka/data

2.4 混合模式配置

# Controller 节点(仅 Controller)
process.roles=controller
node.id=1
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
listeners=CONTROLLER://:9093
advertised.listeners=CONTROLLER://controller-1:9093
controller.listener.names=CONTROLLER
metadata.log.dir=/var/kafka/metadata

# Broker 节点(仅 Broker)
process.roles=broker
node.id=4
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-1:9092
inter.broker.listener.names=PLAINTEXT
log.dirs=/var/kafka/data

三、Raft 协议

3.1 Raft 基础

Raft 协议核心:
1. Leader 选举
2. 日志复制
3. 安全性保证

3.2 Leader 选举

sequenceDiagram
    participant C1 as Controller 1
    participant C2 as Controller 2
    participant C3 as Controller 3
    
    C1->>C1: 启动,成为 Follower
    C2->>C2: 启动,成为 Follower
    C3->>C3: 启动,成为 Follower
    
    C1->>C2: Request Vote (Term 1)
    C1->>C3: Request Vote (Term 1)
    C2-->>C1: Vote Yes
    C3-->>C1: Vote Yes
    
    C1->>C1: 成为 Leader
    C1->>C2: 心跳
    C1->>C3: 心跳

3.3 日志复制

graph TB
    subgraph Leader
        L1[日志条目 1]
        L2[日志条目 2]
        L3[日志条目 3]
    end
    
    subgraph Follower 1
        F1[日志条目 1]
        F2[日志条目 2]
    end
    
    subgraph Follower 2
        FF1[日志条目 1]
        FF2[日志条目 2]
        FF3[日志条目 3]
    end
    
    L1 --> F1
    L1 --> FF1
    L2 --> F2
    L2 --> FF2
    L3 --> FF3

四、元数据管理

4.1 元数据结构

Metadata Log
├── Topic Record
│   ├── Topic Name
│   ├── Partitions
│   └── Replication Factor
├── Partition Record
│   ├── Partition ID
│   ├── Leader
│   └── ISR
├── Broker Record
│   ├── Broker ID
│   ├── Endpoints
│   └── Rack
└── Config Record
    ├── Config Key
    └── Config Value

4.2 元数据变更

// 元数据变更流程
public class MetadataManager {
    
    /**
     * 创建 Topic
     */
    public void createTopic(CreateTopicRequest request) {
        // 1. 构建元数据记录
        TopicRecord record = new TopicRecord();
        record.setName(request.name());
        record.setPartitions(request.partitions());
        record.setReplicationFactor(request.replicationFactor());
        
        // 2. 写入 Raft 日志
        raftClient.append(record);
        
        // 3. 等待提交
        raftClient.commit();
        
        // 4. 通知 Broker
        notifyBrokers(record);
    }
}

五、部署实战

5.1 Docker 部署

# docker-compose.yml
version: '3'
services:
  kafka-1:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-1
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_METADATA_LOG_DIR: /var/kafka/metadata
      KAFKA_LOG_DIRS: /var/kafka/data
      CLUSTER_ID: XwFp2...
    ports:
      - "9092:9092"
    volumes:
      - kafka-1-data:/var/kafka/data

  kafka-2:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-2
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_METADATA_LOG_DIR: /var/kafka/metadata
      KAFKA_LOG_DIRS: /var/kafka/data
      CLUSTER_ID: XwFp2...
    ports:
      - "9093:9092"
    volumes:
      - kafka-2-data:/var/kafka/data

  kafka-3:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-3
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_METADATA_LOG_DIR: /var/kafka/metadata
      KAFKA_LOG_DIRS: /var/kafka/data
      CLUSTER_ID: XwFp2...
    ports:
      - "9094:9092"
    volumes:
      - kafka-3-data:/var/kafka/data

volumes:
  kafka-1-data:
  kafka-2-data:
  kafka-3-data:

5.2 Kubernetes 部署

# statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      initContainers:
      - name: format-storage
        image: confluentinc/cp-kafka:7.4.0
        command:
          - sh
          - -c
          - |
            if [ ! -f /var/kafka/meta.properties ]; then
              kafka-storage.sh format -t $CLUSTER_ID -c /etc/kafka/kraft/server.properties
            fi
        env:
        - name: CLUSTER_ID
          value: "XwFp2..."
        volumeMounts:
        - name: data
          mountPath: /var/kafka
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:7.4.0
        ports:
        - containerPort: 9092
        - containerPort: 9093
        env:
        - name: KAFKA_NODE_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: KAFKA_PROCESS_ROLES
          value: "broker,controller"
        - name: KAFKA_CONTROLLER_QUORUM_VOTERS
          value: "1@kafka-0.kafka:9093,2@kafka-1.kafka:9093,3@kafka-2.kafka:9093"
        volumeMounts:
        - name: data
          mountPath: /var/kafka
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 100Gi

六、迁移方案

6.1 ZooKeeper → KRaft

迁移步骤

#!/bin/bash
# ZooKeeper 迁移到 KRaft

# 1. 备份现有配置
cp /etc/kafka/server.properties /etc/kafka/server.properties.bak

# 2. 生成 Cluster ID
KAFKA_CLUSTER_ID=$(kafka-storage.sh random-uuid)

# 3. 停止 Kafka
systemctl stop kafka

# 4. 格式化存储
kafka-storage.sh format \
  -t $KAFKA_CLUSTER_ID \
  -c /etc/kafka/kraft/server.properties

# 5. 更新配置
# 修改 server.properties 为 KRaft 模式

# 6. 启动 Kafka
systemctl start kafka

# 7. 验证
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

6.2 迁移验证

#!/bin/bash
# 迁移验证脚本

echo "=== 验证 KRaft 模式 ==="

# 1. 检查进程角色
echo "进程角色:"
kafka-metadata.sh --snapshot /var/kafka/metadata/__cluster_metadata-0/00000000000000000000.log

# 2. 检查 Topic
echo -e "\nTopic 列表:"
kafka-topics.sh --bootstrap-server localhost:9092 --list

# 3. 检查 Consumer Group
echo -e "\nConsumer Group:"
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 4. 发送测试消息
echo -e "\n发送测试消息:"
echo "test" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic

# 5. 消费测试消息
echo -e "\n消费测试消息:"
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --max-messages 1 --timeout-ms 5000

echo -e "\n=== 验证完成 ==="

七、最佳实践

7.1 配置建议

Controller 配置:
- Controller 数量:3 或 5(奇数)
- 独立 Controller 节点(大规模集群)
- 跨可用区部署
- 监控 Raft 状态

7.2 性能优化

# Raft 配置优化
controller.quorum.election.timeout.ms=1000
controller.quorum.fetch.timeout.ms=2000
controller.quorum.request.timeout.ms=3000

# 元数据配置
metadata.log.max.record.bytes.between.snapshots=10000
metadata.log.max.snapshot.interval.ms=3600000

7.3 监控指标

指标说明告警阈值
kafka_raft_RaftMetrics_current-leader当前 Leader-
kafka_raft_RaftMetrics_vote-count投票数-
kafka_metadata_MetadataLogLog_end-offset元数据日志偏移量-

总结

Kafka KRaft 的核心要点:

  1. 架构演进:去 ZooKeeper、内置 Raft
  2. 配置方法:Cluster ID、存储格式化、监听器
  3. Raft 协议:Leader 选举、日志复制
  4. 元数据管理:元数据结构、变更流程
  5. 部署实战:Docker、Kubernetes
  6. 迁移方案:ZooKeeper → KRaft

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 架构设计与核心原理
下一篇文章
Redis 多活架构设计与实战