Skip to content
清晨的一缕阳光
返回

Kafka Connector 插件开发实战

Kafka Connector 插件开发可以扩展 Kafka Connect 的数据集成能力。本文将深入探讨 Source Connector、Sink Connector、Transform 的开发实践。

一、Connector 架构

1.1 核心组件

graph TB
    subgraph Kafka Connect
        CC[Connect Cluster]
        WS[Worker]
    end
    
    subgraph Connector
        SC[Source Connector]
        SKC[Sink Connector]
        ST[Source Task]
        SKT[Sink Task]
    end
    
    subgraph 外部系统
        DB[(数据库)]
        FS[文件系统]
    end
    
    DB --> ST
    ST --> SC
    SC --> WS
    WS --> SKC
    SKC --> SKT
    SKT --> FS

1.2 开发流程

开发流程:
1. 定义配置类
2. 实现 Connector
3. 实现 Task
4. 打包部署
5. 测试验证

二、Source Connector 开发

2.1 配置类

public class MySourceConnectorConfig extends AbstractConfig {
    
    public static final String DATABASE_CONFIG = "database.url";
    public static final String TABLE_CONFIG = "table.name";
    public static final String POLL_INTERVAL_CONFIG = "poll.interval.ms";
    
    private static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define(DATABASE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Database URL")
        .define(TABLE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Table name")
        .define(POLL_INTERVAL_CONFIG, ConfigDef.Type.LONG, 5000, ConfigDef.Importance.MEDIUM, "Poll interval");
    
    public MySourceConnectorConfig(Map<String, String> props) {
        super(CONFIG_DEF, props);
    }
    
    public static ConfigDef config() {
        return CONFIG_DEF;
    }
    
    public String getDatabaseUrl() {
        return getString(DATABASE_CONFIG);
    }
    
    public String getTableName() {
        return getString(TABLE_CONFIG);
    }
    
    public long getPollInterval() {
        return getLong(POLL_INTERVAL_CONFIG);
    }
}

2.2 Connector 实现

public class MySourceConnector extends SourceConnector {
    
    private MySourceConnectorConfig config;
    private Map<String, String> partition;
    
    @Override
    public void start(Map<String, String> props) {
        config = new MySourceConnectorConfig(props);
        partition = Collections.singletonMap("table", config.getTableName());
    }
    
    @Override
    public Class<? extends Task> taskClass() {
        return MySourceTask.class;
    }
    
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            Map<String, String> taskProps = new HashMap<>(config.originals());
            taskConfigs.add(taskProps);
        }
        return taskConfigs;
    }
    
    @Override
    public void stop() {
        // 清理资源
    }
    
    @Override
    public ConfigDef config() {
        return MySourceConnectorConfig.config();
    }
    
    @Override
    public String version() {
        return "1.0.0";
    }
}

2.3 Task 实现

public class MySourceTask extends SourceTask {
    
    private MySourceConnectorConfig config;
    private Connection connection;
    private long lastOffset = 0;
    
    @Override
    public void start(Map<String, String> props) {
        config = new MySourceConnectorConfig(props);
        
        // 恢复偏移量
        Map<String, Object> partition = Collections.singletonMap("table", config.getTableName());
        Map<String, Object> offset = context.offsetStorageReader().offset(partition);
        if (offset != null) {
            lastOffset = (Long) offset.get("offset");
        }
        
        // 建立数据库连接
        try {
            connection = DriverManager.getConnection(config.getDatabaseUrl());
        } catch (SQLException e) {
            throw new RuntimeException("Database connection failed", e);
        }
    }
    
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();
        
        try {
            // 查询新数据
            String sql = "SELECT * FROM " + config.getTableName() + " WHERE id > ? LIMIT 100";
            PreparedStatement stmt = connection.prepareStatement(sql);
            stmt.setLong(1, lastOffset);
            
            ResultSet rs = stmt.executeQuery();
            while (rs.next()) {
                long id = rs.getLong("id");
                String data = rs.getString("data");
                
                // 构建 SourceRecord
                Map<String, Object> sourceOffset = Collections.singletonMap("offset", id);
                
                SourceRecord record = new SourceRecord(
                    Collections.singletonMap("table", config.getTableName()),  // partition
                    sourceOffset,  // offset
                    config.getTableName(),  // topic
                    Schema.STRING_SCHEMA,  // key schema
                    String.valueOf(id),  // key
                    Schema.STRING_SCHEMA,  // value schema
                    data  // value
                );
                
                records.add(record);
                lastOffset = id;
            }
            
        } catch (SQLException e) {
            throw new RuntimeException("Database query failed", e);
        }
        
        return records;
    }
    
    @Override
    public void stop() {
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (SQLException e) {
            // 忽略
        }
    }
}

三、Sink Connector 开发

3.1 Connector 实现

public class MySinkConnector extends SinkConnector {
    
    private MySinkConnectorConfig config;
    
    @Override
    public void start(Map<String, String> props) {
        config = new MySinkConnectorConfig(props);
    }
    
    @Override
    public Class<? extends Task> taskClass() {
        return MySinkTask.class;
    }
    
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            taskConfigs.add(new HashMap<>(config.originals()));
        }
        return taskConfigs;
    }
    
    @Override
    public void stop() {
        // 清理资源
    }
    
    @Override
    public ConfigDef config() {
        return MySinkConnectorConfig.config();
    }
    
    @Override
    public String version() {
        return "1.0.0";
    }
}

3.2 Task 实现

public class MySinkTask extends SinkTask {
    
    private MySinkConnectorConfig config;
    private Connection connection;
    
    @Override
    public void start(Map<String, String> props) {
        config = new MySinkConnectorConfig(props);
        
        // 建立数据库连接
        try {
            connection = DriverManager.getConnection(config.getDatabaseUrl());
        } catch (SQLException e) {
            throw new RuntimeException("Database connection failed", e);
        }
    }
    
    @Override
    public void put(Collection<SinkRecord> records) {
        if (records.isEmpty()) {
            return;
        }
        
        try {
            // 批量插入
            String sql = "INSERT INTO " + config.getTableName() + " (id, data) VALUES (?, ?)";
            PreparedStatement stmt = connection.prepareStatement(sql);
            
            for (SinkRecord record : records) {
                stmt.setLong(1, Long.parseLong((String) record.key()));
                stmt.setString(2, (String) record.value());
                stmt.addBatch();
            }
            
            stmt.executeBatch();
            
        } catch (SQLException e) {
            throw new RuntimeException("Database insert failed", e);
        }
    }
    
    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        // 提交偏移量
    }
    
    @Override
    public void stop() {
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (SQLException e) {
            // 忽略
        }
    }
    
    @Override
    public String version() {
        return "1.0.0";
    }
}

四、Transform 开发

4.1 Value Transform

public class MaskFieldTransform implements Transformation<Struct> {
    
    private String fieldName;
    private int maskLength;
    
    @Override
    public void configure(Map<String, ?> configs) {
        fieldName = (String) configs.get("field.name");
        maskLength = (int) configs.getOrDefault("mask.length", 4);
    }
    
    @Override
    public Struct apply(Struct record) {
        Struct newRecord = new Struct(record.schema());
        
        for (Field field : record.schema().fields()) {
            Object value = record.get(field);
            
            if (field.name().equals(fieldName) && value instanceof String) {
                // 脱敏处理
                value = mask((String) value);
            }
            
            newRecord.put(field, value);
        }
        
        return newRecord;
    }
    
    private String mask(String value) {
        if (value == null || value.length() <= maskLength) {
            return "****";
        }
        return value.substring(0, maskLength) + "****" + 
               value.substring(value.length() - maskLength);
    }
    
    @Override
    public void close() {}
    
    @Override
    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        config.define("field.name", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Field to mask");
        config.define("mask.length", ConfigDef.Type.INT, 4, ConfigDef.Importance.MEDIUM, "Mask length");
        return config;
    }
}

4.2 Key Transform

public class ExtractKeyTransform implements Transformation<Struct> {
    
    private String keyField;
    
    @Override
    public void configure(Map<String, ?> configs) {
        keyField = (String) configs.get("key.field");
    }
    
    @Override
    public Struct apply(Struct record) {
        // 提取 Key 字段
        Object keyValue = record.get(keyField);
        
        // 创建新记录(不含 Key 字段)
        Schema newSchema = removeField(record.schema(), keyField);
        Struct newRecord = new Struct(newSchema);
        
        for (Field field : newSchema.fields()) {
            newRecord.put(field, record.get(field));
        }
        
        return newRecord;
    }
    
    private Schema removeField(Schema schema, String fieldName) {
        SchemaBuilder builder = SchemaBuilder.struct();
        for (Field field : schema.fields()) {
            if (!field.name().equals(fieldName)) {
                builder.field(field.name(), field.schema());
            }
        }
        return builder.build();
    }
    
    @Override
    public void close() {}
    
    @Override
    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        config.define("key.field", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Key field to extract");
        return config;
    }
}

五、打包部署

5.1 Maven 配置

<project>
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>kafka-connect-my-connector</artifactId>
    <version>1.0.0</version>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>3.4.0</version>
            <scope>provided</scope>
        </dependency>
        
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-transforms</artifactId>
            <version>3.4.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

5.2 部署步骤

#!/bin/bash
# 部署 Connector 插件

# 1. 打包
mvn clean package

# 2. 复制到插件目录
cp target/kafka-connect-my-connector-1.0.0-jar-with-dependencies.jar \
   /usr/share/java/kafka-connect/my-connector/

# 3. 重启 Connect
systemctl restart kafka-connect

# 4. 验证插件
curl http://localhost:8083/connector-plugins | grep MySourceConnector

六、测试验证

6.1 单元测试

public class MySourceTaskTest {
    
    private MySourceTask task;
    private MockSourceTaskContext context;
    
    @Before
    public void setUp() {
        context = new MockSourceTaskContext();
        task = new MySourceTask();
        task.initialize(context);
    }
    
    @Test
    public void testPoll() throws Exception {
        Map<String, String> props = new HashMap<>();
        props.put("database.url", "jdbc:h2:mem:test");
        props.put("table.name", "test_table");
        
        task.start(props);
        
        List<SourceRecord> records = task.poll();
        
        assertNotNull(records);
        assertTrue(records.size() > 0);
        
        task.stop();
    }
}

6.2 集成测试

#!/bin/bash
# 集成测试脚本

# 1. 启动 Kafka Connect
docker-compose up -d kafka-connect

# 2. 注册 Connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @connector-config.json

# 3. 发送测试数据
# ...

# 4. 验证数据
# ...

# 5. 清理
docker-compose down

七、最佳实践

7.1 开发建议

开发建议:
1. 实现容错机制
2. 配置合理的重试
3. 记录详细日志
4. 实现监控指标
5. 编写完善文档

7.2 性能优化

性能优化:
1. 批量处理数据
2. 使用连接池
3. 异步处理
4. 合理设置批次大小
5. 监控处理延迟

7.3 检查清单

开发检查:
- [ ] 配置类完整
- [ ] Connector 实现正确
- [ ] Task 实现正确
- [ ] 错误处理完善
- [ ] 日志记录完善
- [ ] 单元测试通过
- [ ] 集成测试通过
- [ ] 文档完善

总结

Kafka Connector 插件开发的核心要点:

  1. Connector 架构:Source、Sink、Task
  2. Source Connector:配置类、Connector、Task 实现
  3. Sink Connector:Connector、Task 实现
  4. Transform:Value Transform、Key Transform
  5. 打包部署:Maven 配置、部署步骤
  6. 测试验证:单元测试、集成测试

核心要点

参考资料


分享这篇文章到:

上一篇文章
Agent 架构设计模式详解
下一篇文章
RocketMQ 故障演练与应急预案实战