Skip to content
清晨的一缕阳光
返回

Kafka 安全加固与权限管理实战

Kafka 安全性是企业级应用的核心要求。本文将深入探讨 Kafka 安全加固的完整方案,包括加密、认证、授权、审计等各个方面。

一、安全架构

1.1 安全层次

graph TB
    subgraph 网络安全
        SSL[SSL/TLS 加密]
        FIREWALL[防火墙]
    end
    
    subgraph 认证层
        SASL[SASL 认证]
        KERBEROS[Kerberos]
        OAUTH[OAuth2]
    end
    
    subgraph 授权层
        ACL[ACL 授权]
        RBAC[基于角色访问控制]
    end
    
    subgraph 审计层
        AUDIT[审计日志]
        MONITOR[监控告警]
    end
    
    SSL --> SASL
    SASL --> ACL
    ACL --> AUDIT

1.2 安全协议对比

协议加密认证复杂度适用场景
PLAINTEXT内网测试
SSL数据传输加密
SASL/PLAIN基础认证
SASL/SCRAM推荐默认
SASL/GSSAPI企业级 Kerberos
SASL/OAUTH云原生场景

二、SSL/TLS 加密

2.1 证书生成

生成 CA 证书

#!/bin/bash
# 生成 CA 证书
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
  -subj "/C=CN/ST=Beijing/L=Beijing/O=MyCompany/CN=my-ca"

# 设置权限
chmod 400 ca-key

生成 Broker 密钥库

#!/bin/bash
# 生成 Broker 密钥库
keytool -keystore /var/ssl/private/kafka.server.keystore.jks \
  -alias localhost -validity 365 -genkey -keyalg RSA \
  -dname "CN=kafka-broker-1, OU=IT, O=MyCompany, L=Beijing, ST=Beijing, C=CN"

# 生成 CSR
keytool -keystore /var/ssl/private/kafka.server.keystore.jks \
  -alias localhost -certreq \
  -file /tmp/kafka-server-signing-request.crt

# CA 签名
openssl x509 -req -CA /tmp/ca-cert -CAkey /tmp/ca-key \
  -in /tmp/kafka-server-signing-request.crt \
  -out /tmp/kafka-server-signed.crt -days 365 -CAcreateserial

# 导入证书
keytool -keystore /var/ssl/private/kafka.server.keystore.jks \
  -alias CARoot -import -file /tmp/ca-cert

keytool -keystore /var/ssl/private/kafka.server.keystore.jks \
  -alias localhost -import -file /tmp/kafka-server-signed.crt

# 生成信任库
keytool -keystore /var/ssl/private/kafka.server.truststore.jks \
  -alias CARoot -import -file /tmp/ca-cert

2.2 Broker SSL 配置

# server.properties

# SSL 监听器
listeners=SSL://:9093
advertised.listeners=SSL://kafka-broker-1:9093
listener.security.protocol.map=SSL:SSL

# SSL 配置
ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
ssl.truststore.password=truststore-password

# 客户端认证(双向认证)
ssl.client.auth=required

# SSL 协议
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.cipher.suites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256

2.3 Client SSL 配置

Producer 配置

Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker-1:9093");
props.put("security.protocol", "SSL");

// SSL 配置
props.put("ssl.keystore.location", "/var/ssl/private/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");
props.put("ssl.truststore.location", "/var/ssl/private/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

// 禁用主机名验证(测试用)
props.put("ssl.endpoint.identification.algorithm", "");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Consumer 配置

Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker-1:9093");
props.put("group.id", "ssl-consumer-group");
props.put("security.protocol", "SSL");

// SSL 配置(同 Producer)
props.put("ssl.keystore.location", "/var/ssl/private/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");
props.put("ssl.truststore.location", "/var/ssl/private/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

三、SASL 认证

3.1 SCRAM 配置

Broker 配置

# server.properties

# 监听器配置
listeners=SASL_SSL://:9094
advertised.listeners=SASL_SSL://kafka-broker-1:9094
listener.security.protocol.map=SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_SSL

# SASL 配置
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

# JAAS 配置(Broker 间认证)
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" \
  password="admin-password";

创建用户

#!/bin/bash
# 使用 kafka-configs.sh 创建用户

# 创建普通用户
kafka-configs.sh --bootstrap-server localhost:9093 \
  --command-config /etc/kafka/client-ssl.properties \
  --alter --add-config 'SCRAM-SHA-256=[password=user-password]' \
  --entity-type users --entity-name alice

# 创建管理员用户
kafka-configs.sh --bootstrap-server localhost:9093 \
  --command-config /etc/kafka/client-ssl.properties \
  --alter --add-config 'SCRAM-SHA-256=[password=admin-password]' \
  --entity-type users --entity-name admin

# 查看用户
kafka-configs.sh --bootstrap-server localhost:9093 \
  --command-config /etc/kafka/client-ssl.properties \
  --describe --entity-type users --entity-name alice

Client 配置

Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker-1:9094");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");

// SASL 配置
props.put("sasl.jaas.config", 
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"alice\" " +
    "password=\"user-password\";");

// SSL 配置
props.put("ssl.truststore.location", "/var/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

3.2 Kerberos 配置

KDC 配置

#!/bin/bash
# 安装 KDC
apt-get install krb5-kdc krb5-admin-server

# 创建 Kafka principal
kadmin.local -q "addprinc -randkey kafka/kafka-broker-1@MYCOMPANY.COM"
kadmin.local -q "addprinc -randkey kafka/kafka-broker-2@MYCOMPANY.COM"
kadmin.local -q "addprinc -randkey kafka/kafka-client@MYCOMPANY.COM"

# 导出 keytab
kadmin.local -q "ktadd -k /etc/security/kafka.keytab kafka/kafka-broker-1@MYCOMPANY.COM"

Broker Kerberos 配置

# server.properties

listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://kafka-broker-1:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI

# JAAS 配置
listener.name.sasl_plaintext.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
  useKeyTab=true \
  storeKey=true \
  keyTab="/etc/security/kafka.keytab" \
  principal="kafka/kafka-broker-1@MYCOMPANY.COM";

四、ACL 授权

4.1 启用 ACL

# server.properties

# 授权提供者
authorizer.class.name=kafka.security.authorizer.AclAuthorizer

# 超级用户
super.users=User:admin;User:broker-admin

# 不允许匿名访问
allow.everyone.if.no.acl.found=false

4.2 ACL 配置

Topic 权限

#!/bin/bash
# 创建 Topic 读写权限
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config /etc/kafka/client-ssl.properties \
  --add \
  --allow-principal User:alice \
  --operation Read --operation Write \
  --topic order-topic

# 创建 Topic 只读权限
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config /etc/kafka/client-ssl.properties \
  --add \
  --allow-principal User:bob \
  --operation Read --operation Describe \
  --topic order-topic

# 创建通配符权限
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config /etc/kafka/client-ssl.properties \
  --add \
  --allow-principal User:producer-app \
  --operation Create --operation Write --operation Describe \
  --topic "*"

Consumer Group 权限

# Consumer Group 读取权限
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config /etc/kafka/client-ssl.properties \
  --add \
  --allow-principal User:alice \
  --operation Read \
  --group order-consumer-group

Cluster 权限

# Cluster 描述权限
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config /etc/kafka/client-ssl.properties \
  --add \
  --allow-principal User:monitor-app \
  --operation Describe --operation ClusterAction \
  --cluster kafka-cluster

4.3 ACL 模板脚本

#!/bin/bash
# ACL 批量配置脚本

NAMESRV="localhost:9093"
CLIENT_CONFIG="/etc/kafka/client-ssl.properties"

# 定义角色
declare -A roles=(
    ["producer"]="Create,Write,Describe"
    ["consumer"]="Read,Describe"
    ["admin"]="All"
)

# 定义资源
declare -a topics=("order-topic" "pay-topic" "user-topic")
declare -a groups=("order-consumer-group" "pay-consumer-group")

# 配置 ACL
configure_acl() {
    local principal=$1
    local role=$2
    
    echo "配置 $principal ($role) 权限..."
    
    # Topic 权限
    for topic in "${topics[@]}"; do
        kafka-acls.sh --bootstrap-server $NAMESRV \
          --command-config $CLIENT_CONFIG \
          --add \
          --allow-principal User:$principal \
          --operation ${roles[$role]//,/ --operation } \
          --topic $topic
    done
    
    # Consumer Group 权限
    if [[ $role == "consumer" || $role == "admin" ]]; then
        for group in "${groups[@]}"; do
            kafka-acls.sh --bootstrap-server $NAMESRV \
              --command-config $CLIENT_CONFIG \
              --add \
              --allow-principal User:$principal \
              --operation Read \
              --group $group
        done
    fi
}

# 使用示例
configure_acl "order-service" "producer"
configure_acl "order-consumer" "consumer"
configure_acl "admin-user" "admin"

五、审计日志

5.1 审计配置

# log4j.properties

# 审计日志
log4j.appender.auditAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.auditAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.auditAppender.File=/var/log/kafka/audit.log
log4j.appender.auditAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.auditAppender.layout.ConversionPattern=%d{ISO8601} %p %m%n

log4j.logger.kafka.security.authorizer.AclAuthorizer=INFO, auditAppender
log4j.additivity.kafka.security.authorizer.AclAuthorizer=false

5.2 审计事件

审计日志格式

2026-08-15 10:00:00,123 INFO Principal=User:alice;Operation=Read;Resource=Topic:order-topic;Result=Allowed;IP=192.168.1.100
2026-08-15 10:00:01,456 INFO Principal=User:bob;Operation=Write;Resource=Topic:order-topic;Result=Denied;IP=192.168.1.101

5.3 审计分析脚本

#!/bin/bash
# 审计日志分析脚本

AUDIT_LOG="/var/log/kafka/audit.log"

echo "=== 权限拒绝统计 ==="
grep "Result=Denied" $AUDIT_LOG | \
  awk -F'Principal=' '{print $2}' | \
  awk -F';' '{print $1}' | \
  sort | uniq -c | sort -rn

echo -e "\n=== 用户操作统计 ==="
grep "Result=Allowed" $AUDIT_LOG | \
  awk -F'Principal=' '{print $2}' | \
  awk -F';' '{print $1}' | \
  sort | uniq -c | sort -rn

echo -e "\n=== 最近拒绝记录 ==="
grep "Result=Denied" $AUDIT_LOG | tail -20

echo -e "\n=== 按资源统计 ==="
grep "Result=Denied" $AUDIT_LOG | \
  awk -F'Resource=' '{print $2}' | \
  awk -F';' '{print $1}' | \
  sort | uniq -c | sort -rn

六、安全最佳实践

6.1 配置建议

环境加密认证授权审计
开发
测试SASL/PLAIN
生产SASL/SCRAM
金融SASL/GSSAPI

6.2 密码管理

#!/bin/bash
# 密码轮换脚本

OLD_PASSWORD="old-password"
NEW_PASSWORD="new-password"
USER="alice"

# 更新用户密码
kafka-configs.sh --bootstrap-server localhost:9093 \
  --command-config /etc/kafka/client-ssl.properties \
  --alter --add-config 'SCRAM-SHA-256=[password='$NEW_PASSWORD']' \
  --entity-type users --entity-name $USER

# 通知客户端更新密码
# ...

echo "密码已更新"

6.3 证书管理

#!/bin/bash
# 证书续期脚本

CERT_DIR="/var/ssl/private"
DAYS_BEFORE_EXPIRY=30

# 检查证书过期时间
for cert in $CERT_DIR/*.jks; do
    expiry=$(keytool -list -v -keystore $cert | grep "Valid until" | awk '{print $4}')
    expiry_epoch=$(date -d "$expiry" +%s)
    now_epoch=$(date +%s)
    days_left=$(( (expiry_epoch - now_epoch) / 86400 ))
    
    if [ $days_left -lt $DAYS_BEFORE_EXPIRY ]; then
        echo "WARNING: $cert 将在 $days_left 天后过期"
        # 执行续期操作
        # renew_certificate.sh $cert
    fi
done

6.4 安全检查清单

安全检查:
- [ ] 启用 SSL/TLS 加密
- [ ] 配置 SASL 认证
- [ ] 配置 ACL 授权
- [ ] 启用审计日志
- [ ] 定期轮换密码
- [ ] 定期续期证书
- [ ] 监控异常访问
- [ ] 定期安全审计

七、监控告警

7.1 安全指标

指标说明告警阈值
kafka_security_authentication_failures认证失败次数> 10/分钟
kafka_security_authorization_failures授权失败次数> 10/分钟
kafka_security_ssl_handshake_failuresSSL 握手失败> 5/分钟
kafka_security_acl_deniedACL 拒绝次数> 20/分钟

7.2 Prometheus 告警

# prometheus.yml
scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka-1:9090', 'kafka-2:9090']
    metrics_path: '/metrics'

# alerting_rules.yml
groups:
  - name: kafka-security
    rules:
      - alert: KafkaAuthenticationFailures
        expr: rate(kafka_security_authentication_failures[5m]) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "认证失败率过高"
      
      - alert: KafkaAuthorizationFailures
        expr: rate(kafka_security_authorization_failures[5m]) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "授权失败率过高"
      
      - alert: KafkaSSLCertificateExpiring
        expr: kafka_ssl_certificate_expiry_days < 30
        for: 1h
        labels:
          severity: warning
        annotations:
          summary: "SSL 证书即将过期"

总结

Kafka 安全加固的核心要点:

  1. SSL/TLS:证书管理、加密配置、双向认证
  2. SASL 认证:SCRAM、Kerberos、OAuth2
  3. ACL 授权:Topic、Group、Cluster 权限
  4. 审计日志:事件记录、分析脚本
  5. 最佳实践:配置建议、密码管理、证书管理

核心要点

参考资料


分享这篇文章到:

上一篇文章
Agent 行为验证实战
下一篇文章
Redis Cluster 实战部署与运维