Skip to content
清晨的一缕阳光
返回

Kafka 安全认证与授权详解

Kafka 安全机制是保障消息系统数据安全的关键。本文将深入探讨 Kafka 的 SSL/TLS 加密、SASL 认证、ACL 授权等安全特性。

一、安全架构

1.1 安全层次

graph TB
    subgraph 网络安全
        SSL[SSL/TLS 加密]
        FIREWALL[防火墙]
    end
    
    subgraph 认证层
        SASL[SASL 认证]
        KERBEROS[Kerberos]
        OAUTH[OAuth2]
    end
    
    subgraph 授权层
        ACL[ACL 授权]
        RBAC[基于角色访问控制]
    end
    
    subgraph 审计层
        AUDIT[审计日志]
        MONITOR[监控告警]
    end
    
    SSL --> SASL
    SASL --> ACL
    ACL --> AUDIT

1.2 安全协议

协议说明适用场景
PLAINTEXT明文传输内网测试
SSLSSL/TLS 加密数据传输加密
SASL/PLAIN简单用户名密码基础认证
SASL/GSSAPIKerberos 认证企业级认证
SASL/SCRAM安全密码认证推荐默认

二、SSL/TLS 加密

2.1 证书生成

#!/bin/bash
# 生成 CA 证书
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
  -subj "/C=CN/ST=Beijing/L=Beijing/O=MyCompany/CN=my-ca"

# 生成 Broker 密钥库
keytool -keystore kafka.server.keystore.jks -alias localhost \
  -validity 365 -genkey -keyalg RSA \
  -dname "CN=kafka-broker-1, OU=IT, O=MyCompany, L=Beijing, ST=Beijing, C=CN"

# 生成 CSR
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq \
  -file kafka-server-signing-request.crt

# CA 签名
openssl x509 -req -CA ca-cert -CAkey ca-key -in kafka-server-signing-request.crt \
  -out kafka-server-signed.crt -days 365 -CAcreateserial

# 导入证书
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file kafka-server-signed.crt

# 生成信任库
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert

2.2 Broker 配置

# server.properties

# SSL 配置
ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
ssl.truststore.password=truststore-password

# 监听器配置
listeners=SSL://:9093
advertised.listeners=SSL://kafka-broker-1:9093
listener.security.protocol.map=SSL:SSL
ssl.client.auth=required  # 双向认证

2.3 Client 配置

// Producer SSL 配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker-1:9093");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "/var/ssl/private/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");
props.put("ssl.truststore.location", "/var/ssl/private/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
props.put("ssl.endpoint.identification.algorithm", ""); // 禁用主机名验证(测试用)

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

三、SASL 认证

3.1 SCRAM 配置

Broker 配置

# server.properties

# 启用 SASL
listeners=SASL_SSL://:9094
advertised.listeners=SASL_SSL://kafka-broker-1:9094
listener.security.protocol.map=SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_SSL

# SASL 配置
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

# JAAS 配置(Broker 间认证)
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" \
  password="admin-password";

创建用户

# 使用 kafka-configs.sh 创建用户
kafka-configs.sh --bootstrap-server localhost:9093 \
  --command-config client.properties \
  --alter --add-config 'SCRAM-SHA-256=[password=user-password]' \
  --entity-type users --entity-name alice

kafka-configs.sh --bootstrap-server localhost:9093 \
  --command-config client.properties \
  --alter --add-config 'SCRAM-SHA-512=[password=user-password]' \
  --entity-type users --entity-name bob

Client 配置

Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker-1:9094");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", 
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"alice\" " +
    "password=\"user-password\";");

// SSL 配置
props.put("ssl.truststore.location", "/var/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

3.2 Kerberos 配置

KDC 配置

# 安装 KDC
apt-get install krb5-kdc krb5-admin-server

# 创建 principal
kadmin.local -q "addprinc kafka/kafka-broker-1@MYCOMPANY.COM"
kadmin.local -q "addprinc kafka/kafka-broker-2@MYCOMPANY.COM"
kadmin.local -q "addprinc kafka/kafka-client@MYCOMPANY.COM"

# 导出 keytab
kadmin.local -q "ktadd -k /etc/security/kafka.keytab kafka/kafka-broker-1@MYCOMPANY.COM"

Broker 配置

# server.properties

listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://kafka-broker-1:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI

# JAAS 配置
listener.name.sasl_plaintext.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
  useKeyTab=true \
  storeKey=true \
  keyTab="/etc/security/kafka.keytab" \
  principal="kafka/kafka-broker-1@MYCOMPANY.COM";

四、ACL 授权

4.1 ACL 基础

graph TB
    subgraph 资源类型
        T1[Topic]
        T2[Group]
        T3[Cluster]
    end
    
    subgraph 操作权限
        O1[Read]
        O2[Write]
        O3[Create]
        O4[Delete]
        O5[Describe]
        O6[Alter]
    end
    
    subgraph 授权模式
        M1[Allow]
        M2[Deny]
    end
    
    T1 --> O1
    T1 --> O2
    T2 --> O1
    T3 --> O3

4.2 ACL 配置

启用 ACL

# server.properties

# 授权提供者
authorizer.class.name=kafka.security.authorizer.AclAuthorizer

# 超级用户
super.users=User:admin;User:broker-admin

# 允许匿名(初始配置用)
allow.everyone.if.no.acl.found=false

添加 ACL

#!/bin/bash

# Topic 读写权限
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config client.properties \
  --add \
  --allow-principal User:alice \
  --operation Read --operation Write \
  --topic order-topic

# Consumer Group 读取权限
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config client.properties \
  --add \
  --allow-principal User:alice \
  --operation Read \
  --group order-consumer-group

# Topic 创建权限
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config client.properties \
  --add \
  --allow-principal User:producer-app \
  --operation Create --operation Write --operation Describe \
  --topic "*"

# 查看 ACL
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config client.properties \
  --list

4.3 ACL 模板

#!/bin/bash
# ACL 批量配置脚本

TOPICS=("order-topic" "pay-topic" "user-topic")
GROUPS=("order-consumer-group" "pay-consumer-group")
USERS=("order-service" "pay-service")

# 为每个服务配置权限
for i in "${!TOPICS[@]}"; do
    topic=${TOPICS[$i]}
    group=${GROUPS[$i]}
    user=${USERS[$i]}
    
    # Topic 读写权限
    kafka-acls.sh --bootstrap-server localhost:9093 \
      --command-config client.properties \
      --add \
      --allow-principal User:$user \
      --operation Read --operation Write --operation Describe \
      --topic $topic
    
    # Consumer Group 权限
    kafka-acls.sh --bootstrap-server localhost:9093 \
      --command-config client.properties \
      --add \
      --allow-principal User:$user \
      --operation Read \
      --group $group
    
    echo "ACL configured for $user"
done

五、权限管理

5.1 角色定义

public enum KafkaRole {
    // 管理员
    ADMIN("admin", Arrays.asList(
        Permission.ALL_TOPICS,
        Permission.ALL_GROUPS,
        Permission.CLUSTER_CONFIG
    )),
    
    // 生产者
    PRODUCER("producer", Arrays.asList(
        Permission.WRITE_TOPIC,
        Permission.CREATE_TOPIC,
        Permission.DESCRIBE_TOPIC
    )),
    
    // 消费者
    CONSUMER("consumer", Arrays.asList(
        Permission.READ_TOPIC,
        Permission.READ_GROUP,
        Permission.DESCRIBE_TOPIC
    )),
    
    // 运维
    OPERATOR("operator", Arrays.asList(
        Permission.DESCRIBE_TOPIC,
        Permission.DESCRIBE_GROUP,
        Permission.CLUSTER_DESCRIBE
    ));
    
    private final String name;
    private final List<Permission> permissions;
}

5.2 权限检查

public class AclValidator {
    
    /**
     * 验证用户权限
     */
    public boolean hasPermission(String user, String resource, Operation operation) {
        // 1. 获取用户 ACL
        Set<Acl> acls = getAclsForUser(user);
        
        // 2. 检查权限
        for (Acl acl : acls) {
            if (acl.getResource().equals(resource) && 
                acl.getOperation().equals(operation) &&
                acl.getPermissionType() == PermissionType.ALLOW) {
                return true;
            }
        }
        
        return false;
    }
    
    /**
     * 检查 Topic 写入权限
     */
    public boolean canWriteTopic(String user, String topic) {
        return hasPermission(user, topic, Operation.WRITE);
    }
    
    /**
     * 检查 Group 读取权限
     */
    public boolean canReadGroup(String user, String group) {
        return hasPermission(user, group, Operation.READ);
    }
}

六、审计日志

6.1 审计配置

# log4j.properties

# 审计日志
log4j.appender.auditAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.auditAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.auditAppender.File=/var/log/kafka/audit.log
log4j.appender.auditAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.auditAppender.layout.ConversionPattern=%d{ISO8601} %p %m%n

log4j.logger.kafka.security.authorizer.AclAuthorizer=INFO, auditAppender
log4j.additivity.kafka.security.authorizer.AclAuthorizer=false

6.2 审计事件

public class AuditEvent {
    private String timestamp;
    private String user;
    private String operation;  // CREATE, READ, WRITE, DELETE
    private String resourceType;  // TOPIC, GROUP, CLUSTER
    private String resourceName;
    private String result;  // SUCCESS, FAILURE
    private String ipAddress;
    
    // Getters and Setters
}

6.3 审计分析

#!/bin/bash
# 审计日志分析脚本

AUDIT_LOG="/var/log/kafka/audit.log"

echo "=== 权限拒绝统计 ==="
grep "DENIED" $AUDIT_LOG | awk '{print $5}' | sort | uniq -c | sort -rn

echo -e "\n=== 用户操作统计 ==="
grep "SUCCESS" $AUDIT_LOG | awk '{print $5}' | sort | uniq -c | sort -rn

echo -e "\n=== 最近拒绝记录 ==="
grep "DENIED" $AUDIT_LOG | tail -20

七、安全最佳实践

7.1 配置建议

场景安全级别配置建议
开发环境PLAINTEXT
测试环境SASL/PLAIN + ACL
生产环境SASL/SCRAM + SSL + ACL
金融场景最高SASL/GSSAPI + SSL + ACL + 审计

7.2 密码管理

#!/bin/bash
# 密码轮换脚本

OLD_PASSWORD="old-password"
NEW_PASSWORD="new-password"

# 更新用户密码
kafka-configs.sh --bootstrap-server localhost:9093 \
  --command-config client.properties \
  --alter --add-config 'SCRAM-SHA-256=[password='$NEW_PASSWORD']' \
  --entity-type users --entity-name alice

# 更新 Broker 密码
# 修改 server.properties 中的 JAAS 配置
# 重启 Broker

# 更新 Client 配置
# 更新应用中的密码配置
# 重启应用

7.3 证书管理

#!/bin/bash
# 证书续期脚本

CERT_DIR="/var/ssl/private"
DAYS_BEFORE_EXPIRY=30

# 检查证书过期时间
for cert in $CERT_DIR/*.jks; do
    expiry=$(keytool -list -v -keystore $cert | grep "Valid until" | awk '{print $4}')
    expiry_epoch=$(date -d "$expiry" +%s)
    now_epoch=$(date +%s)
    days_left=$(( (expiry_epoch - now_epoch) / 86400 ))
    
    if [ $days_left -lt $DAYS_BEFORE_EXPIRY ]; then
        echo "WARNING: $cert 将在 $days_left 天后过期"
        # 执行续期操作
        # renew_certificate.sh $cert
    fi
done

八、常见问题排查

8.1 认证失败

症状SASL authentication failed

排查

# 1. 检查用户是否存在
kafka-configs.sh --bootstrap-server localhost:9093 \
  --describe --entity-type users --entity-name alice

# 2. 检查密码
# 重新设置密码

# 3. 检查 JAAS 配置
cat /etc/kafka/jaas.conf

8.2 授权失败

症状TopicAuthorizationException

排查

# 1. 查看 ACL
kafka-acls.sh --bootstrap-server localhost:9093 \
  --list --topic my-topic

# 2. 添加权限
kafka-acls.sh --bootstrap-server localhost:9093 \
  --add --allow-principal User:alice \
  --operation Read --operation Write \
  --topic my-topic

8.3 SSL 连接失败

症状SSLException: Received fatal alert

排查

# 1. 检查证书
keytool -list -v -keystore kafka.client.keystore.jks

# 2. 检查信任链
keytool -list -v -keystore kafka.client.truststore.jks

# 3. 测试连接
openssl s_client -connect kafka-broker-1:9093

总结

Kafka 安全机制的核心要点:

  1. SSL/TLS:数据传输加密
  2. SASL:用户认证(SCRAM/Kerberos)
  3. ACL:资源授权
  4. 审计:操作日志记录
  5. 最佳实践:密码管理、证书续期

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka Quota 配额管理详解与实战
下一篇文章
RocketMQ 最佳实践进阶指南