Kafka 消费者组(Consumer Group)是 Kafka 实现水平扩展和容错的核心机制。本文将深入探讨消费者组的工作原理、重平衡(Rebalance)机制以及分区分配策略。
一、消费者组基础
1.1 核心概念
消费者组是一组消费者的集合,它们共同消费一个或多个 Topic 的消息:
graph TB
subgraph Consumer Group A
C1[Consumer 1]
C2[Consumer 2]
end
subgraph Topic
P1[Partition 1]
P2[Partition 2]
P3[Partition 3]
P4[Partition 4]
end
C1 --> P1
C1 --> P2
C2 --> P3
C2 --> P4
1.2 核心特性
| 特性 | 说明 |
|---|---|
| 一组一消费 | 一条消息只会被组内一个消费者消费 |
| 多组订阅 | 多个消费者组可以订阅同一 Topic |
| 分区独占 | 组内一个分区只能被一个消费者消费 |
| 水平扩展 | 增加消费者数量提高消费能力 |
1.3 基本使用
// 1. 创建消费者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
// 2. 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 3. 订阅 Topic
consumer.subscribe(Arrays.asList("topic-1", "topic-2"));
// 4. 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
二、重平衡机制
2.1 什么是重平衡?
**重平衡(Rebalance)**是消费者组内分区所有权重新分配的过程:
sequenceDiagram
participant C1 as Consumer 1
participant C2 as Consumer 2
participant GC as Group Coordinator
participant K as Kafka
C1->>GC: 加入组
GC->>GC: 触发 Rebalance
GC->>C1: 分配分区 P1, P2
GC->>C2: 分配分区 P3, P4
C1->>K: 消费 P1, P2
C2->>K: 消费 P3, P4
C1->>GC: 消费者离开
GC->>GC: 触发 Rebalance
GC->>C2: 重新分配 P1, P2, P3, P4
2.2 触发条件
| 触发条件 | 说明 | 示例 |
|---|---|---|
| 组成员变化 | 新消费者加入或旧消费者离开 | 扩容、缩容、故障 |
| Topic 变化 | Topic 分区数增加 | 分区扩容 |
| 订阅变化 | 订阅的 Topic 列表变化 | 动态订阅 |
2.3 重平衡流程
graph TD
A[触发 Rebalance] --> B[停止消费]
B --> C[撤销分区所有权]
C --> D[选举 Leader Consumer]
D --> E[Leader 分配分区]
E --> F[同步分区分配方案]
F --> G[恢复消费]
2.4 重平衡问题
Stop-the-World 问题:
- 重平衡期间所有消费者停止消费
- 影响消息实时性
- 可能导致重复消费
优化方案:
// 1. 静态成员资格(减少不必要的重平衡)
props.put("group.instance.id", "consumer-1");
props.put("session.timeout.ms", "45000");
// 2. 优雅关闭
consumer.wakeup();
consumer.close();
// 3. 实现 RebalanceListener
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 提交偏移量,清理资源
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 准备消费新分区
}
});
三、分区分配策略
3.1 Range 策略(默认)
按 Topic 分区范围分配:
graph LR
subgraph Consumer 1
T1P1[T1-P1]
T1P2[T1-P2]
end
subgraph Consumer 2
T1P3[T1-P3]
T1P4[T1-P4]
end
subgraph Topic 1
P1[P1]
P2[P2]
P3[P3]
P4[P4]
end
特点:
- 可能导致负载不均衡
- 适合单 Topic 场景
3.2 RoundRobin 策略
轮询分配所有分区:
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
特点:
- 负载均衡更好
- 适合多 Topic 场景
3.3 Sticky 策略
粘性分配,最小化重平衡影响:
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.StickyAssignor");
特点:
- 保留原有分配
- 减少重平衡开销
3.4 自定义分配策略
public class CustomAssignor implements ConsumerPartitionAssignor {
@Override
public Map<String, List<TopicPartition>> assign(
Cluster cluster,
Map<String, ConsumerPartitionAssignor.Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignment = new HashMap<>();
// 自定义分配逻辑
// 例如:根据消费者负载、网络位置等
return assignment;
}
@Override
public String name() {
return "custom";
}
}
四、位移管理
4.1 位移存储
Kafka 消费者位移存储在内部 Topic __consumer_offsets 中:
graph TB
C[Consumer] -->|提交位移 | CO[__consumer_offsets]
CO -->|读取位移 | C
CO --> P1[Partition 0]
CO --> P2[Partition 1]
4.2 自动提交
// 自动提交配置
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000"); // 5 秒
// 消费循环
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
// 自动提交由 Kafka 后台完成
}
4.3 手动提交
同步提交:
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
consumer.commitSync(); // 同步提交
}
异步提交:
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("提交失败", exception);
// 可以重试或记录日志
}
});
4.4 混合提交
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
consumer.commitAsync(); // 异步提交提高性能
}
} catch (Exception e) {
log.error("消费异常", e);
} finally {
try {
consumer.commitSync(); // 关闭前同步提交
} finally {
consumer.close();
}
}
五、消费模式
5.1 推模式 vs 拉模式
Kafka 采用拉模式(Pull):
- 消费者主动拉取消息
- 可以控制消费速率
- 避免背压问题
5.2 批量消费
// 批量处理配置
props.put("fetch.min.bytes", "1024"); // 最小 1KB
props.put("fetch.max.wait.ms", "500"); // 最多等待 500ms
props.put("max.poll.records", "500"); // 每次最多 500 条
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 批量处理
List<String> batch = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
batch.add(record.value());
}
processBatch(batch);
consumer.commitSync();
}
5.3 并发消费
// 单消费者多线程处理
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Future<?>> futures = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
futures.add(executor.submit(() -> process(record)));
}
// 等待所有线程完成
for (Future<?> future : futures) {
future.get();
}
consumer.commitSync();
executor.shutdown();
}
六、性能优化
6.1 配置优化
| 参数 | 默认值 | 优化建议 | 说明 |
|---|---|---|---|
fetch.min.bytes | 1 | 1024-65536 | 减少网络请求 |
fetch.max.wait.ms | 500 | 100-500 | 控制延迟 |
max.poll.records | 500 | 100-1000 | 控制单次处理量 |
session.timeout.ms | 10000 | 30000-45000 | 减少误判 |
6.2 处理优化
// 1. 异步处理
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 提交后立即处理,不等待
consumer.commitAsync();
executor.submit(() -> {
for (ConsumerRecord<String, String> record : records) {
process(record);
}
});
}
// 2. 批处理
List<ConsumerRecord<String, String>> batch = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(batch::add);
if (batch.size() >= 100) {
processBatch(batch);
batch.clear();
}
}
七、常见问题排查
7.1 重复消费
原因:
- 重平衡导致位移未及时提交
- 消费者故障转移
解决:
// 1. 实现 RebalanceListener
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(); // 提交位移
}
});
// 2. 业务层去重
Set<String> processedIds = new HashSet<>();
if (!processedIds.contains(record.key())) {
process(record);
processedIds.add(record.key());
}
7.2 消费滞后
监控指标:
records-lag-max:最大滞后消息数records-lag-avg:平均滞后消息数
解决:
// 1. 增加消费者数量
// 2. 提高并发度
// 3. 优化处理逻辑
// 4. 增加分区数
7.3 Rebalance 频繁
原因:
- 会话超时设置过短
- 消费者处理时间过长
- 网络不稳定
解决:
props.put("session.timeout.ms", "30000");
props.put("max.poll.interval.ms", "300000");
props.put("heartbeat.interval.ms", "10000");
总结
Kafka 消费者组的核心机制:
- 消费者组:实现水平扩展和容错
- 重平衡:分区重新分配,注意 Stop-the-World 问题
- 分区分配:Range、RoundRobin、Sticky 策略
- 位移管理:自动/手动提交,保证消费进度
- 性能优化:批量消费、并发处理、配置调优
核心要点:
- 理解重平衡触发条件和影响
- 根据场景选择合适的分区分配策略
- 合理配置位移提交策略
- 监控消费滞后和 Rebalance 频率
参考资料
- Kafka Consumer 官方文档
- KIP-62: Improve Consumer Rebalance
- 《Kafka 权威指南》第 5 章