卞昊穹, 陈跃国, 杜小勇, 高彦杰
(1.数据工程与知识工程教育部重点实验室(中国人民大学);2.中国人民大学 信息学院,北京 100872)
Spark是继Hadoop之后出现的通用高性能并行计算平台,应用于大规模的分布式数据处理.在存储方面,Spark采用了基于分布式共享内存的弹性分布式数据集(Resilient Distributed Datasets,RDD)[1]作为数据结构,同时兼容HDFS,可将HDFS中已经存在的大量现有数据作为数据源加载到RDD中进行处理,并将必要的数据在内存中进行容错的缓存以支持高性能的迭代计算.在计算模型方面,Spark与MapReduce相似,但更加灵活,在RDD上提供了更加丰富的操作符及操作符间的组合方式.
由于Spark的高可扩展性和较高的数据处理性能[2],在大规模SQL查询分析正在被研究和应用.目前基于Spark的开源SQL查询分析系统有Shark[3]和Spark SQL[4].在存储方面,无论HDFS还是Spark RDD,为了实现良好的可扩展性,都采用了较简单的存储模型,将数据以大数据块的方式分布式存储在集群的各个节点上,不支持传统并行数据仓库中的数据预划分和数据索引;同时为了简化应用程序开发,Spark与Hadoop等平台的存储分布性对上层应用透明,这使得基于这类平台的数据分析系统亦无法干预数据的分布.
等值连接是结构化数据分析中最为常见、代价最高的操作之一,在传统并行分析型数据库中对等值连接操作的优化大多基于数据预划分,无法在Spark中实现.在Shark和Spark SQL中使用最多是Broadcast Join和Repartition Join.Broadcast Join局限性大,网络和内存开销大,计算复杂度较高;Repartition Join则要求在查询执行时对参与连接的两表作重新划分,网络和内存开销也很大.在结构化大数据分析中,事实表和维表的数据量可能都很大,为了提高数据分析的实时性,需要对现有的Spark上的等值连接算法进行优化.
本文针对大数据分析中事实表与维表,尤其是大维表的等值连接提出了基于Spark优化的连接算法.该算法首先将事实表在连接属性上进行以数据块为单位的去重,然后将按块去重后的事实表与维表进行预连接,预连接通过将去重后的事实表和维表基于一致性哈希的原理进行划分,使之在集群中并行完成,避免了可能的数据倾斜问题,连接结果与事实表再进行一次组装得到最终连接结果.经过理论分析和实验验证,本文算法较目前Spark上性能最好的SQL数据分析系统Spark SQL[3]的连接性能提高了1-2倍,且具有很好的可扩展性.
基于Spark/MapReduce的大数据分析中,常用的连接算法有Simi-Join及其变种(如Per-split Simi-Join)、Broadcast Join、Repartition Join 等[5-6].其 中 Simi-Join 和 Broadcast Join局限性较大,通常性能较差,Repartition Join适用性最好,在绝大多数情况下具有最好的性能.但Repartition Join在Spark上具有如下的缺点.
(1)需要在查询时对数据进行动态的重划分,通信量较大,尤其在宽表的情况下;
(2)通过Hash函数划分到同一节点的很多事实表元组在外键上具有相同值,内存和计算资源消耗较大.
针对以上问题,本文提出了一种对现有的Spark上的连接的优化算法.本文中所用到的符号描述如表1.
定义1 连接并行度:连接时连接属性哈希分区的个数.
算法的执行流程包括如下两个阶段.
阶段一:动态优化与预连接
表1 符号列表Tab.1 Frequently used symbols
该阶段根据Fact和Dim的元组数量计算出连接的并行度Pd,并计算出Fact的连接属性值Key的无重集FactUK,FactUK中的每一个元素除了包含Key外,还记录了Key对应于Fact元组的存储位置.将FactUK与Dim根据并行度Pd进行哈希连接,得到结果JoinedUK,这一过程为预连接.由于FactUK与Dim 的数据量都远小于Fact,因此预连接的网络I/O、内存开销和CPU开销远小于Fact和Dim的Hash连接.本文中预连接所用的哈希函数借鉴了Dynamo、Cassandra等NoSQL数据库中用于数据分布的一致性哈希算法的思想[7-8],避免了现有系统中所采用的简单哈希函数受到数据倾斜影响较大、造成连接时负载不均衡的问题,且查询时动态计算出的并行度更加合理,使得预连接的效率得到了进一步的提高.JoinedUK中同样包含Key对应于Fact元组的存储位置.
阶段二:组装连接结果
该阶段根据JoinedUK中记录的Fact中元组的存储位置将JoinedUK与Fact进行组装,得到最终的连接结果.这一阶段无须再进行网络I/O和内存、CPU开销巨大的Hash连接.JoinedUK的数据量不大于FactUK,在部分实际的查询中,JoinedUK的数据量远小于FactUK,这一阶段仅需将JoinedUK根据Fact中元组的存储位置与Fact进行组装,代价很小.
本文的等值连接算法在Spark中可以高效地实现.下面对算法各阶段中的一些过程作细节描述.
Fact中的元组需要在连接属性Key上进行去重(投影).但是由于Fact是分布式存储在集群中的,需要对Fact进行shuffle和reduce才能完成去重,Fact数据量巨大,这样去重的代价很高.实验分析发现,由于实际数据分析中采用的数据块通常较大(64MB整数倍),块内包含的元组通常很多,加之数据的分布通常具有一定的局部性,因此重复数据以数据块为单位进行去重也可以达到较好的效果.所以本算法的去重分为两个阶段,第一阶段先以数据块为单位进行去重,其结果参与预连接,在预连接的过程中,重复的连接数据会划分到相同的分区中,从而进一步去重.Spark的mapPartitions操作[2]用于实现以数据块为粒度的计算,可用于实现数据块级别的去重.
连接并行度是Repartition Join的一项重要的参数,该参数设置过大或过小都会一定程度地影响连接的性能.研究者在Shark的研究实现中提出了动态的参数优化[9],在Hive等SQL-on-Hadoop系统中,查询优化是查询执行之前基于数据的静态统计信息进行的[10],Shark中在查询运行时的动态参数调整给Shark带来了性能的提升.本文中借鉴了动态优化的思想,将连接并行度的设定在连接过程中完成.因为在连接执行之前难以估计参与连接的实际数据量,在查询执行前的静态查询优化阶段难以准确地设定这一参数.本文中的连接并行度通过式(1)所示的函数getPd计算得出.
Pd=getPd(FactUK,Dim)= min(FactUK.partitionNum+Dim.partitionNum,ω)(1)其中,partitionNum表示数据块的个数,ω用于控制并行度的最大值,在实际的数据分析系统中,往往有多种数据分析和处理负载共享系统中的资源,控制并行度可以防止当连接操作对资源的过度侵占.实验发现,当集群中没有其他负载时,ω取分配给Spark集群的CPU核数的2倍左右可以得到最好的连接性能;在Spark中数据集(RDD)的partitionNum可以快速获取.
经过连接属性上的去重之后,FactUK中的元素个数和数据量都远小于Fact,但FactUK和Dim的数据量仍然可能超过单机处理能力,且仍可能存在数据倾斜.在预连接中,本文借鉴了一致性哈希[7]的思想对FactUK和Dim进行划分,具体流程为如下.
(1)对FactUK和Dim中的Key取并集(不去重),并根据采样率φ进行采样,得到样本sampleUK.
(2)计算sampleUK中Key的Hash值(如果Key为整型,Hash值可以是其本身),并将Hash空间划分为Pd个区间,保证每个区间中的样本数基本相等.
(3)将FactUK和Dim按照(2)中计算出的区间Hash划分到各个区间中,这些区间的物理位置分布在集群的多个节点上.
Spark中支持RDD上的union和sample[2].其中union是两个元素类型相同的RDD上的并操作,该操作并不会移动两个RDD的数据,也不会进行去重,只是将RDD的元数据合并;sample是RDD上的采样操作,在各个partition上并行进行.这两个操作代价都很低,可快速完成(1);而利用Spark中的 RangePartitioner[2]可以快速完成(2)和(3).
FactUK和Dim一致性哈希划分之后,相关的集群节点并行地对各个分片进行连接,得到连接结果JoinedUK,其中每个元素包含了一个Key和Dim中对应于该Key的元组,以及该Key对应的FactRdd分区号的列表.由于划分后的数据缓存在各个集群节点的内存中,且充分利用了多节点和节点上多核的并行计算能力,预连接的速度很快.预连接的目的是进一步去重并排除没有连接结果的Key.在实际的大规模数据分析中,相当一部分表连接查询中,都存在大量的Key上没有连接结果,浪费了大量网络I/O和计算资源.
Spark中的cogroup操作可以实现将两个数据集按照Key快速分组,对分组结果集作简单的过滤操作即可保留有连接的Key,完成预连接.
预连接结果JoinedUK和Fact再做一次连接即可得到最终的连接结果.但这样需要广播JoinedUK,代价较高.本文根据JoinedUK中包含的Key对应的Fact的分区号,将JoinedUK按照Fact的分区号做重新的划分,由于JoinedUK的数据量较小,划分代价较低.划分后的结果和Fact具有相同的分区数且各分区和Fact一一对应,因此组装的过程以分区为粒度在集群中并行完成,效率很高.组装完成后即得到最终的连接结果,可继续在连接结果上完成查询计划中的其他操作.
在Spark中通过自定义的Partitioner可以快速将JoinedUK按照Fact的分区号进行划分,划分得到的结果集具有和Fact相同的分区(数据块)数,之后通过Spark中的zipPartitions操作将Fact和JoinedUK做快速的组装,这一过程无需在网络上传输Fact中的数据.
Spark是基于分布式共享内存的分布式计算平台,在数据的处理过程中,数据可以始终保持在内存中.本文假设参与连接的事实表和维表上的中间结果数据可以被缓存在内存中,因此,连接的代价主要来自网络I/O、内存空间占用和CPU计算,这也是内存计算中性能考量的三个主要方面.
本文的代价模型从网络I/O代价、计算复杂度、内存空间代价三个方面建立的.
1.网络I/O代价
本文的连接算法中,事实表去重和计算连接并行度过程中几乎没有网络I/O的开销,网络I/O开销来自预连接过程的数据划分和连接结果的组装,如式(2)所示.
其中NetCostjp是预连接数据划分(RangePartition)的网络I/O代价,用数据量表示为式(3).
由于去重后的FactUK中元组个数少于Fact,且每个元组仅包含一个Key值及其对应的分区号,因此FactUK的数据量远小于Fact,即Size(FactUK)=α·Size(Fact),0<α≪1.Dim的数据量也远小于Fact,即Size(Dim)=β·Size(Fact),0<β≪1.
NetCosta是结果组装的网络I/O代价,其中包括将预连接进行一次划分和在组装过程中跨节点读取对应分区的通信代价,而这两个子过程的通信代价不会超过NetCostjp,通常由于预连接之后相当于对Fact在连接属性上做了全局的去重且排除了不能连接的元组,NetCosta会远低于NetCostjp.假设参与连接的节点数为N,则总的通信量将由这些节点分摊,因此本文算法的网络I/O代价估算结果如式(4)所示.
2.计算复杂度
算法中计算分区、Hash探索、判断等值等基本运算的执行次数都是和数据规模呈线性关系的.假设Fact中的元组数为n,Dim中的元组数为m,由于n≫m,则算法的计算复杂度为O(n).
3.内存空间代价
算法中,除了事先缓存在内存中的参与连接的数据外,还需要缓存一些中间结果以加快计算,且连接过程中,需要将参与连接的数据在内存中建立Hash表、进行哈希探索,故需要一定的内存空间消耗.
由于FactUK需要用于采样以确定预连接时的Hash空间划分并且要参与预连接,在Spark中对多次使用的数据进行缓存可以提高计算的效率,因此在算法实现中对FactUK进行了缓存.此外在预连接中需要将划分后的FactUK和Dim在内存中建立Hash表,完成连接.虽然这两个过程是依次进行的,但由于Spark的机制问题,在预连接开始之前,被缓存的FactUK数据难以被及时释放,故算法的内存代价应为两者之和.FactUK通常比Fact小很多,所以算法的内存代价如式(5)所示.
基于Spark/MapReduce的大数据分析常用的连接算法中,Simi-Join及其变种性能较差,适用性较差,仅在特殊的条件下才使用,目前在Spark上没有其系统实现,因此本文不做与Simi-Join的对比分析.而Broadcast Join也仅适用于维表很小的情况下,在Spark SQL等基于Spark的大数据分析系统中并不常用.假设事实表Fact中的元组数为n,每个数据块中存储的元组数为bf,维表Dim中的元组数为m,每个数据块中存储的元组数为bd,其中bf和bd是常数.则有:
(1)Broadcast Join:的每个节点的平均网络通信量为Size(Dim),远大于本文算法;计所需内存空间等于维表的数据量乘以事实表的数据块数,即其中bs为事实表数据块大小,通常为64 MB整数倍.
(2)Repartition Join:在查询执行时需要将维表和事实表的数据进行划分,所以每个节计算复杂度同样为O(n);所需内存空间为Size(Fact)+Size(Dim).
本文算法与Broadcast Join、Repartition Join的对比如表2所示.
表2 等值连接算法代价对比Tab.2 Cost comparison of equi-join algorithms
可见,网络I/O代价方面,由于N为参与连接的节点数,通常与集群规模有关,是一个较大的正整数,而0<α+β≪1,所以本文算法网络I/O代价远低于Broadcast Join和Repartition Join;计算复杂度方面,本文算法和Repartition Join为同一数量级,低于Broadcast Join;内存空间代价方面,由于为事实表数据块数,通常是一个较大的正整数,所以本文算法的内存空间代价远低于Broadcast Join和Repartition Join.
本文在Spark上实现了所述的连接算法,并以TPC-DS[11-12]作为测试基准来测试连接算法的性能,与Spark SQL和Shark进行了对比.
实验的软件环境如表3所示.
表3 软件环境Tab.3 Software environment
本文的实验在实验室的云平台上完成,所用虚拟机集群的配置如表4所示.
表4 集群配置Tab.4 Cluster settings
所用的测试数据为TPC-DS 100GB和300GB数据集中最大的一张事实表Store_Sales和最大的一张维表Customer,存储在HDFS上.测试所用的两张表的数据量占数据集总数据量的40%左右,如表5所示.Store_Sales中的外键ss_customer_sk与Customer中的主键c_customer_sk可以连接,连接时每个表除联接属性外,各使用了6个32位整型列.
表5 测试表的数据量Tab.5 Data volume of testing table
定义2 Fact与Dim连接选择率:Dim中能与Fact产生连接结果的元组数占Dim总元组数的比率.
在实验中,取连接选择率为70%,采用TPC-DS 300GB数据集,集群节点数为16,每个节点给Spark分配3个核,测试连接并行度Pd对连接执行时间的影响,结果如图1所示.
图1 连接并行度对连接执行时间的影响Fig.1 Impact of parallelism degree on join execution time
可见当Pd取分配给Spark集群的CPU核数的2倍,即96时可以得到最好的连接性能,并且Pd在该值周围时,查询执行时间变化不大.
在实验中,取Pd为分配给Spark集群的CPU核数的2倍.通过人为加入随机因数改变两表的连接选择率来验证在不同的连接选择率下连接算法的性能.实验结果如图2所示.
图2 连接执行时间对比Fig.2 Join execution time comparison
图2中的执行时间是5次执行取得的平均值,单位为s,保留小数点后两位.根据查询计划分析,Spark SQL中使用的是Repartition Join算法,而Shark中使用的是Broadcast Join算法.实验中测试了不同连接选择率下的连接操作执行时间,Shark的执行时间全部超过1 800s或者报错,因此执行时间没有标注在图中.Spark SQL使用的Repartition Join对连接选择率不敏感.本文中的连接算法随着连接选择率的下降,执行时间有所下降,并趋向一个稳定值,这个稳定值是算法执行过程中预连接的执行时间.在最坏情况下,即连接选择率为100%时,本文连接算法的执行速度仍然比目前性能最好的Hash Join高出1倍.
通过图2(a)和(b)的对比,当集群中节点数量增加1倍(8个增加到16个),数据量增加2倍(288M行到864M行),扩展性最理想情况下执行时间增加50%,本文连接算法的平均执行时间(5种连接选择率下执行时间的平均值)增加63.7%,而Spark SQL的Repartition Join平均执行时间增加63.1%,可见本文的算法和Repartition Join都具有较好的可扩展性.
随着Spark等大规模集群式的内存计算框架在大数据分析中的普及,交互式的大数据分析成为必然的趋势.连接性能是交互式大数据分析的主要瓶颈.本文对Spark/Hadoop上现有的等值连接算法进行了分析研究,提出了一种改进的等值连接算法.该算法具有很好的适用性,在事实表和大维表的连接中表现出良好的性能,比现有系统中的连接算法性能高出1~2倍.并且随着连接选择率的降低,算法的性能会进一步提高.
本文的算法可以作为等值连接的操作符加入到现有的Spark SQL等数据分析系统中,提高现有数据分析系统的性能.
[1] ZAHARIA M,CHOWDHURY M,FRANKLIN M J,et al.Spark:cluster computing with working sets[C]//HotCloud2010.USENIX Association Berkeley,CA:[s.n.],2010:10-10.
[2] Spark[OL].http://spark.apache.org/.
[3] Shark[OL].http://shark.cs.berkeley.edu/.
[4] Spark SQL[OL].http://spark.apache.org/sql/.
[5] BLANAS S,PATEL J M,ERCEGOVAC V,et al.A comparison of join algorithms for log processing in MaPreduce[C]//SIGMOD2010.New York:ACM,2010:975-986.
[6] SAKR S,ANNALIU,FAYOUMI A G.The Family of MapReduce and Large-Scale Data Processing Systems[J].ACM Computing Surveys(CSUR),2013,46(1).
[7] KARGER D,LEHMAN E,LEIGHTON T,et al.Consistent hashing and random trees:distributed caching protocols for relieving hot spots on the world wide Web[C]//STOC97.New York:ACM,1997:654-663.
[8] DECANDIA G,HASTORUN D,JAMPANI M,et al.Dynamo:Amazon’s highly available key-value Store[C]//SOSP2007.New York:ACM,2007:205-220.
[9] XIN R S,ROSEN J,ZAHARIA M,et al.Shark:SQL and rich analytics at scale[C]//SIGMOD2013.New York:ACM,2013:13-24.
[10] THUSOO A,SARMA J S,JAIN N,et al.Hive:a warehousing solution over a map-reduce framework[J].PVLDB,2009,2(2):1626-1629.
[11] OTHAYOTH R,POESS M.The making of TPC-DS[C]//VLDB2006.New York:ACM ,2006:1049-1058.
[12] POESS M,NAMBIAR R O,WALRATH D.Why you should run TPC-DS:a workload analysis[C]//VLDB2007.New York:ACM ,2007:1138-1149.