Skip to content
清晨的一缕阳光
返回

Kafka 容量规划与性能优化实战

Kafka 容量规划是保障集群稳定运行的关键。本文将深入探讨容量评估、性能基准、集群扩容、性能优化等实战技巧。

一、容量评估

1.1 容量指标

指标说明计算公式
TPS每秒消息数消息总量 / 时间
带宽网络带宽占用TPS × 消息大小
存储磁盘存储需求TPS × 消息大小 × 保留时间
内存JVM 堆内存连接数 × 缓存大小

1.2 容量评估模型

示例场景

业务需求:
- 日均消息量:1 亿条
- 平均消息大小:1KB
- 峰值 TPS:5000
- 消息保留:7 天

容量计算:
- 日均带宽:1 亿 × 1KB / 86400 = 1.16MB/s = 9.3Mbps
- 峰值带宽:5000 × 1KB = 5MB/s = 40Mbps
- 存储需求:1 亿 × 1KB × 7 天 = 700GB
- 考虑副本(3 副本):700GB × 3 = 2.1TB

1.3 容量评估脚本

#!/bin/bash
# Kafka 容量评估脚本

AVG_MSG_SIZE=${1:-1024}  # 平均消息大小(字节)
DAILY_MSG_COUNT=${2:-100000000}  # 日均消息量
PEAK_TPS=${3:-5000}  # 峰值 TPS
RETENTION_DAYS=${4:-7}  # 保留天数
REPLICATION_FACTOR=${5:-3}  # 副本数

echo "=== Kafka 容量评估 ==="
echo "平均消息大小:$AVG_MSG_SIZE 字节"
echo "日均消息量:$DAILY_MSG_COUNT"
echo "峰值 TPS: $PEAK_TPS"
echo "保留时间:$RETENTION_DAYS"
echo "副本数:$REPLICATION_FACTOR"

# 计算带宽
DAILY_BANDWIDTH=$((DAILY_MSG_COUNT * AVG_MSG_SIZE / 86400))
PEAK_BANDWIDTH=$((PEAK_TPS * AVG_MSG_SIZE))

echo -e "\n=== 带宽需求 ==="
echo "日均带宽:$DAILY_BANDWIDTH B/s = $(echo "scale=2; $DAILY_BANDWIDTH * 8 / 1000000" | bc) Mbps"
echo "峰值带宽:$PEAK_BANDWIDTH B/s = $(echo "scale=2; $PEAK_BANDWIDTH * 8 / 1000000" | bc) Mbps"

# 计算存储
STORAGE=$((DAILY_MSG_COUNT * AVG_MSG_SIZE * RETENTION_DAYS / 1024 / 1024 / 1024))
TOTAL_STORAGE=$((STORAGE * REPLICATION_FACTOR))

echo -e "\n=== 存储需求 ==="
echo "单副本存储:$STORAGE GB"
echo "总存储($REPLICATION_FACTOR 副本):$TOTAL_STORAGE GB"

# 推荐配置
echo -e "\n=== 推荐配置 ==="
if [ $PEAK_TPS -lt 1000 ]; then
    echo "Broker 数量:3"
    echo "每 Broker CPU: 4 核"
    echo "每 Broker 内存:8GB"
elif [ $PEAK_TPS -lt 5000 ]; then
    echo "Broker 数量:5"
    echo "每 Broker CPU: 8 核"
    echo "每 Broker 内存:16GB"
else
    echo "Broker 数量:7+"
    echo "每 Broker CPU: 16 核"
    echo "每 Broker 内存:32GB"
fi

二、性能基准

2.1 基准测试

测试脚本

#!/bin/bash
# Kafka 性能基准测试脚本

BOOTSTRAP="localhost:9092"
TOPIC="benchmark-topic"
DURATION=60  # 测试时长(秒)

echo "=== Kafka 性能基准测试 ==="

# 1. 创建测试 Topic
kafka-topics.sh --bootstrap-server $BOOTSTRAP \
  --create --topic $TOPIC \
  --partitions 8 --replication-factor 3

# 2. Producer 测试
echo -e "\n=== Producer 测试 ==="
kafka-producer-perf-test.sh \
  --topic $TOPIC \
  --num-records 1000000 \
  --record-size 1024 \
  --throughput 10000 \
  --producer-props bootstrap.servers=$BOOTSTRAP

# 3. Consumer 测试
echo -e "\n=== Consumer 测试 ==="
kafka-consumer-perf-test.sh \
  --topic $TOPIC \
  --messages 1000000 \
  --bootstrap-server $BOOTSTRAP \
  --group benchmark-consumer

# 4. 清理
kafka-topics.sh --bootstrap-server $BOOTSTRAP --delete --topic $TOPIC

echo -e "\n=== 测试完成 ==="

2.2 性能指标

典型性能数据

单 Broker 性能(8 核 16GB):
- 生产 TPS:5 万 -8 万
- 消费 TPS:8 万 -12 万
- 平均延迟:< 5ms
- P99 延迟:< 30ms

集群性能(5 Broker):
- 生产 TPS:25 万 -40 万
- 消费 TPS:40 万 -60 万

2.3 性能监控

#!/bin/bash
# Kafka 性能监控脚本

echo "=== Kafka 性能监控 ==="

# 1. Broker TPS
echo -e "\n=== Broker TPS ==="
for broker in kafka-1 kafka-2 kafka-3 kafka-4 kafka-5; do
    # 通过 JMX 获取 TPS
    echo "$broker: 需要通过 JMX 获取"
done

# 2. 延迟统计
echo -e "\n=== 延迟统计 ==="
# 通过 JMX 获取延迟指标

# 3. 堆积情况
echo -e "\n=== 消费堆积 ==="
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe | awk 'NR>3 {sum+=$6} END {print "总堆积量:" sum}'

三、集群扩容

3.1 添加 Broker

#!/bin/bash
# 添加 Broker 脚本

NEW_BROKER="kafka-6"
CLUSTER="DefaultCluster"

echo "=== 添加 Broker ==="

# 1. 准备新服务器
echo "准备新服务器..."
ssh $NEW_BROKER "yum install -y java-11-openjdk"
ssh $NEW_BROKER "wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz"
ssh $NEW_BROKER "tar -xzf kafka_2.13-3.4.0.tgz -C /opt/"
ssh $NEW_BROKER "mv /opt/kafka_2.13-3.4.0 /opt/kafka"

# 2. 配置 Broker
echo "配置 Broker..."
cat > /tmp/server.properties << EOF
broker.id=6
listeners=PLAINTEXT://:9092
log.dirs=/data/kafka-logs
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
num.partitions=8
default.replication.factor=3
EOF
scp /tmp/server.properties $NEW_BROKER:/opt/kafka/config/

# 3. 启动 Broker
echo "启动 Broker..."
ssh $NEW_BROKER "systemctl start kafka"

# 4. 验证
echo "验证 Broker..."
ssh $NEW_BROKER "kafka-broker-api-versions.sh --bootstrap-server localhost:9092"

echo "=== Broker 添加完成 ==="

3.2 分区重分配

生成重分配计划

# 生成重分配计划
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --topics-to-move-json-file topics-to-move.json \
  --broker-list "0,1,2,3,4,5" \
  --generate > reassign-plan.json

# topics-to-move.json
{
  "topics": [
    {"topic": "order-topic"},
    {"topic": "pay-topic"}
  ]
}

执行重分配

#!/bin/bash
# 执行分区重分配

echo "=== 执行分区重分配 ==="

# 1. 执行重分配
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassign-plan.json \
  --execute

# 2. 监控进度
while true; do
    kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file reassign-plan.json \
      --verify
    
    if [ $? -eq 0 ]; then
        echo "重分配完成"
        break
    fi
    
    echo "等待重分配..."
    sleep 60
done

echo "=== 重分配完成 ==="

3.3 扩容验证

#!/bin/bash
# 扩容验证脚本

echo "=== 扩容验证 ==="

# 1. 检查 Broker 数量
broker_count=$(kafka-broker-api-versions.sh --bootstrap-server localhost:9092 2>/dev/null | wc -l)
echo "Broker 数量:$broker_count"

# 2. 检查分区分布
echo -e "\n分区分布:"
kafka-topics.sh --bootstrap-server localhost:9092 --describe | \
  grep -o "Replicas: [0-9,]*" | sort | uniq -c

# 3. 检查数据平衡
echo -e "\n数据平衡:"
for broker in kafka-1 kafka-2 kafka-3 kafka-4 kafka-5 kafka-6; do
    size=$(ssh $broker "du -sh /data/kafka-logs 2>/dev/null | awk '{print \$1}'")
    echo "$broker: $size"
done

echo "=== 验证完成 ==="

四、性能优化

4.1 Broker 优化

JVM 优化

# kafka-server-start.sh

# 堆内存
export KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"

# G1 GC
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"

# GC 日志
export KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -Xlog:gc*:file=/var/log/kafka/gc.log:time,level,tags"

存储优化

# server.properties

# 异步刷盘
log.flush.interval.messages=10000
log.flush.interval.ms=1000

# 日志配置
log.segment.bytes=1073741824  # 1GB
log.retention.hours=168  # 7 天
log.retention.bytes=-1

# 线程配置
num.network.threads=8
num.io.threads=16
num.replica.fetchers=2

4.2 网络优化

# server.properties

# Socket 配置
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 监听器
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-1:9092

4.3 客户端优化

Producer 优化

Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");

// 批量配置
props.put("batch.size", 65536);  // 64KB
props.put("linger.ms", 10);      // 10ms

// 压缩
props.put("compression.type", "lz4");

// 可靠性
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Consumer 优化

Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put("group.id", "consumer-group");

// 拉取配置
props.put("fetch.min.bytes", 1024);
props.put("fetch.max.wait.ms", 100);
props.put("max.poll.records", 500);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

五、最佳实践

5.1 容量规划建议

容量规划:
1. 预留 30% 容量余量
2. 考虑业务增长因素
3. 定期评估容量使用
4. 建立容量预警机制

5.2 性能优化建议

性能优化:
1. 使用 SSD 磁盘
2. 配置异步刷盘
3. 优化 JVM 参数
4. 合理设置分区数
5. 启用压缩
6. 批量发送/消费

5.3 检查清单

容量检查:
- [ ] 评估业务需求
- [ ] 计算存储需求
- [ ] 计算带宽需求
- [ ] 规划 Broker 数量
- [ ] 预留容量余量

性能检查:
- [ ] 基准测试
- [ ] JVM 优化
- [ ] 存储优化
- [ ] 网络优化
- [ ] 客户端优化

总结

Kafka 容量规划与性能优化的核心要点:

  1. 容量评估:容量指标、评估模型、评估脚本
  2. 性能基准:基准测试、性能指标、性能监控
  3. 集群扩容:添加 Broker、分区重分配、扩容验证
  4. 性能优化:Broker 优化、网络优化、客户端优化
  5. 最佳实践:容量规划、性能优化、检查清单

核心要点

参考资料


分享这篇文章到:

上一篇文章
Redis 大 Key 与热 Key 问题分析与解决
下一篇文章
AI 工程化总结与展望