CSPRJ:基于数据倾斜的MapReduce连接查询算法

2018-03-27 01:23魏夏飞胡彩林
小型微型计算机系统 2018年2期
关键词:海量分区框架

周 娅,魏夏飞,熊 晗,胡彩林,李 玲

(桂林电子科技大学 计算机与信息安全学院,广西 桂林 541004)

1 引 言

数据指数级增长给海量数据(massive data)[1]分析处理带来了严峻挑战,而MapReduce[2,3]是一种线性可伸缩编程模型,其具有良好可扩展性、高可用性以及容错性,适用于海量数据密集型计算任务.海量数据分析处理常常和MapReduce联系到一起,因为实时海量数据分析处理需要像MapReduce这样并行计算框架来向数十、数百甚至数千台计算机来并行处理.但是,分析处理的海量数据信息常常存在数据倾斜场景,而采用传统MapReduce框架并行计算连接查询算法难以对数据倾斜做到有效负载均衡,不利于充分发挥MapReduce集群并行计算特性.

本文主要从以下几个方面论述:第1节介绍国内外关于MapReduce连接查询算法研究现状及不足,并引出本文研究内容;第2节简述问题定义、MapReduce连接查询算法.第3节详细描述本文提出的算法思想与实现;第4节通过实验验证本文提出的算法性能,并给出详细分析;第5节对全文进行总结与简述下一步工作目标.

2 相关工作

连接查询是海量数据分析处理核心操作算子之一,在日志分析、联机分析处理等数据密集型计算领域也频繁使用.海量数据连接查询思想很多都来源于传统数据库中的join算法,并借助MapReduce并行计算框架来提升性能.目前,国内外针对MapReduce并行计算框架连接查询主要可分为3种类型:传统MapReduce连接、基于改进MapReduce连接和基于索引MapReduce连接.

传统MapReduce连接通过自定义Map函数和Reduce函数实现连接功能,如文献[4]设计两表等值连接的标准重分区连接算法,在shuffle阶段使具有相同连接属性的数据分发到同一个Reducer节点,在Reducer节点进行笛卡尔连接查询操作;文献[5]设计两表相似度连接算法,过滤不符合连接条件的数据,减少I/O网络通信与CPU计算开销;文献[6]设计多表等值连接算法,利用一个MapReduce作业处理星型连接与链式连接,简化实现连接操作复杂性;文献[7]采用一种新重定向输出策略,满足连接条件的数据可以一次性发送到同一个Reducer节点进行连接查询处理.上述这些算法的共性是实现简单,主要是通过增加计算节点来线性扩展计算能力,而对于所处理数据本身的特点则鲜少考虑.

基于改进MapReduce连接,如文献[8]提出CHMJ算法,对连接属性进行多副本一致性哈希策略,有效提高连接查询效率;文献[9]设计Map-Reduce-Merge新型编程框架,在Reduce阶段后面附加Merge操作,可以方便地实现关系数据库中的笛卡尔连接查询操作;文献[10]设计Map-Join-Reduce编程模型,在Map和Reduce之间增加Join操作,利用一个MapReduce作业就可以完成多表连接查询操作.上述这些算法在一定程度上扩展了传统MapReduce框架,增强了分析处理海量数据的效率,但这些算法只能用于一些简单的连接查询处理,对于复杂查询其性能并不理想.

基于索引MapReduce连接,如文献[11]将索引结构引入MapReduce计算框架,借助索引实现数据剪枝预处理,缩减待处理数据空间;文献[12]在Hadoop[13]和Hive[14]的基础上,设计HadoopDB系统,充分利用传统关系型数据库中的索引机制提高连接效率;文献[15]实现了SpatialHadoop,在存储层采用传统的空间索引技术,如网格、R-树、R+树建立二级空间索引,在空间查询场景下比传统Hadoop大约快两个数量级;文献[16]基于垂直分组设计一个多表连接混合系统Llama,将多表连接查询分解为无数据耦合多个子查询,大大减少MapReduce作业数.上述这些算法将传统数据库的索引机制引入MapReduce框架,很大程度上提高了查询效率,然而随着处理数据的急剧增加维护索引的代价也随之增加,查询性能也急剧下降.

现有针对MapReduce框架连接查询优化算法大都从集群层面出发,对于更细粒度数据倾斜导致MapReduce集群连接操作性能下降的研究甚少.本文将聚焦于数据倾斜场景下海量数据连接查询算法研究,设计并实现统计倾斜轮询分区连接查询算法.通过实验,对比本文提出的算法与传统MapReduce框架并行连接查询算法在不同数据倾斜率下连接查询性能,并给出详细分析.

3 连接查询

根据连接操作进行位置不同,可以将基于MapReduce框架的连接查询大致分为MapSideJoin、ReduceSideJoin、SemiJoin三种类型.本文主要围绕ReduceSideJoin展开研究.

3.1 问题定义

定义参加连接查询的两张表分别为C和O,C约定为主表,O约定为从表,Ci和Oj分别为C和O的属性,nc和no分别为C和O属性数量,则C和O属性集C′和O′可表示为:

C′={Ci|1<=i<=nc}

O′={Oj|1<=j<=no}

其中,连接属性x∈C′∩O′、y∈C′、z∈O′,不失一般性,连接条件约定为C.x=O.x,查询条件约定为Sc和So,投影属性约定为P,连接操作可以描述为:

σC.x =O.x(C×O)

投影操作可以描述为:

πp(C×O)

则本文的连接查询可以定义为:

(πP(σC.x=O.x∧SC∧SO(C×O)))

其对应的SQL查询用例可表示如下所示:

(πC.y,O.z(σC.x=O.x∧SC∧SO(C×O)))

3.2 MapReduce连接算法

为了衡量与评价本文提出的算法性能,本文选择3种传统经典MapReduce连接查询算法作为参考标准.这3种算法分别为标准重分区连接算法[4](StandardRepartitionJoin,以下简称SRJ),改进重分区连接算法[4](ImprovedRepartitionJoin,以下简称IRJ),过滤型改进重分区连接算法[5](WithFilterImprovedRepartitionJoin,以下简称WFIRJ).上述3种算法的特性、优缺点及其应用场景请参考相应的文献,本文不再累述.

4 统计倾斜轮询分区连接算法

在2.2节中,对3种传统经典MapReduce连接查询算法进行了简单的介绍,这些算法都采用Hadoop平台默认哈希分区算法.哈希分区在数据均衡场景下能较有效的将数据均匀分发到各个Reducer节点,较好利用MapReduce集群并行计算性能.在数据倾斜场景下,哈希分区分割数据集时并没有考虑数据的具体情况,经过哈希分区之后可能导致某个或某几个Reducer节点处理大量数据,而个别Reducer节点处理完少量数据之后处于闲等待状态,因此无法充分利用MapReduce集群并行计算能力.

本文针对ReduceSideJoin在数据倾斜场景下的性能瓶颈问题,设计并实现统计倾斜轮询分区算法(CountSkewPollingRepartitionJoin,以下简称CSPRJ).在本文,我们主要描述CSPRJ算法的核心思想,而关于MapReduce框架的具体实现过程请查阅文献[13].CSPRJ算法核心思想包含两部分:统计倾斜与轮询分区.

4.1 统计倾斜

在Map阶段对分析处理的记录进行统计倾斜信息.为此,定义3个全局计数器recordcounter、joinkeycounter、joinkey(这3个计数器分别代表整个作业进行连接操作的记录数、连接键数量、每个连接键对应记录数量),负责对记录的信息进行统计.

首先,在Mapper节点执行setup函数,利用Hadoop的DistributedCache机制把连接属性加载到Mapper节点缓存中;然后,执行map函数对输入分片记录进行解析,并判断缓存中是否存在该连接属性,如果存在则计数器进行计数并进行相应处理,否则过滤掉该无效数据,以减少网络I/O传输与CPU计算开销.统计倾斜流程可表示如图1所示.

4.2 轮询分区

在Shuffle阶段对倾斜数据采用轮询分区算法进行分区.为此,在处理数据时通过计算Map阶段统计的计数器并判断所处理的记录是否属于倾斜数据.如果该条记录不属于倾斜数据,则采用hashpartition算法对该条记录进行分区,否则采用自定义轮询函数:

(key%reducenum+key/reducenum)%reducenum)

图1 统计倾斜Fig.1 Count skew

对倾斜记录进行分区.经过轮询函数处理将倾斜数据轮询分发到不同的Reducer节点,使各个Reducer节点处理的数

据量基本均衡,以充分利用MapReduce集群并行计算能力.轮询分区流程如图2所示.

图2 轮询分区Fig.2 Polling repartition

4.3 CSPRJ算法实现与执行流程

CSPRJ算法计算框架与执行流程可表示如图3所示.

图3 CSPRJ算法计算框架与执行流程Fig.3 CSPRJ Algorithm Computing Framework and Execution Flow

基于4.1,4.2节的统计倾斜与轮询分区思想,本节给出CSPRJ算法具体实现过程.

在Map阶段:

1)基于4.1节的思想对输入分片信息进行采集,统计倾斜数据及过滤无法进行连接操作的数据;

2)对连接属性添加标签组成combinekey作为键,并以,value>形式输出.

在Shuffle阶段:

1)基于4.2节的思想对倾斜数据采用轮询分区算法,否则采用哈希分区算法对数据进行分区;

2)采用combinekey.tag字段对同一分区中的数据进行排序,使分区中的数据以表为单位进行排序,即一张表的数据始终排在另一张表前面;

3)采用combinekey.key对同一分区中的相关数据进行分组,并以,list>形式发送给Reducer节点.

在Reduce阶段:

1)对combinekey相应的list数据集进行解析并缓存其中一张表的数据,然后使用缓存中的数据与list中剩余数据集进行笛卡尔连接查询操作.

CSPRJ算法核心伪代码描述如下所示:

Map阶段:

输入:Tablec,Tableo

输出:,value>

Setup函数执行①中语句块

① DistributedCache(joinkey)

Map函数执行②~⑦中语句块

② For each MapInputSplitRecord

③ Decode(MapInputSplitRecord)

④ If(!DistributedCache().contains(joinkey.key))

Return;

End If

⑤ If(MapInputSplitRecord∈Tablec)

joinkeycounter++

combinekey

value(value+tag11

End If

⑥ If(MapInputSplitRecord∈Tableo)

recordcounter++

joinkey.put(key,counter++)

combinekey

value(value+tag22

End If

⑦ Emit,value>

End For

Shuffle阶段:

Partition函数执行如下语句块

输入:Partition(,value>)

①If(joinkey.get(combinekey.key) <(recordcounter / joinkeycounter))

hashcode(hash_func(combinekey.key)

Return hashcode % reducenum

② Else

Return ((combinekey.key % reducenum + combinekey.key / reducenum) % reducenum)

5 实验与结果分析

5.1 查询用例描述

TPC-H[17]基准数据集是查询与事务处理常用性能测试数据集.本文以TPC-H中CUSTOMER和ORDERS两张表作为连接查询表,分别使用CUSTOMER表的custkey、custname字段和ORDERS表的custkey、clerk字段,两张表通过字段custkey进行连接.本文实验SQL查询用例描述如下所示:

SELECT CUSTOMER.name,ORDERS.clerk

FROM CUSTOMER,ORDERS

WHERE CUSTOMER.custkey = ORDERS.custkey;

5.2 实验环境与数据

实验集群环境包括5个节点,其中一个主控节点,4个计算节点.每个节点都配有双核Intel(R) Xeon(R) CPU,主频2.53GHz,4G内存,500G存储磁盘;节点间通过百兆内部局域网互联;操作系统为Red Hat Linux 6.5.Hadoop版本为Hadoop2.6.4;实验数据集使用TPC-H中的CUSTOMER和ORDERS两张表.TPC-H数据集是严格均匀的,本文根据实验设定对数据集进行处理,设置多种倾斜率,数据连接率设定为80%(其中连接率约定为连接数据占总数据的百分比).实验测试数据如表1所示.

5.3 实验与分析

实验采用MapReduce默认分片大小,即128MB;设定Reduce分区数量为8;本文主要从两个方面衡量算法性能:①不同倾斜率下连接查询性能;②连接查询中Map、Reduce的时间消耗.

5.3.1 不同倾斜率下连接查询性能

实验环境与测试数据详见5.2节.为了客观准确评价SRJ、IRJ、WFIRJ与本文提出的CSPRJ算法性能,本节对数据连接率设定为80%,CUSTOMER表数据量固定为2万条,ORDERS表数据量从2500万条渐增至17500万条,数据倾斜率分别设定为0、0.4和0.8三组,实验结果如图4所示.从图4可以得出如下分析.

表1 实验数据
Table 1 Experimental data

1)图4(a)显示数据均匀场景下,数据分发到各Reducer节点的数据量大致均衡,因此四种算法性能差异并不大;总体而言,WFIRJ与CSPRJ对无效数据进行过滤,减少部分网络I/O传输与CPU计算开销,性能稍好.

2)图4(b)、图4(c)当倾斜率为0.4与0.8时,数据发生倾斜.SRJ分别只能进行5组与3组实验,这是因为数据倾斜导致某个或某几个Reducer节点需要缓存大量数据从而导致OOM异常;IRJ与WFIRJ在一定程度上避免了OOM异常,但当数据发生倾斜时,还是无法避免某个或某几个Reducer节点处理大量数据的情况;WFIRJ与IRJ主要区别在于,WFIRJ在Map阶段对不能进行连接的无效数据进行了过滤,但WFIRJ并没有解决数据倾斜导致负载不均匀问题.

1)本文设计与实现的CSPRJ不仅避免了OOM异常、过滤无效数据,而且经过Map阶段统计倾斜、Shuffle阶段轮询分区,保证数据基本均匀分发到各个Reducer节点,实现MapReduce集群节点间负载均衡、较好利用其并行计算特性.

图4 不同倾斜率下连接查询性能Fig.4 Join query performance under different skew rates

5.3.2 连接查询中Map、Reduce时间消耗

在5.3.1节中对SRJ、IRJ、WFIRJ、CSPRJ这四种算法进行了实验并给出较为详细分析,本节将对上述四种算法在MapReduce中两个核心操作Map和Reduce的性能评价与分析.为了对上述四种算法进行客观评价与分析,本节采用实验条件与5.3.1节相同,实验结果如图5所示.从图5可以得出如下分析:

1)由于SRJ在Map阶段没有对数据进行额外预处理,所以不管数据是否倾斜SRJ在Map阶段所消耗时间最少;而IRJ、WFIRJ和CSPRJ在Map阶段对数据都做了相应预处理损失了一些性能,其中WFIRJ和CSPRJ在Map阶段对不能进行连接的无效数据进行了过滤,减少部分数据写磁盘操作,因此相对于IRJ,WFIRJ与CSPRJ在Map阶段性能比IRJ略好.

2)本文所述四种算法在Map阶段对数据进行不同处理,从而导致在Reduce阶段表现出不同性能.如图5(a)所示,在数据均匀场景下这四种算法各个Reducer节点负载均衡,但相对于其他三种算法,SRJ在Reduce阶段缓存所有进行连接查询的数据,故性能相对较差;如图5(b)、图5(c)所示,当数据发生倾斜时,SRJ算法性能下降最快,并且由于SRJ在Reduce阶段对所有数据进行缓存从而导致OOM异常,无法得到正确结果;IRJ和WFIRJ有效解决SRJ在Reduce阶段潜在的OOM问题,但当数据发生倾斜时,还是无法避免Reducer节点负载不均匀问题,所以当数据发生倾斜时,IRJ和WFIRJ性能都有不同程度下降.

3)针对数据倾斜导致各个Reducer节点负载不均,影响MapReduce集群并行性能问题.本文设计实现的CSPRJ有效解决了这个问题.从图5(a)、图5(b)、图5(c)可以发现,不管数据是否发生倾斜,CSPRJ均能保证各个Reducer节点达到基本负载均衡,充分利用MapReduce框架并行计算能力,提高连接查询性能.

6 结束语

数据倾斜在海量数据分析处理中普遍存在,本文主要针对数据倾斜场景下连接查询算法研究.通过分析SRJ、IRJ和WFIRJ等传统经典MapReduce并行计算框架连接查询算法及其相应适用场景与性能瓶颈;进而,基于统计倾斜与轮询分区两个核心思想,设计并实现基于数据倾斜的统计倾斜轮询分区连接算法.实验结果表明,本文提出的算法能有效处理数据倾斜场景下海量数据连接查询,且在现实应用场景中得到应用并取得较好性能提升.本文的后续工作将从索引机制的研究展开,借鉴基于B树、Hash等算法优化提高MapReduce连接查询性能.

[1] Liu Xian-cong,Song Bin.Hadoop-based mass data TCP packet reassembly technology[J].Computer Engineering,2016,42(10):113-117.

[2] Chen J,Chen H,Wan X,et al.MR-ELM:a MapReduce-based framework for large-scale ELM training in big data era[J].Neural Computing and Applications,2016,27(1):101-110.

[3] Li R,Hu H,Li H,et al.MapReduce parallel programming model:a state-of-the-art survey[J].International Journal of Parallel Programming,2016,44(4):832-866.

[4] Blanas S,Patel J M,Ercegovac V,et al.A comparison of join algorithms for log processing in MapReduce[C].Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data,ACM,2010:975-986.

[5] Vernica R,Carey M J,Li C.Efficient parallel set-similarity joins using MapReduce[C].ACM SIGMOD International Conference on Management of Data,SIGMOD 2010,Indianapolis,Indiana,USA,June,2010:495-506.

[6] Afrati F N,Ullman J D.Optimizing multiway joins in a Map-Reduce environment[J].IEEE Transactions on Knowledge & Data Engineering,2011,23(9):1282-1298.

[7] Hui Sun.Join processing and optimizing on large DataSets based on Hadoop framework[D].Nanjing:Nanjing University of Posts and Telecommunications,2013.

[8] Zhao Yan-rong,Wang Wei-ping,Meng Dan,et al.Efficient join query processing algorithm CHMJ based on Hadoop[J].Journal of Software,2012,23(8):2032-2041.

[9] Yang H C,Dasdan A,Hsiao R L,et al.Map-reduce-merge:simplified relational data processing on large clusters[C].ACM SIGMOD International Conference on Management of Data,ACM,2007:1029-1040.

[10] Jiang D,Tung A K H,Chen G.Map-join-reduce:toward scalable and efficient data analysis on large clusters[J].IEEE Transactions on Knowledge & Data Engineering,2011,23(9):1299-1311.

[11] Hungchih Yang,Parker D S.Traverse:simplified indexing on large map-reduce-merge clusters[C].Database Systems for Advanced Applications,International Conference,DASFAA 2009,Brisbane,Australia,April 21-23,Proceedings,2009:308-322.

[12] Abouzeid A,Bajda-Pawlikowski K,Abadi D,et al.HadoopDB:an architectural hybrid of MapReduce and DBMS technologies for analytical workloads[J].Proceedings of the Vldb Endowment,2009,2(1):922-933.

[13] Melorose J,Perroy R,Careas S.Hadoop definitive guide[M].Hadoop:The Definitive Guide,Yahoo! Press,2015:1-4.

[14] Vohra D.Apache hive[M].Practical Hadoop Ecosystem,Apress,2016.

[15] Eldawy A,Mokbel M F.SpatialHadoop:a MapReduce framework for spatial data[C].IEEE,International Conference on Data Engineering,IEEE,2016:1352-1363.

[16] Lin Y,Agrawal D,Chen C,et al.Llama:leveraging columnar storage for scalable join processing in the MapReduce framework[C].ACM SIGMOD International Conference on Management of Data,SIGMOD 2011,Athens,Greece,2011:961-972.

[17] Chiba T,Onodera T.Workload characterization and optimization of TPC-H queries on apache spark[C].IEEE International Symposium on Performance Analysis of Systems and Software,IEEE Computer Society,2016:112-121.

猜你喜欢
海量分区框架
一种傅里叶域海量数据高速谱聚类方法
贵州省地质灾害易发分区图
上海实施“分区封控”
有机框架材料的后合成交换
框架
海量快递垃圾正在“围城”——“绿色快递”势在必行
“海量+”:大学生品格提升的浸润方——以高职艺术设计专业为例
一个图形所蕴含的“海量”巧题
大型数据库分区表研究
大空间建筑防火分区设计的探讨