Kafka 安全机制是保障消息系统数据安全的关键。本文将深入探讨 Kafka 的 SSL/TLS 加密、SASL 认证、ACL 授权等安全特性。
一、安全架构
1.1 安全层次
graph TB
subgraph 网络安全
SSL[SSL/TLS 加密]
FIREWALL[防火墙]
end
subgraph 认证层
SASL[SASL 认证]
KERBEROS[Kerberos]
OAUTH[OAuth2]
end
subgraph 授权层
ACL[ACL 授权]
RBAC[基于角色访问控制]
end
subgraph 审计层
AUDIT[审计日志]
MONITOR[监控告警]
end
SSL --> SASL
SASL --> ACL
ACL --> AUDIT
1.2 安全协议
| 协议 | 说明 | 适用场景 |
|---|---|---|
| PLAINTEXT | 明文传输 | 内网测试 |
| SSL | SSL/TLS 加密 | 数据传输加密 |
| SASL/PLAIN | 简单用户名密码 | 基础认证 |
| SASL/GSSAPI | Kerberos 认证 | 企业级认证 |
| SASL/SCRAM | 安全密码认证 | 推荐默认 |
二、SSL/TLS 加密
2.1 证书生成
#!/bin/bash
# 生成 CA 证书
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
-subj "/C=CN/ST=Beijing/L=Beijing/O=MyCompany/CN=my-ca"
# 生成 Broker 密钥库
keytool -keystore kafka.server.keystore.jks -alias localhost \
-validity 365 -genkey -keyalg RSA \
-dname "CN=kafka-broker-1, OU=IT, O=MyCompany, L=Beijing, ST=Beijing, C=CN"
# 生成 CSR
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq \
-file kafka-server-signing-request.crt
# CA 签名
openssl x509 -req -CA ca-cert -CAkey ca-key -in kafka-server-signing-request.crt \
-out kafka-server-signed.crt -days 365 -CAcreateserial
# 导入证书
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file kafka-server-signed.crt
# 生成信任库
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
2.2 Broker 配置
# server.properties
# SSL 配置
ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
# 监听器配置
listeners=SSL://:9093
advertised.listeners=SSL://kafka-broker-1:9093
listener.security.protocol.map=SSL:SSL
ssl.client.auth=required # 双向认证
2.3 Client 配置
// Producer SSL 配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker-1:9093");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "/var/ssl/private/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");
props.put("ssl.truststore.location", "/var/ssl/private/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
props.put("ssl.endpoint.identification.algorithm", ""); // 禁用主机名验证(测试用)
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
三、SASL 认证
3.1 SCRAM 配置
Broker 配置:
# server.properties
# 启用 SASL
listeners=SASL_SSL://:9094
advertised.listeners=SASL_SSL://kafka-broker-1:9094
listener.security.protocol.map=SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_SSL
# SASL 配置
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# JAAS 配置(Broker 间认证)
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-password";
创建用户:
# 使用 kafka-configs.sh 创建用户
kafka-configs.sh --bootstrap-server localhost:9093 \
--command-config client.properties \
--alter --add-config 'SCRAM-SHA-256=[password=user-password]' \
--entity-type users --entity-name alice
kafka-configs.sh --bootstrap-server localhost:9093 \
--command-config client.properties \
--alter --add-config 'SCRAM-SHA-512=[password=user-password]' \
--entity-type users --entity-name bob
Client 配置:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker-1:9094");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"alice\" " +
"password=\"user-password\";");
// SSL 配置
props.put("ssl.truststore.location", "/var/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
3.2 Kerberos 配置
KDC 配置:
# 安装 KDC
apt-get install krb5-kdc krb5-admin-server
# 创建 principal
kadmin.local -q "addprinc kafka/kafka-broker-1@MYCOMPANY.COM"
kadmin.local -q "addprinc kafka/kafka-broker-2@MYCOMPANY.COM"
kadmin.local -q "addprinc kafka/kafka-client@MYCOMPANY.COM"
# 导出 keytab
kadmin.local -q "ktadd -k /etc/security/kafka.keytab kafka/kafka-broker-1@MYCOMPANY.COM"
Broker 配置:
# server.properties
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://kafka-broker-1:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
# JAAS 配置
listener.name.sasl_plaintext.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/security/kafka.keytab" \
principal="kafka/kafka-broker-1@MYCOMPANY.COM";
四、ACL 授权
4.1 ACL 基础
graph TB
subgraph 资源类型
T1[Topic]
T2[Group]
T3[Cluster]
end
subgraph 操作权限
O1[Read]
O2[Write]
O3[Create]
O4[Delete]
O5[Describe]
O6[Alter]
end
subgraph 授权模式
M1[Allow]
M2[Deny]
end
T1 --> O1
T1 --> O2
T2 --> O1
T3 --> O3
4.2 ACL 配置
启用 ACL:
# server.properties
# 授权提供者
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 超级用户
super.users=User:admin;User:broker-admin
# 允许匿名(初始配置用)
allow.everyone.if.no.acl.found=false
添加 ACL:
#!/bin/bash
# Topic 读写权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config client.properties \
--add \
--allow-principal User:alice \
--operation Read --operation Write \
--topic order-topic
# Consumer Group 读取权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config client.properties \
--add \
--allow-principal User:alice \
--operation Read \
--group order-consumer-group
# Topic 创建权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config client.properties \
--add \
--allow-principal User:producer-app \
--operation Create --operation Write --operation Describe \
--topic "*"
# 查看 ACL
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config client.properties \
--list
4.3 ACL 模板
#!/bin/bash
# ACL 批量配置脚本
TOPICS=("order-topic" "pay-topic" "user-topic")
GROUPS=("order-consumer-group" "pay-consumer-group")
USERS=("order-service" "pay-service")
# 为每个服务配置权限
for i in "${!TOPICS[@]}"; do
topic=${TOPICS[$i]}
group=${GROUPS[$i]}
user=${USERS[$i]}
# Topic 读写权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config client.properties \
--add \
--allow-principal User:$user \
--operation Read --operation Write --operation Describe \
--topic $topic
# Consumer Group 权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config client.properties \
--add \
--allow-principal User:$user \
--operation Read \
--group $group
echo "ACL configured for $user"
done
五、权限管理
5.1 角色定义
public enum KafkaRole {
// 管理员
ADMIN("admin", Arrays.asList(
Permission.ALL_TOPICS,
Permission.ALL_GROUPS,
Permission.CLUSTER_CONFIG
)),
// 生产者
PRODUCER("producer", Arrays.asList(
Permission.WRITE_TOPIC,
Permission.CREATE_TOPIC,
Permission.DESCRIBE_TOPIC
)),
// 消费者
CONSUMER("consumer", Arrays.asList(
Permission.READ_TOPIC,
Permission.READ_GROUP,
Permission.DESCRIBE_TOPIC
)),
// 运维
OPERATOR("operator", Arrays.asList(
Permission.DESCRIBE_TOPIC,
Permission.DESCRIBE_GROUP,
Permission.CLUSTER_DESCRIBE
));
private final String name;
private final List<Permission> permissions;
}
5.2 权限检查
public class AclValidator {
/**
* 验证用户权限
*/
public boolean hasPermission(String user, String resource, Operation operation) {
// 1. 获取用户 ACL
Set<Acl> acls = getAclsForUser(user);
// 2. 检查权限
for (Acl acl : acls) {
if (acl.getResource().equals(resource) &&
acl.getOperation().equals(operation) &&
acl.getPermissionType() == PermissionType.ALLOW) {
return true;
}
}
return false;
}
/**
* 检查 Topic 写入权限
*/
public boolean canWriteTopic(String user, String topic) {
return hasPermission(user, topic, Operation.WRITE);
}
/**
* 检查 Group 读取权限
*/
public boolean canReadGroup(String user, String group) {
return hasPermission(user, group, Operation.READ);
}
}
六、审计日志
6.1 审计配置
# log4j.properties
# 审计日志
log4j.appender.auditAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.auditAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.auditAppender.File=/var/log/kafka/audit.log
log4j.appender.auditAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.auditAppender.layout.ConversionPattern=%d{ISO8601} %p %m%n
log4j.logger.kafka.security.authorizer.AclAuthorizer=INFO, auditAppender
log4j.additivity.kafka.security.authorizer.AclAuthorizer=false
6.2 审计事件
public class AuditEvent {
private String timestamp;
private String user;
private String operation; // CREATE, READ, WRITE, DELETE
private String resourceType; // TOPIC, GROUP, CLUSTER
private String resourceName;
private String result; // SUCCESS, FAILURE
private String ipAddress;
// Getters and Setters
}
6.3 审计分析
#!/bin/bash
# 审计日志分析脚本
AUDIT_LOG="/var/log/kafka/audit.log"
echo "=== 权限拒绝统计 ==="
grep "DENIED" $AUDIT_LOG | awk '{print $5}' | sort | uniq -c | sort -rn
echo -e "\n=== 用户操作统计 ==="
grep "SUCCESS" $AUDIT_LOG | awk '{print $5}' | sort | uniq -c | sort -rn
echo -e "\n=== 最近拒绝记录 ==="
grep "DENIED" $AUDIT_LOG | tail -20
七、安全最佳实践
7.1 配置建议
| 场景 | 安全级别 | 配置建议 |
|---|---|---|
| 开发环境 | 低 | PLAINTEXT |
| 测试环境 | 中 | SASL/PLAIN + ACL |
| 生产环境 | 高 | SASL/SCRAM + SSL + ACL |
| 金融场景 | 最高 | SASL/GSSAPI + SSL + ACL + 审计 |
7.2 密码管理
#!/bin/bash
# 密码轮换脚本
OLD_PASSWORD="old-password"
NEW_PASSWORD="new-password"
# 更新用户密码
kafka-configs.sh --bootstrap-server localhost:9093 \
--command-config client.properties \
--alter --add-config 'SCRAM-SHA-256=[password='$NEW_PASSWORD']' \
--entity-type users --entity-name alice
# 更新 Broker 密码
# 修改 server.properties 中的 JAAS 配置
# 重启 Broker
# 更新 Client 配置
# 更新应用中的密码配置
# 重启应用
7.3 证书管理
#!/bin/bash
# 证书续期脚本
CERT_DIR="/var/ssl/private"
DAYS_BEFORE_EXPIRY=30
# 检查证书过期时间
for cert in $CERT_DIR/*.jks; do
expiry=$(keytool -list -v -keystore $cert | grep "Valid until" | awk '{print $4}')
expiry_epoch=$(date -d "$expiry" +%s)
now_epoch=$(date +%s)
days_left=$(( (expiry_epoch - now_epoch) / 86400 ))
if [ $days_left -lt $DAYS_BEFORE_EXPIRY ]; then
echo "WARNING: $cert 将在 $days_left 天后过期"
# 执行续期操作
# renew_certificate.sh $cert
fi
done
八、常见问题排查
8.1 认证失败
症状:SASL authentication failed
排查:
# 1. 检查用户是否存在
kafka-configs.sh --bootstrap-server localhost:9093 \
--describe --entity-type users --entity-name alice
# 2. 检查密码
# 重新设置密码
# 3. 检查 JAAS 配置
cat /etc/kafka/jaas.conf
8.2 授权失败
症状:TopicAuthorizationException
排查:
# 1. 查看 ACL
kafka-acls.sh --bootstrap-server localhost:9093 \
--list --topic my-topic
# 2. 添加权限
kafka-acls.sh --bootstrap-server localhost:9093 \
--add --allow-principal User:alice \
--operation Read --operation Write \
--topic my-topic
8.3 SSL 连接失败
症状:SSLException: Received fatal alert
排查:
# 1. 检查证书
keytool -list -v -keystore kafka.client.keystore.jks
# 2. 检查信任链
keytool -list -v -keystore kafka.client.truststore.jks
# 3. 测试连接
openssl s_client -connect kafka-broker-1:9093
总结
Kafka 安全机制的核心要点:
- SSL/TLS:数据传输加密
- SASL:用户认证(SCRAM/Kerberos)
- ACL:资源授权
- 审计:操作日志记录
- 最佳实践:密码管理、证书续期
核心要点:
- 生产环境必须启用 SSL+SASL+ACL
- 使用 SCRAM 替代 PLAIN 认证
- 定期轮换密码和证书
- 开启审计日志记录
参考资料
- Kafka Security 官方文档
- KIP-51: ACLs
- 《Kafka 权威指南》第 10 章