■ 河南 许红军
编者按:Kafka是高吞吐量的分布式发布/订阅消息系统,能够在生产者与消费者之间建立通信的桥梁,其实质上是解决了在不同系统中如何传递消息的问题。Kafka集群包一些Producer、Broker、Consumer Group,以及一个Zookeeper集群。使用SASL安全认证策略会让Kafka群集变得更加稳固,让用户可以更加安全的对其进行访问。
老版本的kafka实际上没有考虑到安全认证方面的要求,只能依靠防火墙、代理服务等技术,来实现对其进行安全访问。
即Kafka Client应用可以通过连接Zookeeper地址,获取存储在Zookeeper中的Kafka元数据信息。在得到Kafka Broker地址后,就可以连接到Kafka集群,并能够操作集群上的所有Topic主题。
从V.09.0.0版本开始,Kafka增加了很多新特性,其中就包含了安全认证技术。
新版本的Kafka集群可以支持Kerberos、SASL/PLAIN、SASL/SCRAM和SASL/OAUTHBEARER等安全认证协议。Kerberos协议是一种网络认证协议,拥有认证中心服务,使用对称DES密码体制,在客户机与服务器间起到安全桥梁作用。
该协议使用和配置起来比较复杂,对于用户的访问来说,必须经由认证中心服务器的认证,通过后才可以访问目标资源,这在很大程度上消除潜在安全隐患。
SASL/PLAIN协议是一种基于账号和密码的认证方式,Kafka使用的JAAS(即Java认证和授权服务)就需要使用到该协议。
SASL/SCRAM协议同样是kafka安全机制SASL家族中的一员,通过执行PLAIN和DIGEST-MD5等用户名/密码认证机制来解决安全问题。
注意,Kafka中的默认SCRAM实现将SCRAM凭证存储在Zookeeper中。
SASL/OAUTHBEARER协议需要OAuth2服务器来提供客户端或代理的令牌才可以进行验证操作。该协议允许第三方应用通过编排资源所有者和HTTP服务之间的审批交互,来获得对HTTP服务的有限访问权,或者允许第三方应用以自身的名义获得访问权。对于Kafka中的默认Oauthholder来说,实现创建并验证不安全的JSON Web令牌,只能在非生产的Kafka环境中安装使用。
Kafka的用户会使用到Java身份认证和授权服务,即不管处于何种运行环境,都必须能够安全地确定什么用户在执行Java代码,并确保操作者拥有合适的访问权限。
在Java KDK1.4+组件中已经集成了JAAS服务。JAAS中的LoginModule模块主要描述了认证技术所要实现的接口,该模块会嵌入到应用程序中,来提供特定类型的身份验证。该模块利用javax.security.auth.spi.LoginModule 接口的 login、commit、abort、logout等方法,来执行用户的登录和登出等操作。
其中的LoginModule接口会使用CallbackHandler方法与用户进行通信,要求输入用户名和密码。身份验证过程分两个阶段实现,首先LoginModule接口会调用login方法执行实际的身份验证,将身份验证状态作为私有状态信息保存。当完成身份验证操作后,会返回成功或者失败信息。之后如果LoginContext的整个身份验证操作成功,则调用LoginModule 接口的commit方法,检查其私有状态信息的保存状态,以查看身份验证是否成功。
如果整个LoginContext身份验证成功,并且LoginModule自己的身份验证也获得成功,那么就会调用commit方法,将相关的身份和密钥等信息和处于LoginModule模块中的Subject相连接。如果LoginContext的整个身份验证失败,就会调用每个LoginModule的abort方法。
这样,LoginModule就会移除/销毁原先保存的任何状态。JAAS实际认证机制是通过配置文件来实现的,其中包含很多配置块,不同的配置块对应了一个或多个特定的LoginModule对象。
对于Kafka群集来说,需要使用到Zookeeper群集。因为kafka是一个分布式的系统,所以它依赖Zookeeper来完成诸如协调众多任务,状态管理的和配置存储等操作。
ZooKeeper是一种分布式协调技术,针对分布式应用实现高可用、高性能的开源协调服务,ZooKeeper主要提供了分布式锁服务服务,还提供了数据的维护和管理机制,包括集群管理、分布式消息队列等。
本例中部署了zksvr1、zksvr2和zksvr3三台服务器来组成ZooKeeper群集,IP分别为“172.16.1.100”、“172.16.1.101”以 及“172.16.1.102”。
在所有群集主机上都执行以下命令来安装ZooKeeper:
在每台主机上打开“/usr/local/zookeeper/conf”目录,将其中的“zoo_sample.cfg”更名为“zoo.cfg”。打开该配置文件,其中的“tickTime”栏表示基本时间度量单位,以毫秒为单位,它用来控制心跳和超 时。在“initLimit”栏中配置Zookeeper集群中Follower服务器初始化连接到Leader时,最长能忍受多少个心跳时间间隔数。在“dataDir”栏中配置存储快照的目录,在“clientPort”栏中设置Zookeeper服务进程监听的TCP端口,默认为TCP 2181。
在“server.x”栏中具体的Zookeeper主机的信息,包括其IP,与集群中的Leader服务器通信的端口,以及用来执行选举时服务器相互通信的端口。其中的“x”表示本主机的主机编号,例如“server.1=172.16.1.100:2888:3888”。因为使用的是Zookeeper群集,所以需要在每台主机的保存快照的目录中创建名为“myid”的文件,其中包含本主机的编号,例如第一台主机就输入“1”等。在该文件尾部输入以下命令,让Zookeeper支持SASL认证功能:
在上述“conf”目录下创建名为“zk_server_jaas.conf”的文件,在其中输入:
其中的“username”和“password”为ZK集群之间通信使用到的账号密码,“user_kafka”是Kafka用户,密码为“pass@hello”。
因为SASL认证需要使用到Kafka Common包,所以需要在上述“conf”路径下创建名为“sasl_lib”的目录,将“kafkaclients-0.11.0.0.jar”“lz4-1.3.0.jar”“slf4japi-1.7.25.jar”“slf4japi-1.7.25.jar”“snappyjava-1.1.2.6.jar”等kafka相关依赖复制到该目录中。
进入Zookeeper安装路径下的“bin”目录,打开其中的“zkEnv.sh”文件,在其尾部添加:
其中的“/usr/local/zookeeper/conf”表示Zookeeper的配置目录。在ZK群集所有的主机上执行“cd/usr/local/zookeeper/bin”“./zkServer.sh start”命令,来启动ZK群集。执行“jps”命令,可以显示ZK进程的ID。
Kafka群集可以用来管理消息队列,在本例中使用了kasrv1、kasrv2和kasrv3主机来组成Kafka群集,其IP分别为“172.16.1.200”“172.16.1.201”以 及“172.16.1.202”。
在这些主机上分别执行以下命令完成Kafka安装:
进入“/usr/local/kafka/config”目录,新建名为“kafka_server_jaas.conf”的文件,用来配置Kafka的SASL认证信息,输入:
其中的账户和密码信息与上述ZK的配置相同。在上述“config”目录中打开“server.properties”文件,可对Kafka配置进行调整。
其中的“broker.id”栏表示每一个broker在集群中的唯一表示值,例如Kazsvr1值为1。“listeners”栏表示kafka的监听地址与端口,在“log.dirs”栏中设置Kafka保存数据的目录。在“num.partitions”栏中设置新创建的topic拥有的分区数量。
在“log.retention.hours”栏中设置消息保存的时间,在“log.segment.bytes”栏中设置Partition中每个Segment数据文件的大小。在“zookeeper.connect”栏中设置Zookeeper主机所在的地址和端口,在“auto.create.topics.enable”栏中设置是否自动创建Topic,在“delete.topic.enable”栏中设置是否允许物理删除Topic的功能。在文件尾部输入以下命令设置所需的SASL认证参数:
在Kafka安装路径的“bin”目录中打开“kafkaserver-start.sh”文件,这是其启动脚本,在尾部添加:
要想顺利启动Kafka群集,必须保证ZK群集已经正常启动。在每台Kafka主机上执行以下命令来启动Kafka群集:
这里以kafkatools工具为例,来说明如何安全连接Kafka集群。
在其安装目录“f:kafkatool”中创建名为“kafka_client_jaas.conf”的文件,输入以下命令用于kafkatools连接认证:
然后执行“kafkatool.exe -J-Djava.security.auth.login.config=f:kafkatoolkafka_client_jaas.conf”命令,即可顺利连接到Kafka集群。
如果客户端使用Java来连接SASL认证的Kafka集群,需在Kafka配置中添加:
也可使用“kafka_client_jaas.conf”进行连接,例如执行“java --spring.profiles.active=dev-Djava.security.auth.login.config=/data/tmp/kafka_client_jaas.conf-jar App.jar”命令进行连接。为保证Kafka自带的生产者和消费者正常工作,可对上述“kafka_server_jaas.conf”进行修改,输入以下命令添加“producer”和“consumer”:
对“kafka_client_jaas.conf”文件进行修改,使其内容变成:
之后打开“server.properties”文件,添加:
在Kafka安装路径的“bin”目录中打开“kafkaconsole-producer.sh”,添加:
之后执行“bin/kafkaacls.sh --authorizerproperties zookeeper.connect=172.16.1.100:2181,172.16.1.101:2181,172.16.1.102:2183 --add--allow-principal User:producer --operation Write --topic newtop1”命令,为“producer”进行授权。执行“bin/kafka-consoleproducer.sh --brokerlist 172.16.1.200:9092,172.16.1.201:9092,172.16.1.202:9092 --producer.config ./config/producer.properties--topic newtopic1”命令,即可正常生产消息,其中“newtopic1”为具体的Topic名称。
Kafka群集默认提供了ACL,使用Zookeeper来存储数据。在上述Kafka主机上打开配置文件“server.properties”,如果输入“uthorizer.class.name=kafka.security.auth.SimpleAclAuthorizer”行,表示如果没有匹配权限,那么除超级用户,其他人无法访问Kafka群集。如果输入“allow.everyone.if.no.acl.found=true”行,表 示未设置访问权限,任何用户都可以进行访问。如果输入“super.users=User:xxx”行,表示添加超级用户,“xxx”表示超级用户名称。
Kafka授权管理脚本名称为“kafka-acs.sh”,可根据需要灵活使用。例如执行“bin/kafka-acls.sh--authorizer-properties zookeeper.connect=172.16.1.100:2181 --list--topic tp1”命令,显示当前Topic权限,“tp1”为具体的Topic名称。执行“kafkaacls.sh --authorizerproperties zookeeper.connect=172.16.1.100:2181-add --allowprincipal User:newuser1--operation Read --topic tp1”命令,可为指定用户添加读取指定Topic的权限。
执行“bin/kafkaacls.sh --authorizerproperties zookeeper.connect=172.16.1.100:2181-add --allow-principal User:newuser1 --operation Write --topic tp1”命令,可为指定用户添加写入指定Topic的权限。执行“kafkaacls.sh --authorizerproperties zookeeper.connect=172.16.1.100:2181--remove --topic tp1”命令,可删除和指定Topic相关的权限。执行“kafkaacls.sh --authorizer-properties zookeeper.connect=172.16.1.100:2181--add --allow-principal User:newuser1”命 令,添加消费者/生产者的ACL。Kafka的认证范围包含Client与Broker,Broker与Broker,以 及Broker与Zookeeper之间的安全控制。
Zookeeper和Broker间的认证分为两个环节,一是创建JAAS配置文件,并在“zkEnv.sh”中配置此文件位置,二是在每个broker配置文件中配置“zookeeper.set”属性,将每个broker中的ACL设为True。Zookeeper中存储的kafka元数据可通过多种方式读取,包括一些客户端工具或命令行,但只能通过broker修改元数据。如果修改错误,很可能导致集群中断,所以在连接Zookeeper时,最好增加一些网络隔离手段来避免上述情况的发生。