Skip to content
清晨的一缕阳光
返回

Kafka Log Compaction 日志压缩详解

Kafka Log Compaction 是一种特殊的日志清理策略,保证每个 Key 至少保留最新的一条消息。本文将深入探讨 Log Compaction 的实现原理和实战应用。

一、Log Compaction 基础

1.1 什么是 Log Compaction?

定义

Log Compaction = 保留每个 Key 的最新值

目的:
- 只保留最新状态
- 允许历史数据清理
- 支持数据恢复

对比 Delete 策略

graph TB
    subgraph Delete 策略
        D1[消息 1<br/>Key=A, V=1]
        D2[消息 2<br/>Key=A, V=2]
        D3[消息 3<br/>Key=A, V=3]
        D4[消息 4<br/>Key=B, V=x]
    end
    
    subgraph Compaction 策略
        C1[消息 3<br/>Key=A, V=3]
        C2[消息 4<br/>Key=B, V=x]
    end
    
    D1 -.->|压缩 | C1
    D2 -.->|压缩 | C1
    D3 -.->|压缩 | C1
    D4 -.->|压缩 | C2

1.2 适用场景

场景说明示例
状态存储只关心最新状态用户信息、配置数据
数据恢复重启后恢复状态数据库变更日志
变更日志记录数据变更CDC(变更数据捕获)
事件溯源状态机事件流订单状态流转

1.3 不适用场景

场景原因
日志收集需要所有历史日志
审计日志需要完整审计轨迹
消息队列需要消费所有消息

二、实现原理

2.1 压缩流程

sequenceDiagram
    participant P as Producer
    participant L as Log
    participant C as Compactor
    
    P->>L: 写入消息 Key=A, V=1
    P->>L: 写入消息 Key=A, V=2
    P->>L: 写入消息 Key=A, V=3
    
    C->>C: 触发压缩
    C->>L: 扫描日志
    C->>C: 构建 Key 最新偏移量
    C->>L: 删除旧消息
    L->>L: 保留 Key=A, V=3

2.2 数据结构

压缩前

Offset | Key | Value | Timestamp
-------|-----|-------|----------
0      | A   | 1     | 1000
1      | B   | x     | 1001
2      | A   | 2     | 1002
3      | C   | y     | 1003
4      | A   | 3     | 1004
5      | B   | z     | 1005

压缩后

Offset | Key | Value | Timestamp
-------|-----|-------|----------
2      | A   | 3     | 1004
5      | B   | z     | 1005
3      | C   | y     | 1003

2.3 压缩保证

保证

不保证

三、配置方法

3.1 Topic 配置

# 创建带压缩的 Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic user-state \
  --partitions 3 --replication-factor 2 \
  --config cleanup.policy=compact \
  --config min.compaction.lag.ms=60000 \
  --config max.compaction.lag.ms=3600000 \
  --config delete.retention.ms=86400000

# 修改现有 Topic
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --add-config cleanup.policy=compact \
  --entity-type topics --entity-name user-state

3.2 关键参数

参数默认值说明
cleanup.policydelete清理策略(compact/delete)
min.compaction.lag.ms0消息最小压缩延迟
max.compaction.lag.ms9223372036854775807消息最大压缩延迟
delete.retention.ms86400000墓碑消息保留时间
min.cleanable.dirty.ratio0.5压缩触发阈值
segment.min.bytes104857600Segment 最小大小

3.3 混合策略

# 同时使用 delete 和 compact
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic order-events \
  --config cleanup.policy=compact,delete \
  --config retention.ms=604800000  # 7 天后删除

适用场景

四、实战应用

4.1 用户状态存储

Producer

public class UserStateProducer {
    
    private final KafkaProducer<String, UserState> producer;
    
    public UserStateProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", JsonSerializer.class);
        props.put("acks", "all");
        props.put("retries", Integer.MAX_VALUE);
        
        producer = new KafkaProducer<>(props);
    }
    
    public void updateUserState(String userId, UserState state) {
        ProducerRecord<String, UserState> record = 
            new ProducerRecord<>("user-state", userId, state);
        
        producer.send(record);
    }
}

Consumer

public class UserStateConsumer {
    
    private final KafkaConsumer<String, UserState> consumer;
    private final Map<String, UserState> userStates = new ConcurrentHashMap<>();
    
    public UserStateConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "user-state-consumer");
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", JsonDeserializer.class);
        props.put("auto.offset.reset", "earliest");
        
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("user-state"));
    }
    
    public void consume() {
        while (true) {
            ConsumerRecords<String, UserState> records = 
                consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, UserState> record : records) {
                // 更新最新状态
                userStates.put(record.key(), record.value());
            }
        }
    }
    
    public UserState getUserState(String userId) {
        return userStates.get(userId);
    }
}

4.2 数据库 CDC

Debezium 配置

{
  "name": "mysql-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "database.include.list": "mydb",
    "table.include.list": "mydb.users,mydb.orders",
    "topic.prefix": "db",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.mydb"
  }
}

Topic 配置

# CDC Topic 使用压缩
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic db.mydb.users \
  --config cleanup.policy=compact \
  --config key.converter=org.apache.kafka.connect.json.JsonConverter \
  --config value.converter=org.apache.kafka.connect.json.JsonConverter

4.3 配置中心

配置更新

public class ConfigCenter {
    
    private final KafkaProducer<String, ConfigValue> producer;
    
    public void updateConfig(String configKey, ConfigValue value) {
        ProducerRecord<String, ConfigValue> record = 
            new ProducerRecord<>("config-center", configKey, value);
        
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                log.info("配置更新成功:{}", configKey);
            } else {
                log.error("配置更新失败:{}", configKey, exception);
            }
        });
    }
}

配置订阅

public class ConfigSubscriber {
    
    private final Map<String, ConfigValue> configs = new ConcurrentHashMap<>();
    
    @KafkaListener(topics = "config-center", groupId = "config-subscriber")
    public void listen(ConsumerRecord<String, ConfigValue> record) {
        // 更新本地配置缓存
        configs.put(record.key(), record.value());
        log.info("配置更新:{} = {}", record.key(), record.value());
    }
    
    public ConfigValue getConfig(String key) {
        return configs.get(key);
    }
}

五、墓碑消息

5.1 什么是墓碑消息?

定义

墓碑消息 = Key 存在,Value 为 null

作用:
- 标记 Key 已删除
- 压缩后保留墓碑
- 超过保留时间后删除

示例

// 发送墓碑消息
ProducerRecord<String, String> tombstone = 
    new ProducerRecord<>("user-state", "user_123", null);

producer.send(tombstone);

5.2 墓碑处理流程

graph TB
    subgraph 发送墓碑
        S1[发送 Key=A, Value=null]
    end
    
    subgraph 压缩
        C1[保留墓碑]
        C2[删除 Key=A 的旧消息]
    end
    
    subgraph 清理
        D1[超过保留时间]
        D2[删除墓碑]
    end
    
    S1 --> C1
    S1 --> C2
    C1 --> D1
    D1 --> D2

5.3 墓碑配置

# 墓碑保留时间(默认 24 小时)
delete.retention.ms=86400000

# 压缩后墓碑立即生效
# 但墓碑本身会保留到 delete.retention.ms

六、性能优化

6.1 压缩触发条件

# 压缩触发阈值(默认 0.5)
# 值越小,压缩越频繁
min.cleanable.dirty.ratio=0.5

# 后台压缩线程数
num.io.threads=8

# 压缩缓冲区大小
log.cleaner.buffers=1048576

6.2 Segment 配置

# Segment 滚动时间
log.roll.hours=168  # 7 天

# Segment 大小
log.segment.bytes=1073741824  # 1GB

# 压缩最小间隔
log.cleaner.min.compaction.lag.ms=0

6.3 监控指标

指标说明
kafka-log-cleaner:cleaner-manager:percent-dirty待压缩比例
kafka-log-cleaner:cleaner:bytes-written压缩写入字节数
kafka-log-cleaner:cleaner:bytes-dropped压缩删除字节数

七、常见问题

7.1 压缩不生效

原因

解决

# 降低压缩触发阈值
min.cleanable.dirty.ratio=0.1

# 减少压缩延迟
min.compaction.lag.ms=0
max.compaction.lag.ms=60000

# 增加压缩线程
num.io.threads=16

7.2 消息丢失

原因

解决

# 检查 Topic 配置
kafka-topics.sh --describe --topic my-topic

# 确保 cleanup.policy=compact
kafka-configs.sh --alter --add-config cleanup.policy=compact \
  --entity-type topics --entity-name my-topic

7.3 性能下降

原因

解决

# 增加压缩触发阈值
min.cleanable.dirty.ratio=0.9

# 增大 Segment
log.segment.bytes=2147483648  # 2GB

# 增加压缩间隔
log.cleaner.min.compaction.lag.ms=60000

八、最佳实践

8.1 Topic 设计

# 状态存储 Topic
kafka-topics.sh --create --topic user-state \
  --config cleanup.policy=compact \
  --config min.compaction.lag.ms=0 \
  --config max.compaction.lag.ms=3600000 \
  --config segment.bytes=1073741824

# CDC Topic
kafka-topics.sh --create --topic db-changes \
  --config cleanup.policy=compact \
  --config key.converter.schemas.enable=true \
  --config value.converter.schemas.enable=true

# 混合策略 Topic
kafka-topics.sh --create --topic order-events \
  --config cleanup.policy=compact,delete \
  --config retention.ms=604800000

8.2 Key 设计

// ✅ 好的 Key 设计
record.key = userId;           // 用户 ID
record.key = orderId;          // 订单 ID
record.key = configKey;        // 配置 Key

// ❌ 不好的 Key 设计
record.key = UUID.randomUUID();  // 随机 UUID(无压缩效果)
record.key = timestamp;          // 时间戳(无压缩效果)

8.3 监控告警

# Prometheus 告警规则
groups:
  - name: kafka-compaction
    rules:
      - alert: KafkaCompactionLag
        expr: kafka_log_cleaner_percent_dirty > 0.8
        for: 30m
        labels:
          severity: warning
        annotations:
          summary: "压缩延迟过高:{{ $value }}"
      
      - alert: KafkaCompactionStopped
        expr: kafka_log_cleaner_bytes_written == 0
        for: 1h
        labels:
          severity: critical
        annotations:
          summary: "压缩停止"

总结

Kafka Log Compaction 的核心要点:

  1. 实现原理:保留每个 Key 的最新值
  2. 配置方法:cleanup.policy、压缩参数
  3. 实战应用:状态存储、CDC、配置中心
  4. 墓碑消息:标记删除、保留时间
  5. 性能优化:压缩触发、Segment 配置

核心要点

参考资料


分享这篇文章到:

上一篇文章
AI 工程化系列完整学习指南
下一篇文章
焦虑的一代-成长的需求