Kafka 容量规划是保障集群稳定运行的关键。本文将深入探讨容量评估、性能基准、集群扩容、性能优化等实战技巧。
一、容量评估
1.1 容量指标
| 指标 | 说明 | 计算公式 |
|---|---|---|
| TPS | 每秒消息数 | 消息总量 / 时间 |
| 带宽 | 网络带宽占用 | TPS × 消息大小 |
| 存储 | 磁盘存储需求 | TPS × 消息大小 × 保留时间 |
| 内存 | JVM 堆内存 | 连接数 × 缓存大小 |
1.2 容量评估模型
示例场景:
业务需求:
- 日均消息量:1 亿条
- 平均消息大小:1KB
- 峰值 TPS:5000
- 消息保留:7 天
容量计算:
- 日均带宽:1 亿 × 1KB / 86400 = 1.16MB/s = 9.3Mbps
- 峰值带宽:5000 × 1KB = 5MB/s = 40Mbps
- 存储需求:1 亿 × 1KB × 7 天 = 700GB
- 考虑副本(3 副本):700GB × 3 = 2.1TB
1.3 容量评估脚本
#!/bin/bash
# Kafka 容量评估脚本
AVG_MSG_SIZE=${1:-1024} # 平均消息大小(字节)
DAILY_MSG_COUNT=${2:-100000000} # 日均消息量
PEAK_TPS=${3:-5000} # 峰值 TPS
RETENTION_DAYS=${4:-7} # 保留天数
REPLICATION_FACTOR=${5:-3} # 副本数
echo "=== Kafka 容量评估 ==="
echo "平均消息大小:$AVG_MSG_SIZE 字节"
echo "日均消息量:$DAILY_MSG_COUNT 条"
echo "峰值 TPS: $PEAK_TPS"
echo "保留时间:$RETENTION_DAYS 天"
echo "副本数:$REPLICATION_FACTOR"
# 计算带宽
DAILY_BANDWIDTH=$((DAILY_MSG_COUNT * AVG_MSG_SIZE / 86400))
PEAK_BANDWIDTH=$((PEAK_TPS * AVG_MSG_SIZE))
echo -e "\n=== 带宽需求 ==="
echo "日均带宽:$DAILY_BANDWIDTH B/s = $(echo "scale=2; $DAILY_BANDWIDTH * 8 / 1000000" | bc) Mbps"
echo "峰值带宽:$PEAK_BANDWIDTH B/s = $(echo "scale=2; $PEAK_BANDWIDTH * 8 / 1000000" | bc) Mbps"
# 计算存储
STORAGE=$((DAILY_MSG_COUNT * AVG_MSG_SIZE * RETENTION_DAYS / 1024 / 1024 / 1024))
TOTAL_STORAGE=$((STORAGE * REPLICATION_FACTOR))
echo -e "\n=== 存储需求 ==="
echo "单副本存储:$STORAGE GB"
echo "总存储($REPLICATION_FACTOR 副本):$TOTAL_STORAGE GB"
# 推荐配置
echo -e "\n=== 推荐配置 ==="
if [ $PEAK_TPS -lt 1000 ]; then
echo "Broker 数量:3"
echo "每 Broker CPU: 4 核"
echo "每 Broker 内存:8GB"
elif [ $PEAK_TPS -lt 5000 ]; then
echo "Broker 数量:5"
echo "每 Broker CPU: 8 核"
echo "每 Broker 内存:16GB"
else
echo "Broker 数量:7+"
echo "每 Broker CPU: 16 核"
echo "每 Broker 内存:32GB"
fi
二、性能基准
2.1 基准测试
测试脚本:
#!/bin/bash
# Kafka 性能基准测试脚本
BOOTSTRAP="localhost:9092"
TOPIC="benchmark-topic"
DURATION=60 # 测试时长(秒)
echo "=== Kafka 性能基准测试 ==="
# 1. 创建测试 Topic
kafka-topics.sh --bootstrap-server $BOOTSTRAP \
--create --topic $TOPIC \
--partitions 8 --replication-factor 3
# 2. Producer 测试
echo -e "\n=== Producer 测试 ==="
kafka-producer-perf-test.sh \
--topic $TOPIC \
--num-records 1000000 \
--record-size 1024 \
--throughput 10000 \
--producer-props bootstrap.servers=$BOOTSTRAP
# 3. Consumer 测试
echo -e "\n=== Consumer 测试 ==="
kafka-consumer-perf-test.sh \
--topic $TOPIC \
--messages 1000000 \
--bootstrap-server $BOOTSTRAP \
--group benchmark-consumer
# 4. 清理
kafka-topics.sh --bootstrap-server $BOOTSTRAP --delete --topic $TOPIC
echo -e "\n=== 测试完成 ==="
2.2 性能指标
典型性能数据:
单 Broker 性能(8 核 16GB):
- 生产 TPS:5 万 -8 万
- 消费 TPS:8 万 -12 万
- 平均延迟:< 5ms
- P99 延迟:< 30ms
集群性能(5 Broker):
- 生产 TPS:25 万 -40 万
- 消费 TPS:40 万 -60 万
2.3 性能监控
#!/bin/bash
# Kafka 性能监控脚本
echo "=== Kafka 性能监控 ==="
# 1. Broker TPS
echo -e "\n=== Broker TPS ==="
for broker in kafka-1 kafka-2 kafka-3 kafka-4 kafka-5; do
# 通过 JMX 获取 TPS
echo "$broker: 需要通过 JMX 获取"
done
# 2. 延迟统计
echo -e "\n=== 延迟统计 ==="
# 通过 JMX 获取延迟指标
# 3. 堆积情况
echo -e "\n=== 消费堆积 ==="
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe | awk 'NR>3 {sum+=$6} END {print "总堆积量:" sum}'
三、集群扩容
3.1 添加 Broker
#!/bin/bash
# 添加 Broker 脚本
NEW_BROKER="kafka-6"
CLUSTER="DefaultCluster"
echo "=== 添加 Broker ==="
# 1. 准备新服务器
echo "准备新服务器..."
ssh $NEW_BROKER "yum install -y java-11-openjdk"
ssh $NEW_BROKER "wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz"
ssh $NEW_BROKER "tar -xzf kafka_2.13-3.4.0.tgz -C /opt/"
ssh $NEW_BROKER "mv /opt/kafka_2.13-3.4.0 /opt/kafka"
# 2. 配置 Broker
echo "配置 Broker..."
cat > /tmp/server.properties << EOF
broker.id=6
listeners=PLAINTEXT://:9092
log.dirs=/data/kafka-logs
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
num.partitions=8
default.replication.factor=3
EOF
scp /tmp/server.properties $NEW_BROKER:/opt/kafka/config/
# 3. 启动 Broker
echo "启动 Broker..."
ssh $NEW_BROKER "systemctl start kafka"
# 4. 验证
echo "验证 Broker..."
ssh $NEW_BROKER "kafka-broker-api-versions.sh --bootstrap-server localhost:9092"
echo "=== Broker 添加完成 ==="
3.2 分区重分配
生成重分配计划:
# 生成重分配计划
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--topics-to-move-json-file topics-to-move.json \
--broker-list "0,1,2,3,4,5" \
--generate > reassign-plan.json
# topics-to-move.json
{
"topics": [
{"topic": "order-topic"},
{"topic": "pay-topic"}
]
}
执行重分配:
#!/bin/bash
# 执行分区重分配
echo "=== 执行分区重分配 ==="
# 1. 执行重分配
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassign-plan.json \
--execute
# 2. 监控进度
while true; do
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassign-plan.json \
--verify
if [ $? -eq 0 ]; then
echo "重分配完成"
break
fi
echo "等待重分配..."
sleep 60
done
echo "=== 重分配完成 ==="
3.3 扩容验证
#!/bin/bash
# 扩容验证脚本
echo "=== 扩容验证 ==="
# 1. 检查 Broker 数量
broker_count=$(kafka-broker-api-versions.sh --bootstrap-server localhost:9092 2>/dev/null | wc -l)
echo "Broker 数量:$broker_count"
# 2. 检查分区分布
echo -e "\n分区分布:"
kafka-topics.sh --bootstrap-server localhost:9092 --describe | \
grep -o "Replicas: [0-9,]*" | sort | uniq -c
# 3. 检查数据平衡
echo -e "\n数据平衡:"
for broker in kafka-1 kafka-2 kafka-3 kafka-4 kafka-5 kafka-6; do
size=$(ssh $broker "du -sh /data/kafka-logs 2>/dev/null | awk '{print \$1}'")
echo "$broker: $size"
done
echo "=== 验证完成 ==="
四、性能优化
4.1 Broker 优化
JVM 优化:
# kafka-server-start.sh
# 堆内存
export KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"
# G1 GC
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
# GC 日志
export KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -Xlog:gc*:file=/var/log/kafka/gc.log:time,level,tags"
存储优化:
# server.properties
# 异步刷盘
log.flush.interval.messages=10000
log.flush.interval.ms=1000
# 日志配置
log.segment.bytes=1073741824 # 1GB
log.retention.hours=168 # 7 天
log.retention.bytes=-1
# 线程配置
num.network.threads=8
num.io.threads=16
num.replica.fetchers=2
4.2 网络优化
# server.properties
# Socket 配置
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 监听器
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-1:9092
4.3 客户端优化
Producer 优化:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");
// 批量配置
props.put("batch.size", 65536); // 64KB
props.put("linger.ms", 10); // 10ms
// 压缩
props.put("compression.type", "lz4");
// 可靠性
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Consumer 优化:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put("group.id", "consumer-group");
// 拉取配置
props.put("fetch.min.bytes", 1024);
props.put("fetch.max.wait.ms", 100);
props.put("max.poll.records", 500);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
五、最佳实践
5.1 容量规划建议
容量规划:
1. 预留 30% 容量余量
2. 考虑业务增长因素
3. 定期评估容量使用
4. 建立容量预警机制
5.2 性能优化建议
性能优化:
1. 使用 SSD 磁盘
2. 配置异步刷盘
3. 优化 JVM 参数
4. 合理设置分区数
5. 启用压缩
6. 批量发送/消费
5.3 检查清单
容量检查:
- [ ] 评估业务需求
- [ ] 计算存储需求
- [ ] 计算带宽需求
- [ ] 规划 Broker 数量
- [ ] 预留容量余量
性能检查:
- [ ] 基准测试
- [ ] JVM 优化
- [ ] 存储优化
- [ ] 网络优化
- [ ] 客户端优化
总结
Kafka 容量规划与性能优化的核心要点:
- 容量评估:容量指标、评估模型、评估脚本
- 性能基准:基准测试、性能指标、性能监控
- 集群扩容:添加 Broker、分区重分配、扩容验证
- 性能优化:Broker 优化、网络优化、客户端优化
- 最佳实践:容量规划、性能优化、检查清单
核心要点:
- 建立容量评估模型
- 定期性能基准测试
- 预留充足容量余量
- 持续性能优化
- 建立监控预警
参考资料
- Kafka Operations 官方文档
- Kafka Performance Tuning
- 《Kafka 权威指南》第 11 章