魏占辰,刘晓宇 ,黄秋兰,孙功星
1.中国科学院 高能物理研究所,北京 100049
2.中国科学院大学,北京 100049
随着信息技术的快速发展,数据量呈现爆炸式的增长,海量数据给存储和处理带来了极大的挑战,越来越多的应用朝着分布式系统的方向发展,因此以Hadoop[1]、Spark[2]为代表的大数据处理框架应运而生。Spark具有良好的数据处理效率,保证了系统稳定性、可扩展性和可用性,还提供交互式编程接口,非常适合迭代式、交互式或实时数据分析的场景,具有广泛的适用性,能够适应各类领域的大数据应用。
虽然Spark等分布式系统能够极大提升数据处理效率和系统吞吐量,但不可避免地带来额外的性能损耗,这些损耗包括分布式任务派发、调度、结果收集,以及算法自身在分布式场景的开销。如果不能有效地降低额外损耗,它们可能成为分布式计算中影响性能提升的瓶颈。在部分迭代密集型计算的场景中,例如高能物理数据分析中的分波分析(Partial Wave Analysis,PWA)计算,由于每轮迭代过程只有极少量参数发生变化,迭代次数多,迭代任务短,因此在现有的Spark 机制下,框架带来的额外损耗成为了计算过程中的主要性能瓶颈点。目前学术界对于Spark 优化方法的研究,主要集中在开发原则优化、内存优化、配置参数优化、调度优化和Shuffle 过程优化这5 个方面[3],具体研究内容有数据填充和拉取策略研究[4],内存数据缓存和替换策略的研究[5],优化数据存储机制的研究[6],以及Shuffle过程中的数据读写过程优化[7]、文件写入机制优化[8]、调度算法优化[9]、数据压缩算法决策[10]等研究。对于迭代计算的研究,则主要有为增量型数据设计的增量式迭代计算模型[11],以及利用数据的分散性和局部性进行分级、分区域的迭代计算方法[12]等。文献[13]提出了一个Spark 性能预测模型,文献[14]分析了CPU、磁盘和网络等计算资源对Spark 性能的影响,但是它们均未对执行效率及Spark框架自身进行深入分析。上述文献对于迭代密集型应用没有给出一个有效的优化策略和解决方案。
在系统性分析Spark 的核心机制后,本文归纳并总结了Spark 应用在执行过程中的额外消耗,据此提出一种分析Spark 执行效率的公式。通过该公式可以分析Spark 应用的性能瓶颈点,并针对迭代密集型应用提出相应优化策略。经过测试,该公式可以准确分析性能瓶颈,优化策略能够极大提高计算效率。
在Spark中,引入了一个可并行化处理的、带有容错性的新型分布式数据模型——弹性分布式数据集[15](Resilient Distributed Dataset,RDD),用户所提交的程序或启动的交互式Shell称为一个Application,包含若干RDD运算。RDD通过划分分区(Partition)进行数据分片和计算并行化,通过两类算子Transformation 和Action完成复杂运算。每个Action 算子会触发一个真实的运算过程,称为Job。每个Transformation算子会产生一个新的RDD,据此建立与父RDD的依赖关系。RDD的依赖关系有两种,分别是依赖固定父RDD 分区数量的窄依赖关系和依赖全部父RDD 分区并进行数据混洗(Shuffle)的宽依赖关系。由于Shuffle 的后续数据处理过程必须等待Shuffle完成后才能计算,因此Spark在宽依赖关系处进行切分,划分出不同的Stage,以Shuffle连接,形成有向无环图(Direct Acyclic Graph,DAG),图1为一个典型的Spark 单个Job 的计算流程图,即以RDD的依赖关系形成的有向无环图。
图1 Spark计算流程图
在Stage 内部全部为窄依赖关系,因此每个分区的数据可以独立计算,Spark将按照RDD的依赖顺序形成一条计算流水线,称为Task;在Shuffle 处,Spark 将前一个Stage 产生的数据排序并序列化写入磁盘,交由后续Stage进行处理,Shuffle过程是Spark内最容易影响性能的瓶颈点。
在一个Application 内,执行用户数据处理逻辑、分析RDD依赖关系、生成并派发Task的组件称为Driver,存储数据、接收并执行Task的组件称为Executor。用户在Driver 中提交的Job 均会由DAGScheduler 分析RDD依赖关系,并在各Stage 内生成一组TaskSet,交由Task-Scheduler 调度并序列化为二进制数据,由Scheduler-Backend分发到Executor执行,每组Task的数量与RDD的分区数量一致,代表了计算的并行度,图2是Spark任务调度运行的流程图。
广播变量(Broadcast)用于Application 内的数据共享,它在每个节点内只保存一份,因此广播变量产生的数据副本的数量与节点数一致。累加器(Accumulator)是一种分布式变量,它在Task中进行数值更改,最后在Driver中聚合这些修改,因此它的副本数量与Task数量一致。
根据2.1节的分析,可知Spark应用在运行时不可避免地会产生额外的消耗,包括分布式系统自身的消耗和实现分布式算法所引入的额外消耗。因此,需要建立一个用于分析分布式计算额外消耗和执行效率的模型,为有针对性地优化Spark分布式程序提供相关理论基础。
图2 Spark任务调度运行流程图
定义1(有效计算时间)一个算法或任务为得到相应结果而进行的必要的数据计算、处理和分析所用的时间。有效计算时间是衡量一个算法的性能和复杂度的重要指标。
定义2(分布式并行计算代价)分布式并行计算代价为Spark在执行某一任务、处理某一数据时,因框架自身和算法所需而产生的额外消耗。由于这些代价需要额外CPU 时间处理,因此本文以处理分布式并行计算代价的时间(即计算代价时间)作为其评估指标。
定义3(有效计算比)一个算法执行的总时间包含了有效计算时间和计算代价时间,有效计算时间与总时间的比值即为有效计算比。有效计算比越大,表明该数据处理过程的效率越高,越容易达到理想的并行加速比。
下面根据Spark 应用执行流程确定其有效计算时间、计算代价时间和有效计算比。根据定义1、定义2和定义3,在某个Job中,假设有m个Stage,则有(m-1)个Shuffle,则其组成可定义为Job={stage1,stage2,…,stagem}∪{shuffle1,shuffle2,…,shufflem-1} ,使用了v个Executor。由于每个Stage会产生与输入RDD的分区数量一致的Task,若stagej输入RDD的分区数为nj,则其任务集为Taskj={task1,j,task2,j,…,tasknj,j}。对于taski,j,设其序列化时间为di,j,传输到executor 的时间为ei,j,反序列化的时间为fi,j,由于Spark 的任务序列化和发送工作在Driver 节点串行执行,由此可以得到Taskj的任务准备时间Dj为:
在原有串行算法改为并行后,需要有额外的初始数据分配、额外的结果收集以及辅助算子才能完成数据处理过程。设初始数据分配时间为M,结果数据集为Result={result1,result2,…,resultnm} ,resulti的序列化时间、传输时间和反序列化时间分别为gi、hi、li,由于Driver接收数据及反序列化过程为串行,因此可以得到结果收集的时间R为:
使用辅助运算功能(如Accumulator、Broadcast)的计算代价与实际使用情况密切相关,并且具有一定的节点内共享特性,因此要根据实际情况测算算法带来的计算代价E。若一个 Job 中,Accumulator 和 Broadcast 均为1 个,且它们的平均传输时间分别为a、b,并行度为u,executor数量为v时,可以得到一个关于E的计算公式为:
除此之外,由于Spark 在Stage 之间要产生Shuffle,设shufflej的计算代价为sj,由此可以得到一个Job中总的计算代价C为:
若taski,j的有效计算时间为ti,j,不难得出对于任务集Taskj的有效计算时间Tj和整个Job 的有效计算时间V为:
由公式(4)、(6)可以得出一个Job 的有效计算比K为:
根据公式(1)~(7),可以根据实际情况推导出整个Spark 应用的有效计算时间、计算代价时间和有效计算比,因此本文不再赘述。
由此可以看出,计算代价受到框架自身、分布式并行算法以及原始数据分布等多方面的影响。在大多数Spark大数据应用中,由于原始数据难以预测,因此很容易造成数据倾斜,将会造成大量的Shuffle。由公式(3)、(7)可知,巨大的Shuffle 过程会极大提升计算代价,严重拖慢Spark应用的性能,降低整个应用的有效计算比。
在Spark 效率分析公式的基础上,可以根据实际情况进行有针对性的优化,从而提高数据分析效率。在高能物理中使用的分波分析方法是一类Spark迭代密集型应用,下面以此为例具体介绍优化过程。
分波分析是一种观察高能物理实验中产生的轻强子之间的共振态结构的数据分析方法,它能够精确测量共振态参数以及其产生衰变的性质[16]。分波分析需要在高统计量样本数据上进行数值拟合的计算,其核心过程为使用最大似然法估计待定参数,需要反复迭代以求得最优参数,因此分波分析是一类典型的大数据科学计算。
由于样本数据的计算过程是独立的,因此可以很容易地将该过程并行化,将数据划分成若干区块放入不同的RDD分区中,在各部分计算完成后由Spark框架汇总结果,并决定是否进行下一轮迭代计算。
由Spark 分波分析计算的执行流程可知,该类计算在Spark中每进行一次迭代就会产生一个Job,每次迭代的任务完全一致,只有若干参数进行更新,没有Shuffle过程,并且每一个Job仅包含一个Stage。由于输入数据不发生变化,因此除第一次计算外,其他迭代计算过程均没有数据的初始分配时间。若总迭代次数为p,忽略数据分配时间,则可得到第i次迭代的计算代价时间ci以及总计算代价C为:
有效计算时间和有效计算比的计算方法与公式(6)和公式(7)一致,由此可以得到,计算代价时间与迭代次数正相关,降低迭代过程中每一轮的计算代价以及消除计算代价与迭代次数的相关性是优化该类问题的关键。
由于每次迭代计算过程不变,因此可以采用将多个迭代计算的Job 化简为一个Job,即将Task 只分发一次的策略,消除冗余Task分发,从而达到将计算代价与迭代次数的相关性消除的目的,为计算过程带来极大的性能提升。
为实现上述策略,本文基于Spark 现有运行机制设计并实现了一个迭代控制服务模块(Iteration Control Service,ICS),将参数更新分发、迭代流程控制以及结果收集过程交由该模块负责,从而与Spark 原有任务和数据分发机制进行一定分离,达到降低计算代价的目的。ICS由Master和Worker两部分组成,其中Master为迭代主控制模块,负责控制Worker 计算、分发迭代参数、收集结果;Worker为迭代计算模块,执行具体计算任务,并缓存每轮迭代计算结果以确保数据同步和任务容错性。由此可以很容易得出基于ICS 机制优化过后的分波分析总计算代价C′为:
由公式(10)可以看出,使用ICS消除了任务分发的冗余,使计算代价仅与参数更新和结果收集相关,而ICS模块将允许使用者将这两部分自行控制,根据数据分析的实际情况,进行细粒度的优化与控制。
为评估本文提出的Spark迭代计算优化方法的实际效果,本文设计了一组实验来进行性能对比。
本文的实验环境是一个5 节点组成的集群,使用Spark Standalone模式进行任务调度。除Spark外,还部署了Hadoop 和Alluxio 以完成分波分析的计算工作。Standalone Master、NameNode 和 AlluxioMaster 部署于主节点中,Standalone Worker、DataNode和AlluxioWorker部署于4个从节点中,实验环境的详细情况如表1所示。
表1 测试集群软硬件环境
为分析RDD 迭代分波分析计算的计算代价情况,实验选择了约6 GB 样本数据作为目标分析数据,完成该样本的分析需要迭代237次,实验过程使用全部资源进行计算,即并行度为48。
由于分波分析过程的迭代次数与实际数据的拟合过程相关,无法对其进行精确控制,因此为测试不同迭代次数情况下的Spark 分布式并行分波分析的运行性能,在现有基础上将迭代次数修改为直接由用户指定,以此仿真实际情况中不同迭代次数的运算过程。
本文首先测试了选取的分波分析样例在优化前后的性能对比以及与串行程序的对比,评估指标选取了执行时间和有效计算比。执行时间越短、有效计算比越高表明计算任务的性能和效率越好。测试结果如表2所示。
表2 分波分析测试结果对比
从测试结果中可看出,优化前的Spark 分波分析程序相比于原有的串行程序,执行时间缩短了约80.2%;优化后的Spark 分波分析程序相比于串行程序,执行时间缩短了约93.7%;与优化前相比,执行时间缩短了约68.2%,单次迭代的计算代价减小了约80.5%,有效计算比提升了约0.373。由上述数据可以看出,Spark效率分析公式和依据该公式制定的优化策略能够有效提升高能物理分波分析程序的性能和执行效率。
为测试本文提出的优化策略在不同迭代次数和不同子任务长度下对于执行效率和有效计算比的提升情况,在仿真条件下分别测试了程序的有效计算比和执行性能,其中执行性能以执行时间评估。本文设计了三组仿真作业,所有仿真作业均人为指定迭代次数,其中仿真作业1的迭代子任务与真实计算过程一致,仿真作业2和仿真作业3的迭代子任务会将计算过程反复执行若干次,从而延长有效计算时间,经测算仿真作业2的单次有效计算时间约为400 ms,仿真作业3约为1 000 ms。图3和图4展示了仿真作业1在迭代次数为100、200、400、800、1 600、3 200 和 6 400 时的有效计算比和执行时间,为便于图形展示,执行时间取对数后作为图4的纵坐标。
图3 优化前后仿真作业1有效计算比对比图
图4 优化前后仿真作业1执行时间对比图
从图3 中可以看出,优化前的仿真作业1 有效计算比基本保持不变;而优化之后由于减少了任务的分发次数,提高了参数分发和结果收集过程的效率,使得有效计算比得到约0.066~0.520的提升。从图4中可以看出,优化前的仿真作业1与串行版本相比,执行时间缩短了约83.4%~90.6%,而优化后与串行相比缩短了约91.4%~97.2%,与优化前相比缩短了约48.3%~69.4%。
图5、图6 和图7 展示了仿真作业2 和仿真作业3 在迭代次数为100、200、400、800和1 600时有效计算比和执行时间的对比情况,为便于图形展示,执行时间取对数后作为图6、图7的纵坐标。
图5 优化前后仿真作业2、3有效计算比对比图
图6 优化前后仿真作业2执行时间对比图
图7 优化前后仿真作业3执行时间对比图
由上述测试可以看出在单次迭代的有效计算时间较长时,本文提出的优化策略也能够在一定程度上提高迭代计算的执行效率和有效计算比。在有效计算比方面,优化后的仿真作业2 提升了约0.038~0.073,优化后的仿真作业3提升了约0.027~0.067。在执行时间方面,优化前的仿真作业2相比于串行程序缩短了约96%,优化后与优化前相比缩短了约16.2%~20.6%,与串行程序相比缩短了约97%。优化前的仿真作业3 相比于串行程序,执行时间缩短了约96.5%,优化后相比于串行程序,执行时间缩短了约97%,相比于优化前,执行时间缩短了约6.6%~12.9%。
Spark是当前应用最广泛的并行计算框架和模型之一,本文主要致力于Spark 运行迭代密集型应用的性能优化研究。通过深入研究Spark 分布式任务执行流程,提出有效计算时间、计算代价和有效计算比等概念,以此构建Spark 效率分析公式,为精确分析Spark 应用的效率提供理论支持。在此基础上,本文还提出了一个针对迭代密集型应用(例如高能物理中的分波分析方法)的优化策略,将Spark多次任务分发过程简化为一次,并优化了参数更新和结果收集等过程,从而减少了计算代价,提升了有效计算比,使执行效率得到大幅提升,在实际应用中取得了较好的优化效果。本文在实现迭代密集型应用的优化策略时设计了迭代控制服务模块,该模块扩展了标准Spark 的功能,保证了良好的编程接口和可扩展性,能够为类似应用提供参考。
Spark效率分析公式能够为开发者合理安排并行算法并行度和有针对性优化分布式并行应用程序提供理论依据和参考。在未来的工作中,将进一步研究Spark的性能瓶颈点,继续完善和挖掘Spark的性能优化方法,提升整个系统的效率和吞吐率,扩大Spark在多个领域,特别是科学计算领域的应用。