Kafka 运维自动化是提升运维效率的关键。本文将深入探讨 Ansible、Kubernetes、CI/CD、监控告警等自动化运维实践。
一、Ansible 自动化
1.1 Ansible 配置
Inventory 配置:
# inventory.yml
all:
children:
zookeeper:
hosts:
zk1:
ansible_host: 192.168.1.10
zk2:
ansible_host: 192.168.1.11
zk3:
ansible_host: 192.168.1.12
kafka:
hosts:
kafka1:
ansible_host: 192.168.1.20
broker_id: 1
kafka2:
ansible_host: 192.168.1.21
broker_id: 2
kafka3:
ansible_host: 192.168.1.22
broker_id: 3
1.2 部署 Playbook
# deploy-kafka.yml
- name: Deploy Kafka Cluster
hosts: kafka
become: yes
vars:
kafka_version: "3.4.0"
kafka_install_dir: /opt/kafka
kafka_data_dir: /data/kafka-logs
zookeeper_connect: "zk1:2181,zk2:2181,zk3:2181"
tasks:
- name: Install Java
yum:
name: java-11-openjdk
state: present
- name: Download Kafka
get_url:
url: "https://archive.apache.org/dist/kafka/{{ kafka_version }}/kafka_2.13-{{ kafka_version }}.tgz"
dest: /tmp/kafka.tgz
- name: Extract Kafka
unarchive:
src: /tmp/kafka.tgz
dest: /opt/
remote_src: yes
- name: Create Kafka directory
file:
path: "{{ kafka_install_dir }}"
state: directory
owner: kafka
group: kafka
- name: Configure Kafka
template:
src: server.properties.j2
dest: "{{ kafka_install_dir }}/config/server.properties"
vars:
broker_id: "{{ broker_id }}"
zookeeper_connect: "{{ zookeeper_connect }}"
log_dirs: "{{ kafka_data_dir }}"
- name: Create systemd service
template:
src: kafka.service.j2
dest: /etc/systemd/system/kafka.service
- name: Start Kafka
systemd:
name: kafka
enabled: yes
state: started
1.3 配置模板
# server.properties.j2
broker.id={{ broker_id }}
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://{{ ansible_host }}:9092
log.dirs={{ log_dirs }}
zookeeper.connect={{ zookeeper_connect }}
num.partitions=8
default.replication.factor=3
min.insync.replicas=2
二、Kubernetes 部署
2.1 Helm Chart
# Chart.yaml
apiVersion: v2
name: kafka
description: A Helm chart for Kafka
type: application
version: 1.0.0
appVersion: "3.4.0"
2.2 StatefulSet
# templates/statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:7.4.0
ports:
- containerPort: 9092
env:
- name: KAFKA_BROKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://$(POD_NAME).kafka-headless.default.svc.cluster.local:9092"
volumeMounts:
- name: data
mountPath: /var/lib/kafka/data
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
2.3 自动扩缩容
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: kafka-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: StatefulSet
name: kafka
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
三、CI/CD 流水线
3.1 GitLab CI
# .gitlab-ci.yml
stages:
- test
- build
- deploy
test:
stage: test
script:
- docker-compose up -d zookeeper kafka
- sleep 30
- ./run-tests.sh
artifacts:
reports:
junit: test-results.xml
build:
stage: build
script:
- docker build -t myapp:kafka-$CI_COMMIT_SHA .
- docker push myapp:kafka-$CI_COMMIT_SHA
deploy:
stage: deploy
script:
- kubectl set image deployment/myapp myapp=myapp:kafka-$CI_COMMIT_SHA
only:
- main
3.2 Jenkins Pipeline
// Jenkinsfile
pipeline {
agent any
stages {
stage('Test') {
steps {
sh 'docker-compose up -d zookeeper kafka'
sh 'sleep 30'
sh './run-tests.sh'
}
}
stage('Build') {
steps {
sh 'docker build -t myapp:kafka-${BUILD_NUMBER} .'
sh 'docker push myapp:kafka-${BUILD_NUMBER}'
}
}
stage('Deploy') {
steps {
sh 'kubectl set image deployment/myapp myapp=myapp:kafka-${BUILD_NUMBER}'
}
}
}
post {
always {
sh 'docker-compose down'
}
}
}
四、监控告警
4.1 Prometheus 配置
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka-1:9090', 'kafka-2:9090', 'kafka-3:9090']
metrics_path: '/metrics'
- job_name: 'zookeeper'
static_configs:
- targets: ['zk1:2181', 'zk2:2181', 'zk3:2181']
4.2 告警规则
# alerting_rules.yml
groups:
- name: kafka
rules:
- alert: KafkaBrokerDown
expr: up{job="kafka"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Broker 宕机:{{ $labels.instance }}"
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
for: 5m
labels:
severity: warning
annotations:
summary: "存在未同步副本"
- alert: KafkaConsumerLag
expr: kafka_consumer_group_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "消费滞后:{{ $labels.group }}"
4.3 Grafana 仪表盘
{
"dashboard": {
"title": "Kafka 监控",
"panels": [
{
"title": "消息吞吐量",
"targets": [{
"expr": "sum(rate(kafka_server_BrokerTopicMetrics_MessagesInPerSec[1m]))"
}]
},
{
"title": "Consumer Lag",
"targets": [{
"expr": "sum(kafka_consumer_group_lag) by (group)"
}]
},
{
"title": "磁盘使用率",
"targets": [{
"expr": "kafka_log_Log_Size"
}]
}
]
}
}
五、日志管理
5.1 ELK 配置
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/kafka/*.log
fields:
service: kafka
output.elasticsearch:
hosts: ["elasticsearch:9200"]
indices:
- index: "kafka-%{+yyyy.MM.dd}"
5.2 日志分析
#!/bin/bash
# 日志分析脚本
LOG_FILE="/var/log/kafka/server.log"
echo "=== 错误统计 ==="
grep -i "error" $LOG_FILE | awk '{print $NF}' | sort | uniq -c | sort -rn
echo -e "\n=== 警告统计 ==="
grep -i "warn" $LOG_FILE | awk '{print $NF}' | sort | uniq -c | sort -rn
echo -e "\n=== 最近错误 ==="
grep -i "error" $LOG_FILE | tail -20
六、备份恢复
6.1 配置备份
#!/bin/bash
# 配置备份脚本
BACKUP_DIR="/backup/kafka"
DATE=$(date +%Y%m%d)
# 备份配置
scp kafka-1:/opt/kafka/config/server.properties $BACKUP_DIR/kafka-1_$DATE.properties
scp kafka-2:/opt/kafka/config/server.properties $BACKUP_DIR/kafka-2_$DATE.properties
scp kafka-3:/opt/kafka/config/server.properties $BACKUP_DIR/kafka-3_$DATE.properties
# 备份 Topic 配置
kafka-topics.sh --bootstrap-server localhost:9092 --describe > $BACKUP_DIR/topics_$DATE.txt
# 备份消费组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe > $BACKUP_DIR/consumers_$DATE.txt
# 保留 30 天
find $BACKUP_DIR -name "*.txt" -mtime +30 -delete
find $BACKUP_DIR -name "*.properties" -mtime +30 -delete
6.2 数据恢复
#!/bin/bash
# 数据恢复脚本
BACKUP_DATE=$1
if [ -z "$BACKUP_DATE" ]; then
echo "用法:$0 <backup_date>"
exit 1
fi
# 恢复配置
scp $BACKUP_DIR/kafka-1_$BACKUP_DATE.properties kafka-1:/opt/kafka/config/server.properties
scp $BACKUP_DIR/kafka-2_$BACKUP_DATE.properties kafka-2:/opt/kafka/config/server.properties
scp $BACKUP_DIR/kafka-3_$BACKUP_DATE.properties kafka-3:/opt/kafka/config/server.properties
# 重启 Broker
for broker in kafka-1 kafka-2 kafka-3; do
ssh $broker "systemctl restart kafka"
done
echo "恢复完成"
七、最佳实践
7.1 自动化建议
自动化建议:
1. 使用 Ansible 部署集群
2. 使用 Kubernetes 编排
3. 配置 CI/CD 流水线
4. 建立监控告警体系
5. 定期备份配置
7.2 运维检查清单
日常检查:
- [ ] 检查 Broker 状态
- [ ] 检查 Consumer Lag
- [ ] 检查磁盘使用率
- [ ] 检查错误日志
- [ ] 检查监控指标
定期检查:
- [ ] 备份配置
- [ ] 性能基准测试
- [ ] 故障演练
- [ ] 容量评估
总结
Kafka 运维自动化的核心要点:
- Ansible:自动化部署、配置管理
- Kubernetes:容器编排、自动扩缩容
- CI/CD:自动化测试、持续部署
- 监控告警:Prometheus、Grafana、告警规则
- 日志管理:ELK、日志分析
- 备份恢复:配置备份、数据恢复
核心要点:
- 使用自动化工具提升效率
- 建立完善的监控体系
- 定期备份配置和数据
- 实现 CI/CD 流水线
- 建立运维检查清单