Kafka 安全性是企业级应用的核心要求。本文将深入探讨 Kafka 安全加固的完整方案,包括加密、认证、授权、审计等各个方面。
一、安全架构
1.1 安全层次
graph TB
subgraph 网络安全
SSL[SSL/TLS 加密]
FIREWALL[防火墙]
end
subgraph 认证层
SASL[SASL 认证]
KERBEROS[Kerberos]
OAUTH[OAuth2]
end
subgraph 授权层
ACL[ACL 授权]
RBAC[基于角色访问控制]
end
subgraph 审计层
AUDIT[审计日志]
MONITOR[监控告警]
end
SSL --> SASL
SASL --> ACL
ACL --> AUDIT
1.2 安全协议对比
| 协议 | 加密 | 认证 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| PLAINTEXT | ❌ | ❌ | 低 | 内网测试 |
| SSL | ✅ | ❌ | 中 | 数据传输加密 |
| SASL/PLAIN | ✅ | ✅ | 低 | 基础认证 |
| SASL/SCRAM | ✅ | ✅ | 中 | 推荐默认 |
| SASL/GSSAPI | ✅ | ✅ | 高 | 企业级 Kerberos |
| SASL/OAUTH | ✅ | ✅ | 高 | 云原生场景 |
二、SSL/TLS 加密
2.1 证书生成
生成 CA 证书:
#!/bin/bash
# 生成 CA 证书
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
-subj "/C=CN/ST=Beijing/L=Beijing/O=MyCompany/CN=my-ca"
# 设置权限
chmod 400 ca-key
生成 Broker 密钥库:
#!/bin/bash
# 生成 Broker 密钥库
keytool -keystore /var/ssl/private/kafka.server.keystore.jks \
-alias localhost -validity 365 -genkey -keyalg RSA \
-dname "CN=kafka-broker-1, OU=IT, O=MyCompany, L=Beijing, ST=Beijing, C=CN"
# 生成 CSR
keytool -keystore /var/ssl/private/kafka.server.keystore.jks \
-alias localhost -certreq \
-file /tmp/kafka-server-signing-request.crt
# CA 签名
openssl x509 -req -CA /tmp/ca-cert -CAkey /tmp/ca-key \
-in /tmp/kafka-server-signing-request.crt \
-out /tmp/kafka-server-signed.crt -days 365 -CAcreateserial
# 导入证书
keytool -keystore /var/ssl/private/kafka.server.keystore.jks \
-alias CARoot -import -file /tmp/ca-cert
keytool -keystore /var/ssl/private/kafka.server.keystore.jks \
-alias localhost -import -file /tmp/kafka-server-signed.crt
# 生成信任库
keytool -keystore /var/ssl/private/kafka.server.truststore.jks \
-alias CARoot -import -file /tmp/ca-cert
2.2 Broker SSL 配置
# server.properties
# SSL 监听器
listeners=SSL://:9093
advertised.listeners=SSL://kafka-broker-1:9093
listener.security.protocol.map=SSL:SSL
# SSL 配置
ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
# 客户端认证(双向认证)
ssl.client.auth=required
# SSL 协议
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.cipher.suites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
2.3 Client SSL 配置
Producer 配置:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker-1:9093");
props.put("security.protocol", "SSL");
// SSL 配置
props.put("ssl.keystore.location", "/var/ssl/private/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");
props.put("ssl.truststore.location", "/var/ssl/private/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
// 禁用主机名验证(测试用)
props.put("ssl.endpoint.identification.algorithm", "");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Consumer 配置:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker-1:9093");
props.put("group.id", "ssl-consumer-group");
props.put("security.protocol", "SSL");
// SSL 配置(同 Producer)
props.put("ssl.keystore.location", "/var/ssl/private/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");
props.put("ssl.truststore.location", "/var/ssl/private/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
三、SASL 认证
3.1 SCRAM 配置
Broker 配置:
# server.properties
# 监听器配置
listeners=SASL_SSL://:9094
advertised.listeners=SASL_SSL://kafka-broker-1:9094
listener.security.protocol.map=SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_SSL
# SASL 配置
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# JAAS 配置(Broker 间认证)
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-password";
创建用户:
#!/bin/bash
# 使用 kafka-configs.sh 创建用户
# 创建普通用户
kafka-configs.sh --bootstrap-server localhost:9093 \
--command-config /etc/kafka/client-ssl.properties \
--alter --add-config 'SCRAM-SHA-256=[password=user-password]' \
--entity-type users --entity-name alice
# 创建管理员用户
kafka-configs.sh --bootstrap-server localhost:9093 \
--command-config /etc/kafka/client-ssl.properties \
--alter --add-config 'SCRAM-SHA-256=[password=admin-password]' \
--entity-type users --entity-name admin
# 查看用户
kafka-configs.sh --bootstrap-server localhost:9093 \
--command-config /etc/kafka/client-ssl.properties \
--describe --entity-type users --entity-name alice
Client 配置:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker-1:9094");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
// SASL 配置
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"alice\" " +
"password=\"user-password\";");
// SSL 配置
props.put("ssl.truststore.location", "/var/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
3.2 Kerberos 配置
KDC 配置:
#!/bin/bash
# 安装 KDC
apt-get install krb5-kdc krb5-admin-server
# 创建 Kafka principal
kadmin.local -q "addprinc -randkey kafka/kafka-broker-1@MYCOMPANY.COM"
kadmin.local -q "addprinc -randkey kafka/kafka-broker-2@MYCOMPANY.COM"
kadmin.local -q "addprinc -randkey kafka/kafka-client@MYCOMPANY.COM"
# 导出 keytab
kadmin.local -q "ktadd -k /etc/security/kafka.keytab kafka/kafka-broker-1@MYCOMPANY.COM"
Broker Kerberos 配置:
# server.properties
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://kafka-broker-1:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
# JAAS 配置
listener.name.sasl_plaintext.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/security/kafka.keytab" \
principal="kafka/kafka-broker-1@MYCOMPANY.COM";
四、ACL 授权
4.1 启用 ACL
# server.properties
# 授权提供者
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 超级用户
super.users=User:admin;User:broker-admin
# 不允许匿名访问
allow.everyone.if.no.acl.found=false
4.2 ACL 配置
Topic 权限:
#!/bin/bash
# 创建 Topic 读写权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config /etc/kafka/client-ssl.properties \
--add \
--allow-principal User:alice \
--operation Read --operation Write \
--topic order-topic
# 创建 Topic 只读权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config /etc/kafka/client-ssl.properties \
--add \
--allow-principal User:bob \
--operation Read --operation Describe \
--topic order-topic
# 创建通配符权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config /etc/kafka/client-ssl.properties \
--add \
--allow-principal User:producer-app \
--operation Create --operation Write --operation Describe \
--topic "*"
Consumer Group 权限:
# Consumer Group 读取权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config /etc/kafka/client-ssl.properties \
--add \
--allow-principal User:alice \
--operation Read \
--group order-consumer-group
Cluster 权限:
# Cluster 描述权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config /etc/kafka/client-ssl.properties \
--add \
--allow-principal User:monitor-app \
--operation Describe --operation ClusterAction \
--cluster kafka-cluster
4.3 ACL 模板脚本
#!/bin/bash
# ACL 批量配置脚本
NAMESRV="localhost:9093"
CLIENT_CONFIG="/etc/kafka/client-ssl.properties"
# 定义角色
declare -A roles=(
["producer"]="Create,Write,Describe"
["consumer"]="Read,Describe"
["admin"]="All"
)
# 定义资源
declare -a topics=("order-topic" "pay-topic" "user-topic")
declare -a groups=("order-consumer-group" "pay-consumer-group")
# 配置 ACL
configure_acl() {
local principal=$1
local role=$2
echo "配置 $principal ($role) 权限..."
# Topic 权限
for topic in "${topics[@]}"; do
kafka-acls.sh --bootstrap-server $NAMESRV \
--command-config $CLIENT_CONFIG \
--add \
--allow-principal User:$principal \
--operation ${roles[$role]//,/ --operation } \
--topic $topic
done
# Consumer Group 权限
if [[ $role == "consumer" || $role == "admin" ]]; then
for group in "${groups[@]}"; do
kafka-acls.sh --bootstrap-server $NAMESRV \
--command-config $CLIENT_CONFIG \
--add \
--allow-principal User:$principal \
--operation Read \
--group $group
done
fi
}
# 使用示例
configure_acl "order-service" "producer"
configure_acl "order-consumer" "consumer"
configure_acl "admin-user" "admin"
五、审计日志
5.1 审计配置
# log4j.properties
# 审计日志
log4j.appender.auditAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.auditAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.auditAppender.File=/var/log/kafka/audit.log
log4j.appender.auditAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.auditAppender.layout.ConversionPattern=%d{ISO8601} %p %m%n
log4j.logger.kafka.security.authorizer.AclAuthorizer=INFO, auditAppender
log4j.additivity.kafka.security.authorizer.AclAuthorizer=false
5.2 审计事件
审计日志格式:
2026-08-15 10:00:00,123 INFO Principal=User:alice;Operation=Read;Resource=Topic:order-topic;Result=Allowed;IP=192.168.1.100
2026-08-15 10:00:01,456 INFO Principal=User:bob;Operation=Write;Resource=Topic:order-topic;Result=Denied;IP=192.168.1.101
5.3 审计分析脚本
#!/bin/bash
# 审计日志分析脚本
AUDIT_LOG="/var/log/kafka/audit.log"
echo "=== 权限拒绝统计 ==="
grep "Result=Denied" $AUDIT_LOG | \
awk -F'Principal=' '{print $2}' | \
awk -F';' '{print $1}' | \
sort | uniq -c | sort -rn
echo -e "\n=== 用户操作统计 ==="
grep "Result=Allowed" $AUDIT_LOG | \
awk -F'Principal=' '{print $2}' | \
awk -F';' '{print $1}' | \
sort | uniq -c | sort -rn
echo -e "\n=== 最近拒绝记录 ==="
grep "Result=Denied" $AUDIT_LOG | tail -20
echo -e "\n=== 按资源统计 ==="
grep "Result=Denied" $AUDIT_LOG | \
awk -F'Resource=' '{print $2}' | \
awk -F';' '{print $1}' | \
sort | uniq -c | sort -rn
六、安全最佳实践
6.1 配置建议
| 环境 | 加密 | 认证 | 授权 | 审计 |
|---|---|---|---|---|
| 开发 | ❌ | ❌ | ❌ | ❌ |
| 测试 | ✅ | SASL/PLAIN | ✅ | ❌ |
| 生产 | ✅ | SASL/SCRAM | ✅ | ✅ |
| 金融 | ✅ | SASL/GSSAPI | ✅ | ✅ |
6.2 密码管理
#!/bin/bash
# 密码轮换脚本
OLD_PASSWORD="old-password"
NEW_PASSWORD="new-password"
USER="alice"
# 更新用户密码
kafka-configs.sh --bootstrap-server localhost:9093 \
--command-config /etc/kafka/client-ssl.properties \
--alter --add-config 'SCRAM-SHA-256=[password='$NEW_PASSWORD']' \
--entity-type users --entity-name $USER
# 通知客户端更新密码
# ...
echo "密码已更新"
6.3 证书管理
#!/bin/bash
# 证书续期脚本
CERT_DIR="/var/ssl/private"
DAYS_BEFORE_EXPIRY=30
# 检查证书过期时间
for cert in $CERT_DIR/*.jks; do
expiry=$(keytool -list -v -keystore $cert | grep "Valid until" | awk '{print $4}')
expiry_epoch=$(date -d "$expiry" +%s)
now_epoch=$(date +%s)
days_left=$(( (expiry_epoch - now_epoch) / 86400 ))
if [ $days_left -lt $DAYS_BEFORE_EXPIRY ]; then
echo "WARNING: $cert 将在 $days_left 天后过期"
# 执行续期操作
# renew_certificate.sh $cert
fi
done
6.4 安全检查清单
安全检查:
- [ ] 启用 SSL/TLS 加密
- [ ] 配置 SASL 认证
- [ ] 配置 ACL 授权
- [ ] 启用审计日志
- [ ] 定期轮换密码
- [ ] 定期续期证书
- [ ] 监控异常访问
- [ ] 定期安全审计
七、监控告警
7.1 安全指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
kafka_security_authentication_failures | 认证失败次数 | > 10/分钟 |
kafka_security_authorization_failures | 授权失败次数 | > 10/分钟 |
kafka_security_ssl_handshake_failures | SSL 握手失败 | > 5/分钟 |
kafka_security_acl_denied | ACL 拒绝次数 | > 20/分钟 |
7.2 Prometheus 告警
# prometheus.yml
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka-1:9090', 'kafka-2:9090']
metrics_path: '/metrics'
# alerting_rules.yml
groups:
- name: kafka-security
rules:
- alert: KafkaAuthenticationFailures
expr: rate(kafka_security_authentication_failures[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "认证失败率过高"
- alert: KafkaAuthorizationFailures
expr: rate(kafka_security_authorization_failures[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "授权失败率过高"
- alert: KafkaSSLCertificateExpiring
expr: kafka_ssl_certificate_expiry_days < 30
for: 1h
labels:
severity: warning
annotations:
summary: "SSL 证书即将过期"
总结
Kafka 安全加固的核心要点:
- SSL/TLS:证书管理、加密配置、双向认证
- SASL 认证:SCRAM、Kerberos、OAuth2
- ACL 授权:Topic、Group、Cluster 权限
- 审计日志:事件记录、分析脚本
- 最佳实践:配置建议、密码管理、证书管理
核心要点:
- 生产环境必须启用 SSL+SASL+ACL
- 使用 SCRAM 替代 PLAIN 认证
- 定期轮换密码和证书
- 开启审计日志记录
- 建立安全监控告警
参考资料
- Kafka Security 官方文档
- KIP-51: ACLs
- 《Kafka 权威指南》第 10 章