赛影辉 黄 浩
1(奇瑞汽车股份有限公司 安徽 芜湖 241006) 2(武汉大学计算机学院 湖北 武汉 430072)
在传感网络中,流数据的产生、存储和实时分析等操作愈见频繁。为了实时地从繁冗复杂的数据中提取有用的信息,有关流数据的查询操作非常普遍。对于不同的查询操作,我们用于应答的数据处理主要有三种,即选择(selection)、投影(projection)和连接(join)操作。相对于作为一元操作的选择和投影操作,join操作的处理就复杂得多。由于两个输入的流数据之间可能是相关联的,join操作的结果并不一目了然[6]。此外,对于同一对输入流数据的多个join操作同时进行的情况十分常见,join操作和查询处理也会同时出现。为能在可接受的时间内应答相应的查询请求,流数据join处理的框架要能够高效地完成join操作。
例如,图1中Query 1和Query 2是有关环境污染在线分析的两条典型的查询语句,所得到的查询结果蕴含有关当前空气质量指数AQI(Air Quality Index)的重要信息。其中,两个流数据C和H分别代表目前和历史(一个月前)的空气质量指数,两个查询语句对这些流数据进行join操作。由于每个查询对结果有时效性的要求,即30分钟之内的数据结果,所以两个join任务:C▷◁|C.PM2_5-H.PM2.5|>tH和C▷◁|C.PM10-H.PM10|>tH不仅需要并行地执行数据流C和H上的join操作,还要在30分钟内(保证数据的有效性)完成每一个join操作。在实际操作中,并行join操作任务的个数、有效时间(join window)的长度、流数据的输入速率在不同的应用中都截然不同,所以在设计可扩展的流数据join处理框架时,以上几个因素都需要考虑在内。
图1 AQI中的流数据查询
一个可扩展的流数据join处理框架需要具备:
(1) 通用性:一个好的join处理框架应该是应用无关的,即不是为某个特定的应用而设计的,要能够尽可能多的支持不同种类的join操作。
(2) 并发处理:对于一对输入的流数据,通常会有多个查询语句同时对其进行操作,这样就会出现许多并发的join操作任务。所以,join处理框架应能够高效地安排并处理好这些并发的join操作任务。
(3) 高负载量:流数据的高输入率和join操作冗长的有效时间增加了join操作的工作量,挑战着流数据join处理的效率和能力。因此,join处理框架应能容纳并处理很大的工作量并避免工作量失衡。此外,对于操作时效的控制和维持应避免成为效率瓶颈。
虽然在独立机器上的流数据join处理已经有较完善的方法[4,7,10,16,18],但大多数现有方法[1,9,13,17,20]无法扩展到分布式的环境中。现有的分布式join处理的研究成果在结果完整性或通信开销上有一定的限制,且通常是为特定的应用而设计或优化。为能处理同一输入流数据对上的并发join操作,可直接将输入的流数据复制成多份并分配给每一个join操作任务。但这样的复制操作会显著地增加通信开销,并不可行。
本文中我们提出一种可扩展的流数据join处理框架S2J(Scalable Stream Join),能够高效地完成单输入流数据对上大量并发的join操作。S2J自动地分配适当个数的串联的处理单元(join worker)来分担join处理的工作量,并用资源共享的方式将并行的多个join操作任务分配给不同的处理单元,不必复制流数据,节省了通信开销。为处理join处理单元间信息交换,S2J采用了基于元组块的信息传递协议MP-2PF,以保证对于每一个目标元组对的join操作只执行一次。元组块的大小适当,则S2J能够缩小带宽,并有效地发掘每一个join处理单元的处理能力。此外,S2J还应用了可以动态改变以适应多种不同输入源及其不同输入速率的输入适配器和减轻负载的单元。
本文的主要贡献如下:
提出了能够动态地适应变化的工作量的,包括巨大的工作量,可扩展的流数据join处理框架S2J;提供了一个基于元组块的信息传输协议MP-2PF和元组块大小的选择方法,使得S2J框架能够保证join处理结果的完整性,减少通信开销,避免工作量安排失衡,同时减小输入流速率的波动带来的影响。
目前流数据join处理研究大致可分为两种:(1) 单机上的流数据join处理;(2) 分布式的流数据join处理。
1.1.1 集中式流数据join
早期流数据join方法需要集中维护join状态(如中间结果),并采用基于哈希或基于排序的join方法。
基于哈希的join方法[18]:管道哈希join是最经典的流数据join方法之一,它利用并行内存加速join处理。然而,为了保持join状态,此方法要求内存容量充足。为解决此问题,出现了双管道哈希join[7],XJoin[16],以及将部分哈希表刷新到磁盘以便后续处理的哈希归并join[10]。为使输出率最大化,一些方法采用统计刷新策略[3,5,15],即只有更可能参与到join操作中的数据元组才能保存到内存中。
基于排序的join方法:哈希join方法适用于等值join,而基于排序的join方法适用于非等值join,但往往在join处理前需要完整的输入。为此,逐步合并join[4]将内存分为两部分,每部分各保存一个流数据,当内存空间已满时进行join处理。不过,以上操作在输出结果时可能带来严重的延迟。
1.1.2 基于多核的流数据join
多核技术支持了单机并行流数据join[5,8,12,14]。如Gedik等[5]提出利用多核单元处理器提高流数据join效率,但效率依赖于硬件并行性,而商用硬件通常不能很好地支持这种并行性。此外,多核和共享内存相结合的方法也可以进一步提高join处理能力和效率。例如,HandShake join[14]中,流数据中的每个元组与其他流数据中的元组分别进行join操作。但这些基于多核的方法难以扩展到分布式环境中从而实现并行化。
为得到高质量和可扩展的join处理,很多研究着重于分布式的流数据join处理,但是现有方法大多用于特定环境,有些方法为了高效率而牺牲结果完整性。
Photon[1]是Google公司提出的一种容错的分布式流数据join系统,被特定地用于处理网页搜索查询和用户对广告点击的数据join操作,缺少通用性。
D-stream[20]将连续的流数据分为离散的单元,在Spark中对其进行批处理[19]。然而,由于一些目标元组对在批处理时被分至不同批次,无法进行相应join操作,无法保证结果的完整性。很多基于MapReduce的流数据join处理方法[2,9]也面临同样的问题。
TimeStream[13]利用元组依赖关系执行流数据join操作。但依赖关系的维护会导致通信开销,可能成为其性能瓶颈。多join谓词可能使此解决方法更加复杂。
PSP[17]通过状态的时间切片将整体的join处理转换成一系列相互联系的较小的子运算过程,并分散为环形结构。但它需要同步分布式的join状态,通信开销可能会很高,一般达到子运算符数量的指数级。
storm是一个开源的分布式实时计算系统,同样,storm具有无法保证join结果完整性的缺点,应进行join操作的一组数据可能被包含在不同的数据元组中。
Flink是一个针对流数据的分布式处理引擎,将所有任务当作流来处理是其最大的特点。然而在执行Flink过程中,若对其计算资源的分配和定义进行干预,则有可能会出现资源不公平利用的现象。
为了使流数据join处理框架能动态适应变化的工作量,减少通信带宽同时又能保证join处理结果的完整性,本文提出S2J框架,且具有以下三个特点:(1) 对变化的工作负载有良好的适应性;(2) 保证join处理结果完整性同时减少带宽;(3) 信息处理的高效性。
为实现动态适应变化的工作负载这一特点,S2J框架采用基于处理单元平均利用率的分段式解决方法,并使用自适应分级卸载机制,动态管理处理单元。
为保证join处理结果的完整性,同时缩小通信带宽,减少信息过载现象,S2J框架采用MP-2PF传输协议和独特的元组块大小选择方法。
为了实现信息处理的高效性,S2J框架使用了特殊的优化策略以提高join处理效率。
图2即S2J框架的系统结构,由可扩展的join引擎、输入适配器、负载均衡单元、物化单元、查询代理和查询处理器组成。
2.1.1 可扩展的join引擎
为使计算可扩展性最大化,S2J框架的join引擎采用以worker为基本处理单元的流数据处理模型,部署在分布式平台的节点上,一个节点可以负载一个或多个处理单元。所有处理单元通过流数据通道以级联的方式连接,系统运行时可根据需要增加处理单元。
S2J框架采用数据流为导向的处理方式,各进入join引擎的元组有一个生命周期。如图2中,数据源R的一个元组r到达join引擎的左侧终端时,其生命周期开始,生命周期与join操作的有效时间(join window)长度一致。以1 min为例,元组r沿各处理单元向右移动,每遇到一个向左移动的数据源S的目标元组就与其进行join操作,1 min后,元组r从引擎的右侧终端流出,生命周期终止。元组r进入引擎的同时,上一周期数据源S的所有元组已在引擎中,下一周期的元组在元组r终止之前移至引擎中。与元组r有关的所有join操作能在限定时间内完成,保证引擎的时效性。
图2 S2J框架结构
为使全局负载平衡,join引擎中信息传递规则如下:当处理单元的负载超过其后继单元,且超过部分大于一定阈值,该处理单元将一定的负载传递给后继单元。这样,全局的负载能平均分配到每个处理单元。
同时,为能并发处理在同一对流数据上的多个join任务,join引擎将一组独立的worker实例链安排给每个join任务。当新的join任务到达流数据对时,新的worker实例自动初始化。同一个处理单元中的所有实例共享流数据通道和流数据,节省通信开销。
2.1.2 输入适配器和负载均衡
为使框架独立于不同的输入流数据,S2J采用输入适配器将输入的源数据转换为标准的流数据,并根据查询语句预先对数据进行选择和投影操作。此外,负载均衡单元用于解决流数据输入速率暴增的问题。
2.1.3 物化单元
Join引擎最新的输出存储于内存缓冲区,而之前的输出记录以快照的方式快速备份存储于数据库中。每个快照拥有共同标识符(如执行时间),并根据这个标识在记录上建索引。此外,S2J框架支持快照的定期执行或根据输入流中的标点符号执行。
2.1.4 查询代理和查询处理
S2J框架支持连续查询(要求输出最新的结果)和一次查询(只要求输出一段时间内的结果)。通过查询处理上Client/Server模式的应用,可同时处理多个查询。在Client端,查询代理将查询请求转化为流数据,将查询应答流数据转换为Client版本。在Server端,查询处理单元从流数据中解析出查询请求,作出应答,通过查询应答流数据将查询结果返回至相应的Client端。
join任务的工作负载大小取决于join有效时间的长度和输入流数据对的输入速率,以上两者随应用程序不断变化,使不同应用程序有不同的工作负载。在特定的应用程序中,当join有效时间固定,数据流输入速率的波动亦会引起join过程中的动态工作负载。因此,适应动态的工作负载至关重要。
图3阐明这种分段式方法,其中τ*是τ的初始值,所有的阈值满足以下关系:0<τ0<τ*<τ1≤τ2≤1。
图3 动态适应工作负载
基于以上阈值,我们将讨论不同应用中计算资源(即S2J的join处理单元)的初始配置,介绍执行过程中S2J动态适应不同工作负载量的方法,即自适应地将工作负载分级卸载以及在线调整join处理单元的个数。
2.2.1 初始化
在join任务初始化之前,需指定m个join处理单元用于S2J框架的初始部署。由于部署少量的处理单元能减少计算资源的浪费,而部署更多的处理单元能增加整体的处理能力,承担更大的工作负载的波动,因此m值的选取尤为重要。
由于τ*<τ1,我们设定τ*=β·τ1(0<β<1)。如果τ*→1·τ1,虽然所有的处理单元都将被有效的利用,但输入流速率短暂的增加也会触发假阳性分级卸载。而如果τ*→0·τ1,大量处理单元利用率降低,引起额外的通信开销。综上,我们在实验中测试τ*·0.5·τ1,这样,在保证每一个处理单元达到合理的利用率的同时,还能够承受相对较大的输入流波动。
2.2.2 自适应分级卸载
在join任务的执行过程中,短暂的工作量增加造成输入流波动的情况经常发生。为此,S2J框架采用自适应分级卸载,当处理单元状态(或即将)饱和时,卸载一定比例的元组。我们将这个比率定义为卸载率SR,具体如公式所示:
式中:B∈[0,1]是基本卸载率。
以上卸载率运用线性的卸载模型,τ超过阈值τ1时,卸载率与负载的超出部分成正比。实际操作时,可运用其他卸载模型确定卸载率,如二次卸载模型。图4阐示了这两种卸载模型中卸载比率的变化趋势。
图4 分级卸载模型示例
2.2.3 join处理单元管理
为处理执行过程中流输入速率的持续增加和减少,S2J框架对join处理单元运用动态的管理机制。
若τ>τ2且持续一定时间,S2J通过分配额外的处理单元以增加其处理能力。由于配置额外的处理单元,平均利用率τ下降,S2J将增加新的处理单元直到τ的值减少至接近τ*(见2.2.1)或不存在多余节点。
若τ>τ0持续相对长的时间,为解决欠载问题,S2J释放多余的处理单元以节省计算资源。根据τ值,S2J框架从一端释放处理单元,使剩余处理单元的平均利用率增加至τ*左右。被释放的处理单元被S2J框架回收并转换为可用状态。
在分布式处理中,处理单元之间的信息传递会导致大量的通信,可能会成为系统的性能瓶颈,特别是网络带宽有限的情况下。为了节约通信成本,S2J框架令并发join任务共享流通道和数据流,以减少流数据对的数目。对于给定的流数据对,主要的通信过载是由信息序列化造成的。为减少这种过载,S2J框架使用由多个元组组成的元组块进行信息传递。
对于基于元组块的信息传递,我们提出MP-2PF协议部署信息的传递过程,使其与join处理过程同步,并提出元组块大小的选择方法,动态调整元组块粒度以满足不同需求,如减少带宽和避免假阳性分级卸载。
2.3.1 MP-2PF协议
S2J框架在join过程中采用two-phase forwarding model[14]。基于此模型,我们提出MP-2PF信息传递协议,实现处理单元之间被动的信息交换,即当处理单元的状态发生改变的时候,它立即将这一转变信息送至依赖于此信息来完成下一操作的近邻单元。
MP-2PF协议中共有三种信息类型,即:
SIZE_CHG:将其工作负载大小发送至前趋。
TUPLE_BLK:在相邻处理单元之间传递元组块。
ACK:确认接收元组块。
当join处理单元的工作负载量(以元组数为计量单位)发生改变,SIZE_CHG信息会被发送至其前趋单元。若工作负载量小于其前趋单元且两者之差至少为一个元组块大小,那么这两个相邻的处理单元之间会有工作负载传递。此过程中,在收到相应的确认反馈之前,前趋单元将一直保存传递元组的副本,TUPLE_BLK信息包含传递的元组块,当收到后继单元发送的ACK信息时,删除副本信息。这样,S2J框架避免了错失join数据对的问题[14],同时保证一对元组之间的每一个join操作执行且只执行一次。
图5概述了MP-2PF协议内容。处理元组块的过程中,S2J的处理单元于三个状态中相互转换。每个单元的初始状态为Processing状态,并且在接收一个新的元组块之后回到此状态。当收到新的元组块时,处理单元利用新的元组进行join操作,并将确认信息发送至其前趋单元,随后检查forwarding条件以决定是否调用元组块的forwarding步骤。Forwarding条件是指两个近邻处理单元的工作负载量差值达到阈值(此阈值即元组块的大小),若forwarding条件满足,则单元转换到forwarding状态。随后处理单元将元组块发送至其后继单元,在其站点中留下转发信息的副本,并最终返回processing状态。若处于processing状态的处理单元接收到确认信息,其状态转变为deleting状态,删除之前转发元组的副本以及相应的确认信息,并将其工作负载量的改变信息发送至其前趋单元。最后,处理单元回到processing状态。
图5 MP-2PF协议
2.3.2 元组块大小的选择
对每次信息传递,两个处理单元之间传递的元组需序列化为带信息头的信息,这增加了通信量,传输固定数量的元组的通信开销与序列化的次数成正比。因此S2J不用每次只转发一个元组的传统策略[14],而利用包含ζ个元组的元组块作为信息单元进行转发,即仅当处理单元的工作负载量高于其后继单元ζ个元组时,它才将ζ个元组发送给其后继单元,如图6(a)所示;无元组传输的情况如图6(b)所示。
图6 不同Worker间的workload差异
为最小化序列化的开销,ζ值应尽量大。随着ζ增长,减少的通信过载会更小,若ζ足够大,这种过载可忽略不计。此现象可由以下引理和定理来解释。
引理1令ϖ为一对给定的输入数据流的输入速率,常量c为信息头的长度,在有m个处理单元的S2J框架中,元组块的大小ζ与带宽b的关系如下:
(1)
式中:bout为用于输出join结果的带宽,对于给定的join任务,输出带宽是固定的。
基于引理1,我们可以得到以下定理。
定理1提供了一个定量保证:当元组块大小适当时,S2J能达到一个相对低的带宽。图7描绘了式(1)的函数曲线,从中可发现当ζ超过一个相对小的数值如20,S2J能够得到前5%的最小带宽。
图7 S2J中变化量为ζ的带宽
基于以上分析,我们提出了元组块大小ζ值的选择方法如下:
(2)
式中:ζ*是用户定义的参数,决定系统能够达到的最小带宽(ζ≤ζ*)。在我们的实验中,ζ*值设置为20。
图8阐述了此选择方法,从中可以看出:(1) 在τ达到阈值τ1之前,此方法在多数情况下选择的元组块大小ζ保持在ζ*左右,减少了所需带宽;(2) 当τ接近τ1时,选择更小的ζ值,如前所述,较大的元组块大小会增加假阳性分级卸载的可能,为了解决这一问题,S2J以增加带宽为代价运用了更小的元组块大小;(3) 当τ超过τ1时,元组块大小缩小至1以帮助S2J平均地分配工作负载,以有效地开发每个处理单元。
图8 不同τ值对应的ζ
利用此元组块大小选择方法,S2J能够有效地避免假阳性分级卸载。接下来,我们将阐述在每个join处理单元的利用率达到τ1之前,S2J所有m个处理单元的平均利用率τ不会超过分级卸载的阈值τ1。
为了输出一定数量的join结果,所需计算量与找到目标元组对的比较次数成正比。因此,join处理效率P可以通过以下公式计算:
(3)
更大的P值意味着框架能够为流数据的join安排更高效的处理过程。
为提高join操作的效率,S2J使用内存中索引,减少不必要的join操作,加快处理进度。对输入流数据中的每个元组块生成join键的内存索引。当元组块到达join处理单元时,索引条目插入,当元组块移出处理单元时,索引条目删除。在join过程中,每个元组搜索满足索引中谓词的join键,并与关联于匹配的join键的元组进行join操作。
为进行不同谓词的join操作,S2J对于等值join采用哈希索引直接定位目标join键,得到键值相同的相应元组;对于非等值join,利用平衡二叉搜索树(BST)快捷访问由谓词如<,>,≤,≥指示的join键。
我们在分布式实时流数据处理平台Apache S4[11]基础上实现了S2J框架,并在以下实验环境中对S2J进行评估:Intel Xeon X3430 @ 2.4 GHz CPU以及CentOS 5.11,单元内存为2 GB。在本实验中,我们将分别在单机(单核运行一个join处理单元)和分布式环境(同一集群的2~8个结点)中评估S2J的处理能力,并分别在单机4核环境以及拥有8个结点的集群上评估处理并发任务的效率,同时验证不同参数对框架的影响。
由于S2J处理器在单机(单核运行一个join处理单元)和分布式环境中都能够部署展开,我们将研究其在两种不同模式下运行的性能。此外,我们也对其处理并发join任务的性能进行报告。
3.1.1 输入适配器和负载均衡
在此实验中,将S2J的join处理性能与传统多核的流数据处理框架HandShake Join(握手Join)[14]性能进行比较。为此,通过提高流数据对的输入速率找到系统处理能力的上限(执行过程中join的有效执行时间固定),记录使CPU利用率达到95%的数据输入率,此输入率表示系统可承受的最大输入速率,即系统的最大处理性能。为研究系统在不同工作负载量下的处理性能,将join有效操作时间从5分钟逐渐调整至20分钟,在同一单机上依次运行所测验的框架,每次执行运用不同数量的核(2~8个)。
图9阐述了在不同join有效操作时间及不同数量核的情况下,HandShake Join和S2J在单机上的系统处理性能对比。如图9(a),有效操作时间为5分钟,核的数量由2增至8,利用哈希索引的S2J能承受的最大数据输入率由每秒4 000元组线性增至8 500元组,利用二叉搜索树索引的S2J同样具有线性增长趋势且增长幅度较大,而HandShake Join能承受的最大数据输入率最小,且增加核的数量时,其增长幅度不明显。从中发现:(1) 与HandShake Join相比,S2J框架能承担更高的最大输入速率,具有更高的join操作处理性能;(2) 运行的核数量越多,相比于HandShake Join框架,S2J最大输入率的增长更快,即S2J在join处理性能上亦有更好的扩展性。这是由于S2J独特的优化策略和系统结构使其在处理join操作时具有更高的效率,也使其较HandShake Join具有更高的处理性能和更好的扩展性。
图9 单机环境下S2J和HandShake Join所能承受的最大输入率
3.1.2 分布式环境中的处理性能
此实验将S2J框架与D-Stream[20]在分布式环境下的性能进行比较。由于其底层实现部分是不支持非等值join的基于哈希的map和reduce,D-Stream只处理等值join操作。且1.2节指出,D-Stream将输入的流数据分解成离散的处理单元分批处理,可能会使本应执行join操作的两个元组因存在于不同的两个单元而相互错过,处理结果不完整。因此,这个部分仅报告D-Stream在处理等值join方面的系统处理性能,单机环境下的HandShake Join不参与这部分的对比。我们在同一组集群上部署S2J和D-Stream,每次执行的时候使用不同数目的结点(2~8个),并且通过测验不同长度的有效执行时间(5~20分钟)来研究每个框架在不同的工作负载量下所能承受的最大数据输入率。
图10展示了在不同join有效操作时间以及不同数量的核的情况下,D-Stream和S2J在分布式环境下进行等值join操作的系统处理性能的对比结果(其中基于二叉搜索树索引的S2J的实验数据仅作为参考)。如图10(a)中,有效的join操作时间为5分钟,运用的核的数量由2增加至8,D-Stream和S2J能够承受的最大数据输入率都有所增加且两者数值几乎相等,D-Stream能够承受的最大数据输入率略高。
图10 分布式环境下S2J和D-Stream所能承受的最大输入率
对于等值join任务,S2J与D-Stream性能相当。然而,D-Stream只能进行等值join操作,且仅能输出部分目标join结果,而S2J处理框架能保证得到结果的完整性。所以,S2J在支持高效的θ-join和保证join结果的完整性方面优于D-Stream。
3.1.3 执行并发join任务的处理性能
本实验从两方面研究S2J执行并发join任务的处理性能。即并发join任务如何影响:(1) S2J框架能够承受的最大数据输入率;(2) S2J框架所需带宽。我们在S2J中运行2~8个join处理单元,使用不同数量(2~8个)的并发join任务,有效执行时间为10分钟。
处理性能方面,图11描述结点数量不同时,S2J能承受的最大数据输入率随并发任务数量变化的趋势。如图11(a)描述了结点数量为2,当并发任务数量从2增至8时,基于哈希索引的S2J(等值join)能承受的最大数据输入率从接近2 700元组每秒减少至2 000。从图中发现随任务数量的增加,最大输入率减小,且趋势逐渐减缓。对于相同的并发任务数量,结点数量增加时,S2J能承受的最大数据输入率增加,这是由于结点数量的增加能有效提高S2J的处理性能。
图11 并发任务时S2J能承受的最大输入率(有效时间10分钟)
通信带宽方面,图11为S2J在执行不同数量的并发join任务时所需带宽。仅作为参考,这里有S2J的朴素(naive)版本的实验结果,即对于每个任务都采用复制其流数据的方法来进行join时的所需要的带宽,将此结果作为实验数据的对比基准。从图12我们可以发现:(1) 基于流数据复制的join处理框架所需带宽随着并发join任务数量的增加呈线性增长趋势,而S2J所需带宽趋于稳定,几乎不受并发join任务数量的影响;(2) 与基于流数据复制的join处理所需带宽相比,当存在大量并发join任务时,S2J框架能显著节约带宽。这是由于其独特的消息传递机制减少了通信开销。
图12 不同数量的并发join任务时所需带宽(有效时间5分钟)
总之,S2J处理框架能够以更少的性能代价处理更多并发的join任务,同时对于带宽几乎没有损耗。
在实际操作中,并不是处理框架部署的所有计算都能够得到join结果的输出,通常一些计算会由于低效的join处理而被浪费。有效计算的百分比可以通过join操作的效率进行估算(见2.4节中的式(3))。Join处理效率越高,框架可通过更少的计算量获得同样的join操作结果,这就为更多的进程节约了更多的CPU周期。我们在单机4核环境下对S2J和HandShake Join的处理效率进行对比,以及在有8个结点的集群上对S2J和D-Stream的处理效率进行对比。有效执行时间为5秒至1分钟,两组输入流对分别以1 000元组每秒和2 000元组每秒进行输入。
图13展示了对比结果。从中可以发现:(1) S2J的处理效率远高于HandShake Join和D-Stream,并随由有效时间增加或流数据输入率增加引起的工作负载量增加而提高;(2) HandShake Join和D-Stream的处理效率很低(在本实验中,其效率低于1%),由于HandShake Join对每一个元组对进行比较,降低处理效率;而D-Stream丢失了许多用于join操作的目标元组对,只输出部分join结果,降低处理效率。
在本实验中,首先确认元组块的大小和S2J的带宽之间的关系,接着核实元组块大小选择方法在承受输入流输入速率波动方面的有效性。
如2.3.2节中,S2J的带宽与元组块大小ζ成反比,基于这一点,当ζ增大超过一定阈值(如20)时,缩小的带宽(如5%的变化)几乎可被忽略。为证明此结论,我们部署有效执行时间5分钟,具有8个join处理单元,流数据输入率每秒2 000至4 000元组的join任务。执行过程中,将元组块大小从1个元组调整至200,并报告相应的带宽如图14。从图中可以发现,带宽的变化率呈现出与图7中S2J带宽与ζ之间的关系相同的趋势,由此证明上述结论。
图14 不同元组块大小的S2J中的带宽
运用元组块大小选择方法,S2J能容忍数据输入率的波动,避免假阳性分级卸载。为证实这一点,卸载的基本比率(见2.2.2节)设为B=30%,τ1=80%,有效执行时间为1分钟,在8个join处理单元上运行join任务。输入的流数据对具有每秒1 000元组的平均传入速率,同时其波动具有不同的振幅和频率。
图15展示S2J使用选定的ζ值和使用固定元组块大小(即20,50,500)时不同结果的对比。可发现,使用选定的元组块大小时,S2J成功承受数据输入率的波动。而使用其他的ζ值时发生假阳性分级卸载现象。
图15 不同输入数据率波动振幅及频率下元组块大小对分级卸载的影响
本实验中,为阐述S2J框架在执行过程中如何适应不同的工作负载量,我们安排了一个案例研究。为此,采用具有不同输入率的流数据对作为输入,如图16(a),S2J中安排4个结点作为其初始处理单元(一个处理单元运行在一个结点上),有效执行时间为60秒,设置阈值τ0=20%,τ1=80%,τ2=82%。添加和释放处理单元的规则如下:(1) 若所有处理单元的平均利用率τ满足τ>τ2且延续超过30秒,系统将添加额外的结点使得τ减少至τ*左右(即40%,因为τ*=0.5·τ1);(2) 若τ<τ0且延续5分钟,将卸载一定数量的结点以使τ上升至τ*附近。
图16 S2J适应不同工作负载量的案例研究
图16的(b)至(d)分别展示τ值,实时分级卸载,执行过程中的节点数量,从中得出以下几点发现(按照时间顺序):(1) S2J能有效容忍在第400至500秒时由于短暂的数据输入率增加而造成的工作负载量波动;(2) 一旦每个join处理单元的利用率将要达到饱和(即τ>τ1),如图16(c)所示,S2J卸载一部分负载使得其处理单元恢复处理性能;(3) 当工作负载量持续增加,τ>τ2的情况持续超过30秒时,如图16(d)所示,4个新的处理单元添加至S2J框架来帮助处理增加的工作负载,执行此操作之后,所有处理单元的平均工作负载在几秒内降至40%左右。这是因为根据MP-2PF协议,原始单元的元组被迅速传递给新添加的单元,直到工作负载量达到平衡;(4) 若平均利用率降至τ0以下,且此状态持续5分钟以上,S2J卸载多余的join处理单元。系统可自动选择合适数量的处理单元(此例中为4个)来卸载,以使τ恢复到接近于τ*的值。卸载操作在60秒内完成,即有效执行时间,其原因是每一个新添加的元组将被S2J中的处理单元传递,其生命周期终止时被移除。
我们提出了S2J——一个基于高度分布式环境的可扩展流数据join处理框架。S2J能在不同工作负载量下有效地使用计算资源,有能力处理由流数据输入率的增加带来的暂时性过载情况,高效处理θ-join且提供实时保证,并保证join结果的完整性。S2J还可减少通信开销,避免流数据输入速率波动时过大的元组块造成的假阳性分级卸载。我们分别于单机和分布式环境下进行实验,实验结果阐述了S2J框架处理join操作的高效性和有效性,同时证实其能够容忍波动的数据输入率并适应不同的工作负载量。
[1] Ananthanarayanan R,Basker V,Das S,et al.Photon:fault-tolerant and scalable joining of continuous data streams[C]//Proceedings of the 2013 ACM SIGMOD International Conference on Management of Data.ACM,2013:577-588.
[2] Blanas S,Patel J M,Ercegovac V,et al.A comparison of join algorithms for log processing in mapreduce[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data.ACM,2010:975-986.
[3] Chen S,Gibbons P B,Nath S.PR-join:a non-blocking join achieving higher early result rate with statistical guarantees[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data.ACM,2010:147-158.
[4] Dittrich J P,Seeger B,Taylor D S,et al.Progressive merge join:a generic and non-blocking sort-based join algorithm[C]//Proceedings of the 28th International Conference on Very Large Data Bases.ACM,2002:299-310.
[5] Gedik B,Yu P S,Bordawekar R R.Executing stream joins on the cell processor[C]//Proceedings of the 33th International Conference on Very Large Data Bases.ACM,2007:363-374.
[6] Golab L,Özsu M T.Issues in data stream management[J].SIGMOD Record,2003,32(2):5-14.
[7] Ives A G,Florescu D,Friedman M,et al.An adaptive query execution system for data integration[C]//Proceedings of the 1999 ACM SIGMOD International Conference on Management of Data.ACM,1999:299-310.
[8] Karnagel T,Habich D,Schlegel B,et al.The hells-join:a heterogeneous stream join for extremely large windows[C]//Proceedings of the 19th International Workshop on Data Management on New Hardware.ACM,2013:1-7.
[9] Logothetis D,Yocum K.Ad-hoc data processing in the cloud[J].VLDB Endowment,2008,1(1):1472-1475.
[10] Mokbel M,Lu M,Aref W G.Hash-merge join:a non-blocking join algorithm for producing fast and early join results[C]//Proceedings of the 20th IEEE International Conference on Data Engineering.IEEE,2004:251-262.
[11] Neumeyer L,Robbins B,Nair A,et al.S4:distributed stream computing platform[C]//Proceedings of the 12th IEEE International Conference on Data Mining.IEEE,2010:170-177.
[12] Qian J,Li Y,Wang Y,et al.An embedded co-processor for accelerating window joins over uncertain data streams[J].Microprocessors and Microsystems,2012,36(6):489-504.
[13] Qian Z,He Y,Su C,et al.Timestream:reliable stream computation in the cloud[C]//Proceedings of the 8th ACM European Conference on Computer Systems.ACM,2013:1-14.
[14] Teubner J,Mueller R.How soccer players would do stream joins[C]//Proceedings of the 2011 ACM SIGMOD International Conference on Management of Data.ACM,2011:625-636.
[15] Tok W H,Bressan S,Lee M L.RRPJ:result-rate based progressive relational join[C]//Proceedings of the 12th International Conference on Database Systems for Advanced Applications.Springer,2007:43-54.
[16] Urhan T,Franklin M J.Dynamic pipeline scheduling for improving interactive query performance[C]//Proceedings of the 27th International Conference on Very Large DataBases.ACM,2001:501-510.
[17] Wang S,Rundensteiner E.Scalable stream join processing with expensive predicates:workload distribution and adaptation by time-slicing[C]//Proceedings of the 12th International Conference on Extending Database Technology.ACM,2009:299-310.
[18] Wilschut A N,Apers P M G.Dataflow query execution in a parallel main-memory environment[C]//Proceedings of the 1st International Conference on Parallel and Distributed Information Systems.IEEE,1991:68-77.
[19] Zaharia M,Chowdhury M,Das T,et al.Resilient distributed datasets:a fault-tolerant abstraction for in-memory cluster computing[C]//Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation.ACM,2012:25-27.
[20] Zaharia M,Das T,Li H,et al.Discretized streams:fault-tolerant streaming computation at scale[C]//Proceedings of the 24th ACM Symposium on Operating Systems Principles.ACM,2013:423-438.