基于FILTER算子匹配的增量式DAG计算复用方法

2017-09-05 04:10阚京陈彩梁毅
软件导刊 2017年7期
关键词:分布式计算

阚京+陈彩+梁毅

摘 要:伴随着数据的爆炸式增长,越来越多的大数据业务分析与处理选择分布式计算平台。目前针对大数据的分布式计算框架都支持DAG式的任务编排。由于大数据采集来源以及分布式存储系统特点,很多使用DAG框架進行计算的应用都是增量式的大数据集,但现有的DAG框架对这样的数据集进行计算时有许多冗余,造成计算资源浪费。提出了在DAG框架上进行增量式复用的方法,并针对FILTER算子特点提出了基于FILTER算子匹配的间接复用机制。

关键词:增量计算;分布式计算;计算复用;查询优化;DAG计算

DOIDOI:10.11907/rjdk.171282

中图分类号:TP301

文献标识码:A 文章编号:1672-7800(2017)007-0026-03

0 引言

利用大数据进行业务分析与处理越来越多,Google的Map-Reduce、Apache Hadoop MR以及Apache Spark是应用甚广的分布式计算框架[1-2],即便缺乏分布式系统经验的大数据工作者,也能充分利用分布式计算所带来的性能提升。

早期这些分布式计算框架主要用于批处理作业,随着数据量的增加以及业务需求的演进,更复杂高效的数据操作手段成为大数据分析与处理的重要需求。Pig、Hive以及Spark SQL都提供了相较于MPI及Map-Reduce等传统分布式计算手段更高层次的分布式数据集接口,通过这些接口可以更轻松地对大数据集进行分析与处理[3–5]。

大数据计算平台的分布式存储系统,如GFS、HDFS都具备一些共同特征:数据的更新主要通过附加新数据方式完成,而这类增量式数据进行处理时,往往会产生较多的冗余计算[6-7]。

本文提出了在增量计算中进行间接复用的方法,并通过解构、分析FILTER算子方式,达成比算子级更低粒度的计算复用,从而提升算子复用机会,使整体计算效率得到提升。

1 相关工作

增量计算中的复用方法有以下两种方式:

(1)对结果集进行局部更新,即通过针对特定的分布式作业管线设计一系列动态算法,并依据该算法对此前的结果集进行小规模的局部更新,从而使剩余大部分结果集得以复用。Google设计的可用来更新Page Rank的Percolator就是采用这种方法的代表[8]。该种方式主要针对特定的分布式计算任务。

(2)对作业管线中的子任务进行缓存及复用,从而透明地重用此前的计算结果,由系统自动地将可复用的子任务计算结果进行缓存,在缓存命中的情况下,就可避免相关子任务计算。

ReStore通过分析并改写作业的物理计划达到复用之前计算的目的[9]。ReStore不仅可复用完整的计算作业,还可复用作业中的子作业,有效提升了复用发生几率。

与Restore不同,Nectar系统所对应的输入是DAG作业,而非传统的MR作业[10]。Nectar在DryadInc的两个DAG作业复用算法IDE及MER基础上,阐述了整个复用体系的工作机制,并重点阐述了对数据中心的数据管理方法[7],在复用粒度上与Restore相似。Nectar在查询任务的逻辑计划上进行分析与复用优化,将计算逻辑与计算数据进行复合并作为缓存键,将计算结果存储于分布式文件系统中,并通过缓存服务器对其进行索引。

2 基于FILTER算子匹配的间接复用

2.1 基本概念

大数据规模庞大,在以事务为中心的数据库中难以进行批量计算和处理,非关系式的数据模型索引建立和维护较为困难,在检索上往往面临性能上的挑战。目前分布式计算框架是通过多节点的并行运算来提升计算效能的。

增量数据往往是对事物的客观记录和描述,通常都是历史信息,这类数据具有不会更新、删除的特点。增量型数据可定义为:

其中,D(t)表示t时刻数据集。当需要多次对这样的数据集进行非连接型操作(例如过滤、投影以及聚合操作等)时,往往要付出巨大代价。为了针对增量式数据设计增量式DAG计算复用策略,首先对DAG中的算子进行划分。在这些算子中,可合并计算算子可以有效地进行增量计算复用。可合并计算算子定义为这样一类函数:

则称f为可合并计算算子, merge为其合并函数。在DAG运算中,FILTER、SORT、PROJECT,AGGREGATE等算子都具备该性质。

2.2 间接复用

在对FILTER算子进行复用时,除了在算子完全相同时直接复用计算结果外,还存在间接复用计算结果的可能性。下面介绍基于FILTER算子匹配的间接复用方法。

例:在所有人中找年龄大于30的人,若在此前已经找过年龄大于20的人,且该结果已经缓存,那么这部分被缓存的数据仍然可以利用,即便算子Filter(Person.Age > 30)与Filter(Person.Age > 20)并不匹配,如图1所示。

虚线表示计算上的逻辑需求:为寻找年龄大于30的人,就必须遍历整个Person集合,而目前已经存在一个筛选好的年龄大于20的集合,那么在计算过程中就无需去遍历全集,只需从大于20的人的集合进行筛选即可,这就减少了I/O占用及实际计算规模,这种复用方式称为间接复用。间接复用充分利用了缓存中的小规模数据集,从而使这部分被缓存的数据集可以直接复用。在相同的缓存空间下,使得缓存数据利用率得以提高。因此,从DAG的复用分析角度来讲,间接复用是一种比算子级复用粒度更小的复用手段。若采用间接复用手段,除了算子要具备可合并计算性质,还要求被复用的算子及其依赖的计算流程具备更特殊的性质——包含。这一包含关系可定义为:

对于两个函数G1:A→B 和G2:A→B ,若对于任意的数据D∈A,若总是有:

则称函数G1包含G2。这一包含关系实际上表示了G2的结果集总是可以由G1的结果集导出。

对于具备这种形式的可复用计算算子,可采用如下方式进行增量式复用,首先若给定t1时刻的数据集I,其输出为R1,记为:

当G2试图间接复用该结果时,即将R1置入缓存,在t2时刻(t2≥ t1),若数据集I的增量为Δ,则新的计算结果可根据缓存中的R1和Δ导出:

虽然间接复用过程中对缓存数据应用了G2算子,但是若被复用的算子具备压缩数据规模能力,则对于占大比例的复用数据而言,G2所需处理的数据规模就大大缩小了,而这实质上就减少了每个计算单元的任务量和I/O消耗。

基于此,为实现基于FILTER算子匹配的间接复用,只需找到一种算法来判断两个FILTER算子是否具有包含关系即可。对于给定的两个谓词表达式P1(x)和P2(x),若满足:

即二者具备包含关系。对于这样的一对FILTER算子,若算子FilterP2是新到达的DAG算子,而FilterP1是缓存了的算子,则可根据复用算法,以间接方式利用缓存中的数据。

将两个FILTER中的谓词表达式都转换为等价的CNF(Conjunctive Normal Form),设一个FILTER的谓词表达式F为CNF,则F可表示为多个简单析取式的合取,即该FILTER的谓词表达式可表示为如下形式:

对于已经按合取运算符AND拆分的简单析取式,可表示为:

对于F而言,每个简单析取式所对应的真值集的交集就是F所对应的真值集,由此可得判定F能否包含另一个谓词表达式F'的充分条件就是:

即对任意的Fi,都存在至少一个Fj'蕴含Fi,那么F′蕴含F,本文根据该条件对FILTER算子的包含关系进行判定,证明如下:

对于给定的F和F',若式(9)为真,则有:

基于该判定条件,欲解决该问题,只需对两个简单析取式的蕴含关系进行判定即可。

基于上述描述,判断两个简单析取式Fi和Fj之间是否存在蕴含关系的算法如下:

(1)Fi和Fj的所有基本项中的键名存在不同,判定为不一定蕴含,退出。

(2)按照表 1计算Fi∧Fj,为真时保留f',为假时用False表示,若结果不为Fj,判定为不一定蕴含,退出。

(3)判定为Fj'蕴含Fi,退出。

由于算法中的(1)和(2)对计算提出了一定的条件,因此其结果均为不一定蕴含,这意味着该算法仍然为一个充分性的判定。

通过上文给出的判断两个FILTER算子是否相互包含的判定算法,就可以在新的DAG计算到达且不存在直接复用条件时,通过FILTER的包含关系来进行间接复用,从而增加数据的复用机会。

3 实验

3.1 环境

增量条件下的DAG计算要求原始数据集是增量的,为此本文设计了一个增量数据集,该数据集模拟日志类的数据,每轮实验结束后,为该数据增长约1.7 GB的数据。实验使用3个节点,各节点配置如表 2所示。

实验平台为Spark SQL,缓存使用Spark SQL默认的缓存管理器。

3.2 结果與分析

实验在开始阶段,采样密度为每次增量都采集一次实验数据,在后半段维持增量与计算不变,仅减少采样频率。

DAG中的FILTER在本试验中以随机方式生成,对全部的实验负载,覆盖100%数据集计算,66%的实验负载,覆盖10%的数据集计算,这样的负载模拟了实际负载中热点数据被更多关注的特点。实验对直接复用方法和加入FILTER算子匹配的间接复用方法的增量计算进行对比测评,结果如图 2所示。

通过使用基于FILTER算子识别的间接复用,相较于直接复用方法,时间开销平均降低84.91%,具体复用情况见表3。

由此可见,引入基于FILTER算子识别的间接复用可以大幅度提升缓存匹配的成功率,提升缓存利用率及系统整体运行效率。

4 结语

本文提出了基于FILTER算子匹配的增量式DAG计算复用方法,给出了通过识别FILTER算子的包含关系来达成对FILTER算子更细粒度的增量计算复用手段,并通过实验验证了该复用方法可提升增量计算复用中缓存被命中的机会,进而提高计算平台整体运行性能。

参考文献:

[1] DEAN J,GHEMAWAT S.MapReduce:simplified data processing on large clusters[J].In Proceedings of Operating Systems Design and Implementation,2004,51(1):107-113.

[2] ZAHARIA M,CHOWDHURY M,DAS T,et al.Resilient distributed datasets:a fault-tolerant abstraction for in-memory cluster computing[J].In-Memory Cluster Computing.USENIX Symposium on Networked Systems Design and Implementation,2012,70(2):141-146.

[3] ARMBRUST M,XIN R S,LIAN C,et al.Spark SQL:relational data processing in Spark[M].ACM,2015:1383-1394.

[4] OLSTON C,REED B,SRIVASTAVA U,et al.PigLatin:a not-so-foreign language for data processing[J].Science China Information Sciences,2008(1):1099-1110.

[5] THUSOO A,SARMA J S,JAIN N,et al.Hive:a warehousing solution over a map-reduce framework[J].Proceedings of the Vldb Endowment,2009,2(2):1626-1629.

[6] GHEMAWAT S,GOBIOFF H,LEUNG S T.The Google file system[J].ACM Press,2003(5):29-43.

[7] POPA L,BUDIU M,YU Y,et al.DryadInc:reusing work in large-scale computations[EB/OL].https://link.springer.com/article/10.1007%2Fs13222-012-0109-3.

[8] PENG D,DABEK F.Large-scale incremental processing using distributed transactions and notifications[EB/OL].https://www.hanspub.org/reference/ReferencePapers.aspx?PaperID=9501&ReferenceID=23068.

[9] ELGHANDOUR I,ABOULNAGA A.Restore:reusing results of MapReduce jobs[J].Proceedings of the VLDB Endowment,2012,5(6):586-597.

[10] GUNDA P K,RAVINDRANATH L,THEKKATH C A,et al.Nectar:automatic management of data and computation in datacenters[EB/OL].http://www.doc88.com/p-3149057701380.html.

猜你喜欢
分布式计算
分布式系统中基于非合作博弈的调度算法
云计算中MapReduce分布式并行处理框架的研究与搭建
面向异构分布式计算环境的并行任务调度优化方法
同态加密的发展及应用