Skip to content
清晨的一缕阳光
返回

Kafka 集成

Kafka 集成

Kafka 简介

核心特性

高吞吐

持久化

流处理

架构设计

┌─────────────┐      ┌─────────────┐
│  Producer   │      │   Consumer  │
│  (生产者)    │      │   (消费者)   │
└──────┬──────┘      └──────▲──────┘
       │                    │
       │ 发布消息            │ 订阅消息
       ▼                    │
┌─────────────────────────────────┐
│         Kafka Cluster           │
│  ┌─────────┐  ┌─────────┐      │
│  │ Broker 1│  │ Broker 2│ ...  │
│  │ ┌─────┐ │  │ ┌─────┐ │      │
│  │ │Topic│ │  │ │Topic│ │      │
│  │ │Part │ │  │ │Part │ │      │
│  │ └─────┘ │  │ └─────┘ │      │
│  └─────────┘  └─────────┘      │
└─────────────────────────────────┘


┌─────────────┐
│ ZooKeeper   │
│ (元数据管理) │
└─────────────┘

核心概念

主题(Topic)

分区(Partition)

副本(Replica)

消费者组(Consumer Group)

快速开始

1. Docker 部署

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - 8088:8080
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

2. 添加依赖

<dependencies>
    <!-- Spring for Apache Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
    <!-- Kafka Streams (可选) -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
</dependencies>

3. 基础配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all  # 所有副本确认
      retries: 3  # 重试次数
      batch-size: 16384  # 批量大小
      buffer-memory: 33554432  # 缓冲内存
    consumer:
      group-id: order-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: latest  # 从最新开始消费
      enable-auto-commit: true  # 自动提交偏移量
      auto-commit-interval: 1000  # 提交间隔
    listener:
      ack-mode: batch  # 批量确认
      concurrency: 3  # 并发消费者数量

4. 发送消息

基础发送

@Service
public class OrderProducer {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendOrderCreated(Order order) {
        String topic = "order-created";
        String key = order.getId().toString();
        String value = JSON.toJSONString(order);
        
        kafkaTemplate.send(topic, key, value);
        
        log.info("订单消息已发送:{}", order.getId());
    }
}

发送并获取结果

@Service
public class OrderProducer {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public SendResult<String, String> sendOrder(Order order) {
        String topic = "order-created";
        String key = order.getId().toString();
        String value = JSON.toJSONString(order);
        
        // 同步发送
        SendResult<String, String> result = kafkaTemplate.send(
            topic, key, value
        ).get(5, TimeUnit.SECONDS);
        
        log.info("消息发送成功:offset={}, partition={}",
            result.getRecordMetadata().offset(),
            result.getRecordMetadata().partition()
        );
        
        return result;
    }
}

异步回调

@Service
public class OrderProducer {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendOrderAsync(Order order) {
        String topic = "order-created";
        String key = order.getId().toString();
        String value = JSON.toJSONString(order);
        
        ListenableFuture<SendResult<String, String>> future = 
            kafkaTemplate.send(topic, key, value);
        
        future.addCallback(
            result -> {
                log.info("消息发送成功:offset={}",
                    result.getRecordMetadata().offset()
                );
            },
            ex -> {
                log.error("消息发送失败", ex);
            }
        );
    }
}

5. 消费消息

基础消费者

@Component
public class OrderConsumer {
    
    @KafkaListener(
        topics = "order-created",
        groupId = "order-consumer-group"
    )
    public void consume(String message) {
        log.info("收到消息:{}", message);
        
        Order order = JSON.parseObject(message, Order.class);
        processOrder(order);
    }
    
    private void processOrder(Order order) {
        // 业务处理
    }
}

指定分区

@KafkaListener(
    topics = "order-created",
    groupId = "order-consumer-group",
    containerFactory = "kafkaListenerContainerFactory"
)
public void consume(
    @Payload String message,
    @TopicPartition(topic = "order-created", partitionOffsets = {
        @PartitionOffset(partition = "0", initialOffset = "0"),
        @PartitionOffset(partition = "1", initialOffset = "0")
    })
) {
    // 消费指定分区
}

批量消费

@KafkaListener(
    topics = "order-created",
    groupId = "order-consumer-group"
)
public void consumeBatch(List<String> messages) {
    log.info("批量收到 {} 条消息", messages.size());
    
    for (String message : messages) {
        Order order = JSON.parseObject(message, Order.class);
        processOrder(order);
    }
}

高级特性

1. 消息序列化

JSON 序列化

@Configuration
public class KafkaConfig {
    
    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTemplate<String, Order> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.model");
        
        return new DefaultKafkaConsumerFactory<>(
            config,
            new StringDeserializer(),
            new JsonDeserializer<>(Order.class)
        );
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        return factory;
    }
}

2. 消息过滤

消息过滤器

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    
    // 消息过滤
    factory.setRecordFilterStrategy(record -> {
        String value = record.value();
        Order order = JSON.parseObject(value, Order.class);
        
        // 过滤金额小于 100 的订单
        return order.getAmount() < 100;
    });
    
    return factory;
}

3. 错误处理

错误处理器

@Configuration
public class KafkaErrorHandlerConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
        ConsumerFactory<String, String> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        
        // 配置错误处理器
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            (consumerRecord, exception) -> {
                // 处理失败消息
                log.error("消费失败:topic={}, partition={}, offset={}",
                    consumerRecord.topic(),
                    consumerRecord.partition(),
                    consumerRecord.offset(),
                    exception
                );
                
                // 发送到死信队列
                sendToDeadLetterQueue(consumerRecord, exception);
            },
            new FixedBackOff(1000L, 3L)  // 重试 3 次,间隔 1 秒
        );
        
        errorHandler.addNotRetryableExceptions(
            IllegalArgumentException.class,  // 不重试的异常
            ClassCastException.class
        );
        
        factory.setCommonErrorHandler(errorHandler);
        
        return factory;
    }
    
    private void sendToDeadLetterQueue(ConsumerRecord<String, String> record, Exception e) {
        // 发送到死信队列
        kafkaTemplate.send("order-dlq", record.key(), record.value());
    }
}

4. 手动提交偏移量

手动 ACK

@Component
public class ManualAckConsumer {
    
    @KafkaListener(
        topics = "order-created",
        groupId = "order-consumer-group",
        containerFactory = "manualAckFactory"
    )
    public void consume(
        String message,
        Acknowledgment acknowledgment,
        ConsumerRecord<?, ?> record
    ) {
        try {
            log.info("收到消息:topic={}, partition={}, offset={}",
                record.topic(), record.partition(), record.offset()
            );
            
            Order order = JSON.parseObject(message, Order.class);
            processOrder(order);
            
            // 手动确认
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            log.error("消费失败", e);
            // 不确认,会重新投递
            throw e;
        }
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> manualAckFactory(
        ConsumerFactory<String, String> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);
        return factory;
    }
}

5. 消息拦截器

生产者拦截器

public class OrderProducerInterceptor implements ProducerInterceptor<String, String> {
    
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // 添加消息头
        record.headers().add("timestamp", 
            String.valueOf(System.currentTimeMillis()).getBytes()
        );
        
        // 添加消息 ID
        record.headers().add("message-id", UUID.randomUUID().toString().getBytes());
        
        log.info("发送消息:topic={}, key={}", record.topic(), record.key());
        
        return record;
    }
    
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            log.error("消息确认失败", exception);
        } else {
            log.info("消息确认成功:offset={}", metadata.offset());
        }
    }
    
    @Override
    public void close() {}
    
    @Override
    public void configure(Map<String, ?> configs) {}
}

Kafka Streams

1. 基础流处理

流处理应用

@Configuration
public class KafkaStreamsConfig {
    
    @Bean
    public KStream<String, Order> orderStream(
        StreamsBuilder streamsBuilder
    ) {
        KStream<String, Order> stream = streamsBuilder
            .stream("order-created",
                Consumed.with(Serdes.String(), jsonSerde(Order.class))
            );
        
        // 处理逻辑
        stream
            .filter((key, order) -> order.getAmount() > 100)  // 过滤
            .mapValues(order -> {
                order.setStatus("PROCESSED");
                return order;
            })  // 转换
            .to("order-processed");  // 输出
        
        return stream;
    }
    
    private <T> Serde<T> jsonSerde(Class<T> clazz) {
        Map<String, Object> props = new HashMap<>();
        props.put(JsonSerde.JSON_VALUE_TYPE, clazz);
        return new JsonSerde<>(props);
    }
}

2. 聚合操作

统计订单

@Bean
public KTable<String, Long> orderCountByUser(
    KStream<String, Order> orderStream,
    StreamsBuilder streamsBuilder
) {
    return orderStream
        .groupBy((key, order) -> order.getUserId())
        .count()
        .toStream()
        .peek((userId, count) -> 
            log.info("用户 {} 订单数:{}", userId, count)
        )
        .to("order-count-result");
}

窗口聚合

@Bean
public KTable<Windowed<String>, Long> orderCountByWindow(
    KStream<String, Order> orderStream
) {
    return orderStream
        .groupBy((key, order) -> order.getUserId())
        .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
        .count();
}

3. 流表连接

订单与用户关联

@Bean
public KStream<String, OrderUser> orderUserJoin(
    KStream<String, Order> orderStream,
    KTable<String, User> userTable
) {
    return orderStream
        .join(userTable,
            (order, user) -> new OrderUser(order, user),
            JoinWindows.of(Duration.ofMinutes(10))
        );
}

监控告警

1. 监控指标

生产者指标

消费者指标

Broker 指标

2. Prometheus 集成

# Kafka 配置
kafka:
  metrics:
    reporters:
      - io.prometheus.jmx.JmxReporter
# Prometheus 告警规则
groups:
  - name: kafka
    rules:
      - alert: KafkaConsumerLag
        expr: kafka_consumer_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 消费积压"
      
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_underreplicatedpartitions > 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Kafka 副本未同步"

最佳实践

1. Topic 设计

分区数规划

# Topic 配置
num.partitions=12
default.replication.factor=3
retention.ms=604800000  # 7 天

2. 消息设计

消息大小

消息格式

{
  "id": "uuid",
  "type": "ORDER_CREATED",
  "timestamp": 1680000000000,
  "version": "1.0",
  "data": {}
}

3. 消费者优化

批量消费

spring:
  kafka:
    consumer:
      max-poll-records: 500  # 每次拉取数量
      fetch-min-size: 1  # 最小拉取大小
      fetch-max-wait: 500  # 最大等待时间

并发消费

spring:
  kafka:
    listener:
      concurrency: 6  # 并发消费者数量
      max-poll-records: 500

4. 可靠性保障

生产者

spring:
  kafka:
    producer:
      acks: all  # 所有副本确认
      retries: 3
      max-in-flight-requests-per-connection: 5
      enable-idempotence: true  # 幂等生产者

消费者

spring:
  kafka:
    consumer:
      enable-auto-commit: false  # 手动提交
      isolation-level: read_committed  # 只读已提交

5. 故障处理

死信队列

@Component
public class DeadLetterQueueHandler {
    
    @KafkaListener(
        topics = "order-dlq",
        groupId = "dlq-consumer-group"
    )
    public void handleDlq(String message) {
        // 记录失败消息
        log.error("死信消息:{}", message);
        
        // 告警或人工处理
        alertService.sendAlert("Kafka 死信消息", message);
    }
}

重平衡处理

@Component
public class ConsumerRebalanceListenerImpl implements ConsumerRebalanceListener {
    
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 提交偏移量
        // 清理资源
        log.info("分区被撤销:{}", partitions);
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 从指定位置开始消费
        log.info("分区被分配:{}", partitions);
    }
}

总结

Kafka 是高吞吐、分布式的消息系统,支持发布订阅、流处理、事件溯源等多种场景。

与 Spring Cloud 集成后,可以实现服务解耦、异步通信、数据同步等功能。

在生产环境中,需要做好 Topic 规划、监控告警和故障处理,确保 Kafka 集群的稳定运行。


分享这篇文章到:

上一篇文章
Spring Boot Helm Chart 打包部署
下一篇文章
Spring Boot 统一异常处理