徐剑等
摘 要: Hadoop系统在处理多表链接问题时,每轮都会将大量的中间结果写入本地磁盘,从而严重降低了系统的处理效率。为解决该问题,提出一种“替换?查询”方法,该方法通过对链接表建立索引,将预输出的元组集替换为索引信息输出到中间结果,以索引的形式参与多表链接,以此减少中间结果的I/O代价。运用缓冲池、二次排序和多线程技术对索引信息进行优化管理,加快索引查询速度。最后在TPC?H数据集上,设计了与原Hadoop的对比实验,结果表明该方法可减少35.5%的存储空间,提高12.9%的运行效率。
关键词: 多表链接; 替换?查询; 索引; 缓冲池; 二次排序
中图分类号: TN911?34; TP311 文献标识码: A 文章编号: 1004?373X(2014)06?0090?05
0 引 言
随着互联网应用的快速发展,海量数据的存储与处理成为研究人员面临的严峻挑战。近年来,谷歌提出的分布式文件系统GFS、并行编程框架Map?Reduce。基于此,开源社区Apache的Hadoop项目实现了分布式文件系统HDFS和并行编程框架Hadoop Map?Reduce。Hadoop因其良好的可扩展性、高可用性以及容错性,广泛地应用在IBM,百度,360,阿里巴巴等互联网公司的海量数据存储与分析应用中,是目前使用最为广泛的云计算平台之一。
针对分布式计算环境中多表链接运算的问题,文献[1?2]提出一种在单个Map?Reduce任务中同时链接多个关系表的优化算法,文献[3]提出一种基于预排序的多表链接算法。这些算法都存在中间结果快速增加的问题,一方面,为了防止链接属性缺失现象的出现,本次链接要尽可能的将下一步或者下几步链接相关的属性信息输出到中间结果;另一方面,每一步链接操作之后,中间结果的属性列增加,伴随着链接次数的增加,输出到中间结果的属性列越来越多;最后,子链接的中间结果要被多次复制,溢写磁盘或者网络传输到其他节点。这些因素使得中间结果快速膨胀,从而导致大量的内存占用,过多的磁盘溢写和不必要的网络传输代价,严重降低了系统性能。
为了解决多表链接中间结果快速增加的问题,本文提出“替换?查询”方法,该方法具体如下,在每一次子链接结果集输出到中间结果之前,系统对结果集中的记录元组建立非侵入式的Hadoop分布式文件索引(Non?invasive Index of Hadoop Distributed File System,NIHDFS),在保证结果集信息完整性的前提下,用索引替代记录元组信息并输出到中间结果,记录元组以索引的形式参与以后的链接过程;当需要记录元组信息时,该方法能够通过查询操作快速的实现元组信息恢复。在索引的平均长度比记录元组的平均长度短或者链接表的数量较多的情况下,该方法能够有效地减少中间结果的数据规模并降低其增长速度。
在Hadoop?1.1.2上,修改系统源码并实现“替换?查询”多步链接的处理方法,本文将改造后的系统命名为Hadoop?1.1.2?NIHDFS。在与原系统的对比实验中,“替换?查询”的多表链接处理方法,能够有效的提升Hadoop处理多表链接的性能。
1 Hadoop?1.1.2?NIHDFS系统整体设计
原Hadoop在处理多表链接问题时,Map阶段对输入的记录元组进行键?值解析,解析出真实的记录元组之后便直接进行Map端的链接操作。Hadoop?1.1.2?NIHDFS系统则不然,在执行Map端链接操作之前,系统要对输入的信息依照“替换?查询”方法进行处理。该处理包含3个部分,分别是对链接表建立NIHDFS索引,替换链接表元组信息以及元组信息的快速查询。他们分别由索引化模块,替换模块和查询模块来完成,如图1所示。
T3是原始链接表,当该链接表进入系统后,先后进行索引化和替换操作,并用记录元组的索引信息和本次链接的属性信息替换原始记录元组,然后再进行Map端链接操作。Output_1是子链接的链接结果,IndexT1_1表示链接表T1中某个记录元组的索引,IndexT1_1:IndexT2_1表示链接表T1和T2的一条链接结果,其中只存储着2条记录对应的索引信息。当Output_1作为该系统的输入时,该系统需要对其中的索引信息进行查询,恢复出原始记录元组并进行解析,然后再进行Map端的链接操作。Map端和Reduce端链接操作的操作内容由具体的多表链接算法来决定。
2 NIHDFS索引
HDFS(Hadoop Distributed File System)是被设计成适合运行在通用硬件上的分布式文件系统。本文以HDFS为基础,建立非侵入式的Hadoop分布式文件系统索引,即NIHDFS。它是根据链接表各记录元组在HDFS存储的物理位置信息建立起来的预链接索引。
NIHDFS主要包含文件的统一资源定位符(File URL),数据块间偏移量(Block Offset),块内记录偏移量(Record Offset),以及记录元组长度(Record Length)4个部分,结构如图2所示。统一资源定位符(File URL)是文件在HDFS中存储的逻辑路径,该路径可以选择系统默认值,也可以由用户定义。数据块偏移量(Block Offset)是索引对应记录所在的数据块在文件中的起始偏移量。针对特定数据块中的不同记录,它们的长度和存储的物理位置各不相同,本文用块内记录偏移量(Record Offset)来存储记录在数据块中的位置信息。记录元组长度(Record Length)指记录元组的大小。
如在HDFS中存储着一个维表,该表在HDFS中的逻辑路径是URL;它被HDFS切分成3个数据块,分别是Block_A,Block_B,Block_C,起始的偏移量分别是Offset_A,Offset_B,Offset_C;某条记录存储在Block_B中,块内偏移量是Offset_rc,该记录所对应索引是“URL+ Offset_B + Offset_rc + Record_tmp.Length”。
由于图2所示的索引中只包含一个数据块信息,当维表中的记录元组被切分到两个数据块上时,根据图2建立的索引只能够恢复记录元组的部分信息,这显然是不正确的。这种情况下,本文特别设计了如图3所示的索引,该索引中包含有2个数据块偏移量,块内记录元组偏移量和记录元组的长度信息,因此它能够完整的存储记录的位置信息。由于记录元组的后半部分数据存储在第2个数据块的起始位置上,所以记录在第2个数据块的块内记录偏移量的值是零。
3 基于NIHDFS的“替换?查询”方法
“替换?查询”的多表链接处理是通过索引化,替换和查询3个模块来实现的,如图1所示。
3.1 索引化
在处理多表链接问题时,Map?Reduce任务的输入有2种类型,一种是前面某个Map?Reduce任务产生的链接结果,一种是原始链接表。由于链接结果中存储的是记录元组的索引信息,所以无需进行索引化操作。当输入是原始链接表时,要根据NIHDFS索引结构对表中的记录元组建立索引。当被索引化的记录元组被切分到2个数据块时,建立图3中所示的索引。
3.2 替换
根据索引化过程中对记录元组建立的NIHDFS索引,用索引信息和本次链接的链接属性组成的键?值对替换掉原始的记录元组,并保证替换操作的正确性和索引信息的完整性。在之后的链接操作中,如其他经典的多表链接算法一样,链接属性主导着整个链接过程,索引信息作为附属信息或被输出到中间结果,或被发送到其他的节点。
3.3 查询
当Map?Reduce任务的输入是链接结果时,因为链接结果中存储的是记录元组的一系列索引信息,并不包含本次链接的链接属性信息。此时要对NIHDFS索引执行查询操作,到达获取链接属性信息的目的,系统将启动专门线程来完成该查询操作。
查询过程如图4所示,TaskTracker是多表链接操作的任务执行节点,该节点中运行着多个Map或Reduce任务,这些任务提出查询请求并将该请求和索引信息打包发送到系统的文件管理中心,即NameNode。
NameNode对索引信息进行解析,获取文件名;查询HDFS中隶属于该文件的所有数据块及副本信息,并从中随机选择一个数据块来响应查询请求;将该数据块的ID以及NIHDFS索引信息发送给数据块所在的机器节点,即查询服务节点。
查询服务节点收到查询请求信息之后,首先根据请求信息中的数据块ID和路径信息在本地定位到该数据块;其次,对NIHDFS索引信息进行解析,依据块内偏移量,准确定位记录元组的物理位置;最后,根据记录元组的起始位置和记录长度,获取完整的元组信息,并将其回送到请求节点。查询请求节点对接受到的记录元组进行解析,从中获取本次链接所需的链接属性信息,从而完成整个查询流程。
当遇到如图4所示的索引时,需进行两次查询操作,将两次查询结果按照先后顺序整合起来,得到完整的记录元祖信息。
4 整体系统优化
4.1 NIHDFS优化
在对NIHDFS索引执行查询操作是,File URL是不可或缺的。隶属于同一个文件的所有记录元组都对应着一个相同的File URL,这导致File URL被多次重复存储,由于File URL相对较长,多次重复存储所占用的磁盘空间是不可忽略的。为了解决该问题,本文用哈希方法把文件标示符集合S与File URL集合做成一一映射,用文件标示符S替代File URL,从而减少File URL长度,降低磁盘消耗。
采用预处理添加分隔符的方法对索引中的块内记录元组长度信息(Record Length)进行优化,具体方法如下,在上传HDFS之前,对链接表进行预处理。一方面,将一条完整的记录元组信息存储为一行,行与行之间的回车换行符作为元组与元组之间的分隔符,用分隔符替代块内记录长度。当元组被载入内存进行索引化操作或者查询服务节点响应查询请求获取记录元组时,都以分隔符作为元组记录结束的标志,而不必关心各记录元组的长度。将记录长度信息从索引中去掉,达到简化索引结构的目的。另一方面,在不同属性之间添加分隔符,降低Map?Reduce任务解析属性信息的难度。
4.2 查询过程优化
4.2.1 缓冲池法
链接过程中,部分记录元组在短时间内会被多次重复查询,每次都要执行完整的查询过程,这是不必要的。本文采用缓冲池法对其进行优化,经常被查询的记录元组存放于内存缓存池中,当查询请求到来时,先检查缓存池中是否存在该记录元组。若存在,则直接从缓存池中提取,否则才执行完整的查询操作,获取元组信息。采用LRU作为缓存池中记录元组信息的更新算法。
4.2.2 二次排序法
在数据块中定位记录元组时,文件指针的移动操作是很频繁的。由于文件指针的单向向下的移动特性,当访问的内容不是按照偏移量由小到大有序时,指针无效移动次数会相当高。本文采用二次排序的方法对系统进行优化。在原Hadoop系统上添加二次排序功能模块,用以对中间结果中索引信息进行排序。排序策略如下:依照数据块间偏移量的大小,对数据块从小到大进行排序,即一次排序;同一个数据块内的记录,依照块内记录元组偏移量的大小,按从小到大顺序进行排序,即二次排序。有序的中间结果能够大幅减少查询的过程中文件指针无效移动的次数,从而提高定位效率。由于Map?Reduce过程中会对记录元组进行排序,二次排序在此基础上进行,开销不大。
4.2.3 并行执行
链接过程中,查询记录所在的物理位置集中
于数据块的某些区域,并且在一定的时间范围内该区域被查询访问的频率很高,本文把这样的区域称为查询热区。采用并行执行的思想,把单个数据块查询热区的数量上限设置为M,对于含有N个查询热区的数据块来说,如果N≤M,那么就开启N个线程,每一个线程中设置一个指向对应热区的文件指针,该指针只响应本热区的记录元组查询请求。当N>M时,采用随机的方法选取两个相邻的热区并将其合并成一个热区,同时对N执行减一操作,直到N≤M,然后采取与N≤M时相同的操作。查询热区的并行执行方法,一方面使得数据块拥有同时响应多个查询请求的能力,提升查询操作的并行度,在查询请求集中爆发时依然能够保证查询效率;另一方面,它将指针的定位操作集中到数据块的查询热区上,从而避免了热区之间大量的无效指针移位,提升定位效率。
5 实验部分
5.1 实验环境
本实验运行在由五台服务器搭建的集群上,机器均为同等配置。服务器硬件环境是:1.6 GHz的八核CPU,16 GB内存,150 GB硬盘。软件环境:ubuntu10.10,JDK1.6,Hadoop?1.1.2,以及Hadoop?1.1.2?NDFSI。实验数据是TPC?H的基准数据集,选用nation, region,customer,orders四张表作为输入数据,数据规模为397.6M。
5.2 实验内容以及结果分析
两个系统分别执行了四张表的链接操作,链接的具体过程如图5所示。
链接算法是经典的Reduce Join算法,两个系统分别执行了五遍该算法,将各个阶段的统计信息整合求取平均值,并将其填入表1中。由于Hadoop?1.1.2?NIHDFS系统的最终结果是一系列NIHDFS索引信息,需要将其恢复为记录元组,该阶段消耗的时间是t4,产生的中间结果是size4,原系统在该阶段不需要做任何操作。
Hadoop?1.1.2?NIHDFS系统产生的中间结果size1,size2,size3远比原系统要小,链接操作所消耗的时间也比原系统要小很多,空间和时间消耗对比如图6和图7所示。与原系统相比,新系统空间性能提升35.5%,时间性能提升12.9%,时间和空间整体对比如图8所示。本实验表明,“替换?查询”的多表链接处理方法能够有效地减少中间结果的数据规模和增长速度,提升系统效率。
6 结 语
本文通过对记录元组建立NIHDFS索引,用索引替代冗长的记录元组信息参与链接,并对索引结构进行优化,从而大大减少中间结果的数据规模。
运用缓冲池,二次排序以及并行执行的方法对查询过程进行优化,该方法适用于链接表属性较多,记录元组长度较长的场合。
参考文献
[1] AFRATI F, ULLMAN J. Optimizing joins in a map?reduce environment [C]//Proceedings of 2010 EDBT. New York: ACM, 2010: 99?110.
[2] JIANG Da?wei, TUNG A, CHEN Gang. Map?join?reduce: towards scalable and efficient data analysis on large clusters [J]. IEEE Transactions on Knowledge and Data Engineering, 2010, 23(9): 1299?1311.
[3] LIN Yu?ting, AGRAWAL D, CHEN Chun, et a1. Llama: leveraging columnar storage for scalable join [C]// Proceedings of 2011 ACM SIGMOD International Conference on Management of Data. New York: ACM, 2011: 861?972.
[4] 赵保学,李战怀,陈群,等.基于共享的MapReduce多查询优化技术[J].计算机应用研究,2013(5):74?76.
[5] 赵保学,李战怀,陈群,等.可扩展Hadoop任务分配模块的研究与实现[C]//第29届中国数据库学术会议论文集(B辑)(NDBC2012).合肥:知识与数据工程实验室,2012:83?85.
[6] 林大云.基于Hadoop的微博信息挖掘[J].计算机光盘软件与应用,2012(1):7?8.
[7] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large cluster [J]. Communications of the ACM, 2008, 51(1): 107?113.
[8] 王珊,王会举,覃雄派,等.架构大数据:挑战、现状与展望[J].计算机学报,2011,34(10):1741?1751.
5 实验部分
5.1 实验环境
本实验运行在由五台服务器搭建的集群上,机器均为同等配置。服务器硬件环境是:1.6 GHz的八核CPU,16 GB内存,150 GB硬盘。软件环境:ubuntu10.10,JDK1.6,Hadoop?1.1.2,以及Hadoop?1.1.2?NDFSI。实验数据是TPC?H的基准数据集,选用nation, region,customer,orders四张表作为输入数据,数据规模为397.6M。
5.2 实验内容以及结果分析
两个系统分别执行了四张表的链接操作,链接的具体过程如图5所示。
链接算法是经典的Reduce Join算法,两个系统分别执行了五遍该算法,将各个阶段的统计信息整合求取平均值,并将其填入表1中。由于Hadoop?1.1.2?NIHDFS系统的最终结果是一系列NIHDFS索引信息,需要将其恢复为记录元组,该阶段消耗的时间是t4,产生的中间结果是size4,原系统在该阶段不需要做任何操作。
Hadoop?1.1.2?NIHDFS系统产生的中间结果size1,size2,size3远比原系统要小,链接操作所消耗的时间也比原系统要小很多,空间和时间消耗对比如图6和图7所示。与原系统相比,新系统空间性能提升35.5%,时间性能提升12.9%,时间和空间整体对比如图8所示。本实验表明,“替换?查询”的多表链接处理方法能够有效地减少中间结果的数据规模和增长速度,提升系统效率。
6 结 语
本文通过对记录元组建立NIHDFS索引,用索引替代冗长的记录元组信息参与链接,并对索引结构进行优化,从而大大减少中间结果的数据规模。
运用缓冲池,二次排序以及并行执行的方法对查询过程进行优化,该方法适用于链接表属性较多,记录元组长度较长的场合。
参考文献
[1] AFRATI F, ULLMAN J. Optimizing joins in a map?reduce environment [C]//Proceedings of 2010 EDBT. New York: ACM, 2010: 99?110.
[2] JIANG Da?wei, TUNG A, CHEN Gang. Map?join?reduce: towards scalable and efficient data analysis on large clusters [J]. IEEE Transactions on Knowledge and Data Engineering, 2010, 23(9): 1299?1311.
[3] LIN Yu?ting, AGRAWAL D, CHEN Chun, et a1. Llama: leveraging columnar storage for scalable join [C]// Proceedings of 2011 ACM SIGMOD International Conference on Management of Data. New York: ACM, 2011: 861?972.
[4] 赵保学,李战怀,陈群,等.基于共享的MapReduce多查询优化技术[J].计算机应用研究,2013(5):74?76.
[5] 赵保学,李战怀,陈群,等.可扩展Hadoop任务分配模块的研究与实现[C]//第29届中国数据库学术会议论文集(B辑)(NDBC2012).合肥:知识与数据工程实验室,2012:83?85.
[6] 林大云.基于Hadoop的微博信息挖掘[J].计算机光盘软件与应用,2012(1):7?8.
[7] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large cluster [J]. Communications of the ACM, 2008, 51(1): 107?113.
[8] 王珊,王会举,覃雄派,等.架构大数据:挑战、现状与展望[J].计算机学报,2011,34(10):1741?1751.
5 实验部分
5.1 实验环境
本实验运行在由五台服务器搭建的集群上,机器均为同等配置。服务器硬件环境是:1.6 GHz的八核CPU,16 GB内存,150 GB硬盘。软件环境:ubuntu10.10,JDK1.6,Hadoop?1.1.2,以及Hadoop?1.1.2?NDFSI。实验数据是TPC?H的基准数据集,选用nation, region,customer,orders四张表作为输入数据,数据规模为397.6M。
5.2 实验内容以及结果分析
两个系统分别执行了四张表的链接操作,链接的具体过程如图5所示。
链接算法是经典的Reduce Join算法,两个系统分别执行了五遍该算法,将各个阶段的统计信息整合求取平均值,并将其填入表1中。由于Hadoop?1.1.2?NIHDFS系统的最终结果是一系列NIHDFS索引信息,需要将其恢复为记录元组,该阶段消耗的时间是t4,产生的中间结果是size4,原系统在该阶段不需要做任何操作。
Hadoop?1.1.2?NIHDFS系统产生的中间结果size1,size2,size3远比原系统要小,链接操作所消耗的时间也比原系统要小很多,空间和时间消耗对比如图6和图7所示。与原系统相比,新系统空间性能提升35.5%,时间性能提升12.9%,时间和空间整体对比如图8所示。本实验表明,“替换?查询”的多表链接处理方法能够有效地减少中间结果的数据规模和增长速度,提升系统效率。
6 结 语
本文通过对记录元组建立NIHDFS索引,用索引替代冗长的记录元组信息参与链接,并对索引结构进行优化,从而大大减少中间结果的数据规模。
运用缓冲池,二次排序以及并行执行的方法对查询过程进行优化,该方法适用于链接表属性较多,记录元组长度较长的场合。
参考文献
[1] AFRATI F, ULLMAN J. Optimizing joins in a map?reduce environment [C]//Proceedings of 2010 EDBT. New York: ACM, 2010: 99?110.
[2] JIANG Da?wei, TUNG A, CHEN Gang. Map?join?reduce: towards scalable and efficient data analysis on large clusters [J]. IEEE Transactions on Knowledge and Data Engineering, 2010, 23(9): 1299?1311.
[3] LIN Yu?ting, AGRAWAL D, CHEN Chun, et a1. Llama: leveraging columnar storage for scalable join [C]// Proceedings of 2011 ACM SIGMOD International Conference on Management of Data. New York: ACM, 2011: 861?972.
[4] 赵保学,李战怀,陈群,等.基于共享的MapReduce多查询优化技术[J].计算机应用研究,2013(5):74?76.
[5] 赵保学,李战怀,陈群,等.可扩展Hadoop任务分配模块的研究与实现[C]//第29届中国数据库学术会议论文集(B辑)(NDBC2012).合肥:知识与数据工程实验室,2012:83?85.
[6] 林大云.基于Hadoop的微博信息挖掘[J].计算机光盘软件与应用,2012(1):7?8.
[7] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large cluster [J]. Communications of the ACM, 2008, 51(1): 107?113.
[8] 王珊,王会举,覃雄派,等.架构大数据:挑战、现状与展望[J].计算机学报,2011,34(10):1741?1751.