Kafka 的日志存储机制是其高吞吐、低延迟的核心基础。本文将深入探讨 Kafka 的日志存储结构、Segment 设计、索引机制以及日志清理策略。
一、存储架构概览
1.1 整体架构
Kafka 将消息持久化到磁盘,采用顺序写盘和页缓存技术实现高性能:
graph TB
subgraph Broker
subgraph Topic
P1[Partition 1]
P2[Partition 2]
P3[Partition 3]
end
P1 --> S1[Segment 1]
P1 --> S2[Segment 2]
P1 --> S3[Segment 3]
end
S1 --> F1[.log 文件]
S1 --> F2[.index 文件]
S1 --> F3[.timeindex 文件]
1.2 目录结构
kafka-logs/
├── topic-1/
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.timeindex
│ ├── 00000000000000000010.log
│ ├── 00000000000000000010.index
│ ├── 00000000000000000010.timeindex
│ ├── 00000000000000000025.log
│ ├── 00000000000000000025.index
│ ├── 00000000000000000025.timeindex
│ └── leader-epoch-checkpoint
├── topic-2/
│ └── ...
└── __consumer_offsets/
└── ...
1.3 核心文件
| 文件类型 | 说明 | 作用 |
|---|---|---|
.log | 消息数据文件 | 存储实际消息内容 |
.index | 偏移量索引 | 稀疏索引,加速消息查找 |
.timeindex | 时间戳索引 | 基于时间的消息查找 |
.checkpoint | 检查点文件 | 记录日志清理位置 |
二、Segment 设计
2.1 Segment 结构
每个 Partition 由多个 Segment 组成:
graph LR
subgraph Partition
S1[Segment 1<br/>0-9]
S2[Segment 2<br/>10-24]
S3[Segment 3<br/>25-39]
end
S1 -->|滚动 | S2
S2 -->|滚动 | S3
2.2 分段策略
滚动条件(满足任一即滚动):
| 配置参数 | 默认值 | 说明 |
|---|---|---|
log.segment.bytes | 1073741824 (1GB) | Segment 文件大小 |
log.roll.hours | 168 (7 天) | Segment 时间间隔 |
log.roll.jitter.hours | 0 | 随机抖动时间 |
# Server 配置示例
log.segment.bytes=1073741824
log.roll.hours=168
log.retention.hours=168
2.3 消息格式
graph TB
subgraph Message
OFFSET[Offset: 8 bytes]
SIZE[Size: 4 bytes]
CRC[CRC: 4 bytes]
MAGIC[Magic: 1 byte]
ATTR[Attributes: 1 byte]
TS[Timestamp: 8 bytes]
KEY[Key: 4 bytes + data]
VALUE[Value: 4 bytes + data]
end
2.4 消息写入流程
// 1. 追加写入
FileChannel channel = new RandomAccessFile(logFile, "rw").getChannel();
channel.write(byteBuffer, position);
// 2. 刷新到磁盘
channel.force(false); // false = 不强制刷新元数据
// 3. 更新索引
updateIndex(offset, position);
三、索引机制
3.1 稀疏索引
Kafka 使用稀疏索引减少索引文件大小:
graph TB
subgraph 索引文件
I1[相对偏移:0, 位置:0]
I2[相对偏移:4, 位置:256]
I3[相对偏移:8, 位置:512]
I4[相对偏移:12, 位置:768]
end
subgraph 数据文件
M1[消息 0-3]
M2[消息 4-7]
M3[消息 8-11]
M4[消息 12-15]
end
I1 --> M1
I2 --> M2
I3 --> M3
I4 --> M4
3.2 索引条目
| 字段 | 大小 | 说明 |
|---|---|---|
| 相对偏移量 | 4 bytes | Segment 内相对偏移 |
| 位置 | 4 bytes | 在.log 文件中的物理位置 |
# 索引配置
log.index.interval.bytes=4096 # 每 4KB 创建一条索引
log.index.size.max.bytes=10485760 # 索引文件最大 10MB
3.3 消息查找流程
sequenceDiagram
participant App as 应用程序
participant IDX as 索引文件
participant LOG as 数据文件
App->>IDX: 查找 offset=100
IDX-->>App: 定位到 position=5000
App->>LOG: 读取 position=5000
LOG-->>App: 返回消息
3.4 时间戳索引
// 基于时间查找消息
public OffsetLookupResult findOffsetByTimestamp(long timestamp) {
// 1. 在.timeindex 中查找
TimestampIndexEntry entry = timeIndex.lookup(timestamp);
// 2. 在.index 中查找对应位置
IndexEntry posEntry = index.lookup(entry.relativeOffset);
// 3. 在.log 中读取消息
Message msg = log.read(posEntry.position);
return new OffsetLookupResult(msg.offset, msg.timestamp);
}
四、日志清理策略
4.1 Delete 策略(默认)
基于时间或大小删除旧日志:
# 保留时间
log.retention.hours=168 # 7 天
log.retention.minutes=10080
log.retention.ms=604800000
# 保留大小
log.retention.bytes=-1 # 不限制
log.segment.retention.bytes=-1
# 检查间隔
log.retention.check.interval.ms=300000 # 5 分钟
4.2 Compaction 策略
保留每个 Key 的最新值:
graph TB
subgraph 清理前
K1A[key=A, value=1]
K1B[key=A, value=2]
K1C[key=A, value=3]
K2A[key=B, value=x]
K2B[key=B, value=y]
end
subgraph 清理后
K1C_LATEST[key=A, value=3]
K2B_LATEST[key=B, value=y]
end
K1A --> K1C_LATEST
K1B --> K1C_LATEST
K1C --> K1C_LATEST
K2A --> K2B_LATEST
K2B --> K2B_LATEST
# 启用 Compaction
log.cleanup.policy=compact
min.compaction.lag.ms=0
max.compaction.lag.ms=9223372036854775807
4.3 混合策略
# 同时使用 delete 和 compact
log.cleanup.policy=delete,compact
4.4 清理流程
graph TD
A[清理线程启动] --> B{检查 Segment}
B --> C{是否可删除?}
C -->|是 | D[标记为可删除]
C -->|否 | E[保留]
D --> F[删除文件]
F --> G[更新检查点]
E --> H[等待下次检查]
五、高性能原理
5.1 顺序写盘
// Kafka 采用顺序写入,避免随机 IO
FileChannel channel = new RandomAccessFile(logFile, "rw").getChannel();
// 顺序追加
channel.write(byteBuffer, currentPosition);
currentPosition += byteBuffer.remaining();
性能对比:
- 顺序写盘:600 MB/s
- 随机写盘:100 KB/s
- 性能差距:6000 倍
5.2 页缓存利用
graph TB
subgraph 应用程序
App[Kafka Broker]
end
subgraph 操作系统
PC[Page Cache]
end
subgraph 磁盘
DISK[磁盘文件]
end
App -->|读/写 | PC
PC -->|异步刷盘 | DISK
优势:
- 减少系统调用
- 利用操作系统预读
- 断电后数据不丢失(已刷盘)
5.3 零拷贝技术
// 传统方式(4 次拷贝)
// disk -> kernel buffer -> user buffer -> kernel buffer -> socket
// 零拷贝(2 次拷贝)
FileChannel channel = new FileInputStream(file).getChannel();
channel.transferTo(position, count, socketChannel);
性能提升:
- 减少 CPU 拷贝
- 减少上下文切换
- 吞吐量提升 3-5 倍
5.4 批量读写
// 批量读取
List<Message> messages = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
Message msg = readNextMessage();
messages.add(msg);
}
// 批量写入
ByteBuffer buffer = ByteBuffer.allocate(batchSize * avgMessageSize);
for (Message msg : messages) {
buffer.put(msg.toBytes());
}
channel.write(buffer);
六、配置优化
6.1 Broker 配置
# 日志目录
log.dirs=/data/kafka-logs
# Segment 配置
log.segment.bytes=1073741824
log.segment.delete.delay.ms=60000
# 保留策略
log.retention.hours=168
log.retention.bytes=-1
# 索引配置
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
# 清理配置
log.cleanup.policy=delete
log.cleaner.enable=true
log.cleaner.threads=2
6.2 Topic 配置
# 创建 Topic 时指定
kafka-topics.sh --create \
--topic my-topic \
--partitions 3 \
--replication-factor 2 \
--config retention.ms=604800000 \
--config segment.bytes=1073741824 \
--config cleanup.policy=delete
6.3 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
LogSize | 日志大小 | > 80% 磁盘 |
LogEndOffset | 日志末尾偏移量 | - |
LogStartOffset | 日志起始偏移量 | - |
NumLogSegments | Segment 数量 | > 1000 |
七、常见问题排查
7.1 磁盘空间不足
原因:
- 保留时间过长
- 消息量过大
- 清理策略不当
解决:
# 缩短保留时间
log.retention.hours=72
# 限制保留大小
log.retention.bytes=107374182400 # 100GB
# 启用压缩
compression.type=lz4
7.2 索引文件损坏
症状:
- 消息查找失败
- Consumer 无法消费
解决:
# 1. 停止 Broker
# 2. 删除索引文件
rm -f *.index *.timeindex
# 3. 重启 Broker(自动重建索引)
7.3 Segment 过多
原因:
log.segment.bytes过小- 消息量过大
解决:
# 增大 Segment 大小
log.segment.bytes=2147483648 # 2GB
八、最佳实践
8.1 存储规划
| 场景 | 保留时间 | Segment 大小 | 清理策略 |
|---|---|---|---|
| 日志收集 | 7 天 | 1GB | delete |
| 事件溯源 | 永久 | 512MB | compact |
| 审计日志 | 1 年 | 1GB | delete |
| 状态存储 | 永久 | 256MB | compact |
8.2 性能优化
# 1. 使用 SSD 磁盘
log.dirs=/mnt/ssd/kafka-logs
# 2. 多磁盘挂载
log.dirs=/mnt/disk1/kafka-logs,/mnt/disk2/kafka-logs
# 3. 调整刷盘策略
log.flush.interval.messages=10000
log.flush.interval.ms=1000
8.3 监控告警
# 监控磁盘使用率
df -h /data/kafka-logs
# 监控 Segment 数量
ls -l /data/kafka-logs/topic-1/ | wc -l
# 监控日志大小
du -sh /data/kafka-logs/*
总结
Kafka 日志存储的核心机制:
- Segment 设计:分段存储,便于管理和清理
- 稀疏索引:减少索引大小,加速查找
- 清理策略:Delete、Compaction 两种策略
- 高性能原理:顺序写盘、页缓存、零拷贝
- 配置优化:根据场景调整保留策略
核心要点:
- 理解 Segment 滚动机制和文件结构
- 合理配置保留策略和清理间隔
- 利用顺序写盘和零拷贝提升性能
- 监控磁盘使用和 Segment 数量
参考资料
- Kafka Storage 官方文档
- KIP-31: Log compaction
- 《Kafka 权威指南》第 3 章