Skip to content
清晨的一缕阳光
返回

Kafka 日志存储结构详解

Kafka 的日志存储机制是其高吞吐、低延迟的核心基础。本文将深入探讨 Kafka 的日志存储结构、Segment 设计、索引机制以及日志清理策略。

一、存储架构概览

1.1 整体架构

Kafka 将消息持久化到磁盘,采用顺序写盘页缓存技术实现高性能:

graph TB
    subgraph Broker
        subgraph Topic
            P1[Partition 1]
            P2[Partition 2]
            P3[Partition 3]
        end
        
        P1 --> S1[Segment 1]
        P1 --> S2[Segment 2]
        P1 --> S3[Segment 3]
    end
    
    S1 --> F1[.log 文件]
    S1 --> F2[.index 文件]
    S1 --> F3[.timeindex 文件]

1.2 目录结构

kafka-logs/
├── topic-1/
│   ├── 00000000000000000000.log
│   ├── 00000000000000000000.index
│   ├── 00000000000000000000.timeindex
│   ├── 00000000000000000010.log
│   ├── 00000000000000000010.index
│   ├── 00000000000000000010.timeindex
│   ├── 00000000000000000025.log
│   ├── 00000000000000000025.index
│   ├── 00000000000000000025.timeindex
│   └── leader-epoch-checkpoint
├── topic-2/
│   └── ...
└── __consumer_offsets/
    └── ...

1.3 核心文件

文件类型说明作用
.log消息数据文件存储实际消息内容
.index偏移量索引稀疏索引,加速消息查找
.timeindex时间戳索引基于时间的消息查找
.checkpoint检查点文件记录日志清理位置

二、Segment 设计

2.1 Segment 结构

每个 Partition 由多个 Segment 组成:

graph LR
    subgraph Partition
        S1[Segment 1<br/>0-9]
        S2[Segment 2<br/>10-24]
        S3[Segment 3<br/>25-39]
    end
    
    S1 -->|滚动 | S2
    S2 -->|滚动 | S3

2.2 分段策略

滚动条件(满足任一即滚动):

配置参数默认值说明
log.segment.bytes1073741824 (1GB)Segment 文件大小
log.roll.hours168 (7 天)Segment 时间间隔
log.roll.jitter.hours0随机抖动时间
# Server 配置示例
log.segment.bytes=1073741824
log.roll.hours=168
log.retention.hours=168

2.3 消息格式

graph TB
    subgraph Message
        OFFSET[Offset: 8 bytes]
        SIZE[Size: 4 bytes]
        CRC[CRC: 4 bytes]
        MAGIC[Magic: 1 byte]
        ATTR[Attributes: 1 byte]
        TS[Timestamp: 8 bytes]
        KEY[Key: 4 bytes + data]
        VALUE[Value: 4 bytes + data]
    end

2.4 消息写入流程

// 1. 追加写入
FileChannel channel = new RandomAccessFile(logFile, "rw").getChannel();
channel.write(byteBuffer, position);

// 2. 刷新到磁盘
channel.force(false); // false = 不强制刷新元数据

// 3. 更新索引
updateIndex(offset, position);

三、索引机制

3.1 稀疏索引

Kafka 使用稀疏索引减少索引文件大小:

graph TB
    subgraph 索引文件
        I1[相对偏移:0, 位置:0]
        I2[相对偏移:4, 位置:256]
        I3[相对偏移:8, 位置:512]
        I4[相对偏移:12, 位置:768]
    end
    
    subgraph 数据文件
        M1[消息 0-3]
        M2[消息 4-7]
        M3[消息 8-11]
        M4[消息 12-15]
    end
    
    I1 --> M1
    I2 --> M2
    I3 --> M3
    I4 --> M4

3.2 索引条目

字段大小说明
相对偏移量4 bytesSegment 内相对偏移
位置4 bytes在.log 文件中的物理位置
# 索引配置
log.index.interval.bytes=4096  # 每 4KB 创建一条索引
log.index.size.max.bytes=10485760  # 索引文件最大 10MB

3.3 消息查找流程

sequenceDiagram
    participant App as 应用程序
    participant IDX as 索引文件
    participant LOG as 数据文件
    
    App->>IDX: 查找 offset=100
    IDX-->>App: 定位到 position=5000
    App->>LOG: 读取 position=5000
    LOG-->>App: 返回消息

3.4 时间戳索引

// 基于时间查找消息
public OffsetLookupResult findOffsetByTimestamp(long timestamp) {
    // 1. 在.timeindex 中查找
    TimestampIndexEntry entry = timeIndex.lookup(timestamp);
    
    // 2. 在.index 中查找对应位置
    IndexEntry posEntry = index.lookup(entry.relativeOffset);
    
    // 3. 在.log 中读取消息
    Message msg = log.read(posEntry.position);
    
    return new OffsetLookupResult(msg.offset, msg.timestamp);
}

四、日志清理策略

4.1 Delete 策略(默认)

基于时间或大小删除旧日志:

# 保留时间
log.retention.hours=168  # 7 天
log.retention.minutes=10080
log.retention.ms=604800000

# 保留大小
log.retention.bytes=-1  # 不限制
log.segment.retention.bytes=-1

# 检查间隔
log.retention.check.interval.ms=300000  # 5 分钟

4.2 Compaction 策略

保留每个 Key 的最新值:

graph TB
    subgraph 清理前
        K1A[key=A, value=1]
        K1B[key=A, value=2]
        K1C[key=A, value=3]
        K2A[key=B, value=x]
        K2B[key=B, value=y]
    end
    
    subgraph 清理后
        K1C_LATEST[key=A, value=3]
        K2B_LATEST[key=B, value=y]
    end
    
    K1A --> K1C_LATEST
    K1B --> K1C_LATEST
    K1C --> K1C_LATEST
    K2A --> K2B_LATEST
    K2B --> K2B_LATEST
# 启用 Compaction
log.cleanup.policy=compact
min.compaction.lag.ms=0
max.compaction.lag.ms=9223372036854775807

4.3 混合策略

# 同时使用 delete 和 compact
log.cleanup.policy=delete,compact

4.4 清理流程

graph TD
    A[清理线程启动] --> B{检查 Segment}
    B --> C{是否可删除?}
    C -->|是 | D[标记为可删除]
    C -->|否 | E[保留]
    D --> F[删除文件]
    F --> G[更新检查点]
    E --> H[等待下次检查]

五、高性能原理

5.1 顺序写盘

// Kafka 采用顺序写入,避免随机 IO
FileChannel channel = new RandomAccessFile(logFile, "rw").getChannel();

// 顺序追加
channel.write(byteBuffer, currentPosition);
currentPosition += byteBuffer.remaining();

性能对比

5.2 页缓存利用

graph TB
    subgraph 应用程序
        App[Kafka Broker]
    end
    
    subgraph 操作系统
        PC[Page Cache]
    end
    
    subgraph 磁盘
        DISK[磁盘文件]
    end
    
    App -->|读/写 | PC
    PC -->|异步刷盘 | DISK

优势

5.3 零拷贝技术

// 传统方式(4 次拷贝)
// disk -> kernel buffer -> user buffer -> kernel buffer -> socket

// 零拷贝(2 次拷贝)
FileChannel channel = new FileInputStream(file).getChannel();
channel.transferTo(position, count, socketChannel);

性能提升

5.4 批量读写

// 批量读取
List<Message> messages = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
    Message msg = readNextMessage();
    messages.add(msg);
}

// 批量写入
ByteBuffer buffer = ByteBuffer.allocate(batchSize * avgMessageSize);
for (Message msg : messages) {
    buffer.put(msg.toBytes());
}
channel.write(buffer);

六、配置优化

6.1 Broker 配置

# 日志目录
log.dirs=/data/kafka-logs

# Segment 配置
log.segment.bytes=1073741824
log.segment.delete.delay.ms=60000

# 保留策略
log.retention.hours=168
log.retention.bytes=-1

# 索引配置
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760

# 清理配置
log.cleanup.policy=delete
log.cleaner.enable=true
log.cleaner.threads=2

6.2 Topic 配置

# 创建 Topic 时指定
kafka-topics.sh --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 2 \
  --config retention.ms=604800000 \
  --config segment.bytes=1073741824 \
  --config cleanup.policy=delete

6.3 监控指标

指标说明告警阈值
LogSize日志大小> 80% 磁盘
LogEndOffset日志末尾偏移量-
LogStartOffset日志起始偏移量-
NumLogSegmentsSegment 数量> 1000

七、常见问题排查

7.1 磁盘空间不足

原因

解决

# 缩短保留时间
log.retention.hours=72

# 限制保留大小
log.retention.bytes=107374182400  # 100GB

# 启用压缩
compression.type=lz4

7.2 索引文件损坏

症状

解决

# 1. 停止 Broker
# 2. 删除索引文件
rm -f *.index *.timeindex

# 3. 重启 Broker(自动重建索引)

7.3 Segment 过多

原因

解决

# 增大 Segment 大小
log.segment.bytes=2147483648  # 2GB

八、最佳实践

8.1 存储规划

场景保留时间Segment 大小清理策略
日志收集7 天1GBdelete
事件溯源永久512MBcompact
审计日志1 年1GBdelete
状态存储永久256MBcompact

8.2 性能优化

# 1. 使用 SSD 磁盘
log.dirs=/mnt/ssd/kafka-logs

# 2. 多磁盘挂载
log.dirs=/mnt/disk1/kafka-logs,/mnt/disk2/kafka-logs

# 3. 调整刷盘策略
log.flush.interval.messages=10000
log.flush.interval.ms=1000

8.3 监控告警

# 监控磁盘使用率
df -h /data/kafka-logs

# 监控 Segment 数量
ls -l /data/kafka-logs/topic-1/ | wc -l

# 监控日志大小
du -sh /data/kafka-logs/*

总结

Kafka 日志存储的核心机制:

  1. Segment 设计:分段存储,便于管理和清理
  2. 稀疏索引:减少索引大小,加速查找
  3. 清理策略:Delete、Compaction 两种策略
  4. 高性能原理:顺序写盘、页缓存、零拷贝
  5. 配置优化:根据场景调整保留策略

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka Streams 入门与实战
下一篇文章
Kafka 高可用架构设计与实战