Kafka Streams 是 Kafka 提供的流处理库,用于构建实时数据处理应用。本文将深入探讨 Kafka Streams 的核心概念、API 使用和实战应用。
一、Kafka Streams 基础
1.1 什么是 Kafka Streams?
Kafka Streams 是一个客户端库,用于构建流处理应用:
graph LR
subgraph 输入
K1[Kafka Topic 1]
K2[Kafka Topic 2]
end
subgraph Kafka Streams
A[Stream Processing]
end
subgraph 输出
K3[Kafka Topic 3]
DB[(数据库)]
end
K1 --> A
K2 --> A
A --> K3
A --> DB
1.2 核心特性
| 特性 | 说明 |
|---|
| 嵌入式库 | 无需独立集群,直接嵌入应用 |
| Exactly-Once | 支持精确一次语义 |
| 状态存储 | 内置 RocksDB 状态存储 |
| 容错能力 | 自动故障转移和恢复 |
| 弹性扩展 | 支持水平扩展 |
1.3 与其他流处理框架对比
| 框架 | 部署方式 | 延迟 | 吞吐量 | 学习曲线 |
|---|
| Kafka Streams | 嵌入式 | 毫秒级 | 中 | 低 |
| Flink | 独立集群 | 毫秒级 | 高 | 中 |
| Spark Streaming | 独立集群 | 秒级 | 高 | 中 |
| Storm | 独立集群 | 毫秒级 | 中 | 高 |
二、核心概念
2.1 KStream vs KTable
graph TB
subgraph KStream
S1[记录 1<br/>t=1, k=A, v=1]
S2[记录 2<br/>t=2, k=B, v=2]
S3[记录 3<br/>t=3, k=A, v=3]
end
subgraph KTable
T1[A=1]
T2[B=2]
T3[A=3]
end
S1 --> T1
S2 --> T2
S3 --> T3
note over KTable: 最新值:A=3, B=2
| 概念 | 说明 | 适用场景 |
|---|
| KStream | 记录流,每条记录都是事件 | 事件处理、日志分析 |
| KTable | 变更日志表,存储最新状态 | 状态存储、聚合结果 |
| GlobalKTable | 全局表,每个实例都有完整副本 | 维表关联、广播变量 |
2.2 Time 概念
graph LR
subgraph EventTime
E1[事件产生时间]
end
subgraph ProcessingTime
P1[事件处理时间]
end
subgraph IngestionTime
I1[事件到达 Kafka 时间]
end
E1 --> I1 --> P1
| 时间类型 | 说明 | 适用场景 |
|---|
| Event Time | 事件发生时间 | 准确的时间窗口 |
| Processing Time | 事件处理时间 | 实时监控 |
| Ingestion Time | 事件到达 Kafka 时间 | 折中方案 |
2.3 Window 窗口
| 窗口类型 | 说明 | 示例 |
|---|
| Tumbling Window | 固定大小、不重叠 | 每 5 分钟统计 |
| Hopping Window | 固定大小、可重叠 | 每 1 分钟统计过去 5 分钟 |
| Sliding Window | 可变大小、基于事件 | 过去 5 分钟内的事件 |
| Session Window | 基于会话活动 | 用户会话分析 |
三、DSL API
3.1 基础示例
Properties props = new Properties();
props.put("application.id", "order-stats-app");
props.put("bootstrap.servers", "localhost:9092");
props.put("default.key.serde", Serdes.String().getClass());
props.put("default.value.serde", Serdes.Long().getClass());
StreamsBuilder builder = new StreamsBuilder();
// 1. 读取输入流
KStream<String, Order> orders = builder.stream("order-topic",
Consumed.with(Serdes.String(), orderSerde));
// 2. 过滤
KStream<String, Order> paidOrders = orders
.filter((key, order) -> order.getStatus() == OrderStatus.PAID);
// 3. 按用户分组
KGroupedStream<String, Order> userOrders = paidOrders
.groupBy((key, order) -> order.getUserId());
// 4. 聚合
KTable<String, Long> userOrderCount = userOrders
.count(Materialized.as("user-order-count-store"));
// 5. 输出结果
userOrderCount.toStream()
.to("user-order-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
3.2 转换操作
// Map - 转换键值
KStream<String, OrderAmount> amounts = orders
.map((key, order) -> KeyValue.pair(order.getUserId(), order.getAmount()));
// FlatMap - 一对多转换
KStream<String, String> words = lines
.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
// Peek - 副作用操作
orders.peek((key, order) -> log.info("收到订单:{}", order.getId()))
.filter(...)
.map(...)
.to(...);
// Branch - 分流
KStream<String, Order>[] branches = orders
.branch(
(key, order) -> order.getAmount() > 1000, // 大额订单
(key, order) -> order.getAmount() <= 1000 // 普通订单
);
branches[0].to("vip-order-topic");
branches[1].to("normal-order-topic");
3.3 聚合操作
// Count - 计数
KTable<String, Long> countByUser = orders
.groupBy((key, order) -> order.getUserId())
.count();
// Reduce - 归并
KTable<String, OrderSummary> summaryByUser = orders
.groupBy((key, order) -> order.getUserId())
.reduce(
(summary1, summary2) -> {
summary1.setTotalAmount(summary1.getTotalAmount() + summary2.getTotalAmount());
summary1.setOrderCount(summary1.getOrderCount() + 1);
return summary1;
},
Materialized.as("user-summary-store")
);
// Aggregate - 自定义聚合
KTable<String, UserStats> statsByUser = orders
.groupBy((key, order) -> order.getUserId())
.aggregate(
UserStats::new, // 初始化
(userId, order, stats) -> {
stats.addOrder(order);
return stats;
},
Materialized.as("user-stats-store")
);
3.4 窗口聚合
// 滚动窗口(Tumbling Window)
KTable<Windowed<String>, Long> hourlySales = orders
.filter((key, order) -> order.getStatus() == OrderStatus.PAID)
.groupBy((key, order) -> order.getProductId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count();
// 跳跃窗口(Hopping Window)
KTable<Windowed<String>, Long> recentSales = orders
.groupBy((key, order) -> order.getProductId())
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
.count();
// 会话窗口(Session Window)
KTable<Windowed<String>, Long> sessionActivity = userEvents
.groupBy((key, event) -> event.getUserId())
.windowedBy(SessionWindows.withInactivityGap(Duration.ofMinutes(30)))
.count();
四、Join 操作
4.1 KStream-KStream Join
KStream<String, Order> orders = builder.stream("order-topic");
KStream<String, Payment> payments = builder.stream("payment-topic");
// 内连接
KStream<String, OrderPayment> joined = orders.join(
payments,
(order, payment) -> new OrderPayment(order, payment),
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(10), Duration.ofMinutes(1)),
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);
joined.to("order-payment-topic");
4.2 KStream-KTable Join
KStream<String, Order> orders = builder.stream("order-topic");
KTable<String, User> users = builder.table("user-topic");
// Stream-Table Join(实时关联维表)
KStream<String, OrderWithUser> enriched = orders.join(
users,
(order, user) -> new OrderWithUser(order, user),
Joined.with(Serdes.String(), orderSerde, userSerde)
);
enriched.to("enriched-order-topic");
4.3 KTable-KTable Join
KTable<String, OrderCount> orderCounts = builder.table("order-count-topic");
KTable<String, UserLevel> userLevels = builder.table("user-level-topic");
// Table-Table Join
KTable<String, VipUser> vipUsers = orderCounts.join(
userLevels,
(count, level) -> new VipUser(count, level),
Joined.with(Serdes.String(), countSerde, levelSerde)
);
五、状态存储
5.1 状态存储类型
// 内存存储(测试用)
Materialized.<String, Long>as("count-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
.withCachingDisabled();
// RocksDB 存储(生产用)
Materialized.<String, UserStats>as("user-stats-store")
.withKeySerde(Serdes.String())
.withValueSerde(userStatsSerde)
.withLoggingEnabled(changelogConfig);
5.2 交互式查询
// 查询状态存储
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// 获取 Stores
StoresKeyValueIterator<String, UserStats> iterator =
streams.store("user-stats-store", QueryableStoreTypes.keyValueStore());
// 查询特定 Key
UserStats stats = iterator.get("user-123");
System.out.println("用户统计:" + stats);
// 范围查询
KeyValueIterator<String, UserStats> rangeIterator =
iterator.range("user-001", "user-999");
while (rangeIterator.hasNext()) {
KeyValue<String, UserStats> entry = rangeIterator.next();
System.out.println(entry.key + ": " + entry.value);
}
5.3 状态存储 REST API
@RestController
public class StreamsQueryResource {
@Autowired
private KafkaStreams streams;
@GetMapping("/stats/{userId}")
public ResponseEntity<UserStats> getUserStats(@PathVariable String userId) {
KeyValueStore<String, UserStats> store =
streams.store("user-stats-store", QueryableStoreTypes.keyValueStore());
UserStats stats = store.get(userId);
if (stats == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(stats);
}
@GetMapping("/stats")
public ResponseEntity<List<UserStats>> getAllStats() {
KeyValueStore<String, UserStats> store =
streams.store("user-stats-store", QueryableStoreTypes.keyValueStore());
List<UserStats> statsList = new ArrayList<>();
try (KeyValueIterator<String, UserStats> iterator = store.all()) {
while (iterator.hasNext()) {
statsList.add(iterator.next().value);
}
}
return ResponseEntity.ok(statsList);
}
}
六、容错与扩展
6.1 容错机制
graph TB
subgraph Instance 1
T1[Task 1]
T2[Task 2]
S1[State Store 1]
end
subgraph Instance 2
T3[Task 3]
T4[Task 4]
S2[State Store 2]
end
subgraph Changelog
C1[Changelog Topic 1]
C2[Changelog Topic 2]
end
S1 -.->|备份 | C1
S2 -.->|备份 | C2
Instance 1 -.->|故障 | Instance 2
T2 -->|迁移 | Instance 2
6.2 配置调优
# 基础配置
application.id=order-stats-app
bootstrap.servers=localhost:9092
# 并行度
num.stream.threads=4
# 容错配置
commit.interval.ms=10000
cache.max.bytes.buffering=10485760
# 容错重试
max.task.idle.ms=0
task.timeout.ms=300000
# 精确一次
processing.guarantee=exactly_once_v2
6.3 弹性扩展
// 动态添加实例
// 启动多个相同 application.id 的实例
KafkaStreams streams1 = new KafkaStreams(builder.build(), props);
KafkaStreams streams2 = new KafkaStreams(builder.build(), props);
KafkaStreams streams3 = new KafkaStreams(builder.build(), props);
// 自动负载均衡
// Kafka Streams 会自动分配 Task 到各个实例
七、实战案例
7.1 实时订单统计
public class OrderStatsApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put("application.id", "order-stats-app");
props.put("bootstrap.servers", "localhost:9092");
props.put("processing.guarantee", "exactly_once_v2");
StreamsBuilder builder = new StreamsBuilder();
// 1. 读取订单流
KStream<String, Order> orders = builder.stream("order-topic",
Consumed.with(Serdes.String(), orderSerde));
// 2. 过滤已支付订单
KStream<String, Order> paidOrders = orders
.filter((key, order) -> order.getStatus() == OrderStatus.PAID);
// 3. 每小时销售额
KTable<Windowed<String>, Long> hourlySales = paidOrders
.groupBy((key, order) -> "all")
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.aggregate(
() -> 0L,
(key, order, total) -> total + order.getAmount(),
Materialized.with(Serdes.String(), Serdes.Long())
);
// 4. 每个用户订单数
KTable<String, Long> userOrderCount = paidOrders
.groupBy((key, order) -> order.getUserId())
.count(Materialized.as("user-order-count"));
// 5. 输出结果
hourlySales.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.to("hourly-sales-topic", Produced.with(Serdes.String(), Serdes.Long()));
userOrderCount.toStream()
.to("user-order-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
}
}
7.2 实时风控系统
public class FraudDetectionApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put("application.id", "fraud-detection-app");
props.put("bootstrap.servers", "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 1. 读取交易流
KStream<String, Transaction> transactions = builder.stream("transaction-topic");
// 2. 按用户分组
KGroupedStream<String, Transaction> userTransactions = transactions
.groupBy((key, tx) -> tx.getUserId());
// 3. 滑动窗口统计(过去 5 分钟)
KTable<Windowed<String>, Long> txCountByUser = userTransactions
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
.count();
// 4. 检测异常
KStream<String, FraudAlert> alerts = txCountByUser.toStream()
.filter((windowedKey, count) -> count > 10) // 5 分钟内超过 10 笔
.map((windowedKey, count) -> {
FraudAlert alert = new FraudAlert();
alert.setUserId(windowedKey.key());
alert.setTxCount(count);
alert.setWindowStart(windowedKey.window().startTime());
return KeyValue.pair(alert.getUserId(), alert);
});
// 5. 发送告警
alerts.to("fraud-alert-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
八、监控与调试
8.1 监控指标
| 指标 | 说明 |
|---|
process-latency-avg | 平均处理延迟 |
records-consumed-rate | 消费速率 |
records-produced-rate | 生产速率 |
task-created-rate | Task 创建速率 |
state-restoration-rate | 状态恢复速率 |
8.2 监控配置
// 添加监控监听器
streams.setStateListener((newState, oldState) -> {
log.info("状态变化:{} -> {}", oldState, newState);
});
// 添加指标监听器
streams.setGlobalMetricsRecorder(new MetricsRecorder() {
@Override
public void recordSummary(Map<String, Metric> metrics) {
for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
log.info("指标:{} = {}", entry.getKey(), entry.getValue().metricValue());
}
}
});
总结
Kafka Streams 的核心要点:
- 核心概念:KStream、KTable、Time、Window
- DSL API:转换、聚合、Join 操作
- 状态存储:RocksDB、交互式查询
- 容错扩展:Changelog、弹性扩展
- 实战应用:实时统计、风控系统
核心要点:
- 理解 KStream 和 KTable 的区别
- 合理选择窗口类型和大小
- 使用状态存储实现复杂计算
- 配置精确一次保证数据准确
参考资料