Skip to content
清晨的一缕阳光
返回

Kafka 架构设计与核心概念

Kafka 是一个高吞吐量、分布式、基于发布/订阅模式的消息队列系统,广泛应用于日志收集、流处理、事件溯源等场景。本文将深入探讨 Kafka 的架构设计和核心概念。

一、Kafka 简介

1.1 核心特性

特性说明
高吞吐每秒可处理百万级消息
分布式支持水平扩展,高可用
持久化消息持久化到磁盘,支持回放
多订阅支持多个消费者组订阅
实时性低延迟,毫秒级消息传递

1.2 应用场景

graph TB
    subgraph 应用场景
        A[日志收集] --> B[ELK Stack]
        C[流处理] --> D[Flink/Spark]
        E[事件驱动] --> F[微服务解耦]
        G[数据同步] --> H[CDC/ETL]
    end

二、架构概览

2.1 核心组件

graph TB
    subgraph Producer
        P1[Producer 1]
        P2[Producer 2]
    end
    
    subgraph Kafka Cluster
        B1[Broker 1]
        B2[Broker 2]
        B3[Broker 3]
    end
    
    subgraph Consumer
        C1[Consumer 1]
        C2[Consumer 2]
    end
    
    P1 --> B1
    P2 --> B2
    B1 --> C1
    B2 --> C2
    B3 --> C1

核心组件说明

组件说明
Producer消息生产者,负责发送消息到 Kafka
Consumer消息消费者,负责从 Kafka 消费消息
BrokerKafka 服务器,存储消息数据
Topic消息主题,消息的逻辑分类
Partition分区,Topic 的物理分片
Replica副本,Partition 的数据备份
Zookeeper分布式协调服务(Kafka 2.8+ 可选)

2.2 Topic 与 Partition

Topic 结构

Topic: orders
├── Partition 0
│   ├── Offset 0: {order_id: 1}
│   ├── Offset 1: {order_id: 2}
│   └── Offset 2: {order_id: 3}
├── Partition 1
│   ├── Offset 0: {order_id: 4}
│   └── Offset 1: {order_id: 5}
└── Partition 2
    ├── Offset 0: {order_id: 6}
    └── Offset 1: {order_id: 7}

Partition 特点

三、Producer 详解

3.1 发送流程

sequenceDiagram
    participant App as 应用程序
    participant Producer as Producer
    participant Broker as Broker
    
    App->>Producer: 发送消息
    Producer->>Producer: 序列化
    Producer->>Producer: 分区选择
    Producer->>Broker: 发送请求
    Broker-->>Producer: ACK 确认
    Producer-->>App: 返回结果

3.2 分区策略

// 1. 指定 Key(默认策略)
ProducerRecord<String, String> record = 
    new ProducerRecord<>("topic", "key", "value");

// 2. 指定 Partition
ProducerRecord<String, String> record = 
    new ProducerRecord<>("topic", 0, "key", "value");

// 3. 自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
    CustomPartitioner.class.getName());

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, 
                        byte[] keyBytes, Object value, 
                        byte[] valueBytes, Cluster cluster) {
        // 自定义分区逻辑
        return Math.abs(key.hashCode()) % numPartitions;
    }
}

3.3 ACK 确认机制

ACK说明可靠性性能
0不等待确认
1Leader 确认
all所有副本确认

配置示例

# 最高可靠性
acks=all
retries=3
min.insync.replicas=2

# 最高性能
acks=0
batch.size=65536
linger.ms=10

# 平衡方案
acks=1
retries=3
enable.idempotence=true

3.4 幂等性与事务

幂等性配置

enable.idempotence=true
max.in.flight.requests.per.connection=5

事务示例

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-1");
producer.initTransactions();

try {
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

四、Consumer 详解

4.1 消费者组

graph TB
    subgraph Consumer Group
        C1[Consumer 1]
        C2[Consumer 2]
        C3[Consumer 3]
    end
    
    subgraph Topic
        P0[Partition 0]
        P1[Partition 1]
        P2[Partition 2]
    end
    
    C1 --> P0
    C2 --> P1
    C3 --> P2

消费者组特点

4.2 分区分配策略

// Range 策略(默认)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
    "org.apache.kafka.clients.consumer.RangeAssignor");

// RoundRobin 策略
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
    "org.apache.kafka.clients.consumer.RoundRobinAssignor");

// Sticky 策略
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
    "org.apache.kafka.clients.consumer.StickyAssignor");

4.3 Offset 管理

// 自动提交(默认 5 秒)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");

// 手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

// 同步提交
consumer.commitSync();

// 异步提交
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        // 处理失败
    }
});

4.4 消费流程

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
    StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
    StringDeserializer.class);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic"));

while (true) {
    ConsumerRecords<String, String> records = 
        consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", 
            record.offset(), record.key(), record.value());
    }
    
    // 手动提交
    consumer.commitSync();
}

五、Broker 详解

5.1 副本机制

副本类型

类型说明
Leader主副本,处理读写请求
Follower从副本,同步 Leader 数据
ISRIn-Sync Replicas,与 Leader 同步的副本

副本同步流程

sequenceDiagram
    participant Producer
    participant Leader
    participant Follower1
    participant Follower2
    
    Producer->>Leader: 写入消息
    Leader->>Follower1: 同步消息
    Leader->>Follower2: 同步消息
    Follower1-->>Leader: ACK
    Follower2-->>Leader: ACK
    Leader-->>Producer: 确认写入

5.2 高可用配置

# 副本数
default.replication.factor=3

# 最小 ISR 副本数
min.insync.replicas=2

# 自动 Leader 选举
auto.leader.rebalance.enable=true

# 未复制的 Leader 选举
unclean.leader.election.enable=false

5.3 日志管理

日志段配置

# 日志段大小
log.segment.bytes=1073741824

# 日志保留时间
log.retention.hours=168

# 日志保留大小
log.retention.bytes=-1

# 清理策略(delete/compact)
log.cleanup.policy=delete

六、实战应用

6.1 Spring Boot 集成

Maven 依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置文件

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3
    consumer:
      group-id: test-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: latest
      enable-auto-commit: false

Producer 实现

@Service
public class KafkaProducer {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String key, String message) {
        ListenableFuture<SendResult<String, String>> future = 
            kafkaTemplate.send(topic, key, message);
        
        future.addCallback(result -> {
            System.out.println("发送成功:" + result.getRecordMetadata().offset());
        }, ex -> {
            System.out.println("发送失败:" + ex.getMessage());
        });
    }
}

Consumer 实现

@Component
public class KafkaConsumer {
    
    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void consume(String message) {
        System.out.println("收到消息:" + message);
    }
    
    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void consumeWithKey(ConsumerRecord<String, String> record) {
        System.out.printf("Key: %s, Value: %s, Offset: %d%n", 
            record.key(), record.value(), record.offset());
    }
}

6.2 订单处理示例

// 订单事件
public class OrderEvent {
    private String orderId;
    private String userId;
    private Double amount;
    private OrderStatus status;
    private LocalDateTime createTime;
}

// Producer
@Service
public class OrderProducer {
    
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    
    public void sendOrderCreated(OrderEvent event) {
        kafkaTemplate.send("order-events", event.getOrderId(), event);
    }
}

// Consumer
@Component
public class OrderConsumer {
    
    @KafkaListener(topics = "order-events", groupId = "order-group")
    public void consume(OrderEvent event) {
        switch (event.getStatus()) {
            case CREATED:
                handleOrderCreated(event);
                break;
            case PAID:
                handleOrderPaid(event);
                break;
            case SHIPPED:
                handleOrderShipped(event);
                break;
        }
    }
}

七、性能优化

7.1 Producer 优化

参数推荐值说明
batch.size65536批量大小
linger.ms10等待时间
compression.typelz4压缩类型
buffer.memory33554432缓冲区大小

7.2 Consumer 优化

参数推荐值说明
fetch.min.bytes1最小拉取字节
fetch.max.wait.ms500最大等待时间
max.poll.records500每次拉取记录数
session.timeout.ms30000会话超时时间

7.3 Broker 优化

# 网络线程
num.network.threads=8

# IO 线程
num.io.threads=16

# Socket 缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 日志刷新
log.flush.interval.messages=10000
log.flush.interval.ms=1000

八、监控与运维

8.1 关键指标

指标说明告警阈值
Under Replicated Partitions未同步副本数> 0
Offline Partitions Count离线分区数> 0
Active Controller CountActive Controller 数!= 1
Request Handler Avg Idle Percent请求处理空闲率< 30%
Network Handler Avg Idle Percent网络处理空闲率< 30%

8.2 常用命令

# 查看 Topic 列表
kafka-topics.sh --bootstrap-server localhost:9092 --list

# 创建 Topic
kafka-topics.sh --create --topic test \
    --partitions 3 --replication-factor 2 \
    --bootstrap-server localhost:9092

# 查看 Consumer Group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 查看消费详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --describe --group test-group

# 重置 Offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --group test-group --reset-offsets --to-latest --execute

九、总结

Kafka 核心要点

概念要点
Topic消息逻辑分类,可分多个 Partition
Partition物理分片,保证有序性
Replica数据副本,保证高可用
Producer消息发送,支持幂等和事务
Consumer消息消费,支持消费者组
Broker存储服务,Leader/Follower 角色

参考资料


分享这篇文章到:

上一篇文章
MCP 协议详解
下一篇文章
RocketMQ 容量规划与性能优化实战