基于Spark的大数据分析工具Hive的研究

2018-01-04 11:35杨宁黄婷婷
计算机时代 2018年11期

杨宁 黄婷婷

摘 要: 随着数据规模的不断增大,传统的关系型数据库方法已经无法满足大数据量的数据查询需求,而基于Hadoop 平台的 Hive 数据仓库为海量数据分析提供了方便的操作。随着实时查询需求的增加,基于Spark的Hive操作得到了很好的应用。文章主要介绍了Hive on Spark的整合步骤以及与Hadoop运行模式的比较。对 MovieLens 数据集的实验测试显示,新模式的执行速度提高了17.42-46.35倍,这对进一步了解Hive的运行机制及海量数据的实时分析具有重要的意义。

关键词: Hadoop; Hive; Spark; 海量数据; 实时分析

中图分类号:TP399 文献标志码:A 文章编号:1006-8228(2018)11-31-05

Abstract: With the increasing size of the acquired data, the traditional relational database method can no longer meet the data query requirements of such large data volume, but the Hive data warehouse based on Hadoop platform provides convenient operation for massive data analysis. And with the increase in real-time query requirements, Spark-based Hive operations have been well applied. In this paper, Hive and Spark are integrated, and the integration steps of Hive on Spark and the comparison with the traditional running structure are introduced. The experimental test on MovieLens dataset shows that the execution speed of the new mode has increased by 17.42-46.35 times. This is of great significance for further understanding of Hive's operating mechanism and real-time analysis of massive data.

Key words: Hadoop; Hive; Spark; massive data; real-time analysis

0 引言

随着数据分析需求的不断增加,实时性查询越来越重要,但是基于Hadoop的Hive查询,耗时过长,这在实时性要求比较严格的业务中是无法使用的。Spark的使用,给Hive的实时查询提供了可能,如何更好的将Spark与Hive进行整合,从而得到更高的查询速度,是以后需要进一步研究的方向。

1 Hive

1.1 Hive的背景

Hive起源于Facebook。原因是Facebook有着大量的用户数据[1]需要进行处理。而Hadoop[2]是一个由MapReduce[3]模块实现的大数据处理工具,主要的应用场景是在构建数据仓库时,对数据执行抽取、转换和装载操作[4]。但是,由于MapReduce程序对于其他语言开发者来说相对麻烦。所以,Facebook研发了Hive,这将sql语句在Hadoop上执行成为了可能,达到了提高查询效率的目的。

1.2 Hive的模型

Hive运行时,将SQL语句进行解释、编译、优化并生成执行任务,默认情况下会将查询语句转化为MapReduce任务进而执行。在基于Spark的架构中,将转化为抽象的RDD,然后对相应的RDD再进行相关的处理。Hive 中的主要数据模型如下:表(Table)、外部表(External Table)、分区(Partition)、桶(Bucket)[5]。Hive中包含的主要组件如下:

Driver组件:主要有Compiler、Optimizer、Executor,可以将Hive语句进行编译、解析、优化,进而转化为相应的任务并提交給计算引擎进行处理。

MetaStore组件:存储着Hive的元数据信息,主要为关系型数据库。

用户接口:用于访问Hive。

2 Hive on Hadoop

2.1 Hadoop的背景

Google在2004年提出了最原始的分布式计算架构模型[6]:MapReduce,该模型主要用于大规模数据的并行处理。MapReduce模型主要分为Map和Reduce过程,主要的原理是将大规模数据处理作业拆分成多个可独立运行的Map任务,然后传输到多个处理机上进行分布处理,最后通过Reduce任务混洗合并,从而产生最终的输出文件。尽管MapReduce模型比较好的考虑了数据存储、调度、容错管理、负载均衡[7]等问题。但是它也存在不足,如占用过多的网络资源、磁盘读写耗费时间、异步性差等问题。

2.2 Hadoop的四大组件

Hadoop为可靠、可扩展的分布式开源软件。

Hadoop的四个组件如下:

Hadoop Common:支持其他Hadoop模块的程序。

HDFS:分布式文件系统,提供访问应用程序的数据。

Hadoop YARN:作业调度和集群资源管理的框架[8]。

Hadoop MapReduce:基于YARN的大型数据集并行处理系统。

2.3 Hive on Hadoop运行机制

Hive的客户端书写hql语句发起任务请求,然后将hql语句转化为mapreduce任务,通过资源管理器yarn,分发到各个节点上进行数据处理。这种运行模式的目的是使客户端主要集中进行查询语句的书写,而不用过多的关注底层的开发。具体执行流程如图1所示。

2.4 Hive on Hadoop的应用

由于Hadoop具有较高的延迟,而且在作业提交和调度的时候,需要大量的额外开销。所以,这种模式无法满足大数据集的低延迟查询。因此,该模式最佳使用场合是大数据集的离线批处理作业,例如,网络日志的离线分析。

3 Hive on Spark

3.1 Spark的背景介绍

Apache Spark[9]是基于内存计算的用于大规模数据处理的分析引擎。Spark中的核心抽象概念就是弹性分布式数据集RDD(resilient distributed datasets)[10],该数据集为只读型可恢复数据集。用户可以利用 Spark中的转换(transformation)和动作(action)操作对其进行处理,这其中也包括RDD的持久化操作,我们可以利用缓存的方式将其保存在内存[11]中不被回收。

RDD通过血统(lineage)关系来完成容错:主要的原理是丢失的RDD有足够的信息知道自己的父RDD,从而可以通过再次计算的方式从父RDD得到丢失的RDD。

3.2 Spark的四大特性

Spark具有四大特性如下:

快速性:相比较于Hadoop,官网给出的运行速度是提高了100倍。因为Spark使用DAG[12]调度程序、查询优化程序和物理执行引擎,所以实现批量数据和流式数据处理的高性能。

易用性:支持使用Java,Scala,Python,R和SQL等语言进行快速编写应用程序。

高可用性:Spark提供了很多库,包括SQL、DataFrame、MLlib[13]、GraphX[14]和Spark Streaming[15]。我们可以在同一个应用程序中组合使用这些库。

跨平台性:Spark可以运行在Hadoop、Mesos、或者Kubernetes中;可以从HDFS、HBase、Hive和其他数百个数据源中访问数据。

3.3 Hive on Spark的运行机制

我们在Spark平台运行Hive时,有远程和本地两种方式。Hive on Spark主要的设计思路是,尽可能重用Hive逻辑计算层面的功能。在运行生成物理计划开始时,就提供一整套针对Spark的实现,目的是使Hive的查询可以作为Spark任务来执行。

设计原则如下:

⑴ 尽量保持Hive源码的完整性:主要为了不影响Hive目前对MapReduce和Tez的支持;

⑵ 利用Hive语句:主要指使用Hive的执行语句对数据进行操作,使主要的计算逻辑仍由Hive提供;

⑶ 对Spark具有良好的松耦合性:使用中可以直接利用命令进行计算引擎的切换。

图2是一个关于两表join的hive操作执行过程,具体的处理过程如下:

这个join查询在进行逻辑计划过程中生成了两个MapWork和一个ReduceWork。TS读取表记录,FIL进行过滤;RS对数据进行分发和排序,JOIN算子对RS分组排序后的数据进行join运算,最后通过FS算子输出结果。

在执行SparkTask时,将各个MapWork和ReduceWork包装成函数应用到RDD上,RDD主要由Hive表生成。对于存在依赖关系的Work之间,需要调用Shuffle操作并进行stage的相应划分。图2右为RDD的具体执行过程,首先通过Union操作,然后执行Shuffle操作,最后得到相应的RDD,foreachAsync的作用是将任务提交到Spark引擎上进行处理。

3.4 hive on spark的应用

如今,数据的来源和特性不断改变,传统的处理方式已不再适用,并且当使用过程中碰到迭代操作时,基于MapReduce的Hive查询根本无法满足快速处理的要求。但是,对于实时查询业务,基于Spark的大数据分析工具Hive有著突出的表现,特别是对于一些复杂的操作,如迭代操作。

4 相关工作

4.1 Hive on Spark的集群搭建准备

Spark的编译。要使用Hive on Spark,所用的Spark版本必须不包含Hive的相关jar包。需要下载Spark源码进行重新编译。

我们这里用的Spark源码是从官网下载的spark-1.6.2的源码包。编译前请确保已经安装JDK、Maven和Scala,Maven为3.3.3及以上版本,并配置环境变量。进入到源码根目录下,利用make-distribution.sh命令进行编译,注意Hive和Spark的版本号要匹配。

4.2 Hive on Spark的搭建

本次实验中,主要搭建了三台虚拟机,其中Hive只需安装在其中一台机上,启动Hive时,注意将MySQL驱动包上传到Hive的lib目录下;然后,在Hive的机器上,将Spark的lib目录下的assembly包拷贝到Hive的lib目录下,目的是执行Hive操作就不需要再手动启动Spark。初始化数据库,启动Hive。至此,安装结束,进行实验测试。

5 两种模式在具体查询分析中的比较

5.1 影评案例的测试

主要使用了三张表movies.dat,ratings.dat,users.dat,我们主要对两张表以及三张表的join操作进行了测试,具体操作如表1,表2所示。

5.2 实验环境

实验采用在虚拟机建立3台机器测试,配置如下,电脑硬件:(英特尔)Intel(R) Core(TM) i5-3210M CPU@2.50GHz(2500 Mh),内存8.0GB,操作系统是 Microsoft Windows 7旗舰版(64位/Service Pack 1)。

三台虚拟机的信息具体如表3。

三张表movies.dat,ratings.dat,users.dat的數据量分别为3883行数据,1000209行数据,6040行数据。测试结果如表4,表5所示。对于hadoop的具体执行过程如表6所示。

5.3 性能比较与总结

我们通过具体的案例分析,将结果用图表进行显示,每次运行的时间单位为秒,具体如图3、图4所示。

6 实验总结和期望

实验中,我们看到利用Spark作为计算引擎比MapReduce的执行速度快了17.421-46.347倍。基于Hadoop的执行过程具体如表6所示,总体运行时间都比较长。但是当基于Spark引擎运行时,每条语句的执行时间都明显降低了,特别是执行sql6语句的时候,时间减少的更加明显。

通过实验,基于Spark的Hive语句执行的效果明显好于Hadoop,特别是对于复杂的查询语句,如产生多个map和reduce过程的语句,Spark的表现更加突出。Spark的应用使Hive的实时查询成为了可能。这也对海量数据的实时分析具有重要的意义。

参考文献(References):

[1] 李学龙,龚海刚.大数据系统综述[J].中国科学:信息科学,2015.45(1):1-44

[2] 陆嘉恒.Hadoop实战[M].机械工业出版社,2011.

[3] 宋杰.MapReduce大数据处理平台与算法研究进展[J].软件学报,2017.28(3).

[4] El-Sappagh S H A, Hendawi A M A, Bastawissy A H E. A proposed modelfor data warehouse ETL processes[J]. Journal of King Saud University Computer & Information Sciences,2011.23(2):91-104

[5] Dean J,Ghemawat S. MapReduce:simplified data process-ing on largeclusters[J].Communications of the ACM,2008.51(1):107-113

[6] 董西成.Hadoop技术内幕深入解析MapReduce架构设计与实现原理[M].机械工业出版,2013.

[7] 陈林,Hadoop异构集群下的负载均衡算法研究[J].现代计算机,2018.5:60-62

[8] 方宸.基于YARN网络数据分析系统实现与应用研究[D].华中科技大学,2014.

[9] 高彦杰.Spark大数据处理[M].机械工业出版社,2014.

[10] Zaharia M, Chowdhury M, Das T, et al. Resilient distributed datasets: a faulttolerant abstraction for in-memory cluster computing [C]// Proc of Conference on Networked Systems Design and Implementation.[S. l. ]:USENIX Association, 2012:2

[11] Han Z, Zhang Y. Spark: A Big Data Processing Platform Based on MemoryComputing [C]// Proc of International Symposium on Parallel Architectures.[S. l.]:IEEE Press,2015:172-176

[12] 袁景凌,熊盛武,饶文碧.Spark案例与实验教程[M].武汉大学出版社,2017.

[13] B Yavuz, B Yavuz, B Yavuz, E Sparks, D Liu.MLlib: machine learning in apache spark[J]. Journal of Machine Learning Research,2016.17(1):1235-1241

[14] 孙海.Spark的图计算框架:GraphX[J],现代计算机(专业版),2017.9:120-122,127

[15] 陆世鹏,基于Spark Streaming的海量日志实时处理系统的设计[J].电子产品可靠性与环境试验,2017.35(5).