Skip to content
清晨的一缕阳光
返回

Kafka 运维自动化与 DevOps 实践

Kafka 运维自动化是提升运维效率的关键。本文将深入探讨 Ansible、Kubernetes、CI/CD、监控告警等自动化运维实践。

一、Ansible 自动化

1.1 Ansible 配置

Inventory 配置

# inventory.yml
all:
  children:
    zookeeper:
      hosts:
        zk1:
          ansible_host: 192.168.1.10
        zk2:
          ansible_host: 192.168.1.11
        zk3:
          ansible_host: 192.168.1.12
    
    kafka:
      hosts:
        kafka1:
          ansible_host: 192.168.1.20
          broker_id: 1
        kafka2:
          ansible_host: 192.168.1.21
          broker_id: 2
        kafka3:
          ansible_host: 192.168.1.22
          broker_id: 3

1.2 部署 Playbook

# deploy-kafka.yml
- name: Deploy Kafka Cluster
  hosts: kafka
  become: yes
  vars:
    kafka_version: "3.4.0"
    kafka_install_dir: /opt/kafka
    kafka_data_dir: /data/kafka-logs
    zookeeper_connect: "zk1:2181,zk2:2181,zk3:2181"
  
  tasks:
    - name: Install Java
      yum:
        name: java-11-openjdk
        state: present
    
    - name: Download Kafka
      get_url:
        url: "https://archive.apache.org/dist/kafka/{{ kafka_version }}/kafka_2.13-{{ kafka_version }}.tgz"
        dest: /tmp/kafka.tgz
    
    - name: Extract Kafka
      unarchive:
        src: /tmp/kafka.tgz
        dest: /opt/
        remote_src: yes
    
    - name: Create Kafka directory
      file:
        path: "{{ kafka_install_dir }}"
        state: directory
        owner: kafka
        group: kafka
    
    - name: Configure Kafka
      template:
        src: server.properties.j2
        dest: "{{ kafka_install_dir }}/config/server.properties"
      vars:
        broker_id: "{{ broker_id }}"
        zookeeper_connect: "{{ zookeeper_connect }}"
        log_dirs: "{{ kafka_data_dir }}"
    
    - name: Create systemd service
      template:
        src: kafka.service.j2
        dest: /etc/systemd/system/kafka.service
    
    - name: Start Kafka
      systemd:
        name: kafka
        enabled: yes
        state: started

1.3 配置模板

# server.properties.j2
broker.id={{ broker_id }}
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://{{ ansible_host }}:9092
log.dirs={{ log_dirs }}
zookeeper.connect={{ zookeeper_connect }}
num.partitions=8
default.replication.factor=3
min.insync.replicas=2

二、Kubernetes 部署

2.1 Helm Chart

# Chart.yaml
apiVersion: v2
name: kafka
description: A Helm chart for Kafka
type: application
version: 1.0.0
appVersion: "3.4.0"

2.2 StatefulSet

# templates/statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:7.4.0
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_BROKER_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: "zookeeper:2181"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "PLAINTEXT://$(POD_NAME).kafka-headless.default.svc.cluster.local:9092"
        volumeMounts:
        - name: data
          mountPath: /var/lib/kafka/data
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 100Gi

2.3 自动扩缩容

# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: kafka-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: StatefulSet
    name: kafka
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

三、CI/CD 流水线

3.1 GitLab CI

# .gitlab-ci.yml
stages:
  - test
  - build
  - deploy

test:
  stage: test
  script:
    - docker-compose up -d zookeeper kafka
    - sleep 30
    - ./run-tests.sh
  artifacts:
    reports:
      junit: test-results.xml

build:
  stage: build
  script:
    - docker build -t myapp:kafka-$CI_COMMIT_SHA .
    - docker push myapp:kafka-$CI_COMMIT_SHA

deploy:
  stage: deploy
  script:
    - kubectl set image deployment/myapp myapp=myapp:kafka-$CI_COMMIT_SHA
  only:
    - main

3.2 Jenkins Pipeline

// Jenkinsfile
pipeline {
    agent any
    
    stages {
        stage('Test') {
            steps {
                sh 'docker-compose up -d zookeeper kafka'
                sh 'sleep 30'
                sh './run-tests.sh'
            }
        }
        
        stage('Build') {
            steps {
                sh 'docker build -t myapp:kafka-${BUILD_NUMBER} .'
                sh 'docker push myapp:kafka-${BUILD_NUMBER}'
            }
        }
        
        stage('Deploy') {
            steps {
                sh 'kubectl set image deployment/myapp myapp=myapp:kafka-${BUILD_NUMBER}'
            }
        }
    }
    
    post {
        always {
            sh 'docker-compose down'
        }
    }
}

四、监控告警

4.1 Prometheus 配置

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka-1:9090', 'kafka-2:9090', 'kafka-3:9090']
    metrics_path: '/metrics'
  
  - job_name: 'zookeeper'
    static_configs:
      - targets: ['zk1:2181', 'zk2:2181', 'zk3:2181']

4.2 告警规则

# alerting_rules.yml
groups:
  - name: kafka
    rules:
      - alert: KafkaBrokerDown
        expr: up{job="kafka"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Broker 宕机:{{ $labels.instance }}"
      
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "存在未同步副本"
      
      - alert: KafkaConsumerLag
        expr: kafka_consumer_group_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消费滞后:{{ $labels.group }}"

4.3 Grafana 仪表盘

{
  "dashboard": {
    "title": "Kafka 监控",
    "panels": [
      {
        "title": "消息吞吐量",
        "targets": [{
          "expr": "sum(rate(kafka_server_BrokerTopicMetrics_MessagesInPerSec[1m]))"
        }]
      },
      {
        "title": "Consumer Lag",
        "targets": [{
          "expr": "sum(kafka_consumer_group_lag) by (group)"
        }]
      },
      {
        "title": "磁盘使用率",
        "targets": [{
          "expr": "kafka_log_Log_Size"
        }]
      }
    ]
  }
}

五、日志管理

5.1 ELK 配置

# filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/kafka/*.log
  fields:
    service: kafka

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  indices:
    - index: "kafka-%{+yyyy.MM.dd}"

5.2 日志分析

#!/bin/bash
# 日志分析脚本

LOG_FILE="/var/log/kafka/server.log"

echo "=== 错误统计 ==="
grep -i "error" $LOG_FILE | awk '{print $NF}' | sort | uniq -c | sort -rn

echo -e "\n=== 警告统计 ==="
grep -i "warn" $LOG_FILE | awk '{print $NF}' | sort | uniq -c | sort -rn

echo -e "\n=== 最近错误 ==="
grep -i "error" $LOG_FILE | tail -20

六、备份恢复

6.1 配置备份

#!/bin/bash
# 配置备份脚本

BACKUP_DIR="/backup/kafka"
DATE=$(date +%Y%m%d)

# 备份配置
scp kafka-1:/opt/kafka/config/server.properties $BACKUP_DIR/kafka-1_$DATE.properties
scp kafka-2:/opt/kafka/config/server.properties $BACKUP_DIR/kafka-2_$DATE.properties
scp kafka-3:/opt/kafka/config/server.properties $BACKUP_DIR/kafka-3_$DATE.properties

# 备份 Topic 配置
kafka-topics.sh --bootstrap-server localhost:9092 --describe > $BACKUP_DIR/topics_$DATE.txt

# 备份消费组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe > $BACKUP_DIR/consumers_$DATE.txt

# 保留 30 天
find $BACKUP_DIR -name "*.txt" -mtime +30 -delete
find $BACKUP_DIR -name "*.properties" -mtime +30 -delete

6.2 数据恢复

#!/bin/bash
# 数据恢复脚本

BACKUP_DATE=$1

if [ -z "$BACKUP_DATE" ]; then
    echo "用法:$0 <backup_date>"
    exit 1
fi

# 恢复配置
scp $BACKUP_DIR/kafka-1_$BACKUP_DATE.properties kafka-1:/opt/kafka/config/server.properties
scp $BACKUP_DIR/kafka-2_$BACKUP_DATE.properties kafka-2:/opt/kafka/config/server.properties
scp $BACKUP_DIR/kafka-3_$BACKUP_DATE.properties kafka-3:/opt/kafka/config/server.properties

# 重启 Broker
for broker in kafka-1 kafka-2 kafka-3; do
    ssh $broker "systemctl restart kafka"
done

echo "恢复完成"

七、最佳实践

7.1 自动化建议

自动化建议:
1. 使用 Ansible 部署集群
2. 使用 Kubernetes 编排
3. 配置 CI/CD 流水线
4. 建立监控告警体系
5. 定期备份配置

7.2 运维检查清单

日常检查:
- [ ] 检查 Broker 状态
- [ ] 检查 Consumer Lag
- [ ] 检查磁盘使用率
- [ ] 检查错误日志
- [ ] 检查监控指标

定期检查:
- [ ] 备份配置
- [ ] 性能基准测试
- [ ] 故障演练
- [ ] 容量评估

总结

Kafka 运维自动化的核心要点:

  1. Ansible:自动化部署、配置管理
  2. Kubernetes:容器编排、自动扩缩容
  3. CI/CD:自动化测试、持续部署
  4. 监控告警:Prometheus、Grafana、告警规则
  5. 日志管理:ELK、日志分析
  6. 备份恢复:配置备份、数据恢复

核心要点

参考资料


分享这篇文章到:

上一篇文章
Redis 数据恢复与迁移实战
下一篇文章
Kafka 精确一次语义(EOS)详解