Kafka Connector 插件开发可以扩展 Kafka Connect 的数据集成能力。本文将深入探讨 Source Connector、Sink Connector、Transform 的开发实践。
一、Connector 架构
1.1 核心组件
graph TB
subgraph Kafka Connect
CC[Connect Cluster]
WS[Worker]
end
subgraph Connector
SC[Source Connector]
SKC[Sink Connector]
ST[Source Task]
SKT[Sink Task]
end
subgraph 外部系统
DB[(数据库)]
FS[文件系统]
end
DB --> ST
ST --> SC
SC --> WS
WS --> SKC
SKC --> SKT
SKT --> FS
1.2 开发流程
开发流程:
1. 定义配置类
2. 实现 Connector
3. 实现 Task
4. 打包部署
5. 测试验证
二、Source Connector 开发
2.1 配置类
public class MySourceConnectorConfig extends AbstractConfig {
public static final String DATABASE_CONFIG = "database.url";
public static final String TABLE_CONFIG = "table.name";
public static final String POLL_INTERVAL_CONFIG = "poll.interval.ms";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(DATABASE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Database URL")
.define(TABLE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Table name")
.define(POLL_INTERVAL_CONFIG, ConfigDef.Type.LONG, 5000, ConfigDef.Importance.MEDIUM, "Poll interval");
public MySourceConnectorConfig(Map<String, String> props) {
super(CONFIG_DEF, props);
}
public static ConfigDef config() {
return CONFIG_DEF;
}
public String getDatabaseUrl() {
return getString(DATABASE_CONFIG);
}
public String getTableName() {
return getString(TABLE_CONFIG);
}
public long getPollInterval() {
return getLong(POLL_INTERVAL_CONFIG);
}
}
2.2 Connector 实现
public class MySourceConnector extends SourceConnector {
private MySourceConnectorConfig config;
private Map<String, String> partition;
@Override
public void start(Map<String, String> props) {
config = new MySourceConnectorConfig(props);
partition = Collections.singletonMap("table", config.getTableName());
}
@Override
public Class<? extends Task> taskClass() {
return MySourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskProps = new HashMap<>(config.originals());
taskConfigs.add(taskProps);
}
return taskConfigs;
}
@Override
public void stop() {
// 清理资源
}
@Override
public ConfigDef config() {
return MySourceConnectorConfig.config();
}
@Override
public String version() {
return "1.0.0";
}
}
2.3 Task 实现
public class MySourceTask extends SourceTask {
private MySourceConnectorConfig config;
private Connection connection;
private long lastOffset = 0;
@Override
public void start(Map<String, String> props) {
config = new MySourceConnectorConfig(props);
// 恢复偏移量
Map<String, Object> partition = Collections.singletonMap("table", config.getTableName());
Map<String, Object> offset = context.offsetStorageReader().offset(partition);
if (offset != null) {
lastOffset = (Long) offset.get("offset");
}
// 建立数据库连接
try {
connection = DriverManager.getConnection(config.getDatabaseUrl());
} catch (SQLException e) {
throw new RuntimeException("Database connection failed", e);
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
try {
// 查询新数据
String sql = "SELECT * FROM " + config.getTableName() + " WHERE id > ? LIMIT 100";
PreparedStatement stmt = connection.prepareStatement(sql);
stmt.setLong(1, lastOffset);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
long id = rs.getLong("id");
String data = rs.getString("data");
// 构建 SourceRecord
Map<String, Object> sourceOffset = Collections.singletonMap("offset", id);
SourceRecord record = new SourceRecord(
Collections.singletonMap("table", config.getTableName()), // partition
sourceOffset, // offset
config.getTableName(), // topic
Schema.STRING_SCHEMA, // key schema
String.valueOf(id), // key
Schema.STRING_SCHEMA, // value schema
data // value
);
records.add(record);
lastOffset = id;
}
} catch (SQLException e) {
throw new RuntimeException("Database query failed", e);
}
return records;
}
@Override
public void stop() {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
// 忽略
}
}
}
三、Sink Connector 开发
3.1 Connector 实现
public class MySinkConnector extends SinkConnector {
private MySinkConnectorConfig config;
@Override
public void start(Map<String, String> props) {
config = new MySinkConnectorConfig(props);
}
@Override
public Class<? extends Task> taskClass() {
return MySinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(new HashMap<>(config.originals()));
}
return taskConfigs;
}
@Override
public void stop() {
// 清理资源
}
@Override
public ConfigDef config() {
return MySinkConnectorConfig.config();
}
@Override
public String version() {
return "1.0.0";
}
}
3.2 Task 实现
public class MySinkTask extends SinkTask {
private MySinkConnectorConfig config;
private Connection connection;
@Override
public void start(Map<String, String> props) {
config = new MySinkConnectorConfig(props);
// 建立数据库连接
try {
connection = DriverManager.getConnection(config.getDatabaseUrl());
} catch (SQLException e) {
throw new RuntimeException("Database connection failed", e);
}
}
@Override
public void put(Collection<SinkRecord> records) {
if (records.isEmpty()) {
return;
}
try {
// 批量插入
String sql = "INSERT INTO " + config.getTableName() + " (id, data) VALUES (?, ?)";
PreparedStatement stmt = connection.prepareStatement(sql);
for (SinkRecord record : records) {
stmt.setLong(1, Long.parseLong((String) record.key()));
stmt.setString(2, (String) record.value());
stmt.addBatch();
}
stmt.executeBatch();
} catch (SQLException e) {
throw new RuntimeException("Database insert failed", e);
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// 提交偏移量
}
@Override
public void stop() {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
// 忽略
}
}
@Override
public String version() {
return "1.0.0";
}
}
四、Transform 开发
4.1 Value Transform
public class MaskFieldTransform implements Transformation<Struct> {
private String fieldName;
private int maskLength;
@Override
public void configure(Map<String, ?> configs) {
fieldName = (String) configs.get("field.name");
maskLength = (int) configs.getOrDefault("mask.length", 4);
}
@Override
public Struct apply(Struct record) {
Struct newRecord = new Struct(record.schema());
for (Field field : record.schema().fields()) {
Object value = record.get(field);
if (field.name().equals(fieldName) && value instanceof String) {
// 脱敏处理
value = mask((String) value);
}
newRecord.put(field, value);
}
return newRecord;
}
private String mask(String value) {
if (value == null || value.length() <= maskLength) {
return "****";
}
return value.substring(0, maskLength) + "****" +
value.substring(value.length() - maskLength);
}
@Override
public void close() {}
@Override
public ConfigDef config() {
ConfigDef config = new ConfigDef();
config.define("field.name", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Field to mask");
config.define("mask.length", ConfigDef.Type.INT, 4, ConfigDef.Importance.MEDIUM, "Mask length");
return config;
}
}
4.2 Key Transform
public class ExtractKeyTransform implements Transformation<Struct> {
private String keyField;
@Override
public void configure(Map<String, ?> configs) {
keyField = (String) configs.get("key.field");
}
@Override
public Struct apply(Struct record) {
// 提取 Key 字段
Object keyValue = record.get(keyField);
// 创建新记录(不含 Key 字段)
Schema newSchema = removeField(record.schema(), keyField);
Struct newRecord = new Struct(newSchema);
for (Field field : newSchema.fields()) {
newRecord.put(field, record.get(field));
}
return newRecord;
}
private Schema removeField(Schema schema, String fieldName) {
SchemaBuilder builder = SchemaBuilder.struct();
for (Field field : schema.fields()) {
if (!field.name().equals(fieldName)) {
builder.field(field.name(), field.schema());
}
}
return builder.build();
}
@Override
public void close() {}
@Override
public ConfigDef config() {
ConfigDef config = new ConfigDef();
config.define("key.field", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Key field to extract");
return config;
}
}
五、打包部署
5.1 Maven 配置
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-connect-my-connector</artifactId>
<version>1.0.0</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<version>3.4.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
5.2 部署步骤
#!/bin/bash
# 部署 Connector 插件
# 1. 打包
mvn clean package
# 2. 复制到插件目录
cp target/kafka-connect-my-connector-1.0.0-jar-with-dependencies.jar \
/usr/share/java/kafka-connect/my-connector/
# 3. 重启 Connect
systemctl restart kafka-connect
# 4. 验证插件
curl http://localhost:8083/connector-plugins | grep MySourceConnector
六、测试验证
6.1 单元测试
public class MySourceTaskTest {
private MySourceTask task;
private MockSourceTaskContext context;
@Before
public void setUp() {
context = new MockSourceTaskContext();
task = new MySourceTask();
task.initialize(context);
}
@Test
public void testPoll() throws Exception {
Map<String, String> props = new HashMap<>();
props.put("database.url", "jdbc:h2:mem:test");
props.put("table.name", "test_table");
task.start(props);
List<SourceRecord> records = task.poll();
assertNotNull(records);
assertTrue(records.size() > 0);
task.stop();
}
}
6.2 集成测试
#!/bin/bash
# 集成测试脚本
# 1. 启动 Kafka Connect
docker-compose up -d kafka-connect
# 2. 注册 Connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.json
# 3. 发送测试数据
# ...
# 4. 验证数据
# ...
# 5. 清理
docker-compose down
七、最佳实践
7.1 开发建议
开发建议:
1. 实现容错机制
2. 配置合理的重试
3. 记录详细日志
4. 实现监控指标
5. 编写完善文档
7.2 性能优化
性能优化:
1. 批量处理数据
2. 使用连接池
3. 异步处理
4. 合理设置批次大小
5. 监控处理延迟
7.3 检查清单
开发检查:
- [ ] 配置类完整
- [ ] Connector 实现正确
- [ ] Task 实现正确
- [ ] 错误处理完善
- [ ] 日志记录完善
- [ ] 单元测试通过
- [ ] 集成测试通过
- [ ] 文档完善
总结
Kafka Connector 插件开发的核心要点:
- Connector 架构:Source、Sink、Task
- Source Connector:配置类、Connector、Task 实现
- Sink Connector:Connector、Task 实现
- Transform:Value Transform、Key Transform
- 打包部署:Maven 配置、部署步骤
- 测试验证:单元测试、集成测试
核心要点:
- 理解 Connector 架构
- 掌握开发流程
- 实现容错机制
- 编写完善测试
- 建立监控体系
参考资料
- Kafka Connect 官方文档
- Kafka Connect Developer Guide
- 《Kafka 权威指南》第 7 章