Skip to content
清晨的一缕阳光
返回

Kafka 消费者组与重平衡机制详解

Kafka 消费者组(Consumer Group)是 Kafka 实现水平扩展和容错的核心机制。本文将深入探讨消费者组的工作原理、重平衡(Rebalance)机制以及分区分配策略。

一、消费者组基础

1.1 核心概念

消费者组是一组消费者的集合,它们共同消费一个或多个 Topic 的消息:

graph TB
    subgraph Consumer Group A
        C1[Consumer 1]
        C2[Consumer 2]
    end
    
    subgraph Topic
        P1[Partition 1]
        P2[Partition 2]
        P3[Partition 3]
        P4[Partition 4]
    end
    
    C1 --> P1
    C1 --> P2
    C2 --> P3
    C2 --> P4

1.2 核心特性

特性说明
一组一消费一条消息只会被组内一个消费者消费
多组订阅多个消费者组可以订阅同一 Topic
分区独占组内一个分区只能被一个消费者消费
水平扩展增加消费者数量提高消费能力

1.3 基本使用

// 1. 创建消费者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");

// 2. 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 3. 订阅 Topic
consumer.subscribe(Arrays.asList("topic-1", "topic-2"));

// 4. 消费消息
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n",
            record.offset(), record.key(), record.value());
    }
}

二、重平衡机制

2.1 什么是重平衡?

**重平衡(Rebalance)**是消费者组内分区所有权重新分配的过程:

sequenceDiagram
    participant C1 as Consumer 1
    participant C2 as Consumer 2
    participant GC as Group Coordinator
    participant K as Kafka
    
    C1->>GC: 加入组
    GC->>GC: 触发 Rebalance
    GC->>C1: 分配分区 P1, P2
    GC->>C2: 分配分区 P3, P4
    C1->>K: 消费 P1, P2
    C2->>K: 消费 P3, P4
    C1->>GC: 消费者离开
    GC->>GC: 触发 Rebalance
    GC->>C2: 重新分配 P1, P2, P3, P4

2.2 触发条件

触发条件说明示例
组成员变化新消费者加入或旧消费者离开扩容、缩容、故障
Topic 变化Topic 分区数增加分区扩容
订阅变化订阅的 Topic 列表变化动态订阅

2.3 重平衡流程

graph TD
    A[触发 Rebalance] --> B[停止消费]
    B --> C[撤销分区所有权]
    C --> D[选举 Leader Consumer]
    D --> E[Leader 分配分区]
    E --> F[同步分区分配方案]
    F --> G[恢复消费]

2.4 重平衡问题

Stop-the-World 问题

优化方案

// 1. 静态成员资格(减少不必要的重平衡)
props.put("group.instance.id", "consumer-1");
props.put("session.timeout.ms", "45000");

// 2. 优雅关闭
consumer.wakeup();
consumer.close();

// 3. 实现 RebalanceListener
consumer.subscribe(topics, new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 提交偏移量,清理资源
        consumer.commitSync();
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 准备消费新分区
    }
});

三、分区分配策略

3.1 Range 策略(默认)

按 Topic 分区范围分配:

graph LR
    subgraph Consumer 1
        T1P1[T1-P1]
        T1P2[T1-P2]
    end
    
    subgraph Consumer 2
        T1P3[T1-P3]
        T1P4[T1-P4]
    end
    
    subgraph Topic 1
        P1[P1]
        P2[P2]
        P3[P3]
        P4[P4]
    end

特点

3.2 RoundRobin 策略

轮询分配所有分区:

props.put("partition.assignment.strategy", 
    "org.apache.kafka.clients.consumer.RoundRobinAssignor");

特点

3.3 Sticky 策略

粘性分配,最小化重平衡影响:

props.put("partition.assignment.strategy", 
    "org.apache.kafka.clients.consumer.StickyAssignor");

特点

3.4 自定义分配策略

public class CustomAssignor implements ConsumerPartitionAssignor {
    @Override
    public Map<String, List<TopicPartition>> assign(
            Cluster cluster,
            Map<String, ConsumerPartitionAssignor.Subscription> subscriptions) {
        
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        
        // 自定义分配逻辑
        // 例如:根据消费者负载、网络位置等
        
        return assignment;
    }
    
    @Override
    public String name() {
        return "custom";
    }
}

四、位移管理

4.1 位移存储

Kafka 消费者位移存储在内部 Topic __consumer_offsets 中:

graph TB
    C[Consumer] -->|提交位移 | CO[__consumer_offsets]
    CO -->|读取位移 | C
    CO --> P1[Partition 0]
    CO --> P2[Partition 1]

4.2 自动提交

// 自动提交配置
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000"); // 5 秒

// 消费循环
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }
    // 自动提交由 Kafka 后台完成
}

4.3 手动提交

同步提交

props.put("enable.auto.commit", "false");

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }
    consumer.commitSync(); // 同步提交
}

异步提交

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("提交失败", exception);
        // 可以重试或记录日志
    }
});

4.4 混合提交

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            process(record);
        }
        consumer.commitAsync(); // 异步提交提高性能
    }
} catch (Exception e) {
    log.error("消费异常", e);
} finally {
    try {
        consumer.commitSync(); // 关闭前同步提交
    } finally {
        consumer.close();
    }
}

五、消费模式

5.1 推模式 vs 拉模式

Kafka 采用拉模式(Pull)

5.2 批量消费

// 批量处理配置
props.put("fetch.min.bytes", "1024"); // 最小 1KB
props.put("fetch.max.wait.ms", "500"); // 最多等待 500ms
props.put("max.poll.records", "500"); // 每次最多 500 条

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    // 批量处理
    List<String> batch = new ArrayList<>();
    for (ConsumerRecord<String, String> record : records) {
        batch.add(record.value());
    }
    processBatch(batch);
    
    consumer.commitSync();
}

5.3 并发消费

// 单消费者多线程处理
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    ExecutorService executor = Executors.newFixedThreadPool(4);
    List<Future<?>> futures = new ArrayList<>();
    
    for (ConsumerRecord<String, String> record : records) {
        futures.add(executor.submit(() -> process(record)));
    }
    
    // 等待所有线程完成
    for (Future<?> future : futures) {
        future.get();
    }
    
    consumer.commitSync();
    executor.shutdown();
}

六、性能优化

6.1 配置优化

参数默认值优化建议说明
fetch.min.bytes11024-65536减少网络请求
fetch.max.wait.ms500100-500控制延迟
max.poll.records500100-1000控制单次处理量
session.timeout.ms1000030000-45000减少误判

6.2 处理优化

// 1. 异步处理
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    // 提交后立即处理,不等待
    consumer.commitAsync();
    
    executor.submit(() -> {
        for (ConsumerRecord<String, String> record : records) {
            process(record);
        }
    });
}

// 2. 批处理
List<ConsumerRecord<String, String>> batch = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    records.forEach(batch::add);
    
    if (batch.size() >= 100) {
        processBatch(batch);
        batch.clear();
    }
}

七、常见问题排查

7.1 重复消费

原因

解决

// 1. 实现 RebalanceListener
consumer.subscribe(topics, new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync(); // 提交位移
    }
});

// 2. 业务层去重
Set<String> processedIds = new HashSet<>();
if (!processedIds.contains(record.key())) {
    process(record);
    processedIds.add(record.key());
}

7.2 消费滞后

监控指标

解决

// 1. 增加消费者数量
// 2. 提高并发度
// 3. 优化处理逻辑
// 4. 增加分区数

7.3 Rebalance 频繁

原因

解决

props.put("session.timeout.ms", "30000");
props.put("max.poll.interval.ms", "300000");
props.put("heartbeat.interval.ms", "10000");

总结

Kafka 消费者组的核心机制:

  1. 消费者组:实现水平扩展和容错
  2. 重平衡:分区重新分配,注意 Stop-the-World 问题
  3. 分区分配:Range、RoundRobin、Sticky 策略
  4. 位移管理:自动/手动提交,保证消费进度
  5. 性能优化:批量消费、并发处理、配置调优

核心要点

参考资料


分享这篇文章到:

上一篇文章
多 Agent 协作框架实战
下一篇文章
Redis 分布式锁实战