Kafka Streams 提供了强大的流处理能力,支持复杂的数据处理和聚合。本文将深入探讨 Kafka Streams 的高级特性和实战应用。
一、状态存储
1.1 状态存储类型
RocksDB 状态存储:
Properties props = new Properties();
props.put("application.id", "state-store-app");
props.put("bootstrap.servers", "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 创建 KStream
KStream<String, Order> orders = builder.stream("order-topic");
// 聚合到状态存储
KTable<String, Long> orderCountByUser = orders
.groupBy((key, order) -> order.getUserId())
.count(Materialized.<String, Long>as("user-order-count-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
.withCachingEnabled() // 启用缓存
.withLoggingEnabled() // 启用 changelog
);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
内存状态存储:
KTable<String, Long> sessionCount = userEvents
.groupBy((key, event) -> event.getUserId())
.windowedBy(SessionWindows.withInactivityGap(Duration.ofMinutes(30)))
.count(Materialized.<String, Long, WindowStore>as("session-count")
.withCachingDisabled() // 禁用缓存
);
1.2 自定义状态存储
public class CustomStateStore {
private final String storeName;
private final RocksDBConfig rocksDBConfig;
public CustomStateStore(String storeName) {
this.storeName = storeName;
this.rocksDBConfig = new RocksDBConfig();
}
public StoreBuilder<KeyValueStore<String, MyValue>> getStoreBuilder() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
Serdes.String(),
new MyValueSerde()
).withCachingEnabled();
}
}
// 使用自定义存储
StreamsBuilder builder = new StreamsBuilder();
CustomStateStore customStore = new CustomStateStore("my-store");
KStream<String, String> stream = builder.stream("input-topic");
stream.process(
new MyProcessorSupplier(),
Named.as("my-processor"),
customStore.getStoreBuilder().build()
);
二、交互式查询
2.1 配置交互式查询
Properties props = new Properties();
props.put("application.id", "interactive-query-app");
props.put("bootstrap.servers", "localhost:9092");
// 配置 REST API 端点
props.put("application.server", "localhost:8080");
StreamsBuilder builder = new StreamsBuilder();
KTable<String, UserStats> userStats = builder
.table("user-stats-topic", Materialized.as("user-stats-store"));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// 启动交互式查询
streams.start();
// 等待存储准备就绪
streams.store("user-stats-store", QueryableStoreTypes.keyValueStore());
2.2 REST API 实现
@RestController
public class InteractiveQueryController {
@Autowired
private KafkaStreams kafkaStreams;
/**
* 查询单个 Key
*/
@GetMapping("/stats/{userId}")
public ResponseEntity<UserStats> getUserStats(@PathVariable String userId) {
try {
KeyValueStore<String, UserStats> store =
kafkaStreams.store("user-stats-store",
QueryableStoreTypes.keyValueStore());
UserStats stats = store.get(userId);
if (stats == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(stats);
} catch (InvalidStateStoreException e) {
return ResponseEntity.status(503).build();
}
}
/**
* 范围查询
*/
@GetMapping("/stats/range")
public ResponseEntity<List<UserStats>> getStatsRange(
@RequestParam String start,
@RequestParam String end) {
KeyValueStore<String, UserStats> store =
kafkaStreams.store("user-stats-store",
QueryableStoreTypes.keyValueStore());
List<UserStats> statsList = new ArrayList<>();
try (KeyValueIterator<String, UserStats> iterator =
store.range(start, end)) {
while (iterator.hasNext()) {
statsList.add(iterator.next().value);
}
}
return ResponseEntity.ok(statsList);
}
/**
* 所有数据
*/
@GetMapping("/stats/all")
public ResponseEntity<List<UserStats>> getAllStats() {
KeyValueStore<String, UserStats> store =
kafkaStreams.store("user-stats-store",
QueryableStoreTypes.keyValueStore());
List<UserStats> statsList = new ArrayList<>();
try (KeyValueIterator<String, UserStats> iterator = store.all()) {
while (iterator.hasNext()) {
statsList.add(iterator.next().value);
}
}
return ResponseEntity.ok(statsList);
}
}
三、自定义处理器
3.1 Processor API
public class MyProcessor implements Processor<String, Order> {
private ProcessorContext context;
private KeyValueStore<String, Order> stateStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.stateStore = context.getStateStore("order-store");
// 定时调度
context.schedule(Duration.ofSeconds(10),
PunctuationType.WALL_CLOCK_TIME, timestamp -> {
// 定时任务
flushState();
});
}
@Override
public void process(String key, Order order) {
// 处理逻辑
if (order.getAmount() > 1000) {
// 转发到大额订单主题
context.forward(key, order, "high-value-topic");
}
// 更新状态
stateStore.put(key, order);
}
@Override
public void close() {
// 清理资源
}
private void flushState() {
// 刷新状态
}
}
// 使用 Processor
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("order-topic");
orders.process(
MyProcessor::new,
Named.as("my-processor"),
Materialized.as("order-store")
);
3.2 Transformer API
public class MyTransformer implements Transformer<String, Order, KeyValue<String, Order>> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, Order> transform(String key, Order order) {
// 转换逻辑
if (order.getAmount() > 1000) {
order.setVip(true);
}
return KeyValue.pair(key, order);
}
@Override
public void punctuate(long timestamp) {}
@Override
public void close() {}
}
// 使用 Transformer
KStream<String, Order> transformed = orders.transform(
MyTransformer::new,
Named.as("my-transformer")
);
四、Exactly-Once 语义
4.1 配置 Exactly-Once
Properties props = new Properties();
props.put("application.id", "exactly-once-app");
props.put("bootstrap.servers", "localhost:9092");
// 启用 Exactly-Once
props.put("processing.guarantee", "exactly_once_v2");
// 事务配置
props.put("transactional.id", "my-tx-id");
props.put("num.standby.replicas", 1); // 备用副本
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("order-topic");
KTable<String, Long> orderCount = orders
.groupBy((key, order) -> order.getUserId())
.count();
orderCount.toStream().to("order-count-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
4.2 Exactly-Once 实践
public class ExactlyOnceProcessor {
public static void main(String[] args) {
Properties props = new Properties();
props.put("application.id", "exactly-once-processor");
props.put("bootstrap.servers", "localhost:9092");
props.put("processing.guarantee", "exactly_once_v2");
StreamsBuilder builder = new StreamsBuilder();
// 输入流
KStream<String, Transaction> transactions =
builder.stream("transactions-topic");
// 处理
KTable<String, Balance> balances = transactions
.groupBy((key, tx) -> tx.getAccountId())
.aggregate(
Balance::new,
(accountId, tx, balance) -> {
balance.add(tx.getAmount());
return balance;
},
Materialized.as("balance-store")
);
// 输出
balances.toStream().to("balances-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
}
}
五、窗口操作
5.1 时间窗口
滚动窗口:
KTable<Windowed<String>, Long> hourlySales = orders
.filter((key, order) -> order.getStatus() == OrderStatus.PAID)
.groupBy((key, order) -> order.getProductId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count();
跳跃窗口:
KTable<Windowed<String>, Long> recentSales = orders
.groupBy((key, order) -> order.getProductId())
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5), // 窗口大小
Duration.ofMinutes(1) // 宽限期
))
.count();
5.2 会话窗口
KTable<Windowed<String>, Long> sessionActivity = userEvents
.groupBy((key, event) -> event.getUserId())
.windowedBy(SessionWindows.withInactivityGap(Duration.ofMinutes(30)))
.count();
// 会话聚合
KTable<Windowed<String>, SessionAgg> sessionAgg = userEvents
.groupBy((key, event) -> event.getUserId())
.windowedBy(SessionWindows.withInactivityGap(Duration.ofMinutes(30)))
.aggregate(
SessionAgg::new,
(userId, event, session) -> {
session.addEvent(event);
return session;
},
(userId, session1, session2) -> {
session1.merge(session2);
return session1;
},
Materialized.as("session-store")
);
5.3 自定义窗口
public class CustomWindow implements Window {
private final long start;
private final long end;
public CustomWindow(long start, long end) {
this.start = start;
this.end = end;
}
@Override
public long start() {
return start;
}
@Override
public long end() {
return end;
}
}
// 使用自定义窗口
Windows<CustomWindow> customWindows = Windows.newInstance(
(timestamp) -> {
long windowStart = (timestamp / 3600000) * 3600000; // 每小时
return new CustomWindow(windowStart, windowStart + 3600000);
}
);
六、连接操作
6.1 KStream-KStream Join
KStream<String, Order> orders = builder.stream("order-topic");
KStream<String, Payment> payments = builder.stream("payment-topic");
// 内连接
KStream<String, OrderPayment> joined = orders.join(
payments,
(order, payment) -> new OrderPayment(order, payment),
JoinWindows.ofTimeDifferenceAndGrace(
Duration.ofMinutes(10), // 时间窗口
Duration.ofMinutes(1) // 宽限期
),
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);
// 左连接
KStream<String, OrderPayment> leftJoined = orders.leftJoin(
payments,
(order, payment) -> new OrderPayment(order, payment),
JoinWindows.ofTimeDifference(Duration.ofMinutes(10)),
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);
// 外连接
KStream<String, OrderPayment> outerJoined = orders.outerJoin(
payments,
(order, payment) -> new OrderPayment(order, payment),
JoinWindows.ofTimeDifference(Duration.ofMinutes(10)),
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);
6.2 KStream-KTable Join
KStream<String, Order> orders = builder.stream("order-topic");
KTable<String, User> users = builder.table("user-topic");
// Stream-Table Join
KStream<String, EnrichedOrder> enriched = orders.join(
users,
(order, user) -> new EnrichedOrder(order, user),
Joined.with(Serdes.String(), orderSerde, userSerde)
);
6.3 KTable-KTable Join
KTable<String, OrderCount> orderCounts = builder.table("order-count-topic");
KTable<String, UserLevel> userLevels = builder.table("user-level-topic");
// Table-Table Join
KTable<String, VipUser> vipUsers = orderCounts.join(
userLevels,
(count, level) -> new VipUser(count, level),
Joined.with(Serdes.String(), countSerde, levelSerde)
);
七、性能优化
7.1 配置优化
Properties props = new Properties();
props.put("application.id", "optimized-app");
props.put("bootstrap.servers", "localhost:9092");
// 线程配置
props.put("num.stream.threads", 4);
// 缓存配置
props.put("cache.max.bytes.buffering", 10485760); // 10MB
props.put("commit.interval.ms", 10000); // 10 秒提交
// 状态存储配置
props.put("state.dir", "/var/kafka-streams/state");
props.put("rocksdb.config.setter", CustomRocksDBConfig.class);
// 容错配置
props.put("num.standby.replicas", 1);
props.put("task.timeout.ms", 300000);
7.2 状态存储优化
public class CustomRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Map<String, String> config) {
// RocksDB 优化配置
config.put("write_buffer_size", "67108864"); // 64MB
config.put("block_cache_size", "268435456"); // 256MB
config.put("compaction_style", "level");
}
@Override
public void close() {}
}
7.3 监控指标
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// 添加指标监听器
streams.setGlobalMetricsRecorder(metrics -> {
for (Map.Entry<String, ? extends Metric> entry : metrics.metrics().entrySet()) {
log.info("指标:{} = {}", entry.getKey(), entry.getValue().metricValue());
}
});
// 添加状态监听器
streams.setStateListener((newState, oldState) -> {
log.info("状态变化:{} -> {}", oldState, newState);
});
八、最佳实践
8.1 应用设计
设计原则:
1. 保持处理逻辑简单
2. 合理使用状态存储
3. 配置适当的窗口大小
4. 实现错误处理和重试
5. 监控关键指标
8.2 配置检查清单
配置检查:
- [ ] application.id 唯一
- [ ] bootstrap.servers 正确
- [ ] processing.guarantee 设置
- [ ] state.dir 配置
- [ ] num.stream.threads 合理
- [ ] 缓存配置适当
8.3 运维建议
运维建议:
1. 定期备份状态存储
2. 监控处理延迟
3. 配置告警规则
4. 定期清理旧状态
5. 实现优雅关闭
总结
Kafka Streams 高级应用的核心要点:
- 状态存储:RocksDB、内存、自定义存储
- 交互式查询:REST API、范围查询
- 自定义处理器:Processor、Transformer API
- Exactly-Once:配置、实践
- 窗口操作:时间窗口、会话窗口、自定义窗口
- 连接操作:Stream-Stream、Stream-Table、Table-Table
- 性能优化:配置、状态存储、监控
核心要点:
- 理解状态存储机制
- 掌握交互式查询
- 合理使用窗口操作
- 配置 Exactly-Once 语义
- 建立完善的监控体系
参考资料
- Kafka Streams 官方文档
- Kafka Streams Developer Guide
- 《Kafka Streams 实战》