Kafka KRaft(Kafka Raft)是 Kafka 2.8 引入的重大架构变革,实现了去 ZooKeeper 化。本文将深入探讨 KRaft 的原理、配置和实战。
一、KRaft 架构
1.1 架构演进
ZooKeeper 模式:
graph TB
subgraph Kafka 集群
B1[Broker 1]
B2[Broker 2]
B3[Broker 3]
end
subgraph ZooKeeper 集群
ZK1[ZK 1]
ZK2[ZK 2]
ZK3[ZK 3]
end
B1 --> ZK1
B2 --> ZK2
B3 --> ZK3
KRaft 模式:
graph TB
subgraph Kafka 集群
C1[Controller 1]
C2[Controller 2]
C3[Controller 3]
B1[Broker 1]
B2[Broker 2]
B3[Broker 3]
end
C1 -.->|Raft | C2
C1 -.->|Raft | C3
C1 -->|管理 | B1
C1 -->|管理 | B2
C1 -->|管理 | B3
1.2 核心组件
| 组件 | 说明 |
|---|---|
| Controller | 集群管理者,负责元数据 |
| Broker | 数据存储和消息处理 |
| Raft Quorum | Controller 集群,保证一致性 |
1.3 角色对比
| 特性 | ZooKeeper 模式 | KRaft 模式 |
|---|---|---|
| 依赖 | 外部 ZooKeeper | 内置 Raft |
| 元数据 | ZK 存储 | Kafka 存储 |
| 一致性 | ZK 保证 | Raft 保证 |
| 扩展性 | 有限 | 优秀 |
| 运维 | 复杂 | 简单 |
二、KRaft 配置
2.1 Cluster ID 生成
# 生成 Cluster ID
KAFKA_CLUSTER_ID=$(kafka-storage.sh random-uuid)
echo $KAFKA_CLUSTER_ID
# 输出:XwFp2...(随机 UUID)
2.2 存储格式化
# 格式化存储目录
kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /etc/kafka/kraft/server.properties
2.3 Controller 配置
# server.properties
# 进程角色(Controller+Broker)
process.roles=broker,controller
# 节点 ID
node.id=1
# Controller Quorum 配置
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
# 监听器配置
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://kafka-1:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# Controller 配置
controller.listener.names=CONTROLLER
inter.broker.listener.names=PLAINTEXT
# 元数据存储
metadata.log.dir=/var/kafka/metadata
log.dirs=/var/kafka/data
2.4 混合模式配置
# Controller 节点(仅 Controller)
process.roles=controller
node.id=1
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
listeners=CONTROLLER://:9093
advertised.listeners=CONTROLLER://controller-1:9093
controller.listener.names=CONTROLLER
metadata.log.dir=/var/kafka/metadata
# Broker 节点(仅 Broker)
process.roles=broker
node.id=4
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-1:9092
inter.broker.listener.names=PLAINTEXT
log.dirs=/var/kafka/data
三、Raft 协议
3.1 Raft 基础
Raft 协议核心:
1. Leader 选举
2. 日志复制
3. 安全性保证
3.2 Leader 选举
sequenceDiagram
participant C1 as Controller 1
participant C2 as Controller 2
participant C3 as Controller 3
C1->>C1: 启动,成为 Follower
C2->>C2: 启动,成为 Follower
C3->>C3: 启动,成为 Follower
C1->>C2: Request Vote (Term 1)
C1->>C3: Request Vote (Term 1)
C2-->>C1: Vote Yes
C3-->>C1: Vote Yes
C1->>C1: 成为 Leader
C1->>C2: 心跳
C1->>C3: 心跳
3.3 日志复制
graph TB
subgraph Leader
L1[日志条目 1]
L2[日志条目 2]
L3[日志条目 3]
end
subgraph Follower 1
F1[日志条目 1]
F2[日志条目 2]
end
subgraph Follower 2
FF1[日志条目 1]
FF2[日志条目 2]
FF3[日志条目 3]
end
L1 --> F1
L1 --> FF1
L2 --> F2
L2 --> FF2
L3 --> FF3
四、元数据管理
4.1 元数据结构
Metadata Log
├── Topic Record
│ ├── Topic Name
│ ├── Partitions
│ └── Replication Factor
├── Partition Record
│ ├── Partition ID
│ ├── Leader
│ └── ISR
├── Broker Record
│ ├── Broker ID
│ ├── Endpoints
│ └── Rack
└── Config Record
├── Config Key
└── Config Value
4.2 元数据变更
// 元数据变更流程
public class MetadataManager {
/**
* 创建 Topic
*/
public void createTopic(CreateTopicRequest request) {
// 1. 构建元数据记录
TopicRecord record = new TopicRecord();
record.setName(request.name());
record.setPartitions(request.partitions());
record.setReplicationFactor(request.replicationFactor());
// 2. 写入 Raft 日志
raftClient.append(record);
// 3. 等待提交
raftClient.commit();
// 4. 通知 Broker
notifyBrokers(record);
}
}
五、部署实战
5.1 Docker 部署
# docker-compose.yml
version: '3'
services:
kafka-1:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_METADATA_LOG_DIR: /var/kafka/metadata
KAFKA_LOG_DIRS: /var/kafka/data
CLUSTER_ID: XwFp2...
ports:
- "9092:9092"
volumes:
- kafka-1-data:/var/kafka/data
kafka-2:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-2
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_METADATA_LOG_DIR: /var/kafka/metadata
KAFKA_LOG_DIRS: /var/kafka/data
CLUSTER_ID: XwFp2...
ports:
- "9093:9092"
volumes:
- kafka-2-data:/var/kafka/data
kafka-3:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-3
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_METADATA_LOG_DIR: /var/kafka/metadata
KAFKA_LOG_DIRS: /var/kafka/data
CLUSTER_ID: XwFp2...
ports:
- "9094:9092"
volumes:
- kafka-3-data:/var/kafka/data
volumes:
kafka-1-data:
kafka-2-data:
kafka-3-data:
5.2 Kubernetes 部署
# statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
initContainers:
- name: format-storage
image: confluentinc/cp-kafka:7.4.0
command:
- sh
- -c
- |
if [ ! -f /var/kafka/meta.properties ]; then
kafka-storage.sh format -t $CLUSTER_ID -c /etc/kafka/kraft/server.properties
fi
env:
- name: CLUSTER_ID
value: "XwFp2..."
volumeMounts:
- name: data
mountPath: /var/kafka
containers:
- name: kafka
image: confluentinc/cp-kafka:7.4.0
ports:
- containerPort: 9092
- containerPort: 9093
env:
- name: KAFKA_NODE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_PROCESS_ROLES
value: "broker,controller"
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: "1@kafka-0.kafka:9093,2@kafka-1.kafka:9093,3@kafka-2.kafka:9093"
volumeMounts:
- name: data
mountPath: /var/kafka
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
六、迁移方案
6.1 ZooKeeper → KRaft
迁移步骤:
#!/bin/bash
# ZooKeeper 迁移到 KRaft
# 1. 备份现有配置
cp /etc/kafka/server.properties /etc/kafka/server.properties.bak
# 2. 生成 Cluster ID
KAFKA_CLUSTER_ID=$(kafka-storage.sh random-uuid)
# 3. 停止 Kafka
systemctl stop kafka
# 4. 格式化存储
kafka-storage.sh format \
-t $KAFKA_CLUSTER_ID \
-c /etc/kafka/kraft/server.properties
# 5. 更新配置
# 修改 server.properties 为 KRaft 模式
# 6. 启动 Kafka
systemctl start kafka
# 7. 验证
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
6.2 迁移验证
#!/bin/bash
# 迁移验证脚本
echo "=== 验证 KRaft 模式 ==="
# 1. 检查进程角色
echo "进程角色:"
kafka-metadata.sh --snapshot /var/kafka/metadata/__cluster_metadata-0/00000000000000000000.log
# 2. 检查 Topic
echo -e "\nTopic 列表:"
kafka-topics.sh --bootstrap-server localhost:9092 --list
# 3. 检查 Consumer Group
echo -e "\nConsumer Group:"
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 4. 发送测试消息
echo -e "\n发送测试消息:"
echo "test" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
# 5. 消费测试消息
echo -e "\n消费测试消息:"
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --max-messages 1 --timeout-ms 5000
echo -e "\n=== 验证完成 ==="
七、最佳实践
7.1 配置建议
Controller 配置:
- Controller 数量:3 或 5(奇数)
- 独立 Controller 节点(大规模集群)
- 跨可用区部署
- 监控 Raft 状态
7.2 性能优化
# Raft 配置优化
controller.quorum.election.timeout.ms=1000
controller.quorum.fetch.timeout.ms=2000
controller.quorum.request.timeout.ms=3000
# 元数据配置
metadata.log.max.record.bytes.between.snapshots=10000
metadata.log.max.snapshot.interval.ms=3600000
7.3 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
kafka_raft_RaftMetrics_current-leader | 当前 Leader | - |
kafka_raft_RaftMetrics_vote-count | 投票数 | - |
kafka_metadata_MetadataLogLog_end-offset | 元数据日志偏移量 | - |
总结
Kafka KRaft 的核心要点:
- 架构演进:去 ZooKeeper、内置 Raft
- 配置方法:Cluster ID、存储格式化、监听器
- Raft 协议:Leader 选举、日志复制
- 元数据管理:元数据结构、变更流程
- 部署实战:Docker、Kubernetes
- 迁移方案:ZooKeeper → KRaft
核心要点:
- 理解 KRaft 架构优势
- 掌握 KRaft 配置方法
- 理解 Raft 协议原理
- 建立完善的监控体系
- 制定合理的迁移方案
参考资料
- Kafka KRaft 官方文档
- KIP-500: KRaft
- 《Kafka 权威指南》第 4 章