Skip to content
清晨的一缕阳光
返回

Kafka KSQL 流式 SQL 实战指南

KSQL 是 Kafka 的流式 SQL 引擎,允许使用 SQL 语法对 Kafka 中的数据进行实时处理和分析。本文将深入探讨 KSQL 的核心概念、语法和实战应用。

一、KSQL 基础

1.1 什么是 KSQL?

KSQL 是 Kafka 的流处理 SQL 引擎:

graph LR
    subgraph 输入
        K1[Kafka Topic 1]
        K2[Kafka Topic 2]
    end
    
    subgraph KSQL
        Q1[SQL Query 1]
        Q2[SQL Query 2]
    end
    
    subgraph 输出
        K3[Kafka Topic 3]
        K4[Kafka Topic 4]
    end
    
    K1 --> Q1
    K2 --> Q1
    Q1 --> K3
    Q1 --> K4

1.2 核心概念

概念说明示例
STREAM不可变的记录流订单流、日志流
TABLE可变的变更日志表用户表、商品表
QUERY持续执行的 SQL 查询SELECT * FROM orders
WINDOW时间窗口聚合1 分钟统计

1.3 架构

graph TB
    subgraph KSQL Server
        CLI[KSQL CLI]
        ENG[KSQL Engine]
    end
    
    subgraph Kafka
        T1[Topic 1]
        T2[Topic 2]
        T3[Topic 3]
    end
    
    CLI --> ENG
    ENG --> T1
    ENG --> T2
    T2 --> ENG
    ENG --> T3

二、快速入门

2.1 安装部署

# 1. 下载 KSQL
wget https://packages.confluent.io/archive/7.4/confluent-7.4.0.tar.gz
tar -xzf confluent-7.4.0.tar.gz

# 2. 启动 KSQL Server
$CONFLUENT_HOME/bin/ksql-server-start $CONFLUENT_HOME/etc/ksql/ksql-server.properties

# 3. 启动 KSQL CLI
$CONFLUENT_HOME/bin/ksql http://localhost:8088

2.2 创建 Stream

-- 创建订单流
CREATE STREAM orders (
    order_id VARCHAR,
    user_id VARCHAR,
    product_id VARCHAR,
    amount DOUBLE,
    order_time BIGINT
) WITH (
    KAFKA_TOPIC = 'orders-topic',
    VALUE_FORMAT = 'JSON',
    TIMESTAMP = 'order_time',
    TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'
);

-- 查看 Stream
SHOW STREAMS;
DESCRIBE orders;

2.3 创建 Table

-- 创建用户表
CREATE TABLE users (
    user_id VARCHAR PRIMARY KEY,
    name VARCHAR,
    email VARCHAR,
    level VARCHAR,
    register_time BIGINT
) WITH (
    KAFKA_TOPIC = 'users-topic',
    VALUE_FORMAT = 'JSON',
    KEY = 'user_id'
);

-- 查看 Table
SHOW TABLES;
DESCRIBE users;

三、SQL 操作

3.1 基础查询

-- 查询所有订单
SELECT * FROM orders
EMIT CHANGES;

-- 条件过滤
SELECT order_id, user_id, amount
FROM orders
WHERE amount > 100
EMIT CHANGES;

-- 聚合统计
SELECT user_id, COUNT(*) AS order_count, SUM(amount) AS total_amount
FROM orders
GROUP BY user_id
EMIT CHANGES;

3.2 窗口聚合

滚动窗口(Tumbling Window)

-- 每 5 分钟统计订单数
SELECT 
    WINDOWSTART AS window_start,
    COUNT(*) AS order_count,
    SUM(amount) AS total_amount
FROM orders
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY WINDOWSTART
EMIT CHANGES;

跳跃窗口(Hopping Window)

-- 每 1 分钟统计过去 5 分钟的订单
SELECT 
    WINDOWSTART AS window_start,
    COUNT(*) AS order_count
FROM orders
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY WINDOWSTART
EMIT CHANGES;

会话窗口(Session Window)

-- 用户会话分析
SELECT 
    user_id,
    COUNT(*) AS session_events
FROM orders
WINDOW SESSION (60 SECONDS)
GROUP BY user_id
EMIT CHANGES;

3.3 JOIN 操作

Stream-Stream JOIN

-- 订单与支付关联
CREATE STREAM order_payments AS
SELECT 
    o.order_id,
    o.user_id,
    o.amount AS order_amount,
    p.amount AS payment_amount,
    p.payment_time
FROM orders o
INNER JOIN payments p
WITHIN 10 MINUTES
ON o.order_id = p.order_id
EMIT CHANGES;

Stream-Table JOIN

-- 订单与用户关联
CREATE STREAM enriched_orders AS
SELECT 
    o.order_id,
    o.user_id,
    u.name AS user_name,
    u.email,
    u.level,
    o.amount,
    o.order_time
FROM orders o
INNER JOIN users u
ON o.user_id = u.user_id
EMIT CHANGES;

Table-Table JOIN

-- 用户与会员等级关联
CREATE TABLE user_vip AS
SELECT 
    u.user_id,
    u.name,
    v.vip_level,
    v.expire_date
FROM users u
INNER JOIN vip_members v
ON u.user_id = v.user_id
EMIT CHANGES;

3.4 数据转换

-- 字段映射
CREATE STREAM orders_formatted AS
SELECT 
    order_id AS id,
    user_id AS uid,
    amount AS price,
    order_time AS ts
FROM orders
EMIT CHANGES;

-- 条件判断
CREATE STREAM order_level AS
SELECT 
    order_id,
    amount,
    CASE
        WHEN amount > 1000 THEN 'HIGH'
        WHEN amount > 100 THEN 'MEDIUM'
        ELSE 'LOW'
    END AS level
FROM orders
EMIT CHANGES;

-- 字符串操作
CREATE STREAM orders_parsed AS
SELECT 
    order_id,
    UCASE(user_id) AS user_id_upper,
    LCASE(product_id) AS product_id_lower,
    SUBSTRING(order_id, 1, 8) AS order_prefix
FROM orders
EMIT CHANGES;

四、高级功能

4.1 自定义函数(UDF)

// 1. 编写 UDF
package com.example.ksql.udf;

import io.confluent.ksql.function.Udf;
import io.confluent.ksql.function.UdfDescription;

@UdfDescription(name = "mask", description = "Mask sensitive data")
public class MaskUdf {
    
    @Udf(description = "Mask phone number")
    public String maskPhone(String phone) {
        if (phone == null || phone.length() < 7) {
            return phone;
        }
        return phone.substring(0, 3) + "****" + phone.substring(phone.length() - 4);
    }
    
    @Udf(description = "Mask email")
    public String maskEmail(String email) {
        if (email == null || !email.contains("@")) {
            return email;
        }
        String[] parts = email.split("@");
        return parts[0].substring(0, 2) + "***@" + parts[1];
    }
}

// 2. 注册 UDF
CREATE FUNCTION mask_phone AS 'com.example.ksql.udf.MaskUdf.maskPhone';
CREATE FUNCTION mask_email AS 'com.example.ksql.udf.MaskUdf.maskEmail';

// 3. 使用 UDF
SELECT 
    order_id,
    mask_phone(user_phone) AS masked_phone,
    mask_email(user_email) AS masked_email
FROM orders
EMIT CHANGES;

4.2 自定义聚合函数(UDAF)

package com.example.ksql.udaf;

import io.confluent.ksql.function.Udaf;
import io.confluent.ksql.function.UdafDescription;

@UdafDescription(name = "percentile", description = "Calculate percentile")
public class PercentileUdaf {
    
    @UdafDescription(description = "Calculate 95th percentile")
    @Udaf(name = "percentile_95")
    public static KsqlArg<Double> percentile95(KsqlArg<Double> value) {
        // 实现百分位计算
        return new KsqlArg<>(0.95);
    }
}

4.3 模式注册

-- 使用 Schema Registry
CREATE STREAM orders WITH (
    KAFKA_TOPIC = 'orders-topic',
    VALUE_FORMAT = 'AVRO'
);

-- 查看 Schema
SHOW SCHEMAS;

-- 导出 Schema
EXPORT SCHEMA orders TO '/tmp/orders.avsc';

五、实战案例

5.1 实时订单统计

-- 1. 创建订单流
CREATE STREAM orders (
    order_id VARCHAR,
    user_id VARCHAR,
    product_id VARCHAR,
    category VARCHAR,
    amount DOUBLE,
    order_time BIGINT
) WITH (
    KAFKA_TOPIC = 'orders-topic',
    VALUE_FORMAT = 'JSON',
    TIMESTAMP = 'order_time'
);

-- 2. 每分钟订单统计
CREATE TABLE orders_per_minute AS
SELECT 
    WINDOWSTART AS minute,
    COUNT(*) AS order_count,
    SUM(amount) AS total_amount,
    AVG(amount) AS avg_amount
FROM orders
WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY WINDOWSTART
EMIT CHANGES;

-- 3. 每小时品类统计
CREATE TABLE category_per_hour AS
SELECT 
    category,
    WINDOWSTART AS hour,
    COUNT(*) AS order_count,
    SUM(amount) AS total_amount
FROM orders
WINDOW TUMBLING (SIZE 1 HOURS)
GROUP BY category, WINDOWSTART
EMIT CHANGES;

-- 4. 用户订单排行(Top N)
CREATE TABLE user_order_rank AS
SELECT 
    user_id,
    COUNT(*) AS order_count,
    SUM(amount) AS total_amount
FROM orders
GROUP BY user_id
ORDER BY total_amount DESC
LIMIT 100
EMIT CHANGES;

5.2 实时风控

-- 1. 创建交易流
CREATE STREAM transactions (
    tx_id VARCHAR,
    user_id VARCHAR,
    amount DOUBLE,
    tx_time BIGINT,
    location VARCHAR
) WITH (
    KAFKA_TOPIC = 'transactions-topic',
    VALUE_FORMAT = 'JSON',
    TIMESTAMP = 'tx_time'
);

-- 2. 5 分钟内交易次数统计
CREATE TABLE tx_count_5min AS
SELECT 
    user_id,
    COUNT(*) AS tx_count
FROM transactions
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY user_id
EMIT CHANGES;

-- 3. 异常交易检测
CREATE STREAM suspicious_transactions AS
SELECT 
    t.tx_id,
    t.user_id,
    t.amount,
    c.tx_count
FROM transactions t
INNER JOIN tx_count_5min c
WITHIN 5 MINUTES
ON t.user_id = c.user_id
WHERE c.tx_count > 10 OR t.amount > 10000
EMIT CHANGES;

-- 4. 异地登录检测
CREATE STREAM location_change AS
SELECT 
    user_id,
    location,
    tx_time
FROM transactions
PARTITION BY user_id
EMIT CHANGES;

5.3 实时日志分析

-- 1. 创建日志流
CREATE STREAM logs (
    log_id VARCHAR,
    service VARCHAR,
    level VARCHAR,
    message VARCHAR,
    log_time BIGINT
) WITH (
    KAFKA_TOPIC = 'logs-topic',
    VALUE_FORMAT = 'JSON',
    TIMESTAMP = 'log_time'
);

-- 2. 错误日志统计
CREATE TABLE error_logs AS
SELECT 
    service,
    WINDOWSTART AS minute,
    COUNT(*) AS error_count
FROM logs
WHERE level = 'ERROR'
WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY service, WINDOWSTART
EMIT CHANGES;

-- 3. 日志关键词告警
CREATE STREAM alert_logs AS
SELECT 
    log_id,
    service,
    level,
    message
FROM logs
WHERE message LIKE '%OutOfMemory%' 
   OR message LIKE '%ConnectionTimeout%'
   OR message LIKE '%NullPointerException%'
EMIT CHANGES;

-- 4. 服务健康度
CREATE TABLE service_health AS
SELECT 
    service,
    COUNT(*) FILTER (WHERE level = 'ERROR') AS error_count,
    COUNT(*) FILTER (WHERE level = 'WARN') AS warn_count,
    COUNT(*) AS total_count,
    (1.0 - COUNT(*) FILTER (WHERE level = 'ERROR') / COUNT(*)) * 100 AS health_score
FROM logs
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY service
EMIT CHANGES;

六、运维管理

6.1 查询管理

-- 查看所有查询
SHOW QUERIES;

-- 查看查询详情
DESCRIBE EXTENDED orders_per_minute;

-- 终止查询
TERMINATE CSAS_ORDERS_PER_MINUTE_0;

-- 暂停查询
PAUSE QUERY CSAS_ORDERS_PER_MINUTE_0;

-- 恢复查询
RESUME QUERY CSAS_ORDERS_PER_MINUTE_0;

6.2 性能调优

-- 设置并行度
SET 'ksql.streams.num.stream.tasks' = '4';

-- 设置缓存
SET 'ksql.streams.cache.max.bytes.buffering' = '10485760';

-- 设置提交间隔
SET 'ksql.streams.commit.interval.ms' = '10000';

-- 设置复制因子
SET 'ksql.sink.replicas' = '3';

6.3 监控指标

指标说明
ksql-engine-query-status查询状态
ksql-engine-query-stats查询统计
ksql-streams-messages-consumed消费消息数
ksql-streams-messages-produced生产消息数

七、最佳实践

7.1 设计建议

建议说明
合理设置窗口根据业务需求选择窗口大小
控制状态大小避免状态无限增长
使用 Schema Registry保证数据格式一致
监控查询状态及时发现异常查询

7.2 性能优化

-- 1. 使用合适的分区键
CREATE STREAM orders_partitioned AS
SELECT * FROM orders
PARTITION BY user_id
EMIT CHANGES;

-- 2. 限制结果数量
SELECT * FROM orders
LIMIT 1000
EMIT CHANGES;

-- 3. 优化 JOIN
SELECT * FROM orders o
INNER JOIN users u
WITHIN 10 MINUTES  -- 限制时间范围
ON o.user_id = u.user_id
EMIT CHANGES;

总结

KSQL 的核心要点:

  1. 核心概念:STREAM、TABLE、QUERY、WINDOW
  2. SQL 操作:查询、聚合、JOIN、转换
  3. 窗口聚合:滚动、跳跃、会话窗口
  4. 高级功能:UDF、UDAF、Schema Registry
  5. 实战应用:订单统计、风控、日志分析

核心要点

参考资料


分享这篇文章到:

上一篇文章
Redis 混合持久化实战
下一篇文章
RocketMQ DLedger 实战指南