Kafka Log Compaction 是一种特殊的日志清理策略,保证每个 Key 至少保留最新的一条消息。本文将深入探讨 Log Compaction 的实现原理和实战应用。
一、Log Compaction 基础
1.1 什么是 Log Compaction?
定义:
Log Compaction = 保留每个 Key 的最新值
目的:
- 只保留最新状态
- 允许历史数据清理
- 支持数据恢复
对比 Delete 策略:
graph TB
subgraph Delete 策略
D1[消息 1<br/>Key=A, V=1]
D2[消息 2<br/>Key=A, V=2]
D3[消息 3<br/>Key=A, V=3]
D4[消息 4<br/>Key=B, V=x]
end
subgraph Compaction 策略
C1[消息 3<br/>Key=A, V=3]
C2[消息 4<br/>Key=B, V=x]
end
D1 -.->|压缩 | C1
D2 -.->|压缩 | C1
D3 -.->|压缩 | C1
D4 -.->|压缩 | C2
1.2 适用场景
| 场景 | 说明 | 示例 |
|---|---|---|
| 状态存储 | 只关心最新状态 | 用户信息、配置数据 |
| 数据恢复 | 重启后恢复状态 | 数据库变更日志 |
| 变更日志 | 记录数据变更 | CDC(变更数据捕获) |
| 事件溯源 | 状态机事件流 | 订单状态流转 |
1.3 不适用场景
| 场景 | 原因 |
|---|---|
| 日志收集 | 需要所有历史日志 |
| 审计日志 | 需要完整审计轨迹 |
| 消息队列 | 需要消费所有消息 |
二、实现原理
2.1 压缩流程
sequenceDiagram
participant P as Producer
participant L as Log
participant C as Compactor
P->>L: 写入消息 Key=A, V=1
P->>L: 写入消息 Key=A, V=2
P->>L: 写入消息 Key=A, V=3
C->>C: 触发压缩
C->>L: 扫描日志
C->>C: 构建 Key 最新偏移量
C->>L: 删除旧消息
L->>L: 保留 Key=A, V=3
2.2 数据结构
压缩前:
Offset | Key | Value | Timestamp
-------|-----|-------|----------
0 | A | 1 | 1000
1 | B | x | 1001
2 | A | 2 | 1002
3 | C | y | 1003
4 | A | 3 | 1004
5 | B | z | 1005
压缩后:
Offset | Key | Value | Timestamp
-------|-----|-------|----------
2 | A | 3 | 1004
5 | B | z | 1005
3 | C | y | 1003
2.3 压缩保证
保证:
- 每个 Key 至少保留一条消息(最新)
- 已压缩的消息不会被删除
- 新消息会触发压缩检查
不保证:
- 立即压缩(有延迟)
- 压缩后 Offset 连续
- 压缩后时间顺序
三、配置方法
3.1 Topic 配置
# 创建带压缩的 Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic user-state \
--partitions 3 --replication-factor 2 \
--config cleanup.policy=compact \
--config min.compaction.lag.ms=60000 \
--config max.compaction.lag.ms=3600000 \
--config delete.retention.ms=86400000
# 修改现有 Topic
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --add-config cleanup.policy=compact \
--entity-type topics --entity-name user-state
3.2 关键参数
| 参数 | 默认值 | 说明 |
|---|---|---|
cleanup.policy | delete | 清理策略(compact/delete) |
min.compaction.lag.ms | 0 | 消息最小压缩延迟 |
max.compaction.lag.ms | 9223372036854775807 | 消息最大压缩延迟 |
delete.retention.ms | 86400000 | 墓碑消息保留时间 |
min.cleanable.dirty.ratio | 0.5 | 压缩触发阈值 |
segment.min.bytes | 104857600 | Segment 最小大小 |
3.3 混合策略
# 同时使用 delete 和 compact
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic order-events \
--config cleanup.policy=compact,delete \
--config retention.ms=604800000 # 7 天后删除
适用场景:
- 需要最新状态(compact)
- 也需要历史数据(delete)
- 例如:订单事件流
四、实战应用
4.1 用户状态存储
Producer:
public class UserStateProducer {
private final KafkaProducer<String, UserState> producer;
public UserStateProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", JsonSerializer.class);
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
producer = new KafkaProducer<>(props);
}
public void updateUserState(String userId, UserState state) {
ProducerRecord<String, UserState> record =
new ProducerRecord<>("user-state", userId, state);
producer.send(record);
}
}
Consumer:
public class UserStateConsumer {
private final KafkaConsumer<String, UserState> consumer;
private final Map<String, UserState> userStates = new ConcurrentHashMap<>();
public UserStateConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user-state-consumer");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", JsonDeserializer.class);
props.put("auto.offset.reset", "earliest");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-state"));
}
public void consume() {
while (true) {
ConsumerRecords<String, UserState> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, UserState> record : records) {
// 更新最新状态
userStates.put(record.key(), record.value());
}
}
}
public UserState getUserState(String userId) {
return userStates.get(userId);
}
}
4.2 数据库 CDC
Debezium 配置:
{
"name": "mysql-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"database.include.list": "mydb",
"table.include.list": "mydb.users,mydb.orders",
"topic.prefix": "db",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.mydb"
}
}
Topic 配置:
# CDC Topic 使用压缩
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic db.mydb.users \
--config cleanup.policy=compact \
--config key.converter=org.apache.kafka.connect.json.JsonConverter \
--config value.converter=org.apache.kafka.connect.json.JsonConverter
4.3 配置中心
配置更新:
public class ConfigCenter {
private final KafkaProducer<String, ConfigValue> producer;
public void updateConfig(String configKey, ConfigValue value) {
ProducerRecord<String, ConfigValue> record =
new ProducerRecord<>("config-center", configKey, value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
log.info("配置更新成功:{}", configKey);
} else {
log.error("配置更新失败:{}", configKey, exception);
}
});
}
}
配置订阅:
public class ConfigSubscriber {
private final Map<String, ConfigValue> configs = new ConcurrentHashMap<>();
@KafkaListener(topics = "config-center", groupId = "config-subscriber")
public void listen(ConsumerRecord<String, ConfigValue> record) {
// 更新本地配置缓存
configs.put(record.key(), record.value());
log.info("配置更新:{} = {}", record.key(), record.value());
}
public ConfigValue getConfig(String key) {
return configs.get(key);
}
}
五、墓碑消息
5.1 什么是墓碑消息?
定义:
墓碑消息 = Key 存在,Value 为 null
作用:
- 标记 Key 已删除
- 压缩后保留墓碑
- 超过保留时间后删除
示例:
// 发送墓碑消息
ProducerRecord<String, String> tombstone =
new ProducerRecord<>("user-state", "user_123", null);
producer.send(tombstone);
5.2 墓碑处理流程
graph TB
subgraph 发送墓碑
S1[发送 Key=A, Value=null]
end
subgraph 压缩
C1[保留墓碑]
C2[删除 Key=A 的旧消息]
end
subgraph 清理
D1[超过保留时间]
D2[删除墓碑]
end
S1 --> C1
S1 --> C2
C1 --> D1
D1 --> D2
5.3 墓碑配置
# 墓碑保留时间(默认 24 小时)
delete.retention.ms=86400000
# 压缩后墓碑立即生效
# 但墓碑本身会保留到 delete.retention.ms
六、性能优化
6.1 压缩触发条件
# 压缩触发阈值(默认 0.5)
# 值越小,压缩越频繁
min.cleanable.dirty.ratio=0.5
# 后台压缩线程数
num.io.threads=8
# 压缩缓冲区大小
log.cleaner.buffers=1048576
6.2 Segment 配置
# Segment 滚动时间
log.roll.hours=168 # 7 天
# Segment 大小
log.segment.bytes=1073741824 # 1GB
# 压缩最小间隔
log.cleaner.min.compaction.lag.ms=0
6.3 监控指标
| 指标 | 说明 |
|---|---|
kafka-log-cleaner:cleaner-manager:percent-dirty | 待压缩比例 |
kafka-log-cleaner:cleaner:bytes-written | 压缩写入字节数 |
kafka-log-cleaner:cleaner:bytes-dropped | 压缩删除字节数 |
七、常见问题
7.1 压缩不生效
原因:
- 未达到压缩触发条件
- 消息未达到最小压缩延迟
- 压缩线程不足
解决:
# 降低压缩触发阈值
min.cleanable.dirty.ratio=0.1
# 减少压缩延迟
min.compaction.lag.ms=0
max.compaction.lag.ms=60000
# 增加压缩线程
num.io.threads=16
7.2 消息丢失
原因:
- 误用 Delete 策略
- 墓碑消息被清理
解决:
# 检查 Topic 配置
kafka-topics.sh --describe --topic my-topic
# 确保 cleanup.policy=compact
kafka-configs.sh --alter --add-config cleanup.policy=compact \
--entity-type topics --entity-name my-topic
7.3 性能下降
原因:
- 压缩过于频繁
- Segment 过小
解决:
# 增加压缩触发阈值
min.cleanable.dirty.ratio=0.9
# 增大 Segment
log.segment.bytes=2147483648 # 2GB
# 增加压缩间隔
log.cleaner.min.compaction.lag.ms=60000
八、最佳实践
8.1 Topic 设计
# 状态存储 Topic
kafka-topics.sh --create --topic user-state \
--config cleanup.policy=compact \
--config min.compaction.lag.ms=0 \
--config max.compaction.lag.ms=3600000 \
--config segment.bytes=1073741824
# CDC Topic
kafka-topics.sh --create --topic db-changes \
--config cleanup.policy=compact \
--config key.converter.schemas.enable=true \
--config value.converter.schemas.enable=true
# 混合策略 Topic
kafka-topics.sh --create --topic order-events \
--config cleanup.policy=compact,delete \
--config retention.ms=604800000
8.2 Key 设计
// ✅ 好的 Key 设计
record.key = userId; // 用户 ID
record.key = orderId; // 订单 ID
record.key = configKey; // 配置 Key
// ❌ 不好的 Key 设计
record.key = UUID.randomUUID(); // 随机 UUID(无压缩效果)
record.key = timestamp; // 时间戳(无压缩效果)
8.3 监控告警
# Prometheus 告警规则
groups:
- name: kafka-compaction
rules:
- alert: KafkaCompactionLag
expr: kafka_log_cleaner_percent_dirty > 0.8
for: 30m
labels:
severity: warning
annotations:
summary: "压缩延迟过高:{{ $value }}"
- alert: KafkaCompactionStopped
expr: kafka_log_cleaner_bytes_written == 0
for: 1h
labels:
severity: critical
annotations:
summary: "压缩停止"
总结
Kafka Log Compaction 的核心要点:
- 实现原理:保留每个 Key 的最新值
- 配置方法:cleanup.policy、压缩参数
- 实战应用:状态存储、CDC、配置中心
- 墓碑消息:标记删除、保留时间
- 性能优化:压缩触发、Segment 配置
核心要点:
- 理解压缩适用场景
- 正确配置压缩参数
- 合理使用墓碑消息
- 监控压缩状态
参考资料
- Kafka Log Compaction 官方文档
- KIP-280: Log Compaction Improvements
- 《Kafka 权威指南》第 3 章