Spark平台中Kafka偏移量的读取管理与设计

2019-10-08 08:34高宗宝刘丽美张家铭宋国兴
软件 2019年7期
关键词:越界副本偏移量

高宗宝 刘丽美 张家铭 宋国兴

摘  要: 随着移动互联网技术的大规模发展,创新型互联网公司和迭代型各行各业应用产品层出不穷,门户访问、好友互动等操作产生的大规模日志记录,对大数据处理的实时性、准确性和高可用性发起了挑战。Kafka是一种高吞吐量分布式发布订阅消息系統,其在高并发数据读写方面优势明显,但其提供的数据消费方式存在数据丢失和重复的风险。本文首先介绍Kafka架构及其Offset管理,介绍了新型流式数据处理框架SparkStreaming与Kafka的结合,并说明了Kafka数据消费方面存在的缺陷,最后提出了一种基于SparkStreaming读取Kafka的近似Exactly Once方案实现。通过搭建实验环境进行对比测试,验证了该设计可以在保证数据读取效率的前提下确保数据的准确性。

关键词: Kafka;Offset;SparkStreaming;数据准确性

中图分类号: TP302    文献标识码: A    DOI:10.3969/j.issn.1003-6970.2019.07.022

【Abstract】: With the large-scale development of mobile Internet technology, the application products of various industries emerge in an endless stream. The large-scale log records generated by portal access, friend interaction and other operations challenge the real-time, accuracy and high availability of large data processing. Kafka is a high throughput distributed publish-subscribe messaging system, which has obvious advantages in high concurrent data reading and writing, but its data consumption mode has the risk of data loss and duplication. Firstly, this paper introduces Kafka architecture and its Offset management, introduces the combination of SparkStreaming and Kafka, a new streaming data processing framework, and illustrates the shortcomings of Kafka data consumption. Finally, an approximate Exactly One scheme based on SparkStreaming to read Kafka is proposed. By building an experimental environment for comparative testing, it is verified that the design can ensure the accuracy of data on the premise of ensuring the efficiency of data reading.

【Key words】: Kafka; Offset; SparkStreaming; Data accuracy

0  引言

随着IT和移动互联网技术的飞速发展,互联网[1]软件产品迭代开发、层出不穷,数据量激增,如何存储和及时处理这些海量数据,挖掘其中企业比较感兴趣的价值信息(如用户喜好等)进而提供更好的产品服务(如好友推荐、产品推广等)是数据导向型公司迫切需要解决的问题。门户网站访问、好友聊天、支付交易记录等用户操作产生的大规模日志记录,对大数据处理的实时性和高并发性发起了挑战。传统的数据存储介质,如关系型数据库、文件系统等无法满足数据实时读写传输和流处理,Apache Kafka应运而生。Kafka是由Apache软件基金会开发的一个开源流处理平台[2],是主要用Scala编写的一种高吞吐量分布式发布订阅消息系统,因其扩展性好、高吞吐量、快速持久化、高可用性等优点被各大消息系统、日志分析平台、流数据处理平台、门户网站等广泛使用。

1  Kafka简介

1.1  Kafka架构

Kafka消息系统的基本架构如图1所示。其架构主要包括以下几个组件:

(1)Message:消息,通信基本单位。

(2)Broker:Kafka节点实例,对应为Kafka集群的一台机器。

(3)Topic:主题,表示Kafka数据处理的消息源,数据的读写都要指定主题。

(4)Producer:数据生产者,向某个Topic发布消息的对象,即一种push操作,将消息推送给代理对象Broker进行存储。

(5)Consumer:数据消费者,订阅某个Topic并处理消息的对象,即一种pull操作,主动拉去数据,Consumer自己控制消息的读取速度和数量,如果Topic中没有数据,那么会周期性的pull操作直到有数据产生。

(6)Partition:分区,一个Topic可以有多个partition,一个消息实际存储在Topic的某个Partition中,每个Parition可以保证消息的有序性。

(7)Replications:分区副本,每个分区都可以设定副本数目分布到不同的Broker中以便于容错。

(8)ConsumerGroup:消费者组,一组consumer的集合,group订阅的某个topic下的每个分区只能被其中的一个consumer消费,不会出现一个分区的数据被同一个group下的多个consumer消费的情况,可以理解为ConsumerGroup是Kafka提供的可扩展且具有容错性的消费者机制,在开发过程中使用group.id来标识。

Kafka集群中的所有节点都是平等的,不采用Master-Slave结构,这样就不会出现类似HDFS的单点故障问题。Kafka利用zookeeper来解决分布式一致性问题,将broker节点、topic元数据信息等全部存储到zookeeper中。

为了保证较高的读写效率,对于每个partition,消息读写都有一个固定的副本完成,即Leader节点,其他的副本是Follower节点。Follower节点会定期同步Leader节点的数据。

当使用工具kafka-topics.sh创建topic后,kafka会根据选举策略對每个partition都选出一个Leader节点和相应数量的Follower节点(通过参数replication- factor控制副本数量)。图2描述的是创建主题t1,分区数量为4,副本数量为3的情况。

以partition=1为例,其读写节点是437(broker.id),副本节点分别是437、441、436,副本同步队列分别是436、440、441。ISR(in-sync replica,副本同步队列)是由Leader维护的与主节点数据同步的一个节点集合,当producer发送消息到leader后,follower会同步消息,如果某个follower没有同步

leader的消息太多或者失效,那么leader会将其从ISR中剔除。

当leader失效后,kafka会从ISR中的副本中选举出新的leader以保证服务的可用。

1.2  读写offset管理

Topic可以简单理解为一个queue,消息的生产与消费都要声明消息所在的queue。为了提高数据读写效率和数据吞吐量,在物理上Topic被分成了多个partition,每个partition表示一个文件夹,命名为“topic名-分区号”,每个文件夹中保存消息数据、消息索引等。

任何发布到partition的消息都会被append到文件尾部,每条消息在文件中的存储位置称之为偏移量(offset,long型整数),通过partition+offset可以唯一标识一条消息。因为是追加操作,所以在partition中消息是有序写入磁盘的,其写入和索引读取效率都很高。图3表明了一个分区数量为3的topic消息写入状态。

当消息写入时,kafka会按照默认规则规定消息会被写入到哪个partition中。如果自定义规则合理,那么可以保证消息被均匀地分布到broker中。

可以看出,消息的消费,核心是对partition和offset的管理。Kafka由ConsumerGroup控制消息的消费和偏移量,而不是交给Broker去存储,甚至可以加以控制回到一个之前的偏移位置再次消费消息。

Kafka提供了自动和手动2种偏移量管理方式[4,5]。

Kafka默认会定期自动提交偏移信息,即enable. auto.commit=true。在kafka0.10版本之前,offset信息提交到zk中保存,但由于zk不适合大批量数据的并行读写操作,自kafka0.10版本,offset信息自动提交到名为__consumer_offsets的topic存储。该topic默认有50个分区,保存了每个ConsumerGroup消费的Topic所有partition的offset信息,如图4所示。

当然也可以采用手动更新的方法提交offset。

在消息消费过程中,Kafka提供了如下3种可能的传输保障(consumer delivery guarantee)。

(1)At most once:这种模式下,消息可能会丢,但是绝对不会重复消费。如果consumer设定autocommit偏移量,consumer在读取到数据后立即更新offset后未来得及处理消息(如consumer系统崩溃),下次重新工作时无法读取之前未处理的消息,导致数据丢失。

(2)At least once:这种模式下,数据不会丢失,但是可能会存在重复消费。consumer在读取到数据后立即处理,处理完成后没来得及提交偏移量。下次重新工作时还会重新读取已处理但是没有提交偏移量的数据,导致数据重复。

(3)Exactly once:这种模式下,数据既不会丢失也不会重复消费,需要协调消费数据和offset进行精确事务管理,如将数据和offset信息写入到HDFS等外部介质中,这种模式对处理效率有一定影响。

2  SparkStreaming简介

SparkStreaming是基于spark的流式批处理引擎,可以实现高吞吐量的、具备容错机制的实时流数据的处理,能够与RDD算子、机器学习、SparkSQL以及图形图像处理框架无缝连接[3-6]。类似于Apache Storm,用于流式数据的处理。SparkStreaming支持多种数据源,如Flume、Kafka、HDFS、套接字等,经过一系列RDD算子或windows等高级函数进  行处理后,将结果写入到文件系统、数据库等输出源中。

SparkStreaming接收实时数据流,并以某一时间间隔(batchDuration)划分为一个个数据批次(batch)交给Spark Engine处理。SparkStreaming的数据处理流程如图5所示。

Dstream是SparkStreaming中特有的数据类型,表示一系列连续的RDD集合,即数据批次集合,存储方式是Map,对每个批次数据的处理实际上是RDD的操作,每个批次的处理逻辑是完全相同的。

SparkStreaming+Kafka进行流数据处理被广泛采用,本文后续讨论基于spark2.3+kafka0.10展开。

3  一种可靠的Kafka消费方案

3.1  方案设计

SparkStreaming通过KafkaUtils.createDirectStream创建数据流Dstream,默认情况下enable.auto.commit= true自动提交offset,即对应At most once模式。并且无论StreamingContext是否安全终止,都会出现在一段时间后已消费offset值等于最新offset值,尽管此时数据还远没有消费完数据。具体见方案测试。

设置enable.auto.commit=false可以手动提交offset更新,如Spark中可通过stream.asInstance-Of[CanCommitOffsets].commitAsync (offsetRanges)来进行数据处理完后手动提交更新。需要注意的是,此方法将offsetRanges保存在一个队列中,只有等consumer获取下一批次数据后才提交offsetRanges。方案测试中通过5次实验对比进行验证。具体见方案测试。

在很多设计方案中将offset更新到zk中存储。然而zk并不适合大规模数据并发读写,尤其是写效率不高。Kafka允许多个ConsumerGroup并行读写数据,如果offset全部在zk中管理会影响zk性能,进而影响kafka的leader选举、集群同步等功能。

因此,综合考虑kafka集群性能和数据读写效率,本文设计实现了一种At least Once方案SEO (Similar Exactly Once),每个ConsumerGroup在本地系统中维护offset信息,KafkaCluster提供维护信息,在不影响读取效率的情况下趋向于Exactly Once保障。

SEO方案实现的假设条件是zk不可靠或存在延迟,实现目的是数据不可丢失,极端情况下允许数据重复。方案的一些专有名词包括:

客户端:运行SparkStreaming程序所在的机器;

gtoffset文件:客户端存储的偏移量文件,文件存储路径类似于...groupid/topicname/gtoffset,文件包括groupid消費topicname所有分区的offset信息。

偏移量越界:包括低越界、高越界。低越界指的是gtoffset记录的偏移量信息小于Kafka目前可用的offset最小值,高越界指的是gtoffset记录的偏移量信息超过Kafka目前最新的offset值。

方案的实现思路如下。

(1)在客户端是否存在gtoffset文件,若不存在,说明groupid是第一次消费Topic,那么按照auto.offset.reset=earliest从当前可用的最小offset读取数据;如果存在,说明groupid已经消费过Topic,读取得到offset集合A。

(2)使用spark-streaming-kafka-0-8中的Kafka?Cluster构建Kafka集群连接,进行偏移量越界判断。使用getEarliestLeaderOffsets得到Topic的最小可用offset集合M,使用getLatestLeaderOffsets得到Topic的最大可用offset集合N。

(3)如果A中所有分区的offset都满足offset_ (M,par)≤offset_(A,par)≤offset_(N,par)那么说明A有效,A不需要更新;如果A中存在分区的offset满足offset_(M,par)≥offset_(A,par),即A中有的分区offset比最小值都小,低越界,那么更新这些offset为M中对应分区的offset;同样道理,如果A中存在分区的offset满足offset_(A,par)≥offset_(N,par),即A中有的分区offset比最大值都大,高越界,那么更新这些offset为N重对应分区的offset。

(4)解决偏移量越界后,使用更新后A集合拉取Kafka中的数据进行处理,处理成功后将最新offset信息写入到gtoffset文件中。因为offset更新到本地文件,无需与zk、kafka等建立外部连接,可以保证更新效率,程序异常也可控制,所以该方案可以类似实现Exactly once传输保障。

3.2  方案测试

本次测试共包括3次试验,3次实验环境完成相同,软硬件环境如下。

第一次实验为enable.auto.commit=true,此时存在数据丢失情况,且出现offset更新为最大值的bug。实验过程是:topic=test1共4个分区,写入100006条记录,设定程序时间间隔为2 s,每秒每分区最大读取50条记录,过10 s时间后停止spark程序,此时数据没有处理完,但是已消费offset(CURRENT-OFFSET)已达到最大值(LOG-END- OFFSET),具体结果见图6。再次启动程序后没有数据可读,数据丢失。经过10次修改时间间隔和处理数据条数,都复现同样的问题。

第二次实验为通过CanCommitOffsets手动提交偏移量,共包括5次验证。实验过程是:topic=test2共4个分区,写入100000条记录,测试5次,每次修改批次间隔和每分区每秒最大读取消息数,每次在消费过程中终止spark程序一次,然后重启程序直到消费完数据。得到的实验结果如表1。

通过实验结果发现每次均存在重复消费,重复消费的数量等于分区数、间隔时间、每分区每秒最大消息数三者的乘积(假设在消费过程中只有一次终止)。

第三次实验为通过SEO方案手动提交偏移量,共包括5次验证。实验过程是:topic=test3共4个分区,写入100000条记录,测试5次,每次修改批次间隔和每分区每秒最大读取消息数,每次在消费过程中通过ssc.stop(true, true)安全终止spark流程序一次,然后重启程序直到消费完数据。得到的实验结果如表2。

通过实验结果发现每次均不存在重复消费也不存在数据丢失,整个实现过程中没有频繁与第三方数据源进行交互,达到了数据不丢失的目的,近似实现了Exactly Once模式。当然,在极端情况下,如果某个批次数据已经处理结束(如导入到数据库中)后,即使安全终止spark任务也未能更新本地gtoffset文件,此时重启spark任务会出现数据重复消费的问题。

4  结束语

互联网飞速发展,数据质量和数据价值最大化是每个互联网企业和传统企业都需要考虑的问题,数据存储与计算的并发性、实时性导致的产品性能优劣直接影响了用户的体验。本文首先介绍新型流式数据处理框架SparkStreaming与Kafka的数据消费结合,提出了一种基于SparkStreaming讀取Kafka的近似Exactly Once方案实现并搭建集群环境继续数据准确性验证。

参考文献

[1] 赵旭剑, 邓思远, 李波, 等. 互联网新闻话题特征选择与构建[J]. 软件, 2015, 36(7): 17-20.

[2] Wang J, Wang W, Chen R. Distributed Data Streams Processing Based on Flume/Kafka/Spark[C]//International Conference on Mechatronics and Industrial Informatics. 2015.

[3] Ichinose A, Takefusa A, Nakada H, et al. A study of a video analysis framework using Kafka and spark streaming[C]// IEEE International Conference on Big Data. IEEE, 2017: 2396-2401.

[4] 王岩, 王纯. 一种基于Kafka的可靠的Consumer的设计方案[J]. 软件, 2016, 37(1): 61-66.

[5] 王郑合, 王锋, 邓辉, 等. 一种优化的Kafka消费者/客户端负载均衡算法[J]. 计算机应用研究, 2017, 34(8): 2306-2309.

[6] 郑健, 冯瑞. 基于Spark的实时视频分析系统[J]. 计算机系统应用, 2017, (12). doi:10.15888/j.cnki.csa.006112.

猜你喜欢
越界副本偏移量
基于格网坐标转换法的矢量数据脱密方法研究
越界·互换·融合——中国化爵士乐的生成路线与认同政治
面向流媒体基于蚁群的副本选择算法①
搅拌针不同偏移量对6082-T6铝合金接头劳性能的影响
基于最小二乘平差的全极化SAR配准偏移量估计方法
副本放置中的更新策略及算法*
阵列方向图综合中PSO算法粒子越界处理研究
没有炊烟的城市(选章)
越界婚姻的伦理窘境:评史密斯《南街》
基于Andriod多屏互动的遥控器设计