Skip to content
清晨的一缕阳光
返回

Kafka Connect 数据集成实战指南

Kafka Connect 是 Kafka 提供的数据集成工具,用于在 Kafka 和其他系统之间可靠地传输数据。本文将深入探讨 Kafka Connect 的架构、配置和实战应用。

一、架构概览

1.1 整体架构

graph TB
    subgraph 数据源
        DB[(数据库)]
        FS[文件系统]
        MQ[消息队列]
    end
    
    subgraph Kafka Connect
        SC[Source Connector]
        KC[Kafka Cluster]
        SK[Sink Connector]
    end
    
    subgraph 数据目标
        ES[Elasticsearch]
        S3[S3 存储]
        DW[(数据仓库)]
    end
    
    DB --> SC
    FS --> SC
    SC --> KC
    KC --> SK
    SK --> ES
    SK --> S3
    SK --> DW

1.2 核心组件

组件说明示例
Connector连接器定义JdbcSourceConnector
Task并行工作单元实际执行数据同步
Converter数据格式转换AvroConverter
Transform数据转换InsertField

1.3 部署模式

独立模式(Standalone)

# 适合开发和测试
# 单进程,无容错

./bin/connect-standalone.sh \
  config/connect-standalone.properties \
  config/jdbc-source.properties \
  config/elasticsearch-sink.properties

分布式模式(Distributed)

# 适合生产环境
# 多节点,容错,自动负载均衡

./bin/connect-distributed.sh \
  config/connect-distributed.properties

# REST API 管理
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @connector-config.json

二、Source Connector

2.1 JDBC Source

配置示例

# jdbc-source.properties

# 基础配置
name=jdbc-source-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=3
connection.url=jdbc:mysql://localhost:3306/mydb?user=myuser&password=mypassword

# 同步模式
mode=incrementing
incrementing.column.name=id
topic.prefix=mysql-

# 轮询配置
poll.interval.ms=5000
batch.max.rows=100

# 数据转换
transforms=AddTimestamp
transforms.AddTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.AddTimestamp.timestamp.field=created_at

REST API 创建

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "jdbc-source-connector",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "tasks.max": "3",
      "connection.url": "jdbc:mysql://localhost:3306/mydb",
      "connection.user": "myuser",
      "connection.password": "mypassword",
      "mode": "incrementing",
      "incrementing.column.name": "id",
      "topic.prefix": "mysql-",
      "poll.interval.ms": "5000"
    }
  }'

2.2 Debezium CDC

配置示例

# debezium-mysql.properties

name=debezium-mysql-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1

# 数据库配置
database.hostname=localhost
database.port=3306
database.user=myuser
database.password=mypassword
database.server.id=184054
database.server.name=mysql-server-1
database.include.list=mydb

# Schema 历史
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=schema-changes.mydb

# 快照配置
snapshot.mode=when_needed

MySQL 配置

-- 启用 Binlog
[mysqld]
server-id=1
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL

-- 创建用户
CREATE USER 'debezium'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';
FLUSH PRIVILEGES;

2.3 File Source

# file-source.properties

name=file-source-connector
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/var/log/app.log
topic=file-log-topic

三、Sink Connector

3.1 JDBC Sink

配置示例

# jdbc-sink.properties

name=jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
connection.url=jdbc:postgresql://localhost:5432/mydb?user=myuser&password=mypassword

# 写入模式
insert.mode=upsert
pk.mode=record_key
pk.fields=id

# 表名配置
table.name.format=${topic}
auto.create=true
auto.evolve=true

# 数据转换
transforms=ExtractField
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractField.field=value

REST API 创建

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "jdbc-sink-connector",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "tasks.max": "2",
      "connection.url": "jdbc:postgresql://localhost:5432/mydb",
      "connection.user": "myuser",
      "connection.password": "mypassword",
      "insert.mode": "upsert",
      "pk.mode": "record_key",
      "auto.create": "true",
      "topics": "mysql-orders"
    }
  }'

3.2 Elasticsearch Sink

配置示例

# elasticsearch-sink.properties

name=elasticsearch-sink-connector
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=3

# ES 配置
connection.url=http://localhost:9200
connection.username=elastic
connection.password=changeme

# 索引配置
topics=mysql-orders,mysql-users
type.name=_doc
key.ignore=false
schema.ignore=true

# 写入配置
behavior.on.null.values=delete
write.method=upsert

3.3 S3 Sink

# s3-sink.properties

name=s3-sink-connector
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=2

# S3 配置
s3.region=us-east-1
s3.bucket.name=my-kafka-bucket
s3.part.size=5242880

# 数据格式
format.class=io.confluent.connect.s3.format.json.JsonFormat

# 分区配置
flush.size=1000
rotate.interval.ms=3600000
topics=logs-topic

四、数据转换

4.1 内置 Transform

字段插入

{
  "transforms": {
    "AddTimestamp": {
      "type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "timestamp.field": "created_at"
    },
    "AddStaticField": {
      "type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "static.field": "source",
      "static.value": "mysql"
    }
  }
}

字段提取

{
  "transforms": {
    "ExtractValue": {
      "type": "org.apache.kafka.connect.transforms.ExtractField$Value",
      "field": "payload"
    }
  }
}

字段重命名

{
  "transforms": {
    "RenameFields": {
      "type": "org.apache.kafka.connect.transforms.RenameField$Value",
      "fields": "old_name:new_name,user_id:uid"
    }
  }
}

4.2 自定义 Transform

public class MaskField implements Transformation<Struct> {
    
    private String fieldName;
    
    @Override
    public void configure(Map<String, ?> configs) {
        fieldName = (String) configs.get("field.name");
    }
    
    @Override
    public Struct apply(Struct record) {
        Struct newRecord = new Struct(record.schema());
        
        for (Field field : record.schema().fields()) {
            Object value = record.get(field);
            
            if (field.name().equals(fieldName) && value instanceof String) {
                // 脱敏处理
                value = mask((String) value);
            }
            
            newRecord.put(field, value);
        }
        
        return newRecord;
    }
    
    private String mask(String value) {
        if (value.length() <= 4) {
            return "****";
        }
        return value.substring(0, 2) + "****" + value.substring(value.length() - 2);
    }
    
    @Override
    public void close() {}
    
    @Override
    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        config.define("field.name", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Field to mask");
        return config;
    }
}

五、运维管理

5.1 REST API

# 查看连接器列表
curl http://localhost:8083/connectors

# 查看连接器状态
curl http://localhost:8083/connectors/jdbc-source-connector/status

# 查看连接器配置
curl http://localhost:8083/connectors/jdbc-source-connector/config

# 重启连接器
curl -X POST http://localhost:8083/connectors/jdbc-source-connector/restart

# 暂停连接器
curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/pause

# 恢复连接器
curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/resume

# 删除连接器
curl -X DELETE http://localhost:8083/connectors/jdbc-source-connector

5.2 监控指标

指标说明
connect-connect-metrics:connector-startup-time启动时间
connect-task-metrics:task-running运行任务数
connect-metrics:records-written写入记录数
connect-metrics:records-failed失败记录数

5.3 错误处理

# 错误处理配置

# 死信队列
errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq
errors.deadletterqueue.topic.replication.factor=3

# 重试配置
errors.retry.timeout=60000
errors.retry.delay.max.ms=60000
errors.retry.backoff.ms=1000

六、实战案例

6.1 MySQL → Kafka → ES

{
  "name": "mysql-to-es-pipeline",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "database.include.list": "ecommerce",
    "table.include.list": "ecommerce.orders,ecommerce.order_items",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.ecommerce",
    "transforms": "unwrap,extract",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extract.field": "after"
  }
}

6.2 Kafka → HDFS

# hdfs-sink.properties

name=hdfs-sink-connector
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=2

# HDFS 配置
hdfs.url=hdfs://namenode:8020
hadoop.conf.dir=/etc/hadoop/conf

# 数据格式
format.class=io.confluent.connect.hdfs.format.avro.AvroFormat
flush.size=10000
rotate.interval.ms=3600000

# 分区
topics=logs,events
partitioner.class=io.confluent.connect.hdfs.partitioner.DailyPartitioner

6.3 多源汇聚

{
  "name": "multi-source-aggregator",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "3",
    "connection.url": "jdbc:mysql://mysql:3306/db",
    "mode": "timestamp",
    "timestamp.column.name": "updated_at",
    "topic.prefix": "db-",
    "transforms": "addSource,route",
    "transforms.addSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addSource.static.field": "source",
    "transforms.addSource.static.value": "mysql",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "db-(.+)",
    "transforms.route.replacement": "aggregated-$1"
  }
}

七、最佳实践

7.1 配置建议

场景Tasks 数批量大小轮询间隔
低延迟1-21001000ms
高吞吐4-810005000ms
CDC 同步11001000ms
批量导入4-8500010000ms

7.2 监控告警

# Prometheus 告警规则
groups:
  - name: kafka-connect
    rules:
      - alert: ConnectorFailed
        expr: connect_connector_metrics_connector_failed > 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Connector 失败:{{ $labels.connector }}"
      
      - alert: TaskRunning
        expr: connect_task_metrics_task_running == 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Task 未运行:{{ $labels.task }}"

总结

Kafka Connect 的核心要点:

  1. 架构模式:Standalone、Distributed
  2. Source Connector:JDBC、Debezium CDC、File
  3. Sink Connector:JDBC、Elasticsearch、S3
  4. 数据转换:内置 Transform、自定义 Transform
  5. 运维管理:REST API、监控指标、错误处理

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 性能调优实战指南
下一篇文章
OpenCLAW 规范详解