KSQL 是 Kafka 的流式 SQL 引擎,允许使用 SQL 语法对 Kafka 中的数据进行实时处理和分析。本文将深入探讨 KSQL 的核心概念、语法和实战应用。
一、KSQL 基础
1.1 什么是 KSQL?
KSQL 是 Kafka 的流处理 SQL 引擎:
graph LR
subgraph 输入
K1[Kafka Topic 1]
K2[Kafka Topic 2]
end
subgraph KSQL
Q1[SQL Query 1]
Q2[SQL Query 2]
end
subgraph 输出
K3[Kafka Topic 3]
K4[Kafka Topic 4]
end
K1 --> Q1
K2 --> Q1
Q1 --> K3
Q1 --> K4
1.2 核心概念
| 概念 | 说明 | 示例 |
|---|---|---|
| STREAM | 不可变的记录流 | 订单流、日志流 |
| TABLE | 可变的变更日志表 | 用户表、商品表 |
| QUERY | 持续执行的 SQL 查询 | SELECT * FROM orders |
| WINDOW | 时间窗口聚合 | 1 分钟统计 |
1.3 架构
graph TB
subgraph KSQL Server
CLI[KSQL CLI]
ENG[KSQL Engine]
end
subgraph Kafka
T1[Topic 1]
T2[Topic 2]
T3[Topic 3]
end
CLI --> ENG
ENG --> T1
ENG --> T2
T2 --> ENG
ENG --> T3
二、快速入门
2.1 安装部署
# 1. 下载 KSQL
wget https://packages.confluent.io/archive/7.4/confluent-7.4.0.tar.gz
tar -xzf confluent-7.4.0.tar.gz
# 2. 启动 KSQL Server
$CONFLUENT_HOME/bin/ksql-server-start $CONFLUENT_HOME/etc/ksql/ksql-server.properties
# 3. 启动 KSQL CLI
$CONFLUENT_HOME/bin/ksql http://localhost:8088
2.2 创建 Stream
-- 创建订单流
CREATE STREAM orders (
order_id VARCHAR,
user_id VARCHAR,
product_id VARCHAR,
amount DOUBLE,
order_time BIGINT
) WITH (
KAFKA_TOPIC = 'orders-topic',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'order_time',
TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'
);
-- 查看 Stream
SHOW STREAMS;
DESCRIBE orders;
2.3 创建 Table
-- 创建用户表
CREATE TABLE users (
user_id VARCHAR PRIMARY KEY,
name VARCHAR,
email VARCHAR,
level VARCHAR,
register_time BIGINT
) WITH (
KAFKA_TOPIC = 'users-topic',
VALUE_FORMAT = 'JSON',
KEY = 'user_id'
);
-- 查看 Table
SHOW TABLES;
DESCRIBE users;
三、SQL 操作
3.1 基础查询
-- 查询所有订单
SELECT * FROM orders
EMIT CHANGES;
-- 条件过滤
SELECT order_id, user_id, amount
FROM orders
WHERE amount > 100
EMIT CHANGES;
-- 聚合统计
SELECT user_id, COUNT(*) AS order_count, SUM(amount) AS total_amount
FROM orders
GROUP BY user_id
EMIT CHANGES;
3.2 窗口聚合
滚动窗口(Tumbling Window):
-- 每 5 分钟统计订单数
SELECT
WINDOWSTART AS window_start,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM orders
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY WINDOWSTART
EMIT CHANGES;
跳跃窗口(Hopping Window):
-- 每 1 分钟统计过去 5 分钟的订单
SELECT
WINDOWSTART AS window_start,
COUNT(*) AS order_count
FROM orders
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY WINDOWSTART
EMIT CHANGES;
会话窗口(Session Window):
-- 用户会话分析
SELECT
user_id,
COUNT(*) AS session_events
FROM orders
WINDOW SESSION (60 SECONDS)
GROUP BY user_id
EMIT CHANGES;
3.3 JOIN 操作
Stream-Stream JOIN:
-- 订单与支付关联
CREATE STREAM order_payments AS
SELECT
o.order_id,
o.user_id,
o.amount AS order_amount,
p.amount AS payment_amount,
p.payment_time
FROM orders o
INNER JOIN payments p
WITHIN 10 MINUTES
ON o.order_id = p.order_id
EMIT CHANGES;
Stream-Table JOIN:
-- 订单与用户关联
CREATE STREAM enriched_orders AS
SELECT
o.order_id,
o.user_id,
u.name AS user_name,
u.email,
u.level,
o.amount,
o.order_time
FROM orders o
INNER JOIN users u
ON o.user_id = u.user_id
EMIT CHANGES;
Table-Table JOIN:
-- 用户与会员等级关联
CREATE TABLE user_vip AS
SELECT
u.user_id,
u.name,
v.vip_level,
v.expire_date
FROM users u
INNER JOIN vip_members v
ON u.user_id = v.user_id
EMIT CHANGES;
3.4 数据转换
-- 字段映射
CREATE STREAM orders_formatted AS
SELECT
order_id AS id,
user_id AS uid,
amount AS price,
order_time AS ts
FROM orders
EMIT CHANGES;
-- 条件判断
CREATE STREAM order_level AS
SELECT
order_id,
amount,
CASE
WHEN amount > 1000 THEN 'HIGH'
WHEN amount > 100 THEN 'MEDIUM'
ELSE 'LOW'
END AS level
FROM orders
EMIT CHANGES;
-- 字符串操作
CREATE STREAM orders_parsed AS
SELECT
order_id,
UCASE(user_id) AS user_id_upper,
LCASE(product_id) AS product_id_lower,
SUBSTRING(order_id, 1, 8) AS order_prefix
FROM orders
EMIT CHANGES;
四、高级功能
4.1 自定义函数(UDF)
// 1. 编写 UDF
package com.example.ksql.udf;
import io.confluent.ksql.function.Udf;
import io.confluent.ksql.function.UdfDescription;
@UdfDescription(name = "mask", description = "Mask sensitive data")
public class MaskUdf {
@Udf(description = "Mask phone number")
public String maskPhone(String phone) {
if (phone == null || phone.length() < 7) {
return phone;
}
return phone.substring(0, 3) + "****" + phone.substring(phone.length() - 4);
}
@Udf(description = "Mask email")
public String maskEmail(String email) {
if (email == null || !email.contains("@")) {
return email;
}
String[] parts = email.split("@");
return parts[0].substring(0, 2) + "***@" + parts[1];
}
}
// 2. 注册 UDF
CREATE FUNCTION mask_phone AS 'com.example.ksql.udf.MaskUdf.maskPhone';
CREATE FUNCTION mask_email AS 'com.example.ksql.udf.MaskUdf.maskEmail';
// 3. 使用 UDF
SELECT
order_id,
mask_phone(user_phone) AS masked_phone,
mask_email(user_email) AS masked_email
FROM orders
EMIT CHANGES;
4.2 自定义聚合函数(UDAF)
package com.example.ksql.udaf;
import io.confluent.ksql.function.Udaf;
import io.confluent.ksql.function.UdafDescription;
@UdafDescription(name = "percentile", description = "Calculate percentile")
public class PercentileUdaf {
@UdafDescription(description = "Calculate 95th percentile")
@Udaf(name = "percentile_95")
public static KsqlArg<Double> percentile95(KsqlArg<Double> value) {
// 实现百分位计算
return new KsqlArg<>(0.95);
}
}
4.3 模式注册
-- 使用 Schema Registry
CREATE STREAM orders WITH (
KAFKA_TOPIC = 'orders-topic',
VALUE_FORMAT = 'AVRO'
);
-- 查看 Schema
SHOW SCHEMAS;
-- 导出 Schema
EXPORT SCHEMA orders TO '/tmp/orders.avsc';
五、实战案例
5.1 实时订单统计
-- 1. 创建订单流
CREATE STREAM orders (
order_id VARCHAR,
user_id VARCHAR,
product_id VARCHAR,
category VARCHAR,
amount DOUBLE,
order_time BIGINT
) WITH (
KAFKA_TOPIC = 'orders-topic',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'order_time'
);
-- 2. 每分钟订单统计
CREATE TABLE orders_per_minute AS
SELECT
WINDOWSTART AS minute,
COUNT(*) AS order_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount
FROM orders
WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY WINDOWSTART
EMIT CHANGES;
-- 3. 每小时品类统计
CREATE TABLE category_per_hour AS
SELECT
category,
WINDOWSTART AS hour,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM orders
WINDOW TUMBLING (SIZE 1 HOURS)
GROUP BY category, WINDOWSTART
EMIT CHANGES;
-- 4. 用户订单排行(Top N)
CREATE TABLE user_order_rank AS
SELECT
user_id,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM orders
GROUP BY user_id
ORDER BY total_amount DESC
LIMIT 100
EMIT CHANGES;
5.2 实时风控
-- 1. 创建交易流
CREATE STREAM transactions (
tx_id VARCHAR,
user_id VARCHAR,
amount DOUBLE,
tx_time BIGINT,
location VARCHAR
) WITH (
KAFKA_TOPIC = 'transactions-topic',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'tx_time'
);
-- 2. 5 分钟内交易次数统计
CREATE TABLE tx_count_5min AS
SELECT
user_id,
COUNT(*) AS tx_count
FROM transactions
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY user_id
EMIT CHANGES;
-- 3. 异常交易检测
CREATE STREAM suspicious_transactions AS
SELECT
t.tx_id,
t.user_id,
t.amount,
c.tx_count
FROM transactions t
INNER JOIN tx_count_5min c
WITHIN 5 MINUTES
ON t.user_id = c.user_id
WHERE c.tx_count > 10 OR t.amount > 10000
EMIT CHANGES;
-- 4. 异地登录检测
CREATE STREAM location_change AS
SELECT
user_id,
location,
tx_time
FROM transactions
PARTITION BY user_id
EMIT CHANGES;
5.3 实时日志分析
-- 1. 创建日志流
CREATE STREAM logs (
log_id VARCHAR,
service VARCHAR,
level VARCHAR,
message VARCHAR,
log_time BIGINT
) WITH (
KAFKA_TOPIC = 'logs-topic',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'log_time'
);
-- 2. 错误日志统计
CREATE TABLE error_logs AS
SELECT
service,
WINDOWSTART AS minute,
COUNT(*) AS error_count
FROM logs
WHERE level = 'ERROR'
WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY service, WINDOWSTART
EMIT CHANGES;
-- 3. 日志关键词告警
CREATE STREAM alert_logs AS
SELECT
log_id,
service,
level,
message
FROM logs
WHERE message LIKE '%OutOfMemory%'
OR message LIKE '%ConnectionTimeout%'
OR message LIKE '%NullPointerException%'
EMIT CHANGES;
-- 4. 服务健康度
CREATE TABLE service_health AS
SELECT
service,
COUNT(*) FILTER (WHERE level = 'ERROR') AS error_count,
COUNT(*) FILTER (WHERE level = 'WARN') AS warn_count,
COUNT(*) AS total_count,
(1.0 - COUNT(*) FILTER (WHERE level = 'ERROR') / COUNT(*)) * 100 AS health_score
FROM logs
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY service
EMIT CHANGES;
六、运维管理
6.1 查询管理
-- 查看所有查询
SHOW QUERIES;
-- 查看查询详情
DESCRIBE EXTENDED orders_per_minute;
-- 终止查询
TERMINATE CSAS_ORDERS_PER_MINUTE_0;
-- 暂停查询
PAUSE QUERY CSAS_ORDERS_PER_MINUTE_0;
-- 恢复查询
RESUME QUERY CSAS_ORDERS_PER_MINUTE_0;
6.2 性能调优
-- 设置并行度
SET 'ksql.streams.num.stream.tasks' = '4';
-- 设置缓存
SET 'ksql.streams.cache.max.bytes.buffering' = '10485760';
-- 设置提交间隔
SET 'ksql.streams.commit.interval.ms' = '10000';
-- 设置复制因子
SET 'ksql.sink.replicas' = '3';
6.3 监控指标
| 指标 | 说明 |
|---|---|
ksql-engine-query-status | 查询状态 |
ksql-engine-query-stats | 查询统计 |
ksql-streams-messages-consumed | 消费消息数 |
ksql-streams-messages-produced | 生产消息数 |
七、最佳实践
7.1 设计建议
| 建议 | 说明 |
|---|---|
| 合理设置窗口 | 根据业务需求选择窗口大小 |
| 控制状态大小 | 避免状态无限增长 |
| 使用 Schema Registry | 保证数据格式一致 |
| 监控查询状态 | 及时发现异常查询 |
7.2 性能优化
-- 1. 使用合适的分区键
CREATE STREAM orders_partitioned AS
SELECT * FROM orders
PARTITION BY user_id
EMIT CHANGES;
-- 2. 限制结果数量
SELECT * FROM orders
LIMIT 1000
EMIT CHANGES;
-- 3. 优化 JOIN
SELECT * FROM orders o
INNER JOIN users u
WITHIN 10 MINUTES -- 限制时间范围
ON o.user_id = u.user_id
EMIT CHANGES;
总结
KSQL 的核心要点:
- 核心概念:STREAM、TABLE、QUERY、WINDOW
- SQL 操作:查询、聚合、JOIN、转换
- 窗口聚合:滚动、跳跃、会话窗口
- 高级功能:UDF、UDAF、Schema Registry
- 实战应用:订单统计、风控、日志分析
核心要点:
- 理解 STREAM 和 TABLE 的区别
- 合理选择窗口类型和大小
- 使用 Schema Registry 管理数据格式
- 监控查询状态和性能
参考资料
- KSQL 官方文档
- Confluent KSQL
- 《Kafka 流处理实战》