Skip to content
清晨的一缕阳光
返回

Kafka Streams 入门与实战

Kafka Streams 是 Kafka 提供的流处理库,用于构建实时数据处理应用。本文将深入探讨 Kafka Streams 的核心概念、API 使用和实战应用。

一、Kafka Streams 基础

1.1 什么是 Kafka Streams?

Kafka Streams 是一个客户端库,用于构建流处理应用:

graph LR
    subgraph 输入
        K1[Kafka Topic 1]
        K2[Kafka Topic 2]
    end
    
    subgraph Kafka Streams
        A[Stream Processing]
    end
    
    subgraph 输出
        K3[Kafka Topic 3]
        DB[(数据库)]
    end
    
    K1 --> A
    K2 --> A
    A --> K3
    A --> DB

1.2 核心特性

特性说明
嵌入式库无需独立集群,直接嵌入应用
Exactly-Once支持精确一次语义
状态存储内置 RocksDB 状态存储
容错能力自动故障转移和恢复
弹性扩展支持水平扩展

1.3 与其他流处理框架对比

框架部署方式延迟吞吐量学习曲线
Kafka Streams嵌入式毫秒级
Flink独立集群毫秒级
Spark Streaming独立集群秒级
Storm独立集群毫秒级

二、核心概念

2.1 KStream vs KTable

graph TB
    subgraph KStream
        S1[记录 1<br/>t=1, k=A, v=1]
        S2[记录 2<br/>t=2, k=B, v=2]
        S3[记录 3<br/>t=3, k=A, v=3]
    end
    
    subgraph KTable
        T1[A=1]
        T2[B=2]
        T3[A=3]
    end
    
    S1 --> T1
    S2 --> T2
    S3 --> T3
    
    note over KTable: 最新值:A=3, B=2
概念说明适用场景
KStream记录流,每条记录都是事件事件处理、日志分析
KTable变更日志表,存储最新状态状态存储、聚合结果
GlobalKTable全局表,每个实例都有完整副本维表关联、广播变量

2.2 Time 概念

graph LR
    subgraph EventTime
        E1[事件产生时间]
    end
    
    subgraph ProcessingTime
        P1[事件处理时间]
    end
    
    subgraph IngestionTime
        I1[事件到达 Kafka 时间]
    end
    
    E1 --> I1 --> P1
时间类型说明适用场景
Event Time事件发生时间准确的时间窗口
Processing Time事件处理时间实时监控
Ingestion Time事件到达 Kafka 时间折中方案

2.3 Window 窗口

窗口类型说明示例
Tumbling Window固定大小、不重叠每 5 分钟统计
Hopping Window固定大小、可重叠每 1 分钟统计过去 5 分钟
Sliding Window可变大小、基于事件过去 5 分钟内的事件
Session Window基于会话活动用户会话分析

三、DSL API

3.1 基础示例

Properties props = new Properties();
props.put("application.id", "order-stats-app");
props.put("bootstrap.servers", "localhost:9092");
props.put("default.key.serde", Serdes.String().getClass());
props.put("default.value.serde", Serdes.Long().getClass());

StreamsBuilder builder = new StreamsBuilder();

// 1. 读取输入流
KStream<String, Order> orders = builder.stream("order-topic",
    Consumed.with(Serdes.String(), orderSerde));

// 2. 过滤
KStream<String, Order> paidOrders = orders
    .filter((key, order) -> order.getStatus() == OrderStatus.PAID);

// 3. 按用户分组
KGroupedStream<String, Order> userOrders = paidOrders
    .groupBy((key, order) -> order.getUserId());

// 4. 聚合
KTable<String, Long> userOrderCount = userOrders
    .count(Materialized.as("user-order-count-store"));

// 5. 输出结果
userOrderCount.toStream()
    .to("user-order-count-topic", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

3.2 转换操作

// Map - 转换键值
KStream<String, OrderAmount> amounts = orders
    .map((key, order) -> KeyValue.pair(order.getUserId(), order.getAmount()));

// FlatMap - 一对多转换
KStream<String, String> words = lines
    .flatMapValues(value -> Arrays.asList(value.split("\\W+")));

// Peek - 副作用操作
orders.peek((key, order) -> log.info("收到订单:{}", order.getId()))
    .filter(...)
    .map(...)
    .to(...);

// Branch - 分流
KStream<String, Order>[] branches = orders
    .branch(
        (key, order) -> order.getAmount() > 1000,  // 大额订单
        (key, order) -> order.getAmount() <= 1000   // 普通订单
    );

branches[0].to("vip-order-topic");
branches[1].to("normal-order-topic");

3.3 聚合操作

// Count - 计数
KTable<String, Long> countByUser = orders
    .groupBy((key, order) -> order.getUserId())
    .count();

// Reduce - 归并
KTable<String, OrderSummary> summaryByUser = orders
    .groupBy((key, order) -> order.getUserId())
    .reduce(
        (summary1, summary2) -> {
            summary1.setTotalAmount(summary1.getTotalAmount() + summary2.getTotalAmount());
            summary1.setOrderCount(summary1.getOrderCount() + 1);
            return summary1;
        },
        Materialized.as("user-summary-store")
    );

// Aggregate - 自定义聚合
KTable<String, UserStats> statsByUser = orders
    .groupBy((key, order) -> order.getUserId())
    .aggregate(
        UserStats::new,  // 初始化
        (userId, order, stats) -> {
            stats.addOrder(order);
            return stats;
        },
        Materialized.as("user-stats-store")
    );

3.4 窗口聚合

// 滚动窗口(Tumbling Window)
KTable<Windowed<String>, Long> hourlySales = orders
    .filter((key, order) -> order.getStatus() == OrderStatus.PAID)
    .groupBy((key, order) -> order.getProductId())
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .count();

// 跳跃窗口(Hopping Window)
KTable<Windowed<String>, Long> recentSales = orders
    .groupBy((key, order) -> order.getProductId())
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
    .count();

// 会话窗口(Session Window)
KTable<Windowed<String>, Long> sessionActivity = userEvents
    .groupBy((key, event) -> event.getUserId())
    .windowedBy(SessionWindows.withInactivityGap(Duration.ofMinutes(30)))
    .count();

四、Join 操作

4.1 KStream-KStream Join

KStream<String, Order> orders = builder.stream("order-topic");
KStream<String, Payment> payments = builder.stream("payment-topic");

// 内连接
KStream<String, OrderPayment> joined = orders.join(
    payments,
    (order, payment) -> new OrderPayment(order, payment),
    JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(10), Duration.ofMinutes(1)),
    StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);

joined.to("order-payment-topic");

4.2 KStream-KTable Join

KStream<String, Order> orders = builder.stream("order-topic");
KTable<String, User> users = builder.table("user-topic");

// Stream-Table Join(实时关联维表)
KStream<String, OrderWithUser> enriched = orders.join(
    users,
    (order, user) -> new OrderWithUser(order, user),
    Joined.with(Serdes.String(), orderSerde, userSerde)
);

enriched.to("enriched-order-topic");

4.3 KTable-KTable Join

KTable<String, OrderCount> orderCounts = builder.table("order-count-topic");
KTable<String, UserLevel> userLevels = builder.table("user-level-topic");

// Table-Table Join
KTable<String, VipUser> vipUsers = orderCounts.join(
    userLevels,
    (count, level) -> new VipUser(count, level),
    Joined.with(Serdes.String(), countSerde, levelSerde)
);

五、状态存储

5.1 状态存储类型

// 内存存储(测试用)
Materialized.<String, Long>as("count-store")
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long())
    .withCachingDisabled();

// RocksDB 存储(生产用)
Materialized.<String, UserStats>as("user-stats-store")
    .withKeySerde(Serdes.String())
    .withValueSerde(userStatsSerde)
    .withLoggingEnabled(changelogConfig);

5.2 交互式查询

// 查询状态存储
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// 获取 Stores
StoresKeyValueIterator<String, UserStats> iterator = 
    streams.store("user-stats-store", QueryableStoreTypes.keyValueStore());

// 查询特定 Key
UserStats stats = iterator.get("user-123");
System.out.println("用户统计:" + stats);

// 范围查询
KeyValueIterator<String, UserStats> rangeIterator = 
    iterator.range("user-001", "user-999");

while (rangeIterator.hasNext()) {
    KeyValue<String, UserStats> entry = rangeIterator.next();
    System.out.println(entry.key + ": " + entry.value);
}

5.3 状态存储 REST API

@RestController
public class StreamsQueryResource {
    
    @Autowired
    private KafkaStreams streams;
    
    @GetMapping("/stats/{userId}")
    public ResponseEntity<UserStats> getUserStats(@PathVariable String userId) {
        KeyValueStore<String, UserStats> store = 
            streams.store("user-stats-store", QueryableStoreTypes.keyValueStore());
        
        UserStats stats = store.get(userId);
        if (stats == null) {
            return ResponseEntity.notFound().build();
        }
        return ResponseEntity.ok(stats);
    }
    
    @GetMapping("/stats")
    public ResponseEntity<List<UserStats>> getAllStats() {
        KeyValueStore<String, UserStats> store = 
            streams.store("user-stats-store", QueryableStoreTypes.keyValueStore());
        
        List<UserStats> statsList = new ArrayList<>();
        try (KeyValueIterator<String, UserStats> iterator = store.all()) {
            while (iterator.hasNext()) {
                statsList.add(iterator.next().value);
            }
        }
        return ResponseEntity.ok(statsList);
    }
}

六、容错与扩展

6.1 容错机制

graph TB
    subgraph Instance 1
        T1[Task 1]
        T2[Task 2]
        S1[State Store 1]
    end
    
    subgraph Instance 2
        T3[Task 3]
        T4[Task 4]
        S2[State Store 2]
    end
    
    subgraph Changelog
        C1[Changelog Topic 1]
        C2[Changelog Topic 2]
    end
    
    S1 -.->|备份 | C1
    S2 -.->|备份 | C2
    
    Instance 1 -.->|故障 | Instance 2
    T2 -->|迁移 | Instance 2

6.2 配置调优

# 基础配置
application.id=order-stats-app
bootstrap.servers=localhost:9092

# 并行度
num.stream.threads=4

# 容错配置
commit.interval.ms=10000
cache.max.bytes.buffering=10485760

# 容错重试
max.task.idle.ms=0
task.timeout.ms=300000

# 精确一次
processing.guarantee=exactly_once_v2

6.3 弹性扩展

// 动态添加实例
// 启动多个相同 application.id 的实例
KafkaStreams streams1 = new KafkaStreams(builder.build(), props);
KafkaStreams streams2 = new KafkaStreams(builder.build(), props);
KafkaStreams streams3 = new KafkaStreams(builder.build(), props);

// 自动负载均衡
// Kafka Streams 会自动分配 Task 到各个实例

七、实战案例

7.1 实时订单统计

public class OrderStatsApplication {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "order-stats-app");
        props.put("bootstrap.servers", "localhost:9092");
        props.put("processing.guarantee", "exactly_once_v2");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 1. 读取订单流
        KStream<String, Order> orders = builder.stream("order-topic",
            Consumed.with(Serdes.String(), orderSerde));
        
        // 2. 过滤已支付订单
        KStream<String, Order> paidOrders = orders
            .filter((key, order) -> order.getStatus() == OrderStatus.PAID);
        
        // 3. 每小时销售额
        KTable<Windowed<String>, Long> hourlySales = paidOrders
            .groupBy((key, order) -> "all")
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
            .aggregate(
                () -> 0L,
                (key, order, total) -> total + order.getAmount(),
                Materialized.with(Serdes.String(), Serdes.Long())
            );
        
        // 4. 每个用户订单数
        KTable<String, Long> userOrderCount = paidOrders
            .groupBy((key, order) -> order.getUserId())
            .count(Materialized.as("user-order-count"));
        
        // 5. 输出结果
        hourlySales.toStream()
            .map((key, value) -> KeyValue.pair(key.key(), value))
            .to("hourly-sales-topic", Produced.with(Serdes.String(), Serdes.Long()));
        
        userOrderCount.toStream()
            .to("user-order-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        
        streams.start();
    }
}

7.2 实时风控系统

public class FraudDetectionApp {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "fraud-detection-app");
        props.put("bootstrap.servers", "localhost:9092");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 1. 读取交易流
        KStream<String, Transaction> transactions = builder.stream("transaction-topic");
        
        // 2. 按用户分组
        KGroupedStream<String, Transaction> userTransactions = transactions
            .groupBy((key, tx) -> tx.getUserId());
        
        // 3. 滑动窗口统计(过去 5 分钟)
        KTable<Windowed<String>, Long> txCountByUser = userTransactions
            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
            .count();
        
        // 4. 检测异常
        KStream<String, FraudAlert> alerts = txCountByUser.toStream()
            .filter((windowedKey, count) -> count > 10)  // 5 分钟内超过 10 笔
            .map((windowedKey, count) -> {
                FraudAlert alert = new FraudAlert();
                alert.setUserId(windowedKey.key());
                alert.setTxCount(count);
                alert.setWindowStart(windowedKey.window().startTime());
                return KeyValue.pair(alert.getUserId(), alert);
            });
        
        // 5. 发送告警
        alerts.to("fraud-alert-topic");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

八、监控与调试

8.1 监控指标

指标说明
process-latency-avg平均处理延迟
records-consumed-rate消费速率
records-produced-rate生产速率
task-created-rateTask 创建速率
state-restoration-rate状态恢复速率

8.2 监控配置

// 添加监控监听器
streams.setStateListener((newState, oldState) -> {
    log.info("状态变化:{} -> {}", oldState, newState);
});

// 添加指标监听器
streams.setGlobalMetricsRecorder(new MetricsRecorder() {
    @Override
    public void recordSummary(Map<String, Metric> metrics) {
        for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
            log.info("指标:{} = {}", entry.getKey(), entry.getValue().metricValue());
        }
    }
});

总结

Kafka Streams 的核心要点:

  1. 核心概念:KStream、KTable、Time、Window
  2. DSL API:转换、聚合、Join 操作
  3. 状态存储:RocksDB、交互式查询
  4. 容错扩展:Changelog、弹性扩展
  5. 实战应用:实时统计、风控系统

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 云原生部署与 Kubernetes 实战
下一篇文章
Kafka 日志存储结构详解