Kafka Connect 提供了强大的数据集成能力,支持自定义 Connector 和 Transform。本文将深入探讨 Kafka Connect 的高级应用和自定义开发。
一、自定义 Source Connector
1.1 Connector 配置
public class MySourceConnectorConfig extends AbstractConfig {
public static final String DATABASE_CONFIG = "database.url";
public static final String TABLE_CONFIG = "table.name";
public static final String POLL_INTERVAL_CONFIG = "poll.interval.ms";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(DATABASE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Database URL")
.define(TABLE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Table name")
.define(POLL_INTERVAL_CONFIG, ConfigDef.Type.LONG, 5000, ConfigDef.Importance.MEDIUM, "Poll interval");
public MySourceConnectorConfig(Map<String, String> props) {
super(CONFIG_DEF, props);
}
public static ConfigDef config() {
return CONFIG_DEF;
}
public String getDatabaseUrl() {
return getString(DATABASE_CONFIG);
}
public String getTableName() {
return getString(TABLE_CONFIG);
}
public long getPollInterval() {
return getLong(POLL_INTERVAL_CONFIG);
}
}
1.2 SourceConnector 实现
public class MySourceConnector extends SourceConnector {
private MySourceConnectorConfig config;
private Map<String, String> partition;
@Override
public void start(Map<String, String> props) {
config = new MySourceConnectorConfig(props);
partition = Collections.singletonMap("table", config.getTableName());
}
@Override
public Class<? extends Task> taskClass() {
return MySourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskProps = new HashMap<>(config.originals());
taskConfigs.add(taskProps);
}
return taskConfigs;
}
@Override
public void stop() {
// 清理资源
}
@Override
public ConfigDef config() {
return MySourceConnectorConfig.config();
}
}
1.3 SourceTask 实现
public class MySourceTask extends SourceTask {
private MySourceConnectorConfig config;
private Connection connection;
private long lastOffset = 0;
@Override
public void start(Map<String, String> props) {
config = new MySourceConnectorConfig(props);
// 恢复偏移量
Map<String, Object> partition = Collections.singletonMap("table", config.getTableName());
Map<String, Object> offset = context.offsetStorageReader().offset(partition);
if (offset != null) {
lastOffset = (Long) offset.get("offset");
}
// 建立数据库连接
connection = DriverManager.getConnection(config.getDatabaseUrl());
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
try {
// 查询新数据
String sql = "SELECT * FROM " + config.getTableName() + " WHERE id > ? LIMIT 100";
PreparedStatement stmt = connection.prepareStatement(sql);
stmt.setLong(1, lastOffset);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
long id = rs.getLong("id");
String data = rs.getString("data");
// 构建 SourceRecord
Map<String, Object> sourceOffset = Collections.singletonMap("offset", id);
SourceRecord record = new SourceRecord(
Collections.singletonMap("table", config.getTableName()), // partition
sourceOffset, // offset
config.getTableName(), // topic
Schema.STRING_SCHEMA, // key schema
String.valueOf(id), // key
Schema.STRING_SCHEMA, // value schema
data // value
);
records.add(record);
lastOffset = id;
}
} catch (SQLException e) {
throw new RuntimeException("Database query failed", e);
}
return records;
}
@Override
public void stop() {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
// 忽略
}
}
}
二、自定义 Sink Connector
2.1 SinkConnector 实现
public class MySinkConnector extends SinkConnector {
private MySinkConnectorConfig config;
@Override
public void start(Map<String, String> props) {
config = new MySinkConnectorConfig(props);
}
@Override
public Class<? extends Task> taskClass() {
return MySinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(new HashMap<>(config.originals()));
}
return taskConfigs;
}
@Override
public void stop() {
// 清理资源
}
@Override
public ConfigDef config() {
return MySinkConnectorConfig.config();
}
}
2.2 SinkTask 实现
public class MySinkTask extends SinkTask {
private MySinkConnectorConfig config;
private Connection connection;
@Override
public void start(Map<String, String> props) {
config = new MySinkConnectorConfig(props);
// 建立数据库连接
try {
connection = DriverManager.getConnection(config.getDatabaseUrl());
} catch (SQLException e) {
throw new RuntimeException("Database connection failed", e);
}
}
@Override
public void put(Collection<SinkRecord> records) {
if (records.isEmpty()) {
return;
}
try {
// 批量插入
String sql = "INSERT INTO " + config.getTableName() + " (id, data) VALUES (?, ?)";
PreparedStatement stmt = connection.prepareStatement(sql);
for (SinkRecord record : records) {
stmt.setLong(1, Long.parseLong((String) record.key()));
stmt.setString(2, (String) record.value());
stmt.addBatch();
}
stmt.executeBatch();
} catch (SQLException e) {
throw new RuntimeException("Database insert failed", e);
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// 提交偏移量
}
@Override
public void stop() {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
// 忽略
}
}
}
三、自定义 Transform
3.1 Value Transform
public class MaskFieldTransform implements Transformation<Struct> {
private String fieldName;
private int maskLength;
@Override
public void configure(Map<String, ?> configs) {
fieldName = (String) configs.get("field.name");
maskLength = (int) configs.getOrDefault("mask.length", 4);
}
@Override
public Struct apply(Struct record) {
Struct newRecord = new Struct(record.schema());
for (Field field : record.schema().fields()) {
Object value = record.get(field);
if (field.name().equals(fieldName) && value instanceof String) {
// 脱敏处理
value = mask((String) value);
}
newRecord.put(field, value);
}
return newRecord;
}
private String mask(String value) {
if (value == null || value.length() <= maskLength) {
return "****";
}
return value.substring(0, maskLength) + "****" +
value.substring(value.length() - maskLength);
}
@Override
public void close() {}
@Override
public ConfigDef config() {
ConfigDef config = new ConfigDef();
config.define("field.name", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Field to mask");
config.define("mask.length", ConfigDef.Type.INT, 4, ConfigDef.Importance.MEDIUM, "Mask length");
return config;
}
}
3.2 Key Transform
public class ExtractKeyTransform implements Transformation<Struct> {
private String keyField;
@Override
public void configure(Map<String, ?> configs) {
keyField = (String) configs.get("key.field");
}
@Override
public Struct apply(Struct record) {
// 提取 Key 字段
Object keyValue = record.get(keyField);
// 创建新记录(不含 Key 字段)
Schema newSchema = removeField(record.schema(), keyField);
Struct newRecord = new Struct(newSchema);
for (Field field : newSchema.fields()) {
newRecord.put(field, record.get(field));
}
return newRecord;
}
private Schema removeField(Schema schema, String fieldName) {
SchemaBuilder builder = SchemaBuilder.struct();
for (Field field : schema.fields()) {
if (!field.name().equals(fieldName)) {
builder.field(field.name(), field.schema());
}
}
return builder.build();
}
@Override
public void close() {}
@Override
public ConfigDef config() {
ConfigDef config = new ConfigDef();
config.define("key.field", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Key field to extract");
return config;
}
}
四、错误处理
4.1 死信队列配置
{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"topics": "my-topic",
// 错误处理配置
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "connect-dlq",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
// 重试配置
"errors.retry.timeout": 60000,
"errors.retry.delay.max.ms": 60000,
"errors.retry.backoff.ms": 1000
}
}
4.2 错误处理器
public class CustomErrorHandler implements ErrorHandler {
private final String dlqTopic;
private final Producer<String, Struct> dlqProducer;
public CustomErrorHandler(String dlqTopic, ProducerFactory factory) {
this.dlqTopic = dlqTopic;
this.dlqProducer = factory.createProducer();
}
@Override
public void handle(Exception error, SourceRecord record) {
// 记录错误
log.error("处理失败:{}", record, error);
// 发送到死信队列
sendToDlq(record, error);
}
private void sendToDlq(SourceRecord record, Exception error) {
ProducerRecord<String, Struct> dlqRecord = new ProducerRecord<>(
dlqTopic,
record.key() != null ? record.key().toString() : null,
(Struct) record.value()
);
// 添加错误信息到 Header
dlqRecord.headers().add("error.message", error.getMessage().getBytes());
dlqRecord.headers().add("error.timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
dlqProducer.send(dlqRecord);
}
}
五、监控指标
5.1 JMX 指标
public class ConnectorMetrics {
private final MBeanServer mBeanServer;
private final ObjectName objectName;
public ConnectorMetrics(String connectorName) throws MalformedObjectNameException {
this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
this.objectName = new ObjectName("kafka.connect:type=connector-metrics,name=" + connectorName);
}
public long getRecordsProcessed() {
try {
return (Long) mBeanServer.getAttribute(objectName, "records-processed");
} catch (Exception e) {
return -1;
}
}
public double getProcessingRate() {
try {
return (Double) mBeanServer.getAttribute(objectName, "processing-rate");
} catch (Exception e) {
return -1;
}
}
public long getErrors() {
try {
return (Long) mBeanServer.getAttribute(objectName, "errors");
} catch (Exception e) {
return -1;
}
}
}
5.2 REST API 监控
# 查看 Connector 状态
curl http://localhost:8083/connectors/my-connector/status
# 查看 Connector 配置
curl http://localhost:8083/connectors/my-connector/config
# 查看 Connector 指标
curl http://localhost:8083/connectors/my-connector/metrics
# 重启 Connector
curl -X POST http://localhost:8083/connectors/my-connector/restart
# 暂停 Connector
curl -X PUT http://localhost:8083/connectors/my-connector/pause
# 恢复 Connector
curl -X PUT http://localhost:8083/connectors/my-connector/resume
六、部署运维
6.1 分布式部署
# connect-distributed.properties
# Bootstrap servers
bootstrap.servers=localhost:9092
# Cluster settings
group.id=connect-cluster
config.storage.topic=connect-configs
config.storage.replication.factor=3
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
# REST API
rest.port=8083
rest.advertised.host.name=connect-1
rest.advertised.port=8083
# Plugin path
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
6.2 插件管理
# 安装插件
confluent-hub install confluentinc/kafka-connect-jdbc:10.0.0
# 列出已安装插件
confluent-hub list
# 卸载插件
confluent-hub uninstall confluentinc/kafka-connect-jdbc
七、最佳实践
7.1 配置建议
配置建议:
1. 使用分布式模式部署
2. 配置足够的副本因子
3. 启用错误处理和死信队列
4. 配置合理的重试策略
5. 监控关键指标
7.2 性能优化
性能优化:
1. 调整 batch.size
2. 配置适当的 linger.ms
3. 启用压缩
4. 优化数据库连接池
5. 监控处理延迟
7.3 检查清单
部署检查:
- [ ] 配置分布式模式
- [ ] 配置副本因子
- [ ] 配置错误处理
- [ ] 配置监控告警
- [ ] 测试故障恢复
总结
Kafka Connect 高级应用的核心要点:
- 自定义 Connector:SourceConnector、SinkConnector、Task 实现
- 自定义 Transform:Value Transform、Key Transform
- 错误处理:死信队列、重试机制、错误处理器
- 监控指标:JMX 指标、REST API
- 部署运维:分布式部署、插件管理
核心要点:
- 理解 Connector 架构和实现
- 掌握自定义 Transform 开发
- 配置完善的错误处理
- 建立监控告警体系
参考资料
- Kafka Connect 官方文档
- Confluent Connectors
- 《Kafka 权威指南》第 7 章