Kafka Streams 实战案例汇总。本文将分享 Kafka Streams 在生产环境中的典型应用案例,包括实时统计、风控系统、数据管道等场景。
一、实时统计
1.1 订单统计
业务背景:
电商平台:
- 日均订单量:100 万+
- 实时统计需求:每分钟、每小时、每天
- 维度:用户、商品、地区
实现方案:
public class OrderStatsApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put("application.id", "order-stats-app");
props.put("bootstrap.servers", "localhost:9092");
props.put("processing.guarantee", "exactly_once_v2");
StreamsBuilder builder = new StreamsBuilder();
// 读取订单流
KStream<String, Order> orders = builder.stream("order-topic",
Consumed.with(Serdes.String(), orderSerde));
// 每分钟订单统计
KTable<Windowed<String>, Long> ordersPerMinute = orders
.filter((key, order) -> order.getStatus() == OrderStatus.PAID)
.groupBy((key, order) -> "all")
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count(Materialized.as("orders-per-minute-store"));
// 每小时商品统计
KTable<Windowed<String>, Long> itemsPerHour = orders
.flatMapValues(order -> order.getItems())
.groupBy((key, item) -> item.getItemId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.as("items-per-hour-store"));
// 每天用户统计
KTable<Windowed<String>, Long> usersPerDay = orders
.groupBy((key, order) -> order.getUserId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
.count(Materialized.as("users-per-day-store"));
// 输出结果
ordersPerMinute.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.to("orders-per-minute-topic", Produced.with(Serdes.String(), Serdes.Long()));
itemsPerHour.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.to("items-per-hour-topic", Produced.with(Serdes.String(), Serdes.Long()));
usersPerDay.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.to("users-per-day-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
1.2 UV/PV 统计
实现方案:
public class UVStatsApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put("application.id", "uv-stats-app");
props.put("bootstrap.servers", "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 读取页面访问流
KStream<String, PageView> pageViews = builder.stream("pageview-topic",
Consumed.with(Serdes.String(), pageViewSerde));
// PV 统计(每分钟)
KTable<Windowed<String>, Long> pvPerMinute = pageViews
.groupBy((key, view) -> view.getPageId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count(Materialized.as("pv-per-minute-store"));
// UV 统计(使用 HLL 去重)
KTable<Windowed<String>, HyperLogLog> uvPerMinute = pageViews
.map((key, view) -> KeyValue.pair(view.getPageId(), view.getUserId()))
.groupBy((key, userId) -> key)
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.aggregate(
HyperLogLog::new,
(pageId, userId, hll) -> {
hll.add(userId);
return hll;
},
Materialized.as("uv-per-minute-store")
);
// 输出结果
pvPerMinute.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.to("pv-stats-topic", Produced.with(Serdes.String(), Serdes.Long()));
uvPerMinute.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value.estimate()))
.to("uv-stats-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
二、风控系统
2.1 反欺诈检测
业务背景:
风控需求:
- 实时检测欺诈交易
- 规则:短时间多笔交易、大额交易、异常地点
- 响应时间:< 1 秒
实现方案:
public class FraudDetectionApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put("application.id", "fraud-detection-app");
props.put("bootstrap.servers", "localhost:9092");
props.put("processing.guarantee", "exactly_once_v2");
StreamsBuilder builder = new StreamsBuilder();
// 读取交易流
KStream<String, Transaction> transactions = builder.stream("transaction-topic",
Consumed.with(Serdes.String(), transactionSerde));
// 规则 1:短时间多笔交易(5 分钟内>5 笔)
KTable<Windowed<String>, Long> txCountPerUser = transactions
.groupBy((key, tx) -> tx.getUserId())
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
.count();
KStream<String, FraudAlert> frequentTxAlert = txCountPerUser.toStream()
.filter((windowedKey, count) -> count > 5)
.map((key, count) -> {
FraudAlert alert = new FraudAlert();
alert.setUserId(key.key());
alert.setType("FREQUENT_TRANSACTION");
alert.setCount(count);
return KeyValue.pair(alert.getUserId(), alert);
});
// 规则 2:大额交易(>10000)
KStream<String, FraudAlert> largeTxAlert = transactions
.filter((key, tx) -> tx.getAmount() > 10000)
.map((key, tx) -> {
FraudAlert alert = new FraudAlert();
alert.setUserId(tx.getUserId());
alert.setType("LARGE_TRANSACTION");
alert.setAmount(tx.getAmount());
return KeyValue.pair(alert.getUserId(), alert);
});
// 规则 3:异常地点
KStream<String, FraudAlert> locationAlert = transactions
.join(userLocationTable, (tx, location) -> {
if (isAbnormalLocation(tx.getLocation(), location.getLastLocation())) {
FraudAlert alert = new FraudAlert();
alert.setUserId(tx.getUserId());
alert.setType("ABNORMAL_LOCATION");
alert.setLocation(tx.getLocation());
return alert;
}
return null;
})
.filter((key, alert) -> alert != null)
.map((key, alert) -> KeyValue.pair(alert.getUserId(), alert));
// 合并告警
frequentTxAlert.merge(largeTxAlert)
.merge(locationAlert)
.to("fraud-alert-topic", Produced.with(Serdes.String(), fraudAlertSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
private static boolean isAbnormalLocation(Location current, Location last) {
// 地点异常检测逻辑
return current.distanceTo(last) > 1000; // 距离>1000km
}
}
2.2 限流系统
实现方案:
public class RateLimitApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put("application.id", "rate-limit-app");
props.put("bootstrap.servers", "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 读取请求流
KStream<String, Request> requests = builder.stream("request-topic",
Consumed.with(Serdes.String(), requestSerde));
// 每分钟请求数统计
KTable<Windowed<String>, Long> requestCount = requests
.groupBy((key, req) -> req.getUserId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count();
// 限流判断
KStream<String, Request> allowedRequests = requests
.join(requestCount, (req, count) -> {
if (count <= 100) { // 每分钟最多 100 次
return req;
}
return null;
})
.filter((key, req) -> req != null);
KStream<String, Request> rejectedRequests = requests
.join(requestCount, (req, count) -> {
if (count > 100) {
return req;
}
return null;
})
.filter((key, req) -> req != null);
// 输出结果
allowedRequests.to("allowed-request-topic", Produced.with(Serdes.String(), requestSerde));
rejectedRequests.to("rejected-request-topic", Produced.with(Serdes.String(), requestSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
三、数据管道
3.1 ETL 管道
业务背景:
ETL 需求:
- 数据源:MySQL、日志文件
- 目标:数据仓库、Elasticsearch
- 转换:清洗、聚合、 enrich
实现方案:
public class ETLPipelineApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put("application.id", "etl-pipeline-app");
props.put("bootstrap.servers", "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 读取原始数据
KStream<String, RawData> rawData = builder.stream("raw-data-topic",
Consumed.with(Serdes.String(), rawDataSerde));
// 数据清洗
KStream<String, CleanData> cleanData = rawData
.filter((key, data) -> data.isValid()) // 过滤无效数据
.mapValues(data -> {
CleanData clean = new CleanData();
clean.setId(data.getId());
clean.setValue(data.getValue().trim()); // 去除空格
clean.setTimestamp(data.getTimestamp());
return clean;
});
// 数据 enrich
KTable<String, UserInfo> userInfoTable = builder.table("user-info-topic",
Materialized.with(Serdes.String(), userInfoSerde));
KStream<String, EnrichedData> enrichedData = cleanData
.join(userInfoTable, (clean, info) -> {
EnrichedData enriched = new EnrichedData();
enriched.setId(clean.getId());
enriched.setValue(clean.getValue());
enriched.setUserName(info.getName());
enriched.setUserLevel(info.getLevel());
return enriched;
});
// 数据聚合
KTable<String, AggregatedData> aggregatedData = enrichedData
.groupBy((key, data) -> data.getUserLevel())
.aggregate(
AggregatedData::new,
(level, data, agg) -> {
agg.add(data);
return agg;
},
Materialized.with(Serdes.String(), aggregatedDataSerde)
);
// 输出到不同目标
enrichedData.to("enriched-data-topic", Produced.with(Serdes.String(), enrichedDataSerde));
aggregatedData.toStream().to("aggregated-data-topic", Produced.with(Serdes.String(), aggregatedDataSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
3.2 CDC 管道
实现方案:
public class CDCPipelineApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put("application.id", "cdc-pipeline-app");
props.put("bootstrap.servers", "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 读取 CDC 流(来自 Debezium)
KStream<String, ChangeEvent> changeEvents = builder.stream("cdc-topic",
Consumed.with(Serdes.String(), changeEventSerde));
// 处理 INSERT 事件
KStream<String, Entity> inserts = changeEvents
.filter((key, event) -> event.getOp() == Op.INSERT)
.mapValues(event -> event.getAfter());
// 处理 UPDATE 事件
KStream<String, Entity> updates = changeEvents
.filter((key, event) -> event.getOp() == Op.UPDATE)
.mapValues(event -> event.getAfter());
// 处理 DELETE 事件
KStream<String, String> deletes = changeEvents
.filter((key, event) -> event.getOp() == Op.DELETE)
.map((key, event) -> KeyValue.pair(event.getId(), null));
// 输出到不同目标
inserts.to("entity-insert-topic", Produced.with(Serdes.String(), entitySerde));
updates.to("entity-update-topic", Produced.with(Serdes.String(), entitySerde));
deletes.to("entity-delete-topic", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
四、最佳实践
4.1 性能优化
性能优化:
1. 合理设置 State Store
2. 启用缓存
3. 配置适当的提交间隔
4. 使用 RocksDB 持久化
5. 监控处理延迟
4.2 错误处理
// 错误处理
streams.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("Stream 异常", throwable);
// 发送告警
sendAlert("Stream 异常:" + throwable.getMessage());
});
// 状态监听
streams.setStateListener((newState, oldState) -> {
log.info("状态变化:{} -> {}", oldState, newState);
if (newState == State.ERROR) {
// 发送告警
sendAlert("Stream 错误");
}
});
4.3 监控指标
监控指标:
1. 处理延迟
2. 吞吐量
3. 状态存储大小
4. 重平衡次数
5. 错误率
总结
Kafka Streams 实战案例的核心要点:
- 实时统计:订单统计、UV/PV 统计
- 风控系统:反欺诈检测、限流系统
- 数据管道:ETL 管道、CDC 管道
- 最佳实践:性能优化、错误处理、监控
核心要点:
- 根据场景选择合适的窗口
- 实现错误处理和监控
- 优化 State Store 配置
- 建立完善的监控体系
参考资料
- Kafka Streams 官方文档
- Kafka Streams Examples
- 《Kafka Streams 实战》