Skip to content
清晨的一缕阳光
返回

Kafka Streams 高级应用实战

Kafka Streams 提供了强大的流处理能力,支持复杂的数据处理和聚合。本文将深入探讨 Kafka Streams 的高级特性和实战应用。

一、状态存储

1.1 状态存储类型

RocksDB 状态存储

Properties props = new Properties();
props.put("application.id", "state-store-app");
props.put("bootstrap.servers", "localhost:9092");

StreamsBuilder builder = new StreamsBuilder();

// 创建 KStream
KStream<String, Order> orders = builder.stream("order-topic");

// 聚合到状态存储
KTable<String, Long> orderCountByUser = orders
    .groupBy((key, order) -> order.getUserId())
    .count(Materialized.<String, Long>as("user-order-count-store")
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.Long())
        .withCachingEnabled()  // 启用缓存
        .withLoggingEnabled()   // 启用 changelog
    );

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

内存状态存储

KTable<String, Long> sessionCount = userEvents
    .groupBy((key, event) -> event.getUserId())
    .windowedBy(SessionWindows.withInactivityGap(Duration.ofMinutes(30)))
    .count(Materialized.<String, Long, WindowStore>as("session-count")
        .withCachingDisabled()  // 禁用缓存
    );

1.2 自定义状态存储

public class CustomStateStore {
    
    private final String storeName;
    private final RocksDBConfig rocksDBConfig;
    
    public CustomStateStore(String storeName) {
        this.storeName = storeName;
        this.rocksDBConfig = new RocksDBConfig();
    }
    
    public StoreBuilder<KeyValueStore<String, MyValue>> getStoreBuilder() {
        return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(storeName),
            Serdes.String(),
            new MyValueSerde()
        ).withCachingEnabled();
    }
}

// 使用自定义存储
StreamsBuilder builder = new StreamsBuilder();
CustomStateStore customStore = new CustomStateStore("my-store");

KStream<String, String> stream = builder.stream("input-topic");
stream.process(
    new MyProcessorSupplier(),
    Named.as("my-processor"),
    customStore.getStoreBuilder().build()
);

二、交互式查询

2.1 配置交互式查询

Properties props = new Properties();
props.put("application.id", "interactive-query-app");
props.put("bootstrap.servers", "localhost:9092");

// 配置 REST API 端点
props.put("application.server", "localhost:8080");

StreamsBuilder builder = new StreamsBuilder();

KTable<String, UserStats> userStats = builder
    .table("user-stats-topic", Materialized.as("user-stats-store"));

KafkaStreams streams = new KafkaStreams(builder.build(), props);

// 启动交互式查询
streams.start();

// 等待存储准备就绪
streams.store("user-stats-store", QueryableStoreTypes.keyValueStore());

2.2 REST API 实现

@RestController
public class InteractiveQueryController {
    
    @Autowired
    private KafkaStreams kafkaStreams;
    
    /**
     * 查询单个 Key
     */
    @GetMapping("/stats/{userId}")
    public ResponseEntity<UserStats> getUserStats(@PathVariable String userId) {
        try {
            KeyValueStore<String, UserStats> store = 
                kafkaStreams.store("user-stats-store", 
                    QueryableStoreTypes.keyValueStore());
            
            UserStats stats = store.get(userId);
            if (stats == null) {
                return ResponseEntity.notFound().build();
            }
            return ResponseEntity.ok(stats);
            
        } catch (InvalidStateStoreException e) {
            return ResponseEntity.status(503).build();
        }
    }
    
    /**
     * 范围查询
     */
    @GetMapping("/stats/range")
    public ResponseEntity<List<UserStats>> getStatsRange(
            @RequestParam String start,
            @RequestParam String end) {
        
        KeyValueStore<String, UserStats> store = 
            kafkaStreams.store("user-stats-store", 
                QueryableStoreTypes.keyValueStore());
        
        List<UserStats> statsList = new ArrayList<>();
        try (KeyValueIterator<String, UserStats> iterator = 
                store.range(start, end)) {
            
            while (iterator.hasNext()) {
                statsList.add(iterator.next().value);
            }
        }
        
        return ResponseEntity.ok(statsList);
    }
    
    /**
     * 所有数据
     */
    @GetMapping("/stats/all")
    public ResponseEntity<List<UserStats>> getAllStats() {
        KeyValueStore<String, UserStats> store = 
            kafkaStreams.store("user-stats-store", 
                QueryableStoreTypes.keyValueStore());
        
        List<UserStats> statsList = new ArrayList<>();
        try (KeyValueIterator<String, UserStats> iterator = store.all()) {
            while (iterator.hasNext()) {
                statsList.add(iterator.next().value);
            }
        }
        
        return ResponseEntity.ok(statsList);
    }
}

三、自定义处理器

3.1 Processor API

public class MyProcessor implements Processor<String, Order> {
    
    private ProcessorContext context;
    private KeyValueStore<String, Order> stateStore;
    
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.stateStore = context.getStateStore("order-store");
        
        // 定时调度
        context.schedule(Duration.ofSeconds(10), 
            PunctuationType.WALL_CLOCK_TIME, timestamp -> {
                // 定时任务
                flushState();
            });
    }
    
    @Override
    public void process(String key, Order order) {
        // 处理逻辑
        if (order.getAmount() > 1000) {
            // 转发到大额订单主题
            context.forward(key, order, "high-value-topic");
        }
        
        // 更新状态
        stateStore.put(key, order);
    }
    
    @Override
    public void close() {
        // 清理资源
    }
    
    private void flushState() {
        // 刷新状态
    }
}

// 使用 Processor
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("order-topic");

orders.process(
    MyProcessor::new,
    Named.as("my-processor"),
    Materialized.as("order-store")
);

3.2 Transformer API

public class MyTransformer implements Transformer<String, Order, KeyValue<String, Order>> {
    
    private ProcessorContext context;
    
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }
    
    @Override
    public KeyValue<String, Order> transform(String key, Order order) {
        // 转换逻辑
        if (order.getAmount() > 1000) {
            order.setVip(true);
        }
        
        return KeyValue.pair(key, order);
    }
    
    @Override
    public void punctuate(long timestamp) {}
    
    @Override
    public void close() {}
}

// 使用 Transformer
KStream<String, Order> transformed = orders.transform(
    MyTransformer::new,
    Named.as("my-transformer")
);

四、Exactly-Once 语义

4.1 配置 Exactly-Once

Properties props = new Properties();
props.put("application.id", "exactly-once-app");
props.put("bootstrap.servers", "localhost:9092");

// 启用 Exactly-Once
props.put("processing.guarantee", "exactly_once_v2");

// 事务配置
props.put("transactional.id", "my-tx-id");
props.put("num.standby.replicas", 1);  // 备用副本

StreamsBuilder builder = new StreamsBuilder();

KStream<String, Order> orders = builder.stream("order-topic");

KTable<String, Long> orderCount = orders
    .groupBy((key, order) -> order.getUserId())
    .count();

orderCount.toStream().to("order-count-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

4.2 Exactly-Once 实践

public class ExactlyOnceProcessor {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "exactly-once-processor");
        props.put("bootstrap.servers", "localhost:9092");
        props.put("processing.guarantee", "exactly_once_v2");
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // 输入流
        KStream<String, Transaction> transactions = 
            builder.stream("transactions-topic");
        
        // 处理
        KTable<String, Balance> balances = transactions
            .groupBy((key, tx) -> tx.getAccountId())
            .aggregate(
                Balance::new,
                (accountId, tx, balance) -> {
                    balance.add(tx.getAmount());
                    return balance;
                },
                Materialized.as("balance-store")
            );
        
        // 输出
        balances.toStream().to("balances-topic");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        
        streams.start();
    }
}

五、窗口操作

5.1 时间窗口

滚动窗口

KTable<Windowed<String>, Long> hourlySales = orders
    .filter((key, order) -> order.getStatus() == OrderStatus.PAID)
    .groupBy((key, order) -> order.getProductId())
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .count();

跳跃窗口

KTable<Windowed<String>, Long> recentSales = orders
    .groupBy((key, order) -> order.getProductId())
    .windowedBy(TimeWindows.ofSizeAndGrace(
        Duration.ofMinutes(5),    // 窗口大小
        Duration.ofMinutes(1)     // 宽限期
    ))
    .count();

5.2 会话窗口

KTable<Windowed<String>, Long> sessionActivity = userEvents
    .groupBy((key, event) -> event.getUserId())
    .windowedBy(SessionWindows.withInactivityGap(Duration.ofMinutes(30)))
    .count();

// 会话聚合
KTable<Windowed<String>, SessionAgg> sessionAgg = userEvents
    .groupBy((key, event) -> event.getUserId())
    .windowedBy(SessionWindows.withInactivityGap(Duration.ofMinutes(30)))
    .aggregate(
        SessionAgg::new,
        (userId, event, session) -> {
            session.addEvent(event);
            return session;
        },
        (userId, session1, session2) -> {
            session1.merge(session2);
            return session1;
        },
        Materialized.as("session-store")
    );

5.3 自定义窗口

public class CustomWindow implements Window {
    
    private final long start;
    private final long end;
    
    public CustomWindow(long start, long end) {
        this.start = start;
        this.end = end;
    }
    
    @Override
    public long start() {
        return start;
    }
    
    @Override
    public long end() {
        return end;
    }
}

// 使用自定义窗口
Windows<CustomWindow> customWindows = Windows.newInstance(
    (timestamp) -> {
        long windowStart = (timestamp / 3600000) * 3600000;  // 每小时
        return new CustomWindow(windowStart, windowStart + 3600000);
    }
);

六、连接操作

6.1 KStream-KStream Join

KStream<String, Order> orders = builder.stream("order-topic");
KStream<String, Payment> payments = builder.stream("payment-topic");

// 内连接
KStream<String, OrderPayment> joined = orders.join(
    payments,
    (order, payment) -> new OrderPayment(order, payment),
    JoinWindows.ofTimeDifferenceAndGrace(
        Duration.ofMinutes(10),   // 时间窗口
        Duration.ofMinutes(1)     // 宽限期
    ),
    StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);

// 左连接
KStream<String, OrderPayment> leftJoined = orders.leftJoin(
    payments,
    (order, payment) -> new OrderPayment(order, payment),
    JoinWindows.ofTimeDifference(Duration.ofMinutes(10)),
    StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);

// 外连接
KStream<String, OrderPayment> outerJoined = orders.outerJoin(
    payments,
    (order, payment) -> new OrderPayment(order, payment),
    JoinWindows.ofTimeDifference(Duration.ofMinutes(10)),
    StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);

6.2 KStream-KTable Join

KStream<String, Order> orders = builder.stream("order-topic");
KTable<String, User> users = builder.table("user-topic");

// Stream-Table Join
KStream<String, EnrichedOrder> enriched = orders.join(
    users,
    (order, user) -> new EnrichedOrder(order, user),
    Joined.with(Serdes.String(), orderSerde, userSerde)
);

6.3 KTable-KTable Join

KTable<String, OrderCount> orderCounts = builder.table("order-count-topic");
KTable<String, UserLevel> userLevels = builder.table("user-level-topic");

// Table-Table Join
KTable<String, VipUser> vipUsers = orderCounts.join(
    userLevels,
    (count, level) -> new VipUser(count, level),
    Joined.with(Serdes.String(), countSerde, levelSerde)
);

七、性能优化

7.1 配置优化

Properties props = new Properties();
props.put("application.id", "optimized-app");
props.put("bootstrap.servers", "localhost:9092");

// 线程配置
props.put("num.stream.threads", 4);

// 缓存配置
props.put("cache.max.bytes.buffering", 10485760);  // 10MB
props.put("commit.interval.ms", 10000);  // 10 秒提交

// 状态存储配置
props.put("state.dir", "/var/kafka-streams/state");
props.put("rocksdb.config.setter", CustomRocksDBConfig.class);

// 容错配置
props.put("num.standby.replicas", 1);
props.put("task.timeout.ms", 300000);

7.2 状态存储优化

public class CustomRocksDBConfig implements RocksDBConfigSetter {
    
    @Override
    public void setConfig(String storeName, Map<String, String> config) {
        // RocksDB 优化配置
        config.put("write_buffer_size", "67108864");  // 64MB
        config.put("block_cache_size", "268435456");  // 256MB
        config.put("compaction_style", "level");
    }
    
    @Override
    public void close() {}
}

7.3 监控指标

KafkaStreams streams = new KafkaStreams(builder.build(), props);

// 添加指标监听器
streams.setGlobalMetricsRecorder(metrics -> {
    for (Map.Entry<String, ? extends Metric> entry : metrics.metrics().entrySet()) {
        log.info("指标:{} = {}", entry.getKey(), entry.getValue().metricValue());
    }
});

// 添加状态监听器
streams.setStateListener((newState, oldState) -> {
    log.info("状态变化:{} -> {}", oldState, newState);
});

八、最佳实践

8.1 应用设计

设计原则:
1. 保持处理逻辑简单
2. 合理使用状态存储
3. 配置适当的窗口大小
4. 实现错误处理和重试
5. 监控关键指标

8.2 配置检查清单

配置检查:
- [ ] application.id 唯一
- [ ] bootstrap.servers 正确
- [ ] processing.guarantee 设置
- [ ] state.dir 配置
- [ ] num.stream.threads 合理
- [ ] 缓存配置适当

8.3 运维建议

运维建议:
1. 定期备份状态存储
2. 监控处理延迟
3. 配置告警规则
4. 定期清理旧状态
5. 实现优雅关闭

总结

Kafka Streams 高级应用的核心要点:

  1. 状态存储:RocksDB、内存、自定义存储
  2. 交互式查询:REST API、范围查询
  3. 自定义处理器:Processor、Transformer API
  4. Exactly-Once:配置、实践
  5. 窗口操作:时间窗口、会话窗口、自定义窗口
  6. 连接操作:Stream-Stream、Stream-Table、Table-Table
  7. 性能优化:配置、状态存储、监控

核心要点

参考资料


分享这篇文章到:

上一篇文章
GoFrame 全栈框架实战指南
下一篇文章
Spec-Kit 工具链实战