Skip to content
清晨的一缕阳光
返回

Kafka 生态工具全景图

Kafka 生态系统拥有丰富的工具链,涵盖监控、管理、开发、运维等各个方面。本文将全面介绍 Kafka 生态中的常用工具。

一、监控工具

1.1 Prometheus + Grafana

架构

graph TB
    subgraph Kafka 集群
        B1[Broker 1]
        B2[Broker 2]
    end
    
    subgraph 监控采集
        JMX[JMX Exporter]
        PROM[Prometheus]
    end
    
    subgraph 展示
        GRAF[Grafana]
    end
    
    B1 --> JMX
    B2 --> JMX
    JMX --> PROM
    PROM --> GRAF

配置

# prometheus.yml
scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka-1:9090', 'kafka-2:9090']
    metrics_path: '/metrics'

# grafana-dashboard.json
{
  "dashboard": {
    "title": "Kafka 监控",
    "panels": [
      {
        "title": "消息吞吐量",
        "targets": [{
          "expr": "sum(rate(kafka_server_BrokerTopicMetrics_MessagesInPerSec[1m]))"
        }]
      }
    ]
  }
}

1.2 Kafka Manager

功能

部署

# Docker 部署
docker run -d \
  -p 9000:9000 \
  -e ZK_HOSTS=zk1:2181,zk2:2181 \
  -e KM_ARGS="-Dhttp.port=9000" \
  hlebalbauk/kafka-manager

界面

1.3 Kafka Eagle

功能

部署

# 下载安装
wget https://github.com/smartloli/kafka-eagle/releases/download/v3.0.1/kafka-eagle-web-3.0.1-bin.tar.gz
tar -xzf kafka-eagle-web-3.0.1-bin.tar.gz

# 配置
cat > $KE_HOME/conf/system-conf.properties << EOF
ke.zk.acl=false
ke.zk.limit=100
ke.zk.url=192.168.1.100:2181
EOF

# 启动
$KE_HOME/bin/ke.sh start

二、管理工具

2.1 官方 CLI 工具

# Topic 管理
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic my-topic --partitions 3 --replication-factor 2

# 查看 Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic my-topic

# 删除 Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
  --delete --topic my-topic

# 消费组管理
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-group

# 重置偏移量
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group --topic my-topic --reset-offsets --to-latest --execute

# 消息查询
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic my-topic --from-beginning --max-messages 10

# 分区重分配
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassign.json --execute

2.2 kafkactl

安装

# macOS
brew install deviceinsight/kafkactl/kafkactl

# Linux
wget https://github.com/deviceinsight/kafkactl/releases/download/v1.20.0/kafkactl_1.20.0_linux_amd64.tar.gz
tar -xzf kafkactl_1.20.0_linux_amd64.tar.gz
sudo mv kafkactl /usr/local/bin/

使用

# 配置
kafkactl config use-context my-cluster

# 查看 Topics
kafkactl get topics

# 查看 Topic 详情
kafkactl describe topic my-topic

# 消费消息
kafkactl consume my-topic --from-beginning --max-messages 10

# 生产消息
echo "message" | kafkactl produce my-topic

# 查看 Consumer Groups
kafkactl get consumer-groups

# 重置偏移量
kafkactl alter consumer-group my-group --topic my-topic --reset --to-latest

2.3 kcat (原 kafkacat)

安装

# macOS
brew install kcat

# Linux
apt-get install kcat

使用

# 消费模式
kcat -b localhost:9092 -t my-topic -C

# 生产模式
echo "message" | kcat -b localhost:9092 -t my-topic -P

# 查看分区
kcat -b localhost:9092 -t my-topic -L

# 指定偏移量
kcat -b localhost:9092 -t my-topic -C -o 100

# 使用 Key
echo "key:message" | kcat -b localhost:9092 -t my-topic -P -K :

# JSON 格式
kcat -b localhost:9092 -t my-topic -C -o -1 -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n'

三、开发工具

3.1 Offset Explorer

功能

下载

https://www.kafkatool.com/download.html

界面

3.2 Conduktor

功能

部署

# Docker
docker run -d \
  -p 8080:8080 \
  -e CDK_SERVER_PORT=8080 \
  conduktor/conduktor

3.3 开发库

Java

<!-- 官方客户端 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

<!-- Spring Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.0</version>
</dependency>

Python

# confluent-kafka
pip install confluent-kafka

# kafka-python
pip install kafka-python

# aiokafka (异步)
pip install aiokafka

Go

// sarama
go get github.com/Shopify/sarama

// segmentio/kafka-go
go get github.com/segmentio/kafka-go

四、数据集成

4.1 Kafka Connect

部署

# Standalone 模式
$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  jdbc-source.properties

# Distributed 模式
$CONFLUENT_HOME/bin/connect-distributed \
  $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

常用 Connector

Connector说明
JDBC Source数据库→Kafka
JDBC SinkKafka→数据库
S3 SinkKafka→S3
Elasticsearch SinkKafka→ES
HDFS SinkKafka→HDFS

4.2 Debezium

功能

部署

# Docker
docker run -d \
  -p 8083:8083 \
  -e GROUP_ID=1 \
  -e CONFIG_STORAGE_TOPIC=connect_configs \
  -e OFFSET_STORAGE_TOPIC=connect_offsets \
  -e STATUS_STORAGE_TOPIC=connect_statuses \
  debezium/connect:2.3

# 注册 Connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-connector",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "mysql",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "password",
      "database.server.id": "184054",
      "database.server.name": "mysql-server",
      "database.include.list": "mydb"
    }
  }'

功能

示例

// Flink Kafka Consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "flink-consumer");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
    "my-topic",
    new SimpleStringSchema(),
    props);

DataStream<String> stream = env.addSource(consumer);

// 处理
stream.map(value -> process(value))
    .addSink(new FlinkKafkaProducer<>("output-topic", ...));

五、运维工具

5.1 Cruise Control

功能

部署

# Docker
docker run -d \
  -p 9090:9090 \
  -e KAFKA_BROKER_LIST=kafka-1:9092,kafka-2:9092 \
  linkedin/cruise-control

# REST API
# 查看集群状态
curl -X GET http://localhost:9090/kafkacruisecontrol/state

# 生成重分配方案
curl -X POST http://localhost:9090/kafkacruisecontrol/rebalance

# 执行重分配
curl -X POST http://localhost:9090/kafkacruisecontrol/rebalance?dryrun=false

5.2 Burrow

功能

配置

[general]
pidfile=burrow.pid
stderr-log=true
stdout-log=false
syslog-log=false

[cluster.local]
class-name=kafka
servers=kafka-1:9092,kafka-2:9092
topic-refresh=60
offset-refresh=10

[consumer.local-consumer]
class-name=kafka
cluster=local
group=local-consumer
servers=kafka-1:9092,kafka-2:9092

5.3 Kafka Lag Exporter

功能

部署

# Docker
docker run -d \
  -p 9999:9999 \
  -e KAFKA_BROKERS=kafka-1:9092,kafka-2:9092 \
  inguardians/kafka-lag-exporter

# Prometheus 配置
scrape_configs:
  - job_name: 'kafka-lag'
    static_configs:
      - targets: ['kafka-lag-exporter:9999']

六、性能测试

6.1 官方基准测试

# Producer 测试
kafka-producer-perf-test.sh \
  --topic test-topic \
  --num-records 1000000 \
  --record-size 1024 \
  --throughput 10000 \
  --producer-props bootstrap.servers=localhost:9092

# Consumer 测试
kafka-consumer-perf-test.sh \
  --topic test-topic \
  --messages 1000000 \
  --bootstrap-server localhost:9092 \
  --group test-group

6.2 k6

功能

示例

// k6 脚本
import { check } from 'k6';
import kafka from 'k6/x/kafka';

export const options = {
  vus: 10,
  duration: '1m',
};

export default function () {
  const message = {
    topic: 'test-topic',
    message: 'test message',
  };
  
  const result = kafka.produce([message]);
  check(result, {
    'produce success': (r) => r.success,
  });
}

七、安全工具

7.1 ACL 管理

# 添加 ACL
kafka-acls.sh --bootstrap-server localhost:9092 \
  --add \
  --allow-principal User:alice \
  --operation Read --operation Write \
  --topic my-topic

# 查看 ACL
kafka-acls.sh --bootstrap-server localhost:9092 \
  --list

# 删除 ACL
kafka-acls.sh --bootstrap-server localhost:9092 \
  --remove \
  --allow-principal User:alice \
  --operation Read \
  --topic my-topic

7.2 证书管理

# 生成证书
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

# 生成密钥库
keytool -keystore kafka.server.keystore.jks -alias localhost -genkey

# 签名证书
openssl x509 -req -CA ca-cert -CAkey ca-key \
  -in kafka-server-signing-request.crt \
  -out kafka-server-signed.crt -days 365

八、工具对比

8.1 监控工具对比

工具优点缺点适用场景
Prometheus灵活、生态好配置复杂生产环境
Kafka Manager功能全、图形化较重管理需求
Kafka Eagle轻量、SQL 查询功能有限快速部署

8.2 CLI 工具对比

工具语言特点适用场景
官方 CLIShell功能最全运维操作
kafkactlGo现代化日常使用
kcatC轻量快速脚本集成

九、最佳实践

9.1 工具选择

graph TD
    A[需求分析] --> B{监控需求?}
    B -->|是 | C[Prometheus+Grafana]
    B -->|否 | D{管理需求?}
    
    D -->|是 | E[Kafka Manager/Eagle]
    D -->|否 | F{开发需求?}
    
    F -->|是 | G[Offset Explorer]
    F -->|否 | H{运维需求?}
    
    H -->|是 | I[Cruise Control]
    H -->|否 | J[官方 CLI]

9.2 工具栈推荐

小型集群

- 监控:Kafka Eagle
- 管理:官方 CLI
- 开发:Offset Explorer

中型集群

- 监控:Prometheus + Grafana
- 管理:Kafka Manager
- 开发:kafkactl
- 运维:Cruise Control

大型集群

- 监控:Prometheus + Grafana + Burrow
- 管理:Kafka Manager + 自研平台
- 开发:Conduktor + 自研工具
- 运维:Cruise Control + 自动化脚本
- 集成:Kafka Connect + Debezium + Flink

总结

Kafka 生态工具的核心要点:

  1. 监控工具:Prometheus、Kafka Manager、Kafka Eagle
  2. 管理工具:官方 CLI、kafkactl、kcat
  3. 开发工具:Offset Explorer、Conduktor、客户端库
  4. 数据集成:Kafka Connect、Debezium、Flink
  5. 运维工具:Cruise Control、Burrow、Lag Exporter

核心要点

参考资料


分享这篇文章到:

上一篇文章
2025年互联网行业 “寒潮” 持续:大厂裁员背后的6大核心原因?
下一篇文章
RocketMQ 生产实践案例精选