Skip to content
清晨的一缕阳光
返回

Kafka Connect 高级应用与自定义开发

Kafka Connect 提供了强大的数据集成能力,支持自定义 Connector 和 Transform。本文将深入探讨 Kafka Connect 的高级应用和自定义开发。

一、自定义 Source Connector

1.1 Connector 配置

public class MySourceConnectorConfig extends AbstractConfig {
    
    public static final String DATABASE_CONFIG = "database.url";
    public static final String TABLE_CONFIG = "table.name";
    public static final String POLL_INTERVAL_CONFIG = "poll.interval.ms";
    
    private static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define(DATABASE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Database URL")
        .define(TABLE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Table name")
        .define(POLL_INTERVAL_CONFIG, ConfigDef.Type.LONG, 5000, ConfigDef.Importance.MEDIUM, "Poll interval");
    
    public MySourceConnectorConfig(Map<String, String> props) {
        super(CONFIG_DEF, props);
    }
    
    public static ConfigDef config() {
        return CONFIG_DEF;
    }
    
    public String getDatabaseUrl() {
        return getString(DATABASE_CONFIG);
    }
    
    public String getTableName() {
        return getString(TABLE_CONFIG);
    }
    
    public long getPollInterval() {
        return getLong(POLL_INTERVAL_CONFIG);
    }
}

1.2 SourceConnector 实现

public class MySourceConnector extends SourceConnector {
    
    private MySourceConnectorConfig config;
    private Map<String, String> partition;
    
    @Override
    public void start(Map<String, String> props) {
        config = new MySourceConnectorConfig(props);
        partition = Collections.singletonMap("table", config.getTableName());
    }
    
    @Override
    public Class<? extends Task> taskClass() {
        return MySourceTask.class;
    }
    
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            Map<String, String> taskProps = new HashMap<>(config.originals());
            taskConfigs.add(taskProps);
        }
        return taskConfigs;
    }
    
    @Override
    public void stop() {
        // 清理资源
    }
    
    @Override
    public ConfigDef config() {
        return MySourceConnectorConfig.config();
    }
}

1.3 SourceTask 实现

public class MySourceTask extends SourceTask {
    
    private MySourceConnectorConfig config;
    private Connection connection;
    private long lastOffset = 0;
    
    @Override
    public void start(Map<String, String> props) {
        config = new MySourceConnectorConfig(props);
        
        // 恢复偏移量
        Map<String, Object> partition = Collections.singletonMap("table", config.getTableName());
        Map<String, Object> offset = context.offsetStorageReader().offset(partition);
        if (offset != null) {
            lastOffset = (Long) offset.get("offset");
        }
        
        // 建立数据库连接
        connection = DriverManager.getConnection(config.getDatabaseUrl());
    }
    
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();
        
        try {
            // 查询新数据
            String sql = "SELECT * FROM " + config.getTableName() + " WHERE id > ? LIMIT 100";
            PreparedStatement stmt = connection.prepareStatement(sql);
            stmt.setLong(1, lastOffset);
            
            ResultSet rs = stmt.executeQuery();
            while (rs.next()) {
                long id = rs.getLong("id");
                String data = rs.getString("data");
                
                // 构建 SourceRecord
                Map<String, Object> sourceOffset = Collections.singletonMap("offset", id);
                
                SourceRecord record = new SourceRecord(
                    Collections.singletonMap("table", config.getTableName()),  // partition
                    sourceOffset,  // offset
                    config.getTableName(),  // topic
                    Schema.STRING_SCHEMA,  // key schema
                    String.valueOf(id),  // key
                    Schema.STRING_SCHEMA,  // value schema
                    data  // value
                );
                
                records.add(record);
                lastOffset = id;
            }
            
        } catch (SQLException e) {
            throw new RuntimeException("Database query failed", e);
        }
        
        return records;
    }
    
    @Override
    public void stop() {
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (SQLException e) {
            // 忽略
        }
    }
}

二、自定义 Sink Connector

2.1 SinkConnector 实现

public class MySinkConnector extends SinkConnector {
    
    private MySinkConnectorConfig config;
    
    @Override
    public void start(Map<String, String> props) {
        config = new MySinkConnectorConfig(props);
    }
    
    @Override
    public Class<? extends Task> taskClass() {
        return MySinkTask.class;
    }
    
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            taskConfigs.add(new HashMap<>(config.originals()));
        }
        return taskConfigs;
    }
    
    @Override
    public void stop() {
        // 清理资源
    }
    
    @Override
    public ConfigDef config() {
        return MySinkConnectorConfig.config();
    }
}

2.2 SinkTask 实现

public class MySinkTask extends SinkTask {
    
    private MySinkConnectorConfig config;
    private Connection connection;
    
    @Override
    public void start(Map<String, String> props) {
        config = new MySinkConnectorConfig(props);
        
        // 建立数据库连接
        try {
            connection = DriverManager.getConnection(config.getDatabaseUrl());
        } catch (SQLException e) {
            throw new RuntimeException("Database connection failed", e);
        }
    }
    
    @Override
    public void put(Collection<SinkRecord> records) {
        if (records.isEmpty()) {
            return;
        }
        
        try {
            // 批量插入
            String sql = "INSERT INTO " + config.getTableName() + " (id, data) VALUES (?, ?)";
            PreparedStatement stmt = connection.prepareStatement(sql);
            
            for (SinkRecord record : records) {
                stmt.setLong(1, Long.parseLong((String) record.key()));
                stmt.setString(2, (String) record.value());
                stmt.addBatch();
            }
            
            stmt.executeBatch();
            
        } catch (SQLException e) {
            throw new RuntimeException("Database insert failed", e);
        }
    }
    
    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        // 提交偏移量
    }
    
    @Override
    public void stop() {
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (SQLException e) {
            // 忽略
        }
    }
}

三、自定义 Transform

3.1 Value Transform

public class MaskFieldTransform implements Transformation<Struct> {
    
    private String fieldName;
    private int maskLength;
    
    @Override
    public void configure(Map<String, ?> configs) {
        fieldName = (String) configs.get("field.name");
        maskLength = (int) configs.getOrDefault("mask.length", 4);
    }
    
    @Override
    public Struct apply(Struct record) {
        Struct newRecord = new Struct(record.schema());
        
        for (Field field : record.schema().fields()) {
            Object value = record.get(field);
            
            if (field.name().equals(fieldName) && value instanceof String) {
                // 脱敏处理
                value = mask((String) value);
            }
            
            newRecord.put(field, value);
        }
        
        return newRecord;
    }
    
    private String mask(String value) {
        if (value == null || value.length() <= maskLength) {
            return "****";
        }
        return value.substring(0, maskLength) + "****" + 
               value.substring(value.length() - maskLength);
    }
    
    @Override
    public void close() {}
    
    @Override
    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        config.define("field.name", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Field to mask");
        config.define("mask.length", ConfigDef.Type.INT, 4, ConfigDef.Importance.MEDIUM, "Mask length");
        return config;
    }
}

3.2 Key Transform

public class ExtractKeyTransform implements Transformation<Struct> {
    
    private String keyField;
    
    @Override
    public void configure(Map<String, ?> configs) {
        keyField = (String) configs.get("key.field");
    }
    
    @Override
    public Struct apply(Struct record) {
        // 提取 Key 字段
        Object keyValue = record.get(keyField);
        
        // 创建新记录(不含 Key 字段)
        Schema newSchema = removeField(record.schema(), keyField);
        Struct newRecord = new Struct(newSchema);
        
        for (Field field : newSchema.fields()) {
            newRecord.put(field, record.get(field));
        }
        
        return newRecord;
    }
    
    private Schema removeField(Schema schema, String fieldName) {
        SchemaBuilder builder = SchemaBuilder.struct();
        for (Field field : schema.fields()) {
            if (!field.name().equals(fieldName)) {
                builder.field(field.name(), field.schema());
            }
        }
        return builder.build();
    }
    
    @Override
    public void close() {}
    
    @Override
    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        config.define("key.field", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Key field to extract");
        return config;
    }
}

四、错误处理

4.1 死信队列配置

{
  "name": "jdbc-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "topics": "my-topic",
    
    // 错误处理配置
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "connect-dlq",
    "errors.deadletterqueue.topic.replication.factor": 3,
    "errors.deadletterqueue.context.headers.enable": true,
    
    // 重试配置
    "errors.retry.timeout": 60000,
    "errors.retry.delay.max.ms": 60000,
    "errors.retry.backoff.ms": 1000
  }
}

4.2 错误处理器

public class CustomErrorHandler implements ErrorHandler {
    
    private final String dlqTopic;
    private final Producer<String, Struct> dlqProducer;
    
    public CustomErrorHandler(String dlqTopic, ProducerFactory factory) {
        this.dlqTopic = dlqTopic;
        this.dlqProducer = factory.createProducer();
    }
    
    @Override
    public void handle(Exception error, SourceRecord record) {
        // 记录错误
        log.error("处理失败:{}", record, error);
        
        // 发送到死信队列
        sendToDlq(record, error);
    }
    
    private void sendToDlq(SourceRecord record, Exception error) {
        ProducerRecord<String, Struct> dlqRecord = new ProducerRecord<>(
            dlqTopic,
            record.key() != null ? record.key().toString() : null,
            (Struct) record.value()
        );
        
        // 添加错误信息到 Header
        dlqRecord.headers().add("error.message", error.getMessage().getBytes());
        dlqRecord.headers().add("error.timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
        
        dlqProducer.send(dlqRecord);
    }
}

五、监控指标

5.1 JMX 指标

public class ConnectorMetrics {
    
    private final MBeanServer mBeanServer;
    private final ObjectName objectName;
    
    public ConnectorMetrics(String connectorName) throws MalformedObjectNameException {
        this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
        this.objectName = new ObjectName("kafka.connect:type=connector-metrics,name=" + connectorName);
    }
    
    public long getRecordsProcessed() {
        try {
            return (Long) mBeanServer.getAttribute(objectName, "records-processed");
        } catch (Exception e) {
            return -1;
        }
    }
    
    public double getProcessingRate() {
        try {
            return (Double) mBeanServer.getAttribute(objectName, "processing-rate");
        } catch (Exception e) {
            return -1;
        }
    }
    
    public long getErrors() {
        try {
            return (Long) mBeanServer.getAttribute(objectName, "errors");
        } catch (Exception e) {
            return -1;
        }
    }
}

5.2 REST API 监控

# 查看 Connector 状态
curl http://localhost:8083/connectors/my-connector/status

# 查看 Connector 配置
curl http://localhost:8083/connectors/my-connector/config

# 查看 Connector 指标
curl http://localhost:8083/connectors/my-connector/metrics

# 重启 Connector
curl -X POST http://localhost:8083/connectors/my-connector/restart

# 暂停 Connector
curl -X PUT http://localhost:8083/connectors/my-connector/pause

# 恢复 Connector
curl -X PUT http://localhost:8083/connectors/my-connector/resume

六、部署运维

6.1 分布式部署

# connect-distributed.properties

# Bootstrap servers
bootstrap.servers=localhost:9092

# Cluster settings
group.id=connect-cluster
config.storage.topic=connect-configs
config.storage.replication.factor=3
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3

# REST API
rest.port=8083
rest.advertised.host.name=connect-1
rest.advertised.port=8083

# Plugin path
plugin.path=/usr/share/java,/usr/share/confluent-hub-components

6.2 插件管理

# 安装插件
confluent-hub install confluentinc/kafka-connect-jdbc:10.0.0

# 列出已安装插件
confluent-hub list

# 卸载插件
confluent-hub uninstall confluentinc/kafka-connect-jdbc

七、最佳实践

7.1 配置建议

配置建议:
1. 使用分布式模式部署
2. 配置足够的副本因子
3. 启用错误处理和死信队列
4. 配置合理的重试策略
5. 监控关键指标

7.2 性能优化

性能优化:
1. 调整 batch.size
2. 配置适当的 linger.ms
3. 启用压缩
4. 优化数据库连接池
5. 监控处理延迟

7.3 检查清单

部署检查:
- [ ] 配置分布式模式
- [ ] 配置副本因子
- [ ] 配置错误处理
- [ ] 配置监控告警
- [ ] 测试故障恢复

总结

Kafka Connect 高级应用的核心要点:

  1. 自定义 Connector:SourceConnector、SinkConnector、Task 实现
  2. 自定义 Transform:Value Transform、Key Transform
  3. 错误处理:死信队列、重试机制、错误处理器
  4. 监控指标:JMX 指标、REST API
  5. 部署运维:分布式部署、插件管理

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 运维脚本与工具集
下一篇文章
Kafka Controller 控制器详解与高可用