黄文辉,冯 瑞
(1.复旦大学计算机科学技术学院,上海201203;2.上海视频技术与系统工程研究中心,上海201203)
很多“大数据”是实时接收得到的,这些数据往往在刚得到时具有最大的价值[1],并且这些不断产生的“大数据”无论是数据存储还是数据快速处理问题都已经不是单台物理机器能够解决的。因此,许多分布式流式处理平台不断被开发出来,如Twitter公司开发的Storm[2]、Yahoo!公司开发的S4[3]、微软的Timestream[4]以及UC Berkeley AMPLab 开 发 的Spark Streaming[5]等。其 中,Spark Streaming是基于D-Steam[1]模型并构建在Spark计算引擎上[6]的分布式流式计算框架,其特点是结合了流式处理和批处理,相比于其他分布式流式处理的框架,Spark Streaming比较突出的优势就是能够高效并行恢复失败的节点和对执行任务比较慢的任务重新分配。
智能视频监控很多情况下是要对采集的图像进行实时处理。数量众多的监控摄像头,庞大的监控网络,很短时间之内就会产生海量的图像视频数据,如何从这些海量数据中高效地提取出有用的信息,就成为智能视频监控技术要解决的问题[7]。本文介绍了利用Spark Streaming 框架构建分布式视频/图像流处理的测试平台,实现了从数据的传输、处理和存储的整个流程,并为几个重要的参数对性能的影响进行深入研究,提出了CPU 时间占用率作为评估指标,对于评估与改进集群的性能有着重要的参考意义,并为其他类似的分布式计算的性能评估提供了借鉴。目前,单纯从总的处理时间来看,无法得到集群资源(CPU 等)的利用率情况,CPU 时间占用率结合总的处理时间,一方面可以从宏观角度来衡量集群处理能力,另一方面可以从微观角度判断集群的资源利用情况,可以用来调整集群的参数以达到更高的性能或者是否选择扩展集群等。
平台组成如图1所示,分为三大部分:数据传输、数据处理和数据存储。
数据传输由Socket服务器、客户端和Spark Streaming接收器(Receiver)三部分组成。数据源按照一定的通信协议发送数据包到Socket服务器,Socket服务器负责接收客户端发送的数据并转发到Spark Streaming 接收器。Spark Streaming接收器本质是一个Socket客户端,接收器角色是由spark 集群中一个Worker扮演的(由Spark框架决定,该Worker依然会参与数据处理)。客户端发送的信息包括数据源ID、图像属性、图像数据等信息。
服务器充当中转站的作用,Spark Streaming接收器、数据源和服务器三者可以得到分离,各自功能明确。若Server有着公网的IP地址,则数据源和Receiver可以运行在可以连接到公网的局域网中。
Figure 1 Composition of the testing platforms图1 测试平台的组成
接收器接收到数据后按照一定的解码方式对数据进行解码,得到的客户端ID、图像属性和图像数据等信息进行分布式处理,如果需要用到OpenCV 库,图像信息需要进一步解码和转换得到OpenCV 对应的Mat类型再进行处理。整个数据的序列化以及传输采用的是byte[]基本类型数组。
本文采用图像的HOG(Histogram of Oriented Gradient)[8]特征检测作为测试,HOG 特征已经被广泛运用在图像识别中,特别是在行人检测方面获得了很大的成功。如图2所示为HOG 特征行人检测的效果图,方框为检测的结果。
Figure 2 Result of the HOG detection图2 HOG 特征检测效果图
数据存储采用多种存储结合的方式,针对不同的应用采取不同的存储方式,主要有本地存储、Hadoop 分布式存储HDFS 和分布式数据库HBase等。如网页应用可以将结果返回集群的Driver的本地存储,这样可以快速获取结果。
数据包的组织形式如图3所示。
Figure 3 Format of data packets图3 数据包的格式
(1)Block(0):数据包头部,整型,四个字节,记录后面一个数据的总长度;
(2)Block(2k+1):整型,四个字节,记录Block(2k+2)的字节长度(k=0,1,2,…,n);
(3)Block(2k+2):任意长的字节数组,是真正的数据(k=0,1,2,…,n);
(4)Block(2k+3):CRC校验,一个字节。
数据 包 在RDD(Resilient Distributed Datasets)中的抽象如图4所示。在Receiver接收到数据包之后会保存到ArrayBuffer中,ArrayBuffer最终会被包装成ReceivedBlock 并保存在Block-Manager中。在提交Job 的时候这些Received-Block会抽象成RDD,每个ReceivedBlock对应一个partition 并由一个任务Task 处理,每个任务(Task)所要处理的就是ReceivedBlock 中的数据包。
Figure 4 RDD containing data packets图4 数据包的RDD 抽象
集群的硬件配置情况如表1所示。
Table 1 Hardware configuration of the cluster表1 集群的硬件配置
集群的软件配置情况如表2所示。
Table 2 Software configuration of the cluster表2 集群的软件配置
Spark Streaming集群采用Standalone模式、分辨率为1 272×767的PNG 格式的图片,为了更好地进行性能评估,测试样本采用1 000张相同的图片。图片处理方式为HOG 特征提取与检测,提取结果返回Driver端存到本地磁盘。并分别探讨几个重要因素对于平台性能的影响,分别是参数spark.streaming.blockInterval(下 面 用block-Interval代替)、spark.streaming.concurrentJobs(下面用concurrentJobs代替)、批处理时间batchDuration[9]和Receiver的数量。
性能评估采用处理时间和CPU 时间占有率作为指标。前者从宏观角度横向比较来分析平台性能,后者从微观角度出发,可以横向比较也可以纵向比较。
(1)处理时间。
以流式处理1 000张相同的图片所消耗的时间为依据。处理时间作为评估指标简单直观,能够横向对比出各个条件下集群处理数据的效率。
(2)CPU 时间占用率。
CPU 时间占有率表示某一段时间内CPU 各个核数用于数据处理的时间占总时间的比率。由于Spark Streaming每个任务Task都在单独的线程中运行,统计Task的运行时间不难得出线程处理数据的时间。
设N表示为Spark Streaming用户程序设定的最 大 使 用 的CPU 核 数(max-executor-core),n≤N表示第n个核t时刻(以ms为单位)的使用状态,则可表示为:
ρ越大,也就是处理数据的时间占任务的总时间(包括处理数据的时间、任务调度时间、网络传输时间等)的比重也大,说明平台越高效。
(1)spark.streaming.concurrentJobs。
该参数表示一次最多可以同时运行Job的数量,采用线程池的方式来控制和实现Job的数量。batchTime设置为4 000ms,blockInterval设置为1 000ms,其他参数采用默认值的情况下,处理时间、CPU 时间占用率与concurrentJobs的关系图如图5所示。从图5中可以看出,在一定的范围内concurrentJobs越大,程序运行越高效,CPU 时间占用率在本文所用的测试平台可以达到50%以上,CPU 时间占用率逐渐变得平缓,说明concurrentJobs对集群的影响逐渐达到一个最高水平。从Spark Streaming 原 理 来 讲,concurrentJobs过小的情况会使得当前运行的所有Job分别对应的RDD 的Partition数量/Block数量(每个Partition对应一个Task)之和小于Spark Streaming用户程序设定的最大使用的CPU 核数导致一些核处于空闲状态。另外,concurrentJobs的设置要根据Spark Streaming 用户程序的Driver所在机器的CPU 核数的情况来定。本文中Driver所在的机器CPU 核数为32。
Figure 5 Impact of conrrentJobs on performance图5 concurrentJobs对性能的影响
(2)spark.streaming.blockInterval。
该参数设置Receiver的生成Block的间隔,直接影响Block中元素的个数(但不一定每个Block中元素的个数都相等)。在batchTime设为2 000ms以及其他参数采用默认值的情况下,处理时间、CPU 时间占用率与blockInterval的关系如图6 所 示。从 图6 中 可 以 看 出,blockInterval越小,处理时间越少,CPU 时间占有率越高。实际上在concurrentJobs设置为默认值1时,blockInterval越小,Job对应的RDD 的Partition越多,Partition中的元素的数量越少,这样就可以增加Job中Task的数量而减少Task的计算量,进而Job执行的并行度更高,减少了空闲的CPU 核数,从而使得处理时间减少。
(3)批处理时间batchDuration。
Figure 6 Impact of blockInterval on performance图6 blockInterval对性能的影响
batchDuration控制着周期生成Job 的时间,由Spark Streaming 的任务生成器JobGenerator周期性生成Job并交由任务调度器JobScheduler调度运行。在blockInterval设为1 000 ms、其他参数采用默认值的情况下,处理时间、CPU 时间占用率与batchDuration的关系如图7 所示。从原理上来讲当数据流均匀到达时,batchDuration设置越大,一次性处理的数据量越多,可以节省一部分任务生成和调度的时间,从而减少总体完成时间,特别是在单位数据的计算量较小以及对实时性要求不高的情况下可以适当地将batchDuration设置大一点。
Figure 7 Impact of batchDurationon performance图7 batchDuration对性能的影响
(4)Receiver的数量。
Receiver充当接收数据的角色,用户在程序中通过StreamingContext每设定一次Receiver就会产生一个InputDStream,由Spark Streaming框架的DStreamingGraph管理,设定N次就会有N个Receiver,在默认的分配策略下这些Receiver会均匀分布在集群中。图8展示了在blockInterval设定为1 000ms、batchDuration设定为4 000ms,其他按照默认参数的情况下处理时间和CPU 时间占用率与Receiver数量之间的关系。在数据流传输速度一定的情况下,Receiver数量过少会导致数据集中,集群的处理能力跟不上数据接收的速度时会造成一定的滞后;Receiver数量过多会导致数据分散,每个Partition/Block中元素的数量过少,造成任务过多,任务的调度消耗时间增多。
Figure 8 Impact of receivers on performance图8 Receiver数量对性能的影响
Spark Streaming使用的是Spark 计算引擎,该计算引擎“压榨”计算机的硬件资源,包括CPU资源,CPU 时间占用率正是对CPU“压榨”程度的一种重要反映,CPU 时间占用率越高说明对CPU的利用程度越高。
在理想情况下,Spark Streaming用户程序使用的集群中的CPU 总核数等于concurrentJobs×Receiver数量×batchDuration÷blockInterval时,CPU 能得到最大程度的利用,实际上由于框架调度运行时间,数据接收速率不均匀等因素影响需要实际调整:这里面比较好控制的是concurrent-Jobs,但该参数受限于Driver所在机器的CPU 的核数;Receiver可根据数据的传输速度来调整;blockInterval根据数据的计算量来设定,block-Interval过大可能会造成单个Task 计算时间过长,当集群开启straggler监控机制时可能被当作straggler处理;batchDuration的设置根据实时性要求设定,当实时性要求不太高是可以适当设置大一点。
本文针对智能视频监控领域对大规模视频/图像流高效处理的需求,提出了利用Spark Streaming分布式框架构建对视频/图像进行流式处理的思想。重点深入研究了几个重要参数/因素对性能的影响,创新性地提出CPU 时间占用率来衡量性能,并结合总的处理时间,从微观和宏观、横向和纵向来评价,对于参数调整和框架改进以及对于是否扩展集群有着非常重要的参考意义,并对其他分布式架构的性能的评估有着重要的借鉴意义。接下来的研究重点在于针对视频/图像处理领域对Spark Streaming框架进行调整与改进,并针对大规模视频/图像数据流式处理问题进行实际部署和应用。
[1] Matei Z,Tathagata D,Haoyuan L,et al.Discretized streams:Fault-tolerant streaming computation at scala[C]∥Proc of the 24th ACM Symposium on Operating Systems Principles,2013:423-438.
[2] Apache Software Foundation.Storm,distributed and faulttolerant real time computation[EB/OL].[2015-06-25].http://storm.apache.org/.
[3] Neumeyer L,Robbins B,Nair A,et al.S4:Distributed stream computing platform[C]∥Proc of the 10th IEEE International Conference on Data Mining Workshops(ICDMW 2010),2010:170-177.
[4] Qian Z,He Y,Su C,et al.Timestream:Reliable stream computation in the cloud[C]∥Proc of the 8th ACM European Conference on Computer Systems,2013:1-14.
[5] Apache Software Foundation.Apache Spark,lightning-fast cluster computing[EB/OL].[2015-06-28].http://spark.apache.org/.
[6] Zaharia M,Chowdhury M,Das T,et al.Resilient distributed datasets:A fault tolerant abstraction for in-memory cluster computing[C]∥Proc of the 9th USENIX Conference on Networked Systems Design and Implementation,2012:2.
[7] Huang Kai-qi,Chen Xiao-tang,Kang Yun-feng,et al.Intellegent visual surveillance:A review[J].Chinese Journal of Computers,2015,38(6):1093-1118.(in Chinese)
[8] Dalal N,Bill Triggs.Histograms of oriented gradients for human detection[C]∥Proc of the 2005IEEE Computer Society Conference on Computer Vision and Pattern Recognition,2005:886-893.
[9] Apache Software Foundation.Performance tuning [EB/OL].[2015-06-25].http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning.
附中文参考文献:
[7] 黄凯奇,陈晓棠,康运锋,等.智能视频监控技术综述[J].计算机学报,2015,38(6):1093-1118.