Skip to content
清晨的一缕阳光
返回

Kafka Streams 实战案例精选

Kafka Streams 实战案例汇总。本文将分享 Kafka Streams 在生产环境中的典型应用案例,包括实时统计、风控系统、数据管道等场景。

一、实时统计

1.1 订单统计

业务背景

电商平台:
- 日均订单量:100 万+
- 实时统计需求:每分钟、每小时、每天
- 维度:用户、商品、地区

实现方案

public class OrderStatsApplication {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "order-stats-app");
        props.put("bootstrap.servers", "localhost:9092");
        props.put("processing.guarantee", "exactly_once_v2");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 读取订单流
        KStream<String, Order> orders = builder.stream("order-topic",
            Consumed.with(Serdes.String(), orderSerde));
        
        // 每分钟订单统计
        KTable<Windowed<String>, Long> ordersPerMinute = orders
            .filter((key, order) -> order.getStatus() == OrderStatus.PAID)
            .groupBy((key, order) -> "all")
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .count(Materialized.as("orders-per-minute-store"));
        
        // 每小时商品统计
        KTable<Windowed<String>, Long> itemsPerHour = orders
            .flatMapValues(order -> order.getItems())
            .groupBy((key, item) -> item.getItemId())
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
            .count(Materialized.as("items-per-hour-store"));
        
        // 每天用户统计
        KTable<Windowed<String>, Long> usersPerDay = orders
            .groupBy((key, order) -> order.getUserId())
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
            .count(Materialized.as("users-per-day-store"));
        
        // 输出结果
        ordersPerMinute.toStream()
            .map((key, value) -> KeyValue.pair(key.key(), value))
            .to("orders-per-minute-topic", Produced.with(Serdes.String(), Serdes.Long()));
        
        itemsPerHour.toStream()
            .map((key, value) -> KeyValue.pair(key.key(), value))
            .to("items-per-hour-topic", Produced.with(Serdes.String(), Serdes.Long()));
        
        usersPerDay.toStream()
            .map((key, value) -> KeyValue.pair(key.key(), value))
            .to("users-per-day-topic", Produced.with(Serdes.String(), Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

1.2 UV/PV 统计

实现方案

public class UVStatsApplication {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "uv-stats-app");
        props.put("bootstrap.servers", "localhost:9092");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 读取页面访问流
        KStream<String, PageView> pageViews = builder.stream("pageview-topic",
            Consumed.with(Serdes.String(), pageViewSerde));
        
        // PV 统计(每分钟)
        KTable<Windowed<String>, Long> pvPerMinute = pageViews
            .groupBy((key, view) -> view.getPageId())
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .count(Materialized.as("pv-per-minute-store"));
        
        // UV 统计(使用 HLL 去重)
        KTable<Windowed<String>, HyperLogLog> uvPerMinute = pageViews
            .map((key, view) -> KeyValue.pair(view.getPageId(), view.getUserId()))
            .groupBy((key, userId) -> key)
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .aggregate(
                HyperLogLog::new,
                (pageId, userId, hll) -> {
                    hll.add(userId);
                    return hll;
                },
                Materialized.as("uv-per-minute-store")
            );
        
        // 输出结果
        pvPerMinute.toStream()
            .map((key, value) -> KeyValue.pair(key.key(), value))
            .to("pv-stats-topic", Produced.with(Serdes.String(), Serdes.Long()));
        
        uvPerMinute.toStream()
            .map((key, value) -> KeyValue.pair(key.key(), value.estimate()))
            .to("uv-stats-topic", Produced.with(Serdes.String(), Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

二、风控系统

2.1 反欺诈检测

业务背景

风控需求:
- 实时检测欺诈交易
- 规则:短时间多笔交易、大额交易、异常地点
- 响应时间:< 1 秒

实现方案

public class FraudDetectionApplication {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "fraud-detection-app");
        props.put("bootstrap.servers", "localhost:9092");
        props.put("processing.guarantee", "exactly_once_v2");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 读取交易流
        KStream<String, Transaction> transactions = builder.stream("transaction-topic",
            Consumed.with(Serdes.String(), transactionSerde));
        
        // 规则 1:短时间多笔交易(5 分钟内>5 笔)
        KTable<Windowed<String>, Long> txCountPerUser = transactions
            .groupBy((key, tx) -> tx.getUserId())
            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
            .count();
        
        KStream<String, FraudAlert> frequentTxAlert = txCountPerUser.toStream()
            .filter((windowedKey, count) -> count > 5)
            .map((key, count) -> {
                FraudAlert alert = new FraudAlert();
                alert.setUserId(key.key());
                alert.setType("FREQUENT_TRANSACTION");
                alert.setCount(count);
                return KeyValue.pair(alert.getUserId(), alert);
            });
        
        // 规则 2:大额交易(>10000)
        KStream<String, FraudAlert> largeTxAlert = transactions
            .filter((key, tx) -> tx.getAmount() > 10000)
            .map((key, tx) -> {
                FraudAlert alert = new FraudAlert();
                alert.setUserId(tx.getUserId());
                alert.setType("LARGE_TRANSACTION");
                alert.setAmount(tx.getAmount());
                return KeyValue.pair(alert.getUserId(), alert);
            });
        
        // 规则 3:异常地点
        KStream<String, FraudAlert> locationAlert = transactions
            .join(userLocationTable, (tx, location) -> {
                if (isAbnormalLocation(tx.getLocation(), location.getLastLocation())) {
                    FraudAlert alert = new FraudAlert();
                    alert.setUserId(tx.getUserId());
                    alert.setType("ABNORMAL_LOCATION");
                    alert.setLocation(tx.getLocation());
                    return alert;
                }
                return null;
            })
            .filter((key, alert) -> alert != null)
            .map((key, alert) -> KeyValue.pair(alert.getUserId(), alert));
        
        // 合并告警
        frequentTxAlert.merge(largeTxAlert)
            .merge(locationAlert)
            .to("fraud-alert-topic", Produced.with(Serdes.String(), fraudAlertSerde));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
    
    private static boolean isAbnormalLocation(Location current, Location last) {
        // 地点异常检测逻辑
        return current.distanceTo(last) > 1000;  // 距离>1000km
    }
}

2.2 限流系统

实现方案

public class RateLimitApplication {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "rate-limit-app");
        props.put("bootstrap.servers", "localhost:9092");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 读取请求流
        KStream<String, Request> requests = builder.stream("request-topic",
            Consumed.with(Serdes.String(), requestSerde));
        
        // 每分钟请求数统计
        KTable<Windowed<String>, Long> requestCount = requests
            .groupBy((key, req) -> req.getUserId())
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .count();
        
        // 限流判断
        KStream<String, Request> allowedRequests = requests
            .join(requestCount, (req, count) -> {
                if (count <= 100) {  // 每分钟最多 100 次
                    return req;
                }
                return null;
            })
            .filter((key, req) -> req != null);
        
        KStream<String, Request> rejectedRequests = requests
            .join(requestCount, (req, count) -> {
                if (count > 100) {
                    return req;
                }
                return null;
            })
            .filter((key, req) -> req != null);
        
        // 输出结果
        allowedRequests.to("allowed-request-topic", Produced.with(Serdes.String(), requestSerde));
        rejectedRequests.to("rejected-request-topic", Produced.with(Serdes.String(), requestSerde));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

三、数据管道

3.1 ETL 管道

业务背景

ETL 需求:
- 数据源:MySQL、日志文件
- 目标:数据仓库、Elasticsearch
- 转换:清洗、聚合、 enrich

实现方案

public class ETLPipelineApplication {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "etl-pipeline-app");
        props.put("bootstrap.servers", "localhost:9092");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 读取原始数据
        KStream<String, RawData> rawData = builder.stream("raw-data-topic",
            Consumed.with(Serdes.String(), rawDataSerde));
        
        // 数据清洗
        KStream<String, CleanData> cleanData = rawData
            .filter((key, data) -> data.isValid())  // 过滤无效数据
            .mapValues(data -> {
                CleanData clean = new CleanData();
                clean.setId(data.getId());
                clean.setValue(data.getValue().trim());  // 去除空格
                clean.setTimestamp(data.getTimestamp());
                return clean;
            });
        
        // 数据 enrich
        KTable<String, UserInfo> userInfoTable = builder.table("user-info-topic",
            Materialized.with(Serdes.String(), userInfoSerde));
        
        KStream<String, EnrichedData> enrichedData = cleanData
            .join(userInfoTable, (clean, info) -> {
                EnrichedData enriched = new EnrichedData();
                enriched.setId(clean.getId());
                enriched.setValue(clean.getValue());
                enriched.setUserName(info.getName());
                enriched.setUserLevel(info.getLevel());
                return enriched;
            });
        
        // 数据聚合
        KTable<String, AggregatedData> aggregatedData = enrichedData
            .groupBy((key, data) -> data.getUserLevel())
            .aggregate(
                AggregatedData::new,
                (level, data, agg) -> {
                    agg.add(data);
                    return agg;
                },
                Materialized.with(Serdes.String(), aggregatedDataSerde)
            );
        
        // 输出到不同目标
        enrichedData.to("enriched-data-topic", Produced.with(Serdes.String(), enrichedDataSerde));
        aggregatedData.toStream().to("aggregated-data-topic", Produced.with(Serdes.String(), aggregatedDataSerde));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

3.2 CDC 管道

实现方案

public class CDCPipelineApplication {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "cdc-pipeline-app");
        props.put("bootstrap.servers", "localhost:9092");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 读取 CDC 流(来自 Debezium)
        KStream<String, ChangeEvent> changeEvents = builder.stream("cdc-topic",
            Consumed.with(Serdes.String(), changeEventSerde));
        
        // 处理 INSERT 事件
        KStream<String, Entity> inserts = changeEvents
            .filter((key, event) -> event.getOp() == Op.INSERT)
            .mapValues(event -> event.getAfter());
        
        // 处理 UPDATE 事件
        KStream<String, Entity> updates = changeEvents
            .filter((key, event) -> event.getOp() == Op.UPDATE)
            .mapValues(event -> event.getAfter());
        
        // 处理 DELETE 事件
        KStream<String, String> deletes = changeEvents
            .filter((key, event) -> event.getOp() == Op.DELETE)
            .map((key, event) -> KeyValue.pair(event.getId(), null));
        
        // 输出到不同目标
        inserts.to("entity-insert-topic", Produced.with(Serdes.String(), entitySerde));
        updates.to("entity-update-topic", Produced.with(Serdes.String(), entitySerde));
        deletes.to("entity-delete-topic", Produced.with(Serdes.String(), Serdes.String()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

四、最佳实践

4.1 性能优化

性能优化:
1. 合理设置 State Store
2. 启用缓存
3. 配置适当的提交间隔
4. 使用 RocksDB 持久化
5. 监控处理延迟

4.2 错误处理

// 错误处理
streams.setUncaughtExceptionHandler((thread, throwable) -> {
    log.error("Stream 异常", throwable);
    // 发送告警
    sendAlert("Stream 异常:" + throwable.getMessage());
});

// 状态监听
streams.setStateListener((newState, oldState) -> {
    log.info("状态变化:{} -> {}", oldState, newState);
    if (newState == State.ERROR) {
        // 发送告警
        sendAlert("Stream 错误");
    }
});

4.3 监控指标

监控指标:
1. 处理延迟
2. 吞吐量
3. 状态存储大小
4. 重平衡次数
5. 错误率

总结

Kafka Streams 实战案例的核心要点:

  1. 实时统计:订单统计、UV/PV 统计
  2. 风控系统:反欺诈检测、限流系统
  3. 数据管道:ETL 管道、CDC 管道
  4. 最佳实践:性能优化、错误处理、监控

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 高可用架构设计与实战
下一篇文章
RocketMQ 消息设计最佳实践与案例