陈天宇,张龙信,李肯立,周立前
1(湖南工业大学 计算机学院,湖南 株洲 412007)2(湖南大学 信息科学与工程学院,长沙 410082)
随着大数据与云计算的兴起,基于集群的大规模数据处理已成为各大IT公司的解决方案[1].许多高科技企业使用的大数据分析技术具有迭代特性,包括使用图计算来进行PageRank或社交网络分析,使用机器学习进行聚类或回归分析等.这些应用共同的特点是其数据需要多次迭代处理直到满足收敛条件或者结束条件,于此同时,大量的数据需要在迭代中重用[2].
目前比较有影响力的大数据处理框架有Hadoop、Spark、Storm等[3],它们均源于Google早期提出的MapReduce思想.Hadoop在进行迭代计算时速度比较慢,当待处理的数据规模越来越大时,Hadoop通过分布式文件系统读写的性能瓶颈愈发明显,而Spark正是在这种背景下应运而生.Spark针对迭代计算中需要被多次使用的工作数据集进行优化,引入了内存集群计算的概念,将数据集缓存在内存中,以减少访问延迟.用户可以调整Spark的存储策略,Spark是基于弹性分布数据集(RDD)的内存计算,数据集可从记录的信息来源重构.RDD被表示为一个对象,并且可以在文件中创建[4].
由于Spark的存储策略可由用户选择定制,Spark整个计算过程有不同的存储策略可供选择[5].用户可以根据经验选择缓存到内存或是物理硬盘中,由此产生的计算时间不尽相同.当缓存的是无用数据时,该数据会占用内存空间,系统性能将降低.如果缓存了错误的数据还会导致内存溢出等后果,甚至可能将随后重用的中间结果忽略,造成计算时间延长.通常情况下,计算机内存空间是有限的,无法保证每个计算节点有充裕的内存空间缓存数据.
目前的存储替换策略中,先入先出(FIFO)策略主要关注创建时间,最近最少使用(LRU)策略更侧重于命中历史访问次数,然而这些经典的算法没有考虑分片的计算成本问题[6].现有的研究大多数基于此类算法改进,其中部分改进算法诸如GreedyDual[7]、GD-Wheel[8]均考虑了访问历史和计算成本,但在一个作业中待处理任务的执行逻辑在Spark中是已知的,对访问RDD历史次数进行优化的缓存策略效果不明显.类似的研究还有不考虑RDD的访问历史次数的LCS[9]算法,以及考虑计算成本次数加权的WR[10]算法.本文针对内存占用率与RDD替换时的权重,考虑了分片的历史访问次数和计算成本等因素对权值的影响,并提出缓存权重替换算法(CWS).
本文第一部分介绍了Spark的应用背景;第二部分阐述了Spark的相关概念;第三部分介绍相关的预备知识;第四部分详细介绍本文提出的算法;第五部分通过实验验证了本文提出的算法的性能;最后对本文工作进行了总结.
弹性分布式数据集的提出,是对不适合非循环数据流模型应用的一种改进.非循环数据流模型是从物理存储中加载记录,将操作记录传入有向无环图(DAG)后写回物理存储介质中[11].计算机集群系统通过调用这个数据流图来完成调度工作和故障恢复.当作业在多个并行操作时需要重用工作数据集(例如迭代算法、交互式数据挖掘[12]),系统会将数据输出到物理存储介质,计算机集群每次重用时都需要重新加载,从而导致开销较大.
RDD是Spark框架的核心抽象,作为Spark的用户,可以把RDD看成独立数据分片的集合.RDD具有以下特征:
1) RDD能够记录其血统(Lineage)信息,RDD能够记录其本身是怎样通过其他数据集来产生或者转换的.在某个RDD分片丢失时,能够根据Lineage信息恢复该分片,而不必重新计算所有分片.
2) RDD中的数据分片存储在集群中的节点上,可并行操作每个分片中的数据.RDD的每个分片上都有函数,通过函数调用可操作RDD的分片转换.
3) Spark在任务调度时,尽可能将计算任务分配到数据块所存储位置,即数据本地化.
RDD自身是只读数据集,仅能通过对其它的RDD执行转换操作(例如map,join和groupBy)创建.传统的分布式共享内存寻找丢失的分片需要checkpoint和rollback操作,开销较大,而RDD的内存共享方式既能保证低开销,又能保证容错性.RDD通过转换操作转换成不同的RDD后,可通过lineage来计算出相应的RDD分片.RDD所有分片都分布在集群系统的各节点上,默认情况下的RDD在需要用到时都会被重新计算.
在Spark中,RDD被表示为对象,RDD的操作仅包含转换操作和行为操作.RDD在转换操作执行完后才可能执行行为操作,此时DAGScheduler为每个执行中的任务生成一个DAG(有向无环图),随后将任务上传至集群.图1是Spark中的作业调度模型,本文对于选择算法部分的优化基于DAG将选择RDD分片的缓存至内存.
图1 Spark中的任务调度Fig.1 Task scheduling in the Spark
图2展示了RDD在集群中的缓存,RDD2是由RDD1经过Transformation操作转换得到的,RDD2在接下来的运行过程中有可能被再次使用,所以Spark主动缓存RDD2.假设RDD2中包含3个分片21、P22和P23,集群中有3个节点,缓存时P21、P22和P23分别存储在节点1、节点2以及节点3的内存中.
图2 集群中的RDD缓存Fig.2 RDD cache in a cluster
在Spark默认的缓存机制中,当RDD在内存中完成计算后,可通过CacheManager来获取结果.RDD分片由CacheManager来缓存,而CacheManager中的操作取决于BlockManager提供的API.用户通过BlockManager来决定分片将从内存或者磁盘中获取,而MemoryStore则确定分片是否缓存至内存中.当分片占满内存之后,默认情况下系统将用LRU算法来选择要被替换的分片[13].传统的LRU算法忽略了RDD分片的大小和计算成本,无法确定所选择的分片是否将被重用.本文的工作正是针对RDD分片的选择和替换策略进行优化.
Spark在任务执行过程中需要用到某个RDD时,并非立即对该RDD进行计算,任务调度器会根据该RDD的lineage关系构建一个DAG图.生成DAG后,Spark会将任务划分为不同的stage,执行后得到目标的RDD.在本文中用表示分片,Pij表示第i个RDD的第j个分片.
当剩余的缓存不足以存放新的RDD时,Spark默认的缓存策略会替换最近最少使用的RDD.在一个作业中RDDn所包含的分片Pnm被推出缓存后,若在后续的作业中被重用,该分片会被再次计算,采用默认的算法会产生很多不必要的开销,我们需要解决默认情况下仅考虑最近最少使用的逐出带来的计算效率差异.
在计算RDD的过程中,任务的stage通过DAG来划分,本文提出的替换策略在执行过程中将stage中满足替换条件的分片进行替换.
图3 一个计算过程的DAG图Fig.3 DAG graph of a computational process
图3展示了Spark中一个作业执行阶段的DAG图,假设该作业的所有分片都在同一个节点之中,实验通过六台虚拟机所组成的节点计算数据.这个任务序列的执行阶段Stage如公式(1)和公式(2)所示.
Stage11{RDD3→RDD4→RDD5}
(1)
Stage21{RDD3→RDD5}
(2)
RDD是由分片组成的,假定一个作业中所有的RDD分片大小都相同,用Pnm表示第n个RDD的第m个分片,那么:
RDDn=Pn1+…+Pnm+…+Pnp
(3)
假设每个分片的执行时间为TPnm,则Stage2中的执行总时间为:
TP=TP32+2TP31+TP41+TP42
(4)
我们可以看到P31被重复计算了,而这仅仅是一个作业中的小部分,整个作业中可能存在大量重复的高开销计算.
Spark在数据处理的过程中,为每个作业生成一个DAG.在作业的计算过程中,有些变量会反复出现,若将这些变量缓存在内存中可以显著提升处理速度.Spark默认内置的LRU是通过LinkedHashMap实现的,本文所实现的工作在此基础上重构了内存管理策略.
默认的LRU算法选择最近使用的RDD分片,不考虑分片的计算成本和大小.当两个RDD分片的大小和重复出现的次数相同时,应该缓存计算成本更高的分片;而当两个分片的计算成本和重复出现的次数都相同时,则需要考虑缓存空间较小的分片.
通常情况下,当计算缓存资源不够用时,需要对缓存中的RDD分片进行替换,我们用MemoryC表示集群内存总资源,计算节点中实际空闲内存记为MemoryA,默认条件下,假设此时的任务中有q个RDD在等待,则等待计算的RDD所需占用的内存资源必须小于MemoryA.即:
(5)
Spark中的行为操作都是在转换操作之后才执行的,所以可从DAG图中统计分片重复使用的次数,在作业执行的过程中,本文把缓存中已有的RDD记为MemoryS.
默认情况下系统使用LRU算法淘汰最长时间未被使用的分片,如果该分片需要被再次使用,此分片重新计算的成本可能会非常高.LRU无法确定分片是否有保存的意义,本文提出的CWS算法能选择高权值的分片并保留在缓存中,该算法的伪代码如算法1所示.
算法1.选择算法
Selection algorithm
actual free cacheMemoryA
maximum cacheMemoryC
selected blocksMemoryS
Output:evicted RDDs
1 get a DAG from the driver of the spark
2 allocate memory according to DAG
3if(MemoryE 4fori=1toq 5if(MemoryE 6MemoryA 7elseif(MemoryE>MemoryA) 8 call replacement algorithm 9endif 10endfor 11endif 在选择算法中,MemoryE表示等待执行队列中的RDD,RDD的数量记为q个,剩余的缓存总量表示为MemoryC.算法首先在设定好缓存的替换策略后,遍历内存查找是否已有待选择的RDD,若已有则停止循环.第5-9步判断实际空闲内存的大小,当实际空闲内存大于待分配的内存大小时,将RDD放入缓存中,否则跳转至替换策略.该选择算法的时间复杂度为O(n). Spark中的各节点处理能力基本相同,那么分片完成作业花费的时间更多,则意味着其计算成本更高.通常情况下,我们可以使用分片大小近似作为其计算成本.在替换分片的过程中,当发现某个分片占用的存储空间较大(计算成本更高)而使用次数又较少时,本文提出的算法将用存储空间较小且使用次数相同的分片替换该分片,或者使用存储空间大小相近且使用次数更多的分片来替换该分片.在公式(6)中,当分片的使用次数Fnm不变时,分片的计算成本Spnm的增加将导致该分片的权值变小;当Spnm不变时,该分片的权值随着Fnm增加而变大.本文用Fnm表示RDDn的第m个分片的使用次数,分片大小记为Spnm,分片的权值Vpnm则可表示为: Vpnm=Fnm/Spnm (6) 若RDDn有m个分片,那么RDDn的权值VRn为: (7) 当内存中的分片缓存接近饱和时,替换算法将根据Vpnm和VRn替换权值较小的分片.替换算法的伪代码如算法2所示. 算法2.替换算法 Replacement algorithm Input:selected blocksMemoryS the valueVRnof candidate RDDs the size of partitionsSpnm execution sequencePnm Output:evicted RDDs 1fori=1toq 2if(Pnm==Pni) 3 break; 4elsequickly sortMemorySforVRn 5 expel RDD; 6endif 7endfor 在替换算法中,首先判断待替换分片是否存在于内存中,若不存在则对内存中待替换的缓存序列按权值进行快速排序,得到新的序列,最后按照新的序列逐出RDD.该算法的时间复杂度为O(n). 为避免因实验环境差异带来的实验结果对比的不便,本文尽量使用与目前主流研究算法一致的实验环境.在服务器上部署含有6个节点的集群,每个节点配备8核2.2GHz Intel Xeon E5-2620 CPU、物理硬盘空间80G、内存根据实验条件可调整为2G、4G、8G等多种情况.集群使用的操作系统为CentOS 7,Scala版本为2.11.8,而Java开发工具包版本为1.7,Hadoop的版本为2.2.0,Spark版本为2.0.0.实验过程中的计算时间通过对三次运行结果取平均值,内存占用率则通过Ganglia[14]监控获取. 测试数据集使用斯坦福大学提供的公开网络分析项目获取的17个真实的图形数据集.这些数据集的Nodes和Edges对执行时间和内存的使用情况影响较大.实验采取PageRank算法对这些数据集进行排序,数据集如表1所示. 表1 斯坦福大型网络数据集 NameNodesEdges Descriptionp2p-Gnutella041087639994Gnutella peer to peer network from August 4,2002p2p-Gnutella242651865369Gnutella peer to peer network from August 24,2002wiki-Vote7115103689Wikipedia who-votes-on-whom networkp2p-Gnutella3162586147892Gnutella peer to peer network from August 31,2002Cit-HepTh27770352807Arxiv High Energy Physics paper citation networksoc-sign-Slashdot08110677357516757Slashdot Zoo signed social network from November 6,2008Cit-HepPh34546421578Arxiv High Energy Physics paper citation networksoc-sign-Slashdot09022182144549202Slashdot Zoo signed social network from February 21,2009Soc-sign-epinions131828841372Epinions signed social networkSlashdot090282168948464Slashdot social network from November 2008Amazon03022621111234877Amazon product co-purchasing network from March 2,2003Web-Stanford2819032312497Web graph of Stanford.eduAmazon03124007273200440Amazon product co-purchasing network from March 12,2003Wiki-Talk23943855021410Wikipedia talk (communication) networkweb-Google8757135105039Web graph from Googlecit-Patents377476816518948Citation network among US patentssoc-Pokec163280330622564Pokec online social network 由于Spark是基于内存计算的框架,所以集群系统中各节点的内存空间越充足其计算优势越明显.当缓存重复的RDD分片时,充足的内存能保证最大限度地缩短执行时间.在本文实验中,根据目前国内外研究均使用的PageRank算法来测试性能,使用开源项目Ganglia监控内存使用率[15].每个数据集在Spark集群环境下运行PageRank算法三次,取其时间的平均值.本文使用表1中的数据集来测试充足内存条件下的执行时间和内存占用率,分别测试了默认情况的下LRU、权重替换WR和CWS的策略. 图4(a)是每个数据集在内存空间宽裕的条件下对应的执行时间.当每个数据集在充裕的内存中被处理时,WR的替换算法优势并不明显,而WR的选择算法由于执行选择算法时会频繁统计分片的使用次数,数据集较小时,其处理时间可能会稍微增加.但是,随着数据集的Nodes和Edges数量的增加,WR算法的计算时间会减少.CWS算法在计算过程中减少了统计分片的使用次数,降低内存占用率的同时允许计算时间适当地增加.计算较小的数据集时,由于减少了频繁的统计分片使用次数,从而缩短了计算时间.计算较大的数据集时,算法侧重于降低内存的占用率,执行时间不是算法重点考虑的指标.CWS算法在处理表1所示的图形数据集时,总的执行时间比WR算法平均降低了2.4%.在图4(b)中,当每个节点使用8G内存时,WR算法的内存占用率明显高于其它算法,CWS尽管在计算时间上没有明显的优势,但其内存的占用率较低. Spark在内存接近饱和时使用LRU算法来重新分配内存,在有限的内存情况下替换算法起着重要作用.我们将LRU、WR和CWS算法分别在单个节点内存大小分别为2G和4G条件下进行比较.我们使用表1的数据集运算PageRank算法测试,使用Ganglia监控内存使用率.每个数据集进行三次运算后取平均值,发现每个节点的内存使用情况大致相同.因为Spark使用负载均衡策略来管理资源,所以我们用其中一个节点的内存使用情况对比.图5(a)展示了单个节点在2G内存条件下完成较小数据集的处理的执行时间,图6(a)展示了单个节点在4G内存条件下完成较大数据集的执行时间. 图4 单节点使用8G内存时数据集运行不同算法后的执行时间和内存占用率Fig.4 Task execution time and memory usage comparison under different algorithms with RAM=8 GB in each node 图5 单节点使用2G内存时数据集运行不同算法后的执行时间和内存占用率Fig.5 Task execution time and memory usage comparison under different algorithms with RAM=2 GB in each node 从图5(a)和图6(a)的对比,我们可以看出,在内存有限的情况下,计算量较大的数据集会导致缓存空间不够,LRU算法仅根据分片最近是否使用来决定是否替换,当被替换的分片计算成本较高时,此时可能需要花费更多时间;WR算法会频繁计算分片的使用次数,当数据集较小时,也会消耗掉部分时间.从图5(b)不难看出,2G内存条件下各算法的内存占用率基本一致,与LRU相比,WR算法的平均计算时间有较大改善,CWS算法在计算时间节省方面没有较大提高.在图6(b)中,每个节点使用4G内存时,CWS算法相比WR算法降低了0.8%的内存占用率,而WR算法的内存占用率会随着数据集的增大高于其它算法. 图6 单节点使用4G内存时数据集运行不同算法后的执行时间和内存占用率Fig.6 Task execution time and memory usage comparison under different algorithms with RAM=4 GB in each node 通过对Spark的RDD进行深入研究,我们从选择RDD和替换RDD两个方面对Spark的缓存机制进行算法调优.Spark是基于内存的计算,但Spark默认的机制没有充分利用内存的性能.WR算法虽然考虑了多个因素,但由于频繁的选择操作,在处理较小数据集时,产生了较多的时间开销.本文在此基础上,对算法进行优化.实验结果表明CWS算法在内存空间充裕的条件下处理较小数据的平均执行时间相比WR算法缩短了2.4%,内存占用率降低36%;在内存空间有限的条件下,每个计算节点使用2G内存时,内存占用率与WR算法基本持平.使用4G内存时,与WR算法相比,CWS算法降低了0.8%的内存占用率.4.3 权值替换
5 实验部分
5.1 实验环境
5.2 内存充足条件下的算法对比
Table 1 Stanford large network dataset5.3 有限内存条件下的算法对比
6 总 结