詹杭龙,曹东刚+,谢 冰
1.北京大学 高可信软件技术教育部重点实验室,北京 100871 2.北京大学(天津滨海)新一代信息技术研究院,天津 300450
分布共享环境下支持弹性伸缩的图处理框架*
詹杭龙1,2,曹东刚1,2+,谢冰1,2
1.北京大学 高可信软件技术教育部重点实验室,北京 100871 2.北京大学(天津滨海)新一代信息技术研究院,天津 300450
ZHAN Hanglong,CAO Donggang,XIE Bing.Graph processing framework supporting elastic scalability in distributed shared environment.Journal of Frontiers of Computer Science and Technology,2016,10(7): 901-914.
作为大数据处理的一种重要模式,图处理被广泛地应用在机器学习、数据统计和数据挖掘等场景中。在企业级应用中,多种类型的大数据处理框架通常会部署在同一个分布式集群中,其运行环境是开放、共享的,这时图处理需要考虑运算资源动态变化的问题。为了能适应这种动态性,更加充分地利用开放共享环境的资源,图处理框架应该具备弹性伸缩能力。通过调研,发现现有的图处理框架尚未完全实现弹性伸缩。为此,介绍了一种支持弹性伸缩的分布式并行图处理框架SParTaG。首先基于任务并行模型定义了图处理任务集及任务模型;其次基于任务迁移机制设计并实现了可动态伸缩的图处理框架;最后设计了一个基于负载均衡的调度算法,实现了动态伸缩的图处理过程。实验结果说明,SParTaG的性能与当前流行的开源图处理框架Giraph相近,且具有较好的弹性伸缩能力。
图处理;分布式并行计算;弹性伸缩;任务迁移
作为大数据处理的一种重要模式,图处理被广泛地应用在机器学习、数据统计和数据挖掘等场景中。图处理是指利用图算法、机器学习算法等对图结构进行分析和统计的过程。为了高效地执行图处理程序,学术界研发了一系列分布式图处理系统。这些分布式图处理系统一方面把多处理器的运算资源整合起来,实现图中顶点数据的并行运算,并控制这种运算以迭代的方式向前演进;另一方面向上提供了有效的编程抽象,简化了在企业级应用中图处理程序的开发。在企业级应用中,对大数据的分析往往要经过流处理、批处理和图处理等多种过程[1],不同类型的处理框架通常会部署在同一个分布式集群中,因此这种运算平台是开放共享的。这里的开放性是指分布式集群可能因为扩容或者临时需要而添加更多的运算节点,这使得数据处理框架有可能获得更多的计算资源。所谓共享性是指平台中常常同时存在多个作业,这就导致单个作业在执行时所占用的计算资源可能发生变化[2]。对于这种开放共享平台上运行的图处理作业,如果在计算资源增加时无法即刻利用富余的资源,或者需要重启作业才能使用更大规模的运算资源,将造成不必要的资源或时间浪费。因此,分布式集群的开放性和共享性要求其上层的图处理作业应该具有弹性伸缩的能力,当分配给作业的资源规模发生变化时,作业能够及时动态地调整负载的分布,以更加充分地利用集群资源,如图1所示。
Fig.1 Elastic scalable graph processing图1 图处理弹性伸缩示意图
以Apache Giraph图处理框架为例,对顶点数为4×106,边数为68×106的网页图数据执行PageRank作业需要近40 min的时间(http://giraph.apache.org/)。在PageRank作业运行过程中,开放共享平台可能由于总作业量的减少而产生若干空闲的运算节点。如果这时PageRank作业能够弹性扩展到空闲的运算节点中,提高并行度,则有可能实现作业的加速,用更少的时间完成作业。
然而,图处理作业的弹性伸缩并不像MapReduce应用实现动态扩展那样直观。在MapReduce中,输入的数据块被分配到Mapper Slot中进行第一步处理,产生中间结果再被发射到Reduce Slot中进行合并处理。数据块与数据块之间不存在依赖关系,因此可以简单地通过增加Slot来提高并行度,进而实现动态扩展。而图处理是一个迭代执行且有中间状态的过程,在每一个迭代步中每个顶点都需要获取其邻接顶点的信息。这种顶点的依赖关系是比较复杂的。因此,为了实现图处理的弹性伸缩,需要在运行时调度层面提供更加复杂的支持。
自从Google在2010年提出Pregel[3]系统介绍分布式并行图处理技术以来,许多图处理系统都对伸缩性(scalability)提供了支持。这里所描述的伸缩性一般有两个层面的含义:第一,当使用更多计算资源的时候,重新执行的作业可以在更短的时间内完成;第二,作业所输入的图结构可以是不同规模的图,而不需要更改处理框架的相关配置。但是,这两个层面的伸缩性都不是即时的,需要作业重新运行一遍才能生效。因此在开放共享环境中,这些图处理系统的伸缩性无法及时响应计算资源的变动,难以实现在当前运算完成进度的基础上动态伸缩。
此外,还有一些工作在运行时调度层面提出了动态调整机制来解决图处理过程中负载均衡的问题。例如文献[4]介绍了一种基于顶点迁移的动态负载均衡机制,通过对每个顶点的消息通信量、响应时间进行监控,以实现顶点的迁移,从而达到动态的负载均衡。文献[5-6]针对图处理过程中图结构发生变化的场景进行分析,通过对运行过程中的图进行重新分区,以实现负载均衡。这些工作围绕图处理的动态调整机制进行了有益的尝试,但它们并未考虑开放分布式环境的变化,无法处理运算资源发生变化的情况。针对上述问题,本文介绍了一种面向开放分布环境的支持弹性伸缩的图处理框架SParTaG。这里的弹性伸缩,是指图处理作业的运行过程能适应分布式平台中资源的动态变化,适时地调整迁移各运算节点的负载,以实现运行时的动态伸缩及作业加速。
与大多数分布式图处理框架相同,SParTaG目前基于BSP(bulk synchronous parallel)[7]的计算模型实现。不同的是SParTaG在对图进行分区的基础上,利用图分区的迁移机制实现动态的负载均衡与作业扩展。通过对各个分区的运行时信息进行监控,SParTaG计算出分区间的迁移方案,从而实现图处理的动态调度。SParTaG使用Erlang语言开发,Erlang是一种函数式编程语言[8],具有轻量级进程,高效的消息通信,支持分布节点的进程监控等特点,非常适合构建支持动态伸缩的分布式系统。
本文主要有如下贡献:提出了一种面向并行图处理的任务模型;设计了一个支持动态任务调度的图处理框架;引入了一种动态调度迁移机制,初步实现了弹性伸缩。
本文组织结构如下:第2章简述分布式图处理的整体流程,分析实现图处理弹性伸缩所需要解决的问题;第3章介绍SParTaG的设计与实现,包括图处理问题的任务模型、负载监控与动态迁移机制以及基于负载均衡的调度算法;第4章给出实验数据,并对实验数据加以分析;第5章对相关工作进行介绍;最后对本文进行总结,并讨论进一步的工作方向。
下面对基于BSP计算模型的分布式图处理流程进行介绍。图处理流程包含两个阶段:图结构的分区与派发、迭代执行与调度,如图2所示。
2.1图结构的分区与派发
为了让图处理作业能在分布式环境中并行执行,处理框架首先需要将图数据分发到各个运算节点中。这个预处理过程被称为图的分区(partitioning)。该过程根据并行度的大小将完整的图结构切分成若干子图,每个子图包含了完整图中的部分顶点和部分边的数据,再利用一定的静态调度策略将这些子图分派到各个运算节点上。对图结构进行分区的目标有两个:一是负载均衡,即每个运算节点所承担的计算量及通信量要尽量相近,这样在运算过程中作业不会因为某一个节点的执行时间过长而陷入不必要的等待;二是尽量减少跨子图的边,这样能够降低在运算过程中处理器之间的通信量。
Fig.2 Distributed graph processing based on BSP图2 基于BSP的分布式图处理流程
图结构的分区问题在图论中得到了长期的研究,已有的算法包括局部改进图划分算法和全局图划分算法,其中局部改进算法比较经典的是KL(Kernighan-Lin)算法[9]和FM(Fiduccia-Mattheyses)算法[10];全局算法比较经典的是Laplace图特征值谱二分法[11]和多层图划分算法[12],多层图划分算法的典型代表是METIS[13]及其并行版本ParMETIS。然而,这些划分技术都是集中式的,在算法运行时需要保存对整张图的全局视图,且都具有较高的时间复杂度。因此,这些分片技术并不适用于现实生活中大规模的图处理。
近年来,针对大数据场景下图处理的需要,分布式图框架在效果最优化与算法执行成本之间进行权衡,提出了一系列相对简单可行的分布式图分片算法。例如基于随机散列的分区策略、基于区间划分的分区策略、基于标签传播的分区策略等。
2.2迭代执行
完成图结构的分区与派发后,作业开始执行。基于BSP计算模型的分布式图处理框架采取一种以顶点为中心的并行运算思路,要求应用程序通过定义顶点的状态更新和数据传递等操作来描述图处理的算法。在执行过程中,图处理常常由一串迭代步骤所构成,每一次迭代称为一个超级步。每个超级步之间存在一个全局的同步控制,即必须等到所有的顶点处理完当前超级步的运算后,系统才会触发下一个超级步的运算。这种方式使得顶点的并行执行过程更加清晰可控,易于保证图处理过程的正确性。
在每一个超级步中,所有顶点将在集群的所有处理器中并行执行,并根据图结构中有向边所记录的邻接信息来传递数据。这种数据的传递是跨越相邻的两个超级步的。即:如果在图结构中存在一条有向边从顶点va指向顶点vb,那么从va的角度来看,va需要向vb传递数据以供vb在下一个超级步中使用;而从vb的角度来看,在当前超级步,vb则需要先处理由va在上一个超级步所传递的数据。每个顶点在执行完当前超级步之后,需要根据应用程序定义的逻辑判定当前状态是否可以暂时停机,可以则顶点进入休眠状态。当图中所有的顶点都进入休眠状态时,整个运算过程结束。
为了更方便地描述以顶点为中心的程序逻辑,许多文献提出了不同方式的编程接口。Pregel系统[3]基于compute-send模型来描述每个超级步中每个顶点的行为;PowerGraph系统[14]提出了基于GAS(gatherapply-scatter)模型的接口以充分利用顶点内部的并行能力;Galois系统[15]提出了一套不定型的数据并行模型(amorphous data-parallelism,ADP),并设计了基于活动元素集(activity set)和算子(operator)的编程接口等。
2.3迭代执行运行时需要考虑的调度问题
在图处理应用的运算过程中,为了达到较好的性能,首先要考虑的调度问题是负载均衡,即参与运算的每个处理器在每个超级步中处理各自所包含的图分区的时间要尽量相近。在实际场景中,输入的图数据可能来自不同的场景,图结构可能呈现出不同的特点。对于一个由特定算法(如文献[16])随机生成的图,它的边分布比较均衡,对其进行分区的效果也比较理想。而对于一些从真实场景构建出的图结构,它们存在一些特殊的性质,如边分布的幂律性[17]或高密度局部图等。由于这类图的规模较大,在可接受的时间内较难通过复杂的图分区算法实现严格均衡的子图划分。从而图处理过程一般是在无法完全掌握图的结构特点的情况下进行运算的,这极可能导致负载不均。因此需要有动态的负载调整机制来改善这一情况,减少因负载不均导致的时间浪费。
现有图处理系统对动态负载均衡的支持一般是通过图中顶点的迁移来实现[6]。然而,由于输入图结构的规模较大,顶点数量较多,如果以顶点为粒度进行监控和运算,可能导致用于调度算法的执行时间过长,影响运算性能。此外,现有的基于顶点迁移的方案在调度决策中只考虑顶点的执行时间,并未考虑待迁移顶点与原分区中其他顶点的关系。这样的迁移可能导致旧分区中原本重要的顶点被迁移到其他分区,导致更严重的负载不均。因此,如果图处理系统能选择较好的分区算法(如METIS算法、基于标签传播的分区算法等),而不是简单的随机划分或区间划分策略,使得划分后的各个图分区内部相对紧密,那么负载的迁移就应该考虑由更大的粒度来完成。
另一方面,在当前的开放共享环境中,资源是动态聚合与弹性绑定的[18]。这意味着集群中的作业可能在刚开始运算时只拥有少量的计算资源,而随着运算的执行又获得了更多的资源。因此,图处理系统应该能够适应这一动态过程,充分利用可用资源,实现运算的加速。然而,图处理过程是迭代且有状态的,如何实现动态扩展,是图处理的调度过程中一个待解决的问题。
经过上述分析,本文概括了现有图处理框架中的运行时调度机制在弹性伸缩方面所存在的不足,如下:
(1)缺乏一个图处理问题的任务模型,导致对弹性伸缩性的研究不够清晰、直观;
(2)以顶点作为负载均衡的粒度过小,需要一个合适粒度的调度方案;
(3)现有工作很少考虑资源动态增加时作业弹性伸缩的解决方案。
针对上文描述的不足,提出解决方案。弹性伸缩的图处理首先要求负载能够在节点间进行转移,以适应资源变化时作业的伸缩。这一点与负载均衡的实现方式是一致的。然而,图处理比MapReduce应用的逻辑更加复杂,为了保证弹性伸缩时的正确性,需要对图处理问题中的任务进行建模,确保负载能够完全转移。
为此,首先对图处理应用构建任务模型,明确其任务的结构和依赖关系;其次,设计弹性伸缩的分布式图处理框架SParTaG,并介绍其任务迁移机制;最后,设计基于负载均衡的调度算法实现动态扩展。
3.1图处理应用的任务模型
本节介绍图处理的任务模型。对于一个输入图,定义为G=(V,E),其中V表示图中顶点的集合{v1,v2,…,vn},E表示图中的有向边组成的集合。对于每一条边,用一个二元组(vi,vj)表示,其中vi是有向边的起始点,vj是目标点。对于某个顶点vx,其入边集合为 InEdge(vx)={(vi,vx)∈E},其出边集合为OutEdge(vx)={(vx,vj)∈E}。为了对输入图进行并行处理,输入图将被划分成若干分区组成的集合,表示为P={P1,P2,…,Pm}。每一个Pi包含一组顶点,Pi中的每一个顶点 vx记录着特定属性,包括顶点的值State(vx)、入边集合InEdge(vx)、出边集合OutEdge(vx)。
定义操作符f,用于描述在每个超级步中对每一个顶点的运算过程。基于BSP计算模型,每一个顶点vx首先处理其入边所相邻的顶点在前一个超级步所发送来的消息,更新自身的状态,而后发送新消息给其出边所相邻的顶点。定义如下符号进行描述:
Msg((vx,vy))[s],在顶点vx的第s个超级步发送一条消息给顶点vy;
Msg(OutEdge(vx))[s],在顶点vx的第s个超级步所发送的所有消息;
Msg(InEdge(vx))[s],在顶点vx的第s个超级步所收到的所有消息。
于是,有:
Msg(InEdge(vx))[s]={Msg((vj,vx))[s-1]|(vj,vx)∈E}
Msg(OutEdge(vx))[s]={Msg((vx,vj))[s]|(vx,vj)∈E}
而对顶点vx的第s步运算过程 f可以表示为:
f(vx):Msg(InEdge(vx))[s],State(vx)[s]→
State(vx)[s+1],Msg(OutEdge(vx))[s]
操作符 f以 Msg(InEdge(vx))[s]与State(vx)[s]作为参数传入,更新vx的值成为State(vx)[s+1],并产生消息组Msg(OutEdge(vx))[s]用于传播数据。更进一步,对分区Pz的第s步运算过程可表示为:
f(Pz):{Msg(InEdge(vi))[s]|vi∈Pz},State(Pz)[s]→
State(Pz)[s+1],{Msg(OutEdge(vi))[s]|vi∈Pz}
从该式可以知道,当对一个分区进行操作时,f需要两部分参数:{Msg(InEdge(vi))[s]|vi∈Pz}代表第s个超级步中所有在Pz中的顶点接收到的所有消息;State(Pz)[s]代表第s个超级步中所有在Pz中的顶点的值。当分区为了动态扩展的需要而在处理器之间迁移时,上述两部分参数均需要进行迁移。因此,本文定义SParTaG中的任务模型如下:
Task(Pk)[s]=(Pk[s],Msg(Pk)[s]),
where Msg(Pk)[s]={Msg(InEdge(vi))[s]|vi∈Pk}
由此可知,SParTaG中的task首先有一个时间维度的属性,用于记录当前的超级步。此外包含两部分:其一是图分区的结构,这部分数据可以保存在处理器中以利用本地性,并在每一次迭代运算中自我更新;另一部分是在上一个超级步所接收的用于当前超级步处理的消息集合。每个顶点的运算所需要的消息只能等待其他顶点传播,因此消息部分正是图处理与其他简单并行模式(如MapReduce)所不同的地方。
3.2SParTaG框架与编程接口
SParTaG框架基于BSP计算模型。在预处理阶段,SParTaG提供了多种图划分策略,如基于随机散列的分区策略、基于标签传播的分区策略等。输入的图结构利用某种分区策略划分成若干子图。SParTaG将这些子图(或称为图分区)均分到包含多个运算节点的集群中,每个运算节点可能负责处理多个图分区数据。
为了维护处理单元中的图分区数据,便于动态迁移,SParTaG利用3.1节定义的任务模型将每一块图分区映射成一个任务,并设计了任务队列。如图3所示,每个运算节点包含4部分:
(1)任务双向队列负责记录该运算节点所分配的一系列图分区,以及维护该运算节点所执行的超级步。
Fig.3 Architecture of distributed SParTaG图3 分布式SParTaG架构图
(2)channel为消息接收信箱,负责记录该运算节点所接收到的用于下一超级步运算所需的消息集合。在运算初始时,因为尚无消息传递,所以每个运算节点的channel均为空。
(3)处理单元负责实际执行图处理运算,每当处理单元处于空闲状态时,便向本地的任务双向队列请求一个任务。根据前文所述,处理单元获取到的任务结构为一个二元组,由图分区与对应的消息集合所构成。当处理单元处理完当前任务时,把更新后的图分区提交回任务双向队列,以便下一超级步使用。
(4)为了减少实际运算过程中的通信量,SParTaG不是对图分区中的每一条有向边都对应发送一条消息,而是引入buffer,用于缓存任务双向队列在处理图分区过程中发送给其他顶点的消息,再打包发送。channel与buffer可利用应用程序实现的combine接口对消息进行合并,进一步减少消息的通信和处理数量。
在作业执行过程中,所有运算节点将以迭代的方式处理各自包含的任务。在相邻两个超级步间,所有运算节点通过全局同步的方式控制迭代过程。在执行每一个任务时,运算节点的处理逻辑如下:
从任务双向队列中获取图分区数据
foreach(顶点in图分区){
从channel获取发送给该顶点的所有消息;
调用应用程序接口处理这些消息,更新顶点状态;
根据顶点的出边集将待发消息缓存到buffer中;
}
将buffer中的消息根据目标顶点的不同发送到相应运算节点的channel中
应用程序需要描述如何处理顶点接收到的数据,更新顶点状态,以及发送消息给哪些顶点。SParTaG的编程接口如图4所示。其中compute用于实现每个顶点在每个迭代步的运算;NewState表示顶点在这次运算完成后是否进入休眠状态;combine用于实现消息的合并,减少跨机器的通信量。
Fig.4 Application programming interface of SParTaG图4 SParTaG的编程接口
3.3面向图分区的任务迁移机制
迁移是实现负载均衡的重要途径,也是实现作业伸缩的一种可行方案。针对2.3节所分析的顶点迁移所存在的问题,SParTaG提出了以图分区为监控和迁移粒度的弹性伸缩方案。
3.1节提到的图处理任务模型亦是根据此方案而定义的。以图分区为监控和迁移粒度,调度算法的执行复杂度便与图分区的数量相关,而不是与顶点数量相关,运算量大为减少;在进行负载迁移时,整个图分区转存到新的处理器,顶点之间的紧密关系依然保持,分区内的顶点依然可以进行本地通信。
在SParTaG框架中,任务迁移机制通过任务双向队列来实现。任务双向队列提供两种类型的获取任务接口,如图5所示。如果是本地的运算节点请求获取任务,则调用fetch_task接口从任务双向队列的队首取出图分区返回;如果是远程的运算节点需要迁移任务,则可根据调度决策的需要以任务ID为索引,调用migrate_task接口从任务双向队列中获取若干块对应的任务数据。
Fig.5 Two operations of task queue图5 任务队列的两种操作
为了支持动态扩展,SParTaG还需要考虑任务迁移的时机问题。在每一个超级步中,每个顶点均通过运算节点向其邻接顶点传递数据。如果在这个时候进行任务迁移,将会使得顶点的计算与通信产生混乱,不容易保证扩展后作业执行的正确性。而当所有的顶点完成某一超级步的执行,阻塞等待全局同步时,所有顶点上的运算已经完成,网络中亦无消息传递,在这个时候进行任务迁移,是简单可控的。因此,SParTaG将每一个超级步的全局同步点与下一步的执行触发点分离开,添加到任务调度与迁移阶段,用于完成作业的动态扩展。
在迁移过程中,图分区数据和对应的消息集合都要进行传输。
2.2节提到SParTaG使用任务双向队列记录图分区的结构,channel记录下一个超级步所需的所有消息。当发生任务迁移时,被迁移的运算节点以任务ID为索引,从任务双向队列中取出图分区添加到目标运算节点的任务双向队列中,同时从channel中取出与该分区相关的所有消息传递到目标运算节点的channel中,如图6所示。这样,便完成了此次的任务迁移。
Fig.6 Diagram of task migration图6 任务迁移过程
任务迁移机制既可以让图分区扩展到新的计算节点,也可以对现有计算节点进行收缩,把待收缩节点中的图分区迁移到剩余的工作节点,因此任务迁移是实现动态伸缩的重要途径。
3.4运行时监控与基于负载均衡的调度
在任务迁移机制的基础上,SParTaG引入监控与调度机制,使得图处理获得弹性伸缩的能力,更好地适应计算资源的动态变化。
SParTaG定义任务迁移只在相邻两次超级步之间发生,因此监控与调度机制也在每次超级步执行完成后进行。如图7所示,监控与调度机制分为如下几个步骤:
(1)获取所有worker的负载信息;
(2)判断当前的负载是否均衡;
(3)如果均衡,则触发下一个超级步开始执行;
(4)如果不均衡,则执行任务迁移操作,完成后再触发下一个超级步。
Fig.7 Monitoring and scheduling mechanism图7 监控与调度机制
,ε为运算节点获取图分区结构和消息集合所需时间,相对于T(pi)可以忽略不计。T(w)用于粗略判定负载是否均衡,T(pi)用于决策对哪些图分区进行任务迁移。当SParTaG在运行过程中获取到更多的资源,创建更多的运算节点时,这些新增运算节点的负载定义为T(wnew)=0。如图8中的负载监控表所示。这样便可以将图处理的动态扩展问题转化成新增资源后的负载均衡问题。
Fig.8 Load monitoring table图8 负载监控表
利用负载监控表,本文接着对均衡调度算法进行分析。一种直观的办法是将该问题转化成数集的划分问题:即存在一个由所有的图分区执行时间构成的集合{T(p1),T(p2),…,T(p)},现需要将该集合划分成与运算节点数量相等的若干子集,要求各子集中元素的加和尽量相近。然而,该方案却不适合用于图处理问题的均衡调度。因为任务迁移是有时间代价的,所以应该让大多数据任务尽量留在原有的运算节点上继续执行,通过少量任务的迁移以达到负载均衡的目标。因此,SParTaG初步设计了一种大小配对与贪心迁移的算法以进行调度决策。算法伪代码见算法1。
算法1伸缩调度算法
输入:所有worker的运算时间{T(w)},所有partition的执行时间{T(p)}。
输出:任务迁移列表。
1.对集合{T(w)}按时间从大到小进行排序,得到{Ts(w)}={T(ws),T(ws+1),…,T(wt-1),T(wt)};
2.将排序后的{Ts(w)}进行大小配对,生成由二元组构成的集合B={(T(ws),T(wt)),(T(ws+1),T(wt-1)),…};
3.设任务迁移列表为空;
4.对于集合B中的每一对二元组(T(wa),T(wb)),如果T(wa)与T(wb)相差超过阈值,则执行如下操作:
5.迁移量=(T(wa),T(wb))/2;
6.将wa所包含的{T(p)}进行排序,按从大到小的顺序取出图分区,直到取出分区的时间加和最接近迁移量,获得分区列表Lab;
7.将三元组(wa,wb,Lab)加入任务迁移表中;
8.遍历所有二元组后,返回任务迁移表。
3.5分布式索引表
在系统的实现过程中,还有一个问题需要考虑。顶点之间的数据传递是通过进程发送消息实现的,而进程发送消息需要指明消息传输的目的地。因此,在作业运行过程中,图的每一个顶点均需要有地址属性,说明该顶点位于哪一个运算节点中。当顶点a向顶点b传递数据时,它需要先知道顶点b属于哪一个运算节点,再向该运算节点发送消息。此外,由于SParTaG引入了任务迁移机制,在作业运行过程中,某些顶点可能迁移到新的运算节点中,改变了地址属性,这个时候发送给这些迁移顶点的消息就需要更新其目标运算节点。
为了解决这一问题,同时避免单一中心记录表的查询和修改操作代价过大[19],SParTaG使用分布式索引表[20]来记录每个顶点的地址信息。在作业初始阶段,每个运算节点加载自己拥有的图分区结构时,将图分区中所有顶点的地址信息以(key,value)键值对的格式登记到分布式索引表中,其中key为每个顶点的ID,value为该顶点所在的运算节点ID。在图处理执行过程中,分布式索引表允许运算节点在本地查询到图中所有顶点的信息(包括本地包含的顶点以及其他运算节点所包含的顶点);也允许运算节点在本地修改顶点的地址信息,并保证每个运算节点获取新的顶点信息的及时性和一致性。
以一个迁移场景为例,如图9所示。在图中存在一条顶点Va指向Vb的有向边。在第s个超级步时,顶点Va位于运算节点X中,顶点Vb位于运算节点Y中。运算节点X在处理顶点Va时,首先从分布式索引表中获取顶点Vb的地址位置运算节点Y;接着将消息发送给运算节点Y中的Vb。第s个超级步结束时,图处理框架进行任务迁移,将Vb所在的图分区迁移到运算节点Z中。运算节点Z获得迁移后的图分区后,更新该分区中所有顶点的地址信息,这个信息通过分布式索引表扩散到所有运算节点的索引表中。在第s+1个超级步运算节点X处理顶点Va时,从本地的索引表获取Vb新的位置。再将第s+1步的消息发送给位于运算节点Z的Vb。为了提升运算节点读分布式索引表的效率,SParTaG实现了cache用于所需顶点地址信息的缓存。
Fig.9 Workflow of distribute index table图9 分布式索引表的工作机制
本文通过实验对SParTaG的处理效率和弹性伸缩能力进行评估。实验的物理环境是由8个节点组成的分布式平台。机器配置为:4 Intel®Xeon®CPU E5-2670,内存8 GB,操作系统版本64 bit Debian 3.16.3-2。因为是图处理问题,所以需要图结构作为输入数据。本文首先从Stanford网络分析项目的网站(http:// snap.stanford.edu/data/index.html)中下载了若干真实的图结构,另外利用图的随机生成算法[16]构造两个尺寸与上述真实图相近的随机图。
表1展示了几个图数据的信息。图处理算法为PageRank应用和单源最短路径(single source shortest path,SSSP)应用。
Table 1 Size of input graph表1 图数据大小
4.1SParTaG与Giraph的性能测试
本节首先测试SParTaG在基准情况下的性能,即不使用弹性伸缩机制时,SParTaG静态执行图处理应用的能力。这里以当前比较流行的开源图处理框架Apache Giraph作为比较对象。第一组实验在单台机器上运行,均创建10个运算节点,运行PageRank和SSSP应用。实验结果如图10所示。
Fig.10 Comparison of running time between static SParTaG and Giraph图10 静态SParTaG与Giraph的运行时间比较
由该实验可知,对于不同尺寸的图数据,静态SParTaG具有与Giraph可比的性能。此外,通过该实验也可以发现,对于同等尺寸的实际图和随机图,处理时间是不一样的。以LiveJournal与R_Graph A为例,图的尺寸均为顶点4.8×106,边68×106,在SParTaG中用PageRank算法处理LiveJournal的时间是123.933 s,处理R_Graph A的时间是106.039 s。这是因为实际图LiveJournal中边的分布并不均衡,导致在处理过程中各个worker的运算时间有长有短,每个超级步所花时间更久,因而总时间更长。这就需要动态调度机制来提升性能。
4.2扩展性
本实验用于验证SParTaG框架的扩展性。对Arabic-05与LiveJournal两个实际图的数据执行PageRank算法。每次应用的执行使用不同数量的机器,每个机器创建10个运算节点。将1个机器执行应用的时间与多个机器的时间做比值,求加速比。实验结果如图11所示。
Fig.11 Scalability verification of static SParTaG图11 静态SParTaG的扩展性验证
从实验结果可以看出,SParTaG具有较好的扩展性。这里的扩展性是静态的。由于LiveJournal图的尺寸较小,分布在更多节点上时,每个运算节点所承担的图分区规模变小,影响了并行所带来的收益。因而加速效果相对差些。
4.3静态执行与弹性伸缩执行
本实验用于验证SParTaG的弹性伸缩能力。对实际图Arabic-05与随机图R_Graph B两个尺寸相近的图数据执行PageRank算法,比较静态执行和弹性伸缩执行的时间差异。对于每个输入图均运行3个实例:实例Static使用两台机器静态执行PagaRank;实例Load Balancing启用弹性伸缩策略,但保持机器的数量始终为两台,每台机器创建10个运算节点;实例Elastic Scaling同样启动弹性伸缩策略,但在作业初始时机器数量为两台,在作业执行到某一时刻额外添加两台机器,其中每台机器各创建10个运算节点。
在实验结果中,Load Balancing Arabic与Static Arabic相比,Load Balancing R_Graph B与Static R_ Graph B相比,均减少了作业执行时间。Load Balaing Arabic减少的幅度更大些,这是因为实际图Arabic由于图结构密度不均,利用简单的图分区算法难以保证负载均衡。经过动态调度后,在一定程序上改善了负载不均的情况。R_Graph B为随机图,图结构较为平均,因此动态调整对其优化程度不大。
此外,在两个图的执行过程中动态加入两台机器,图12中的Elastic Scaling R_Graph B与Elastic ScalingArabic为执行效果。弹性伸缩机制使得SParTaG能够将作业动态地扩展到新增机器上,而不用重启系统,并减少了运算时间,实现了即时(on the fly)加速。
Fig.12 Time comparison of static and elastic execution图12 静态执行与弹性执行效果对比
4.4弹性伸缩的运行剖面图
为了更直观地展示SParTaG动态调度的效果,本节对图处理过程进行时间剖面分析,并对每个超级步所花时间进行记录。实验用例为对Arabic图数据执行PageRank算法,初始时机器规模为两台。在作业运行过程的某一时刻加入额外的两台机器。两个对比实验分别为:静态执行与启用弹性伸缩机制的动态图处理。实验结果如图13、图14所示。
Fig.13 Progresscomparisonofstaticandelasticexecution图13 静态执行与弹性伸缩执行的进度对比
Fig.14 Execution profile of every superstep图14 每个超级步的运算用时对比
在图13中,横轴表示作业执行的时间,纵轴表示作业在某一时刻已完成的超级步。对于静态执行的实验,总共用时2 000多秒。对于启动弹性伸缩机制的实验,在600 s左右时添加新的机器,SParTaG将作业扩展到新增的运算节点中,使得剩下的每一个超级步都能够用更短的时间完成,最终在将近1 400 s时完成最后一个超级步,实现了作业的动态加速。
图14用更直观的方式说明SParTaG是如何实现动态加速的。在弹性执行实验中,出现两次比较明显的调度与迁移阶段。第一次是作业刚开始运行时,由于负载不均而导致的任务迁移。负载均衡后每个超级步的执行时间相比于静态实验有所减小。第二次是新节点加入时,调度机制将任务迁移到空闲的运算节点上,以实现新的规模下的负载均衡。这时,图处理的并行度增加,每个超级步的执行时间再次减小,图处理作业也因此获得了加速。
开放共享的集群环境对企业级的大数据处理提出了新的要求,弹性伸缩是这种环境下构建大数据处理平台所需要考虑的重要问题。为了让平台中的应用能够弹性地执行,不仅需要在集群中引入实现弹性资源分配的相关设施(如MESOS[21]、Yarn[22]等),更需要在处理框架层面根据作业的逻辑提供支持弹性伸缩的运行时调度机制。
为此,相关工作围绕大数据处理框架的弹性伸缩技术展开了研究。例如:Morpho[23]、EMRE[24]等实现了支持动态伸缩的MapReduce执行框架,ESC[25]实现了一种基于MAPE循环的弹性流处理框架,DETS[26]实现了一种基于任务池调度的面向计算密集型应用的弹性伸缩并行框架。
然而,对于图处理框架,目前虽然存在着若干关于负载均衡调整技术的研究,但对于弹性伸缩的研究仍处于探索阶段。根据系统架构的不同,图处理框架可以分为4类:图数据库、基于共享内存的图处理框架、基于消息通信的分布式图处理框架、基于内存计算的分布式处理框架。Neo4j[27]是一个当前十分流行的开源图数据库,通过提供一系列接口以支持图数据的读写、索引和遍历操作。基于共享内存的图处理系统包括GraphLab[28],PowerGraph、PowerLyra[29]、Seraph[30]等,它们把整个图和程序状态存储在内存中,在运算过程中,顶点之间的数据传递通过读写共享内存实现。由于是共享内存的架构,作业在访问共享数据时的加锁操作往往导致更大的开销,其横向扩展能力相对弱一些。上述两类图处理框架受到扩展能力的限制,尚未有工作对其弹性伸缩进行研究。
基于消息通信的分布式图处理框架包括Pregel[3]、Giraph、Mizan[4]、PAGE[31]等,它们基于BSP计算模型实现图处理,通过消息通信进行顶点间数据的传递,这种分布式的结构使得系统具有较好的扩展性。因此,现有的关于图处理负载均衡动态调整问题的研究[4-6]主要是在这一类框架上进行。本文所介绍的SParTaG也是基于消息通信的图处理框架。
此外,还有一类图处理框架是基于内存计算的,例如GraphX系统[32]在Spark RDD的基础上构建出基于BSP计算模型的分布式图处理框架。RDD对韧性(resilience)的支持使得GraphX具有动态容错的能力;RDD中对partitions的调度设施为GraphX提供了动态扩展的可能。然而,GraphX中缺乏弹性机制,无法在运行过程中自适应地调整各个partitions,要实现弹性伸缩仍需要更进一步的工作。利用Spark中RDD的容错能力与Tachyon内存文件系统高效的分布式读写,可以考虑重新构建基于Spark的支持弹性伸缩的图处理框架。
本文介绍了一种面向开放共享环境下支持弹性伸缩的并行图处理框架SParTaG。SParTaG首先定义了动态环境下图处理应用的任务模型,并利用图分区的迁移机制实现动态的负载均衡与扩展。通过对各个分区的运行时信息进行监控,SParTaG计算出分区间的迁移方案,从而实现图处理的动态调度。实验数据验证了SParTaG与当前流行的开源图处理框架Apache Giraph的性能相当,而且SParTaG还具有弹性伸缩的能力,能够充分利用分布式环境下动态变化的运算资源,实现作业的加速。
未来的工作重点主要包含以下两个方面:一方面考虑改进图的分区算法,以基于子图边密度的策略实现对图结构的均衡划分;另一方面考虑改进调度算法,以更细致的负载指标来指导任务调度。
[1]Cheng Xueqi,Jin Xiaolong,Wang Yuanzhuo,et al.Survey on big data system and analytic technology[J].Journal of Software,2014,25(9):1889-1908.
[2]Lu Xicheng,Wang Huaimin,Wang Ji.Internet virtual computing environment—iVCE:concept and architecture[J]. Science in China:Series E Information Sciences,2006,36 (10):1081-1099.
[3]Malewicz G,Austern M H,Bik A J,et al.Pregel:a system for large-scale graph processing[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data,Indianapolis,USA,Jun 6-11,2010.New York,USA: ACM,2010:135-146.
[4]Khayyat Z,Awara K,Alonazi A,et al.Mizan:a system fordynamic load balancing in large-scale graph processing[C]// Proceedings of the 8th ACM European Conference on Computer Systems,Prague,Czech Republic,Apr 15-17,2013. New York,USA:ACM,2013:169-182.
[5]Vaquero L,Cuadrado F,Logothetis D,et al.xDGP:a dynamic graph processing system with adaptive partitioning[C]//Proceedings of the 4th Annual Symposium on Cloud Computing, 2013.
[6]Nicoara D,Kamali S,Daudjee K,et al.Managing social network data through dynamic distributed partitioning[Z]. 2014.
[7]Valiant L G.A bridging model for parallel computation[J]. Communications of theACM,1990,33(8):103-111.
[8]Armstrong J.Programming Erlang:software for a concurrent world[M].[S.l.]:Pragmatic Bookshelf,2007.
[9]Dutt S.New faster Kernighan-Lin-type graph partitioning algorithms[C]//Proceedings of the 1993 IEEE/ACM International Conference on Computer-Aided Design,Santa Clara, USA,Nov 7-11,1993.Piscataway,USA:IEEE,1993:370-377.
[10]Fiduccia C M,Mattheyses R M.A linear-time heuristic for improving network partitions[C]//Proceedings of the 19th Conference on Design Automation,Las Vegas,USA,Jun 14-16,1982.Piscataway,USA:IEEE,1982:175-181.
[11]Pothen A,Simon H D,Liou K P.Partitioning sparse matrices with eigenvectors of graphs[J].SIAM Journal on Matrix Analysis andApplications,1990,11(3):430-452.
[12]Karypis G,Kumar V.Multilevel graph partitioning schemes [C]//Proceedings of the 1995 International Conference on Parallel Processing,Urbana-Champain,USA,Aug 14-18, 1995.Boca Raton,USA:CRC Press,1995:113-122.
[13]Karypis G,Kumar V.METIS:unstructured graph partitioning and sparse matrix ordering system[R].University of Minnesota,1995.
[14]Gonzalez J E,Low Y Gu Haijie,et al.PowerGraph:distributed graph-parallel computation on natural graphs[C]//Proceedings of the 10th USENIX Conference on Operating Systems Design and Implementation,Hollywood,USA,Oct 8-10,2012.Berkeley,USA:USENIXAssociation,2012:17-30.
[15]Nguyen D,Lenharth A,Pingali K.A lightweight infrastructure for graph analytics[C]//Proceedings of the 24th Symposium on Operating Systems Principles,Farmington,USA, Nov 3-6,2013.New York,USA:ACM,2013:456-471.
[16]Gilbert E.N Random graphs[J].The Annals of Mathematical Statistics,1959,30(4):1141-1144.
[17]Mitzenmacher M.A brief history of generative models for power law and lognormal distributions[J].Internet Mathematics,2002,1(2):226-251.
[18]Lu Xicheng,Wang Huaimin,Wang Ji,et al.Internet-based virtual computing environment:beyond the data center as a computer[J].Future Generation Computer Systems,2013, 29(1):309-322.
[19]Balakrishnan H,Kaashoek M F,Karger D,et al.Looking up data in P2P systems[J].Communications of the ACM, 2003,46(2):43-48.
[20]Mattsson H,Nilsson H,Wikstrom C.Mnesia:a distributed robust DBMS for telecommunications applications[C]// LNCS 1551:Proceedings of the 1st International Workshop on Practical Aspects of Declarative Languages,San Antonio, USA,Jan 18-19,1999.Berlin,Heidelberg:Springer,1999: 152-163.
[21]Hindman B,Konwinski A,Zaharia M,et al.Mesos:a platform for fine-grained resource sharing in the data center [C]//Proceedings of the 8th USENIX Conference on Networked Systems Design and Implementation,Boston,USA, Mar 30-Apr 1,2011.Berkeley,USA:USENIX Association, 2011:295-308.
[22]Yarn.Apache Hadoop next generation MapReduce(Yarn)[R]. http://hadoop.apache.org/docs/r2.3.0/hadoop-yarn/hadoopyarn-site/YARN.html.
[23]Lu Lu,Shi Xuanhua,Jin Hai,et al.Morpho:a decoupled MapReduce framework for elastic cloud computing[J].Future Generation Computer Systems,2014,36:80-90.
[24]Goh W X,Tan K L.Elastic MapReduce execution[C]//Proceedings of the 2014 14th IEEE/ACM International Symposium on Cluster,Cloud and Grid Computing,Chicago,USA, May 26-29,2014.Piscataway,USA:IEEE,2014:216-225.
[25]Satzger B,Hummer W,Leitner P,et al.ESC:towards an elastic stream computing platform for the cloud[C]//Proceedings of the 2011 International Conference on Cloud Computing,Washington,USA,Jul 4-9,2011.Piscataway, USA:IEEE,2011:348-355.
[26]Zhan Hanglong,Kang Lianghuan,Cao Donggang.DETS:a dynamic and elastic task scheduler supporting multiple parallel schemes[C]//Proceedings of the 8th International Symposium on Service Oriented System Engineering,Oxford, UK,Apr 7-10,2014.Piscataway,USA:IEEE,2014:278-283.
[27]Webber J.A programmatic introduction to Neo4j[C]//Pro-ceedings of the 3rd Annual Conference on Systems,Programming,and Applications:Software for Humanity,Tucson,USA,Oct 21-25,2012:217-218.
[28]Low Y,Gonzalez J,KyrolaA,et al.GraphLab:a new framework for parallel machine learning[J].arXiv:1006.4990, 2010.
[29]Chen Rong,Shi Jiaxin,Chen Yanzhe,et al.PowerLyra:differentiated graph computation and partitioning on skewed graphs[C]//Proceedings of the 10th European Conference on Computer Systems,Bordeaux,France,Apr 21-24,2015. New York,USA:ACM,2015.
[30]Xue Jilong,Yang Zhi,Qu Zhi,et al.Seraph:an efficient, low-cost system for concurrent graph processing[C]//Proceedings of the 23rd International Symposium on High Performance Parallel and Distributed Computing,Vancouver, Canada,Jun 23-27,2014.New York,USA:ACM,2014: 227-238.
[31]Shao Yingxia,Yao Junjie,Cui Bin,et al.PAGE:a partition aware graph computation engine[C]//Proceedings of the 22nd ACM International Conference on Conference on Information&Knowledge Management,San Francisco,USA,Oct 27-Nov 1,2013.New York,USA:ACM,2013:823-828.
[32]Gonzalez J E,Xin R S,Dave A,et al.GraphX:graph processing in a distributed dataflow framework[C]//Proceedings of the 11th USENIX Symposium on Operating System Design and Implementation,Broomfield,USA,Oct 6-8,2014. Berkeley,USA:USENIXAssociation,2014:599-613.
附中文参考文献:
[1]程学旗,靳小龙,王元卓,等.大数据系统和分析技术综述[J].软件学报,2014,25(9):1889-1908.
[2]卢锡城,王怀民,王戟.虚拟计算环境iVCE:概念与体系结构[J].中国科学:E辑信息科学,2006,36(10):1081-1099.
ZHAN Hanglong was born in 1989.He is a Ph.D.candidate at School of Electronics Engineering and Computer Science,Peking University.His research interests include big data,system software,parallel and distributed computing,etc.
詹杭龙(1989—),男,福建漳州人,北京大学信息科学技术学院博士研究生,主要研究领域为大数据,系统软件,分布式并行计算等。
CAO Donggang was born in 1975.He received the Ph.D.degree from School of Electronics Engineering and Computer Science,PekingUniversity in 2004.Now he is an associate professor at Software Institute,School of Electronics Engineering and Computer Science,Peking University.His research interests include system software,parallel and distributed computing,etc.
曹东刚(1975—),男,山东威海人,2004年于北京大学信息科学技术学院获得博士学位,现为北京大学信息科学技术学院软件所副教授,主要研究领域为系统软件,分布并行处理等。发表学术论文30余篇,承担过国家973计划、863计划、自然科学基金等多个项目,获国家技术发明二等奖,电子学会电子信息科学技术一等奖。
XIE Bing was born in 1970.He received the Ph.D.degree from School of Computer,National University of Defense Technology in 1998.Now he is a professor and Ph.D.supervisor at Peking University.His research interests include software engineering,formal methods and software reuse,etc.
谢冰(1970—),男,湖南湘潭人,1998年于国防科技大学计算机学院获得博士学位,现为北京大学信息科学技术学院软件所教授、博士生导师,主要研究领域为软件工程,形式化方法,软件复用等。发表学术论文80余篇,主持多项国家863计划重点项目,获国家科技进步二等奖、技术发明二等奖等。
Graph Processing Framework Supporting Elastic Scalability in Distributed Shared Environmentƽ
ZHAN Hanglong1,2,CAO Donggang1,2+,XIE Bing1,2
1.Key Lab of High Confidence Software Technologies(Peking University),Ministry of Education,Beijing 100871, China 2.Beida(Binhai)Information Research,Tianjing 300450,China +Corresponding author:E-mail:caodg@pku.edu.cn
As an important pattern in big data processing,graph processing has been widely used in many kinds of scenarios,such as machine learning,data statistics and data mining,etc.when running enterprise-level applications,various kinds of big-data processing frameworks are usually deployed in the same distributed cluster,so the runtime environmentisopenandshared.Asaresult,graphprocessingshouldconsiderthedynamicchangesofcomputingresources. In order to adapt to this dynamics and make good use of computing resources,graph processing framework should have the ability of elastic scaling.However,current graph processing frameworks have not fully realized elastic scaling yet as far as this paper knows.This paper introduces the design and implementation of an elastic scalable parallelgraph processing framework,SParTaG.SParTaG firstly defines the task set and task model in graph processing problem;then designs an elastic scalable framework based on task migration mechanism;and proposes a load-balancing based scheduling algorithm at last.Experiments show that SParTaG achieves performance parity with the currently popular open-source Giraph system,and it can run graph job well in an elastic scalable manner.
graph processing;distributed parallel computing;elastic scaling;task migration
2015-07,Accepted 2015-09.
10.3778/j.issn.1673-9418.1509009
A
TP391
*The National Natural Science Foundation of China under Grant Nos.61272154,61121063(国家自然科学基金);the National Basic Research Program of China under Grant No.2011CB302604(国家重点基础研究发展计划(973计划));the Baidu Cloud Service Platform Demonstration Project(百度云服务开放平台示范项目).
CNKI网络优先出版:2015-10-09,http://www.cnki.net/kcms/detail/11.5602.TP.20151009.1639.010.html