流数据实时接收方案的研究

2022-04-29 05:15张笑燕刘志浩杜晓峰陆天波
通信学报 2022年4期
关键词:元组缓冲区磁盘

张笑燕,刘志浩,杜晓峰,陆天波

(北京邮电大学计算机学院(国家示范性软件学院),北京 100876)

0 引言

随着信息社会飞速发展,数据产生的速度越来越快。现实中存在一类常见的业务,即将大量源源不断到达的流数据S先与存储在磁盘上的关系表R进行连接,对S进行半流连接关联更新操作(如去重、修正等)后再将其写入数据仓库[1]。该过程被定义为S∞CR[2]。其中,C代表某种半流连接操作。

由于源数据系统的不同,逻辑上相同的元组可能具有不同的值,在数据进入数据仓库时,需要通过关联表进行统一,保证数据的一致性。

为了更具体地理解这种半流连接关联更新操作,图1 展示了数据仓库中涉及该操作的一个例子。由于数据来源的不同或数据的延迟到达,系统中同一个id 可能具有不同的名称,在数据进入数据仓库之前,需要使用一张关系表将id 转换为系统内部的id,对不同的名称进行统一。

图1 数据仓库关联更新示意

在真实的数据仓库系统中,R保存在磁盘上,一般占用空间较大,无法全部放入内存;S包含不断到达的流数据,其中的每个元组都需要和R进行连接操作[3-5]。这便产生了一个问题:S中的元组以流数据的形式不断快速到达,而R中的元组需要通过磁盘I/O 以相对较低的速率读取,造成更新时需要消耗大量时间等待磁盘I/O,导致数据处理的延迟。

事实上,为了解决流数据连接操作中的各种问题,与此有关的研究从未中断。Polyzotis 等[6]提出了MESHJOIN 算法,通过对关系表的分组读取和对流数据的分页读取,把对磁盘的I/O 时间分摊到了若干个流数据元组的读取中,平衡了关系表的读取速率和流数据的到达速率的差异。但是MESHJOIN 算法并没有考虑流数据S的特征和关系表R的组织,因此在处理倾斜数据时表现较差[7]。Vaidehi 等[8]提出了分布式嵌套循环连接处理(DNLJP,distributed nested loop join processing)算法,对流数据的来源进行分组,根据其所定义的不同查询成本、查询时间等对连接查询操作进行优化,降低了分布式集群中各节点之间通信的成本,提高了对海量数据连接查询的效率。DNLJP 是可以用于分布式集群的算法,而 MESHJOIN 和CACHEJOIN 算法只适于单机环境,无法直接应用于分布式环境[9-10]。Naeem 等[11]提出 的CACHEJOIN 算法考虑了数据倾斜的问题,通过引入一个缓存,将关系表中较常用的部分存储在缓存中,其中的元组有较高的概率和流数据匹配成功,减少了磁盘的I/O 次数。Jeon 等[12]提出了DS-join方案,通过使用几种流处理引擎的微批处理模式,控制数据分区,减少了各节点之间的网络通信频率,同时引入缓存模块对连接操作并行处理,减少磁盘I/O 次数,并且拥有自动调整缓存区大小的优化设置。为了保证分布式流连接操作中处理结果具有完整性和一致性,Yuan 等[13]测试了数据仓库接收到的流数据出现错误时连接操作结果的质量优劣,提出了基于有序传播模型的、应用于分布式流连接系统的Eunomia 方法,保证了所有连接器的元组到达顺序的一致性,能够消除数据传播过程中的某些错误,提高了分布式流连接处理的扩展性和吞吐率。狄程等[14]设计并实现了对多源异构流数据的处理系统,在一定程度上消除了数据的结构不同对数据引入、连接处理造成的复杂问题,使流处理服务化、模块化,减少了流数据处理程序的开发成本。

因此,本文在已有研究的基础上进行改进和优化,提出了应用于分布式环境的D-CACHEJOIN 算法。D-CACHEJOIN 算法的关键在于采用一致性哈希函数[15]的策略,将R分区存储在不同节点,并在后续的匹配过程中使用这一计算结果,达到快速将R与S进行匹配的目的。同时,本文使用服从Zipfian 分布的模拟数据[16-17],定量计算算法执行的成本开销,以优化算法的执行效率[18]。实验表明,在拥有一定的可用内存时,D-CACHEJOIN 算法在处理流数据时具有良好的实时处理性能,同时具有易于扩展的特性。

1 基于缓存的分布式D-CACHEJOIN 算法

1.1 D-CACHEJOIN 算法的原理描述

与CACHEJOIN 算法类似,D-CACHEJOIN 算法拥有2 个阶段——流检测阶段和磁盘检测阶段。流检测阶段使用流数据S作为输入,磁盘检测阶段使用关系表R作为输入,由于内存大小的限制,两阶段每次都分别只处理S和R的一部分。

D-CACHEJOIN 算法执行架构如图2 所示[11],关系表R和流数据S是外部输入,HS用于分次存储S中的元组,实际中占用较大的空间;HR是缓存模块,用于存储R中匹配频繁的元组[19]。

图2 D-CACHEJOIN 算法执行架构

下面对本文算法的执行架构进行说明。算法在流检测阶段,读取流缓冲区的数据元组并与HR中存储的高频访问元组进行连接匹配,如果在HR中找到所需的元组,则算法生成该元组作为输出,只有在无法匹配时,才会将该元组存储到HS中并将其指针加入队列,以便进行后续的匹配操作。当HS满或S中已无数据元组时,流检测阶段结束,磁盘检测阶段开始。在磁盘检测阶段,算法将R中的部分元组读取到内存,得以分摊消耗资源较多的磁盘I/O 操作。读取完成后,算法会将其与HS中的元组进行匹配,匹配成功后,算法生成连接后的元组作为输出。与此同时,匹配次数超过阈值的元组会被缓存到HR中用于流检测。经历数次迭代后,HS中最先进入的元组已经完成了和R中所有元组的匹配,算法会将其删除,此时HS中有了新的空闲位置,可以继续存储S中待匹配的元组,因此算法接下来再次切换到流检测阶段。

D-CACHEJOIN 算法的伪代码实现如算法1所示[11],其中w为流检测阶段读取S的元组数,b为磁盘检测阶段读取R的元组数。

算法1D-CACHEJOIN

20) 删除HS中匹配完成的元组及其队列中对应的指针

21) end while

为了完整地处理流数据,算法的外层是无限循环,主要包括2 个部分,流检测阶段和磁盘检测阶段,二者交替进行。

在步骤2)~步骤9)的流检测阶段,算法在流缓冲区读取w个元组,对w中的每个元组t,算法首先检测检测t是否在HR中,如果是则将其连接并输出结果;否则将t加入HS中,同时在队列中加入t的指针。

在步骤10)~步骤20)的流检测阶段,算法在磁盘缓冲区读取b个元组,对b中的每个元组r,算法首先检测r是否在HS中,如果是则将其连接并输出结果;否则将忽略该元组。由于HS需要存储流中的元组,因此HS是一个多映射,r可能会成功匹配到HS中的多个元组,记该成功匹配的次数为f;如果某个r对应的f大于阈值threshold Value,就将r加入HR中,如果HR已满,则先随机删除HR中的一个元组,然后再将r加入HR。在每一次的迭代中,如果缓存HR未满,说明阈值设置过高,此时可以自动降低阈值;如果过于频繁地从HR中删除元组,说明阈值设置过低,此时可以自动提高阈值。

1.2 D-CACHEJOIN 算法的系统实现

1.1 节说明了本文算法在单节点环境中的一般过程,本节将算法应用到分布式环境中。为了叙述简洁,本节以拥有2 个节点的数据仓库为例。

数据仓库接收从数据源经网络传输的流数据时,数据会随机地分配到各节点上。数据向各节点的随机分配是为了实现集群整体上各节点抽取-转换-加载(ETL,extract-transform-load)的负载均衡,在实际处理中,可以在随机选取第一个节点后,按照某个固定的顺序依次选取各个节点作为数据的接收方,消除因选取节点而产生的额外的CPU 计算开销。节点在接收到数据元组后,直接通过一致性哈希函数计算,如果元组属于该节点,就将其保留,否则将元组转发到对应的节点,同时将计算得到的哈希值一并发送,避免后续产生重复的计算。D-CACHEJOIN 算法的整体架构流程如图 3 所示。对于 MESHJOIN 算法和CACHEJOIN 算法,由于其相当于只在图3 中的“连接”环节发挥作用,而数据元组在分布式环境中的接收、转发等都在此之前就已经完成,因此可以相对简单地一并扩展到分布式环境之中。

在基于本文算法的处理流程中,流数据S、关系表R和数据仓库集群中各节点均采用同一个哈希函数进行映射,有利于集群的扩展。同时对于节点接收到的不属于自身、需要转发的元组,通过计算得到的哈希值将会随元组一同转发,避免了重复的计算。

从图3 可以看出,数据仓库将流数据随机地依次分配给集群中的任一节点,从宏观上看,集群中的所有节点在某一时间段内同时读取数据并进行计算,保留属于自身的数据、转发属于其他节点的数据。这样的设计没有属于中心地位的节点,发挥了分布式集群并行处理数据的优势。数据不需要首先经过中心节点的处理,而是直接在各节点之间进行流转,避免了由于存在中心节点导致系统出现性能瓶颈的问题。

图3 D-CACHEJOIN 算法的整体架构流程

2 D-CACHEJOIN 算法实验分析

2.1 性能评估模型

对于数据仓库的ETL 过程而言,主要关注的性能指标包括系统吞吐率、内存消耗等。本文研究中使用的分布式系统中,各个节点的作用主要是参与计算和数据存储等,大量运算如备份、保证数据一致性等只在内部进行,对内提供支持,对外只提供将多节点集群环境虚拟为单节点环境的接口,集群可以视为一个单节点环境。引入分布式系统直接导致算法增加的开销是一致性哈希函数计算的开销,为了实现负载均衡,令各节点随机接收流数据,然后相互转发数据。文献[6]中已经给出MESHJOIN 算法的成本模型,本文参考该模型,建立D-CACHEJOIN 算法的性能评估模型,模型参数如表1 所示。

表1 算法性能评估模型参数

根据表1 中的参数,下面逐个计算每个过程的开销,最终得到D-CACHEJOIN 算法的整体吞吐率μ(单位为元组/秒)。

1) 数据在网络中传输的开销。假设分布式系统由n个节点构成,单位时间内需转发的节点数为,因此网络中数据总的传输开销为。

3) 读取关系表R中k个分页的开销为CI/O(k)。

4) 移除w个匹配完成的元组及其指针的开销为wCE。

5) 从流缓冲区中读取w个元组的开销为wCS。

6) 将w个元组加入HS的开销为wCA。

7) 将HS中元组与R中元组匹配的开销为。

8) 输出结果的开销。结果输出时的开销和连接的成功率有关,成功率越高,需要输出的结果元组就越多,因此,最终输出结果的开销为。

由此可得最终D-CACHEJOIN 算法的整体吞吐率为

用nSP表示流探测阶段中每次迭代处理的元组数量,nDP表示磁盘探测阶段中每次迭代处理的元组数量,则算法的吞吐率还可以表示为

式(1)和式(2)能否精确表示流数据和关系表进行连接操作的开销的前提是算法必须只需要有限的内存就可以持续运行,同时流数据S中新到达的元组进入系统时,需要有剩余的缓冲区内存对其进行保存。更加详细的关于关系表R的分页数和流缓冲区中元组个数的理论论证可参考文献[6]。

2.2 算法成本模型

由式(2)可知,算法的吞吐率与R的设置情况、S的设置情况、流和磁盘缓冲区等因素有关,本节通过定量计算来描述算法的成本。

1)nSP的模型计算

在流检测阶段中,S加载到流缓冲区后立即与HR中的元组进行匹配,且优先与HR中较多次与R中元组成功匹配的元组进行,如果匹配成功,则直接生成结果输出;如果匹配失败,则该元组在后续的磁盘检测阶段尝试进行匹配。由此,nSP与HR、R中元组大小、R中元组个数、流缓冲区大小有关,其中流缓冲区大小在后续实验中得知极小(始终在0.1 MB 以下),因此忽略不计。

根据表1,nR为R中元组个数,为HR中元组个数。现实中在构建R和S时,由于其数据分布并不均匀,需要考虑其扭曲分布系数Zipf,如果Zipf 值为0,那么连接属性值完全均匀分布,随着Zipf 值的增大,数据的不均匀分布程度就越大。假设流数据S有S~Zipfian(1,n),其概率密度函数为,累积分布函数为,其中,Hx,1同理[20]。通过归一化处理,流检测阶段的匹配概率p1可表示为

其中,x和y是未知数。

当nhR变为原来的2 倍时,式(4)变为

将式(5)代入式(4),得φ1=2x,即x=lbφ1。

当nR变为原来的2 倍时,式(4)变为

将式(6)代入式(4),得ψ1=2y,即y=lbψ1。

因此,式(4)变为

若在n次迭代中算法处理的流元组总量为Sn,则有

2)nDP的模型计算

在流检测阶段,算法已经通过HR连接匹配了S中的部分元组,在磁盘检测阶段中,将使用常规方式进行关系表R和流数据S中元组的连接。由于R较大,算法分段对R进行读取,每次读取磁盘缓冲区大小nRB的数据段,由于S~Zipfian(1,n),因此可以逐段计算匹配概率后相加,表示为

记R中元组的分段数为N,由匹配概率之和经归一化后可得平均匹配概率p2为

类似于nSP的计算方式,p2可以用nRB、nR和nhR表示。现考察nRB、nhR和nR分别变为2 倍时p2的变化情况。假设此时p2分别增大一个常数因子θ2、增大一个常数因子φ2和减小一个常数因子ψ2,且有

和nSP中处理方法相同,此时可得x=lbφ2,y=lbψ2,z=lbθ2。

因此,式(11)可以变为

若哈希表中存储的流元组总量为,则有

确定nSP和nDP后,即可通过式(2)对算法进行调整。

2.3 实验设计

实验环境为4 台IBM POWER x-236 8841 服务器,Intel Xeon Gold 6248R CPU,64 GB 内存,Linux 64 位操作系统,具体配置情况如表2 所示。

表2 实验环境

环境部署的规模主要从数据访问峰值、吞吐量要求和响应时间等综合考量。基本思路是读写分离。通过解析查询语句,对于仅包括读操作的配置一个连接字符串到读服务器,对于写操作则配置另一个连接字符串到写服务器,将大规模数据的访问分流到多台服务器上,使应用中读取数据的速度和并发量显著提高,增加了系统响应速度,减少了服务器的压力,能够有效增加系统的稳定性和扩展性。在读写分离的基础上再进行负载均衡,具体过程是各节点通过专用网络进行连接,对每个服务器进行监测,获取资源占用情况,整个集群可以视为一台具有超高性能的独立服务器。

实验数据来自某运营商内部的原始话单数据,以此构建流数据S和关系表R,各包含约2 000 万条数据。

由于本文算法具有缓存模块和自动控制缓存中元组换入换出的阈值的功能,因此需要一个热身阶段。由于在实际场景中,程序开始后会持续地运行,因此热身阶段是可以接受的,后续实验中如果没有特殊说明,该系统热身阶段都会忽略。

本节设计实验测试了本文算法的性能,实验的具体软硬件条件已在表2 中列出,在此环境上搭建了具有8 个虚拟节点的分布式集群。通过将本文算法与传统的MESHJOIN 算法、CACHEJOIN 算法和较先进的Eunomia 分布式连接方法进行比较,对算法的执行效率进行实验考察,各实验运行5 次后取平均值作为实验结果。

2.4 实验结果及分析

实验1不同数据量对算法效率的影响

首先令集群开启4 个节点,处理不同数量级的数据,吞吐率对比如图4 所示,D-CACHEJOIN都取得了比其他3 种算法更好的吞吐率,这一方面是因为实验数据并不是均匀分布,而是服从Zipfian 分布的,其中一部分数据具有比其他数据更多的出现次数,没有对此进行额外考虑的算法无法适应该环境;另一方面是因为D-CACHEJOIN具有缓存模块,能够较好地处理数据倾斜的情况。同时,D-CACHEJOIN 的吞吐率在10 万、100 万和1 000 万条数据的情况下分别是Eunomia的1.30 倍、1.48 倍和1.65 倍,算法的优势随着数据量的增多而逐渐增大,这是因为数据量增多时,可能对磁盘的 I/O 次数逐渐增大,D-CACHEJOIN 越来越多地减少了实际上对于磁盘的I/O 次数。

图4 不同数据量下算法的吞吐率

现实中大多数的大型数据集是服从Zipfian 分布的[21],包括本文实验的数据集。相对于均匀分布的数据集,对服从Zipfian 分布的数据集进行实验有更大的实际意义,但为了实验的完整性,给出其他实验条件相同时对均匀数据的实验结果,如图5 所示。

图5 不同数据量下算法的吞吐率(均匀分布)

实验2可用内存总量对算法效率的影响

本文实验中可用内存的总量为磁盘缓冲区、流缓冲区、队列、HR和HS的大小之和,但本文实验中有两点不变:流缓冲区基本不变而且可以忽略,因为在所有情况下流缓冲区使用的内存不超过0.1 MB;H R保持不变,因为HR的大小会在很大程度上影响本文算法的效率,所以在后续实验中专门对HR的大小对算法效率的影响进行研究,故在本文实验中HR保持为R的1%,一旦HR满,如果再次有元组的匹配频率超过threshold Value,将可能会对HR中已有元组进行替换并可能引起threshold Value 的动态调整。可用内存总量对算法效率的影响如图6 所示。

图6 可用内存总量对算法效率的影响

从图6 可以看出,几种流连接算法的吞吐率都会随着可用内存总量的增加而提高,由于引入了缓存模块,CACHEJOIN 和D-CACHEJOIN 的吞吐率都要比MESHJOIN 高2 倍以上,而Eunomia 由于需要基于时间戳对流数据进行检查,因此需要较多的额外内存才能表现出较高的效率,内存较少时执行效率不足。另有其他实验显示,当提供大量内存时,Eunomia 的吞吐率已经超过了CACHEJOIN,但仍低于D-CACHEJOIN。考虑到实验的完整性,补充了内存较少时的情况,由于D-CACHEJOIN 需要存储哈希函数的计算结果以便后续使用,因此当可用内存较少时,D-CACHEJOIN 的吞吐率要略低于CACHEJOIN。由于本文研究的支撑系统具有充足的内存可供使用,同时鉴于现代分布式系统中如果有计算需要,通常都会配备充足的内存,因此一般不会出现内存严重不足的情况。当可用内存增大后,D-CACHEJOIN 的吞吐率逐渐超过CACHEJOIN,且随着内存的增加,前者吞吐率的增加速度要高于后者;当内存达到100 MB 时,D-CACHEJOIN 的吞吐率相比于CACHEJOIN 增加了10%以上。

实验3R的缓存比例对算法效率的影响

由于算法引入了缓存模块,因此可对R中频繁匹配的元组进行缓存,能够大大提升算法的效率,本文实验固定可用内存大小,研究R的缓存比例对算法效率的影响,实验结果如图7 所示。

图7 R 的缓存比例对算法效率的影响

根据图7 可以看出,当R的缓存比例达到10%时,D-CACHEJOIN 算法的吞吐率是MESHJOIN 算法的10 倍以上。当缓存比例较低时,D-CACHEJOIN算法的吞吐率仍然是MESHJOIN 算法的2 倍以上,由于没有引入缓存策略,MESHJOIN 和Eunomia的吞吐率保持不变。这说明引入缓存模块可以大大提升算法的性能。

实验4D-CACHEJOIN 算法在分布式集群中的扩展性

为了测试算法在分布式集群中的扩展情况,将集群中的节点数n由4 增加到8,并且改变算法处理的数据量,考察此时的执行效率,实验结果如图8 所示。

图8 算法吞吐率随节点扩展的变化情况

总体而言,各算法的吞吐率随着节点数的增多而增大,且D-CACHEJOIN 保持着最佳的执行效率。图8 中从左至右,D-CACHEJOIN 的吞吐率依次是Eunomia 的1.29 倍、1.15 倍、1.48 倍、1.36 倍、1.65 倍、1.50 倍,这表示随着集群中节点数的增加,D-CACHEJOIN 相对于Eunomia 在执行效率上的优势变小了(对其他2 种算法也有这一结论)。这是因为随着节点数的增多,每个节点上关系表R的片段就越小,用于磁盘 I/O 的时间就越短,而D-CACHEJOIN 恰恰减少了磁盘I/O 的次数,因此这一优势在节点数增加时变小了。但与对内存的访问相比,磁盘I/O 是非常消耗时间的操作,因此即使集群进一步扩容,只要磁盘上的关系表无法全部放入内存中,D-CACHEJOIN 算法仍然会有更优的执行效率。

综上,D-CACHEJOIN 算法总体上保持了最优的执行效率。当ETL 系统中数据的数量级发生比较大的变化时,由于对数据库的访问时间,即对磁盘的I/O 所需时间会随之增大,算法的吞吐率会随之下降,但本文算法的优势也会逐渐增大,因为本文算法减少了对磁盘的I/O 次数。为了拥有更高的效率,算法往往会进行更多消耗内存的额外工作,在现代处理海量数据的分布式系统中,通常都会配备足够的内存,同时本文算法拥有一定的自由度,可以根据实际情况设置缓存的大小,更好地贴合具体场景,本文算法一般能够很好地适应环境并良好运行。当分布式集群中新增或下线节点时,本文算法保持了更优的执行效率;当处理海量数据时,本文算法在分布式集群中具有良好的拓展性。

3 结束语

本文介绍了流数据接收过程中面临的主要问题,并简单介绍了已有的流连接算法的处理策略,在此基础上对算法进行改进,提出了一种把使用缓存的流连接策略应用于分布式环境的D-CACHEJOIN算法。此外,本文详细描述了该算法的执行架构并计算其性能开销,通过实验展示了该算法的执行效率,说明了该算法具有较好的适应性,在一般的大数据处理系统中能够良好运行,并且在分布式集群中具有良好的拓展性。

猜你喜欢
元组缓冲区磁盘
叶腊石聚合成型及其旋转磁盘的制作方法
Python核心语法
它的好 它的坏 详解动态磁盘
针对隐藏Web数据库的Skyline查询方法研究*
一种基于时间戳的简单表缩减算法∗
解决Windows磁盘签名冲突
海量数据上有效的top-kSkyline查询算法*
串行连续生产线的可用度与缓冲库存控制研究*
基于ARC的闪存数据库缓冲区算法①
Windows系统下动态磁盘卷的分析与研究