Kafka Connect 是 Kafka 提供的数据集成工具,用于在 Kafka 和其他系统之间可靠地传输数据。本文将深入探讨 Kafka Connect 的架构、配置和实战应用。
一、架构概览
1.1 整体架构
graph TB
subgraph 数据源
DB[(数据库)]
FS[文件系统]
MQ[消息队列]
end
subgraph Kafka Connect
SC[Source Connector]
KC[Kafka Cluster]
SK[Sink Connector]
end
subgraph 数据目标
ES[Elasticsearch]
S3[S3 存储]
DW[(数据仓库)]
end
DB --> SC
FS --> SC
SC --> KC
KC --> SK
SK --> ES
SK --> S3
SK --> DW
1.2 核心组件
| 组件 | 说明 | 示例 |
|---|---|---|
| Connector | 连接器定义 | JdbcSourceConnector |
| Task | 并行工作单元 | 实际执行数据同步 |
| Converter | 数据格式转换 | AvroConverter |
| Transform | 数据转换 | InsertField |
1.3 部署模式
独立模式(Standalone):
# 适合开发和测试
# 单进程,无容错
./bin/connect-standalone.sh \
config/connect-standalone.properties \
config/jdbc-source.properties \
config/elasticsearch-sink.properties
分布式模式(Distributed):
# 适合生产环境
# 多节点,容错,自动负载均衡
./bin/connect-distributed.sh \
config/connect-distributed.properties
# REST API 管理
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.json
二、Source Connector
2.1 JDBC Source
配置示例:
# jdbc-source.properties
# 基础配置
name=jdbc-source-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=3
connection.url=jdbc:mysql://localhost:3306/mydb?user=myuser&password=mypassword
# 同步模式
mode=incrementing
incrementing.column.name=id
topic.prefix=mysql-
# 轮询配置
poll.interval.ms=5000
batch.max.rows=100
# 数据转换
transforms=AddTimestamp
transforms.AddTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.AddTimestamp.timestamp.field=created_at
REST API 创建:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "3",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "myuser",
"connection.password": "mypassword",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "mysql-",
"poll.interval.ms": "5000"
}
}'
2.2 Debezium CDC
配置示例:
# debezium-mysql.properties
name=debezium-mysql-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
# 数据库配置
database.hostname=localhost
database.port=3306
database.user=myuser
database.password=mypassword
database.server.id=184054
database.server.name=mysql-server-1
database.include.list=mydb
# Schema 历史
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=schema-changes.mydb
# 快照配置
snapshot.mode=when_needed
MySQL 配置:
-- 启用 Binlog
[mysqld]
server-id=1
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
-- 创建用户
CREATE USER 'debezium'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';
FLUSH PRIVILEGES;
2.3 File Source
# file-source.properties
name=file-source-connector
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/var/log/app.log
topic=file-log-topic
三、Sink Connector
3.1 JDBC Sink
配置示例:
# jdbc-sink.properties
name=jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
connection.url=jdbc:postgresql://localhost:5432/mydb?user=myuser&password=mypassword
# 写入模式
insert.mode=upsert
pk.mode=record_key
pk.fields=id
# 表名配置
table.name.format=${topic}
auto.create=true
auto.evolve=true
# 数据转换
transforms=ExtractField
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractField.field=value
REST API 创建:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "2",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "myuser",
"connection.password": "mypassword",
"insert.mode": "upsert",
"pk.mode": "record_key",
"auto.create": "true",
"topics": "mysql-orders"
}
}'
3.2 Elasticsearch Sink
配置示例:
# elasticsearch-sink.properties
name=elasticsearch-sink-connector
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=3
# ES 配置
connection.url=http://localhost:9200
connection.username=elastic
connection.password=changeme
# 索引配置
topics=mysql-orders,mysql-users
type.name=_doc
key.ignore=false
schema.ignore=true
# 写入配置
behavior.on.null.values=delete
write.method=upsert
3.3 S3 Sink
# s3-sink.properties
name=s3-sink-connector
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=2
# S3 配置
s3.region=us-east-1
s3.bucket.name=my-kafka-bucket
s3.part.size=5242880
# 数据格式
format.class=io.confluent.connect.s3.format.json.JsonFormat
# 分区配置
flush.size=1000
rotate.interval.ms=3600000
topics=logs-topic
四、数据转换
4.1 内置 Transform
字段插入:
{
"transforms": {
"AddTimestamp": {
"type": "org.apache.kafka.connect.transforms.InsertField$Value",
"timestamp.field": "created_at"
},
"AddStaticField": {
"type": "org.apache.kafka.connect.transforms.InsertField$Value",
"static.field": "source",
"static.value": "mysql"
}
}
}
字段提取:
{
"transforms": {
"ExtractValue": {
"type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"field": "payload"
}
}
}
字段重命名:
{
"transforms": {
"RenameFields": {
"type": "org.apache.kafka.connect.transforms.RenameField$Value",
"fields": "old_name:new_name,user_id:uid"
}
}
}
4.2 自定义 Transform
public class MaskField implements Transformation<Struct> {
private String fieldName;
@Override
public void configure(Map<String, ?> configs) {
fieldName = (String) configs.get("field.name");
}
@Override
public Struct apply(Struct record) {
Struct newRecord = new Struct(record.schema());
for (Field field : record.schema().fields()) {
Object value = record.get(field);
if (field.name().equals(fieldName) && value instanceof String) {
// 脱敏处理
value = mask((String) value);
}
newRecord.put(field, value);
}
return newRecord;
}
private String mask(String value) {
if (value.length() <= 4) {
return "****";
}
return value.substring(0, 2) + "****" + value.substring(value.length() - 2);
}
@Override
public void close() {}
@Override
public ConfigDef config() {
ConfigDef config = new ConfigDef();
config.define("field.name", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Field to mask");
return config;
}
}
五、运维管理
5.1 REST API
# 查看连接器列表
curl http://localhost:8083/connectors
# 查看连接器状态
curl http://localhost:8083/connectors/jdbc-source-connector/status
# 查看连接器配置
curl http://localhost:8083/connectors/jdbc-source-connector/config
# 重启连接器
curl -X POST http://localhost:8083/connectors/jdbc-source-connector/restart
# 暂停连接器
curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/pause
# 恢复连接器
curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/resume
# 删除连接器
curl -X DELETE http://localhost:8083/connectors/jdbc-source-connector
5.2 监控指标
| 指标 | 说明 |
|---|---|
connect-connect-metrics:connector-startup-time | 启动时间 |
connect-task-metrics:task-running | 运行任务数 |
connect-metrics:records-written | 写入记录数 |
connect-metrics:records-failed | 失败记录数 |
5.3 错误处理
# 错误处理配置
# 死信队列
errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq
errors.deadletterqueue.topic.replication.factor=3
# 重试配置
errors.retry.timeout=60000
errors.retry.delay.max.ms=60000
errors.retry.backoff.ms=1000
六、实战案例
6.1 MySQL → Kafka → ES
{
"name": "mysql-to-es-pipeline",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"database.include.list": "ecommerce",
"table.include.list": "ecommerce.orders,ecommerce.order_items",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.ecommerce",
"transforms": "unwrap,extract",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extract.field": "after"
}
}
6.2 Kafka → HDFS
# hdfs-sink.properties
name=hdfs-sink-connector
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=2
# HDFS 配置
hdfs.url=hdfs://namenode:8020
hadoop.conf.dir=/etc/hadoop/conf
# 数据格式
format.class=io.confluent.connect.hdfs.format.avro.AvroFormat
flush.size=10000
rotate.interval.ms=3600000
# 分区
topics=logs,events
partitioner.class=io.confluent.connect.hdfs.partitioner.DailyPartitioner
6.3 多源汇聚
{
"name": "multi-source-aggregator",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "3",
"connection.url": "jdbc:mysql://mysql:3306/db",
"mode": "timestamp",
"timestamp.column.name": "updated_at",
"topic.prefix": "db-",
"transforms": "addSource,route",
"transforms.addSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addSource.static.field": "source",
"transforms.addSource.static.value": "mysql",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "db-(.+)",
"transforms.route.replacement": "aggregated-$1"
}
}
七、最佳实践
7.1 配置建议
| 场景 | Tasks 数 | 批量大小 | 轮询间隔 |
|---|---|---|---|
| 低延迟 | 1-2 | 100 | 1000ms |
| 高吞吐 | 4-8 | 1000 | 5000ms |
| CDC 同步 | 1 | 100 | 1000ms |
| 批量导入 | 4-8 | 5000 | 10000ms |
7.2 监控告警
# Prometheus 告警规则
groups:
- name: kafka-connect
rules:
- alert: ConnectorFailed
expr: connect_connector_metrics_connector_failed > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Connector 失败:{{ $labels.connector }}"
- alert: TaskRunning
expr: connect_task_metrics_task_running == 0
for: 5m
labels:
severity: warning
annotations:
summary: "Task 未运行:{{ $labels.task }}"
总结
Kafka Connect 的核心要点:
- 架构模式:Standalone、Distributed
- Source Connector:JDBC、Debezium CDC、File
- Sink Connector:JDBC、Elasticsearch、S3
- 数据转换:内置 Transform、自定义 Transform
- 运维管理:REST API、监控指标、错误处理
核心要点:
- 生产环境使用分布式模式
- CDC 使用 Debezium 实现
- 配置死信队列处理错误
- 监控连接器状态和指标