任桂禾 王 晶
1 北京邮电大学网络与交换技术国家重点实验室 北京 100876
2 东信北邮信息技术有限公司 北京 100191
Hadoop诞生于大数据时代,是Apache基金会受到Google开发的GFS(Google File System,谷歌文件系统)和MapReduce计算框架的启发引入的开源项目。Hadoop使用大量的廉价Linux PC机组成集群,可谓是大数据处理商用技术架构的开端。Hadoop作为经典的大数据离线处理技术架构,很好地满足了人们对于大数据的离线处理需求[1]。
然而,随着Web2.0的兴起,琳琅满目的各式应用和服务如雨后春笋般地涌现。这其中出现了以微博为代表的一批典型应用,海量的用户、碎片化的信息流、极快的传播速度,使得它们对业务实时性的要求大幅度提高[2]。当业务需求允许的时延降低到一定限度时,Hadoop架构会达到本身的瓶颈,已经不能满足大数据处理的需求。Twitter出于自身的业务需求开发了Storm实时处理框架,使用流式处理架构,对传统离线处理技术架构进行了变革。
Hadoop是优秀的大数据离线处理技术架构,主要采用的思想是“分而治之”,对大规模数据的计算进行分解,然后交由众多的计算节点分别完成,再统一汇总计算结果[3]。Hadoop架构通常的使用方式为批量收集输入数据,批量计算,然后批量吐出计算结果。然而,Hadoop结构在处理实时性要求较高的业务时,却显得力不从心。本章内容对Hadoop架构这种瓶颈的产生原因进行了探究。
Hadoop架构的核心组成部分是HDFS(Hadoop Distributed File System,Hadoop分布式文件系统)和MapReduce分布式计算框架。HDFS采用Master/Slave体系结构,在集群中由一个主节点充当NameNode,负责文件系统元数据的管理,其它多个子节点充当Datanode,负责存储实际的数据块[4]。如图1所示。
图1 HDFS架构
MapReduce分布式计算模型由JobTracker和TaskTracker两类服务进程实现,JobTracker负责任务的调度和管理,TaskTracker负责实际任务的执行。
在笔者实施的某运营监控系统项目中,业务需求为处理业务平台产生的海量用户数据,展现业务中PV(Page View,页面浏览量)、UV(Unique Visitor,独立访客)、营收和付费用户数等关键运营指标,供领导层实时了解运营状况,做出经营决策。在一期项目的需求描述中,允许的计算时延是15分钟。
根据需求,在一期项目的实施中,搭建了Hadoop平台与Hive数据仓库,通过编写Hive存储过程完成数据的处理,相当于是一个离线的批处理过程。不同的运营指标拥有不同的算法公式,各公式的复杂程度不同导致各运营指标算法复杂度不同,因此,所需要的计算时延也各不相同,如PV指标的计算公式相对简单,可以在5分钟内完成计算,而页面访问成功率指标的计算公式相对复杂,需要10分钟以上才能完成计算。项目到达二期阶段时,对实时性的要求有了进一步提高,允许的计算时延减少到5分钟。在这种应用场景下,Hadoop架构已经不能满足需要,无法在指定的时延内完成所有运营指标的计算。
在以上的应用场景中,Hadoop的瓶颈主要体现在以下两点。
1)MapReduce计算框架初始化较为耗时,并不适合小规模的批处理计算。因为MapReduce框架并非轻量级框架,在运行一个作业时,需要进行很多初始化的工作,主要包括检查作业的输入输出路径,将作业的输入数据分块,建立作业统计信息以及将作业代码的Jar文件和配置文件拷贝到HDFS上。当输入数据的规模很大时,框架初始化所耗费的时间远远小于计算所耗费的时间,所以初始化的时间可以忽略不计;而当输入数据的规模较小时,初始化所耗费的时间甚至超过了计算所耗费的时间,导致计算效率低下,产生了性能上的瓶颈。
2)Reduce任务的计算速度较慢。有的运营指标计算公式较为复杂,为之编写的Hive存储过程经Hive解释器解析后产生了Reduce任务,导致无法在指定的时延内完成计算。这是由于Reduce任务的计算过程分为三个阶段,分别是copy阶段、sort阶段和reduce阶段。其中,copy阶段要求每个计算节点从其它所有计算节点上抽取其所需的计算结果,如图2所示。copy操作需要占用大量的网络带宽,十分耗时,从而造成Reduce任务整体计算速度较慢。
图2 copy操作示意图
Storm的流式处理计算模式保证了任务只需进行一次初始化,就能够持续计算,同时使用了ZeroMQ作为底层消息队列,有效地提高了整体架构的数据处理效率,避免了Hadoop的瓶颈[5]。
与Hadoop主从架构一样,Storm也采用Master/Slave体系结构,分布式计算由Nimbus和Supervisor两类服务进程实现,Nimbus进程运行在集群的主节点,负责任务的指派和分发,Supervisor运行在集群的从节点,负责执行任务的具体部分。
Storm架构中使用Spout/Bolt编程模型来对消息进行流式处理。消息流是Storm中对数据的基本抽象,一个消息流是对一条输入数据的封装,源源不断输入的消息流以分布式的方式被处理。Spout组件是消息生产者,是Storm架构中的数据输入源头,它可以从多种异构数据源读取数据,并发射消息流。Bolt组件负责接收Spout组件发射的信息流,并完成具体的处理逻辑。在复杂的业务逻辑中可以串联多个Bolt组件,在每个Bolt组件中编写各自不同的功能,从而实现整体的处理逻辑[6]。
Storm架构和Hadoop架构的总体结构相似,各个组成部分的对比如表1所示。
表1 Storm架构与Hadoop架构对比
在Hadoop架构中,主从节点分别运行JobTracker和TaskTracker进程,在Storm架构中,主从节点分别运行Nimbus和Supervisor进程。在Hadoop架构中,应用程序的名称是Job,Hadoop将一个Job解析为若干Map和Reduce任务,每个Map或Reduce任务都由一个Child进程来运行,该Child进程是由TaskTracker在子节点上产生的子进程。在Storm架构中,应用程序的名称是Topology,Storm将一个Topology划分为若干个部分,每部分由一个Worker进程来运行,该Worker进程是Supervisor在子节点上产生的子进程,在每个Worker进程中存在着若干Spout和Bolt线程,分别负责Spout和Bolt组件的数据处理过程。
从应用程序的比较中可以明显地看到Hadoop和Storm架构的主要不同之处。在Hadoop架构中,应用程序Job代表着这样的作业:输入是确定的,作业可以在有限时间内完成,当作业完成时Job的生命周期走到终点,输出确定的计算结果。而在Storm架构中,Topology代表的并不是确定的作业,而是持续的计算过程。在确定的业务逻辑处理框架下,输入数据源源不断地进入系统,经过流式处理后以较低的延迟产生输出。如果不主动结束这个Topology或者关闭Storm集群,那么数据处理的过程就会持续地进行下去。
通过以上的分析,我们可以看到Storm架构是如何解决Hadoop架构瓶颈的。
1)Storm的Topology只需初始化一次。在将Topology提交到Storm集群的时候,集群会针对该Topology做一次初始化的工作。此后,在Topology运行过程中,对于输入数据而言,是没有计算框架初始化耗时的,有效地避免了计算框架初始化的时间损耗。
2)Storm使用ZeroMQ作为底层的消息队列来传递消息,保证消息能够得到快速的处理。同时,Storm采用内存计算模式,无需借助文件存储,直接通过网络直传中间计算结果,避免了组件之间传输数据的大量时间损耗。
根据业务实时性需求的变化,进行大数据处理技术架构由Hadoop向Storm变更时,需要进行业务逻辑开发变更和计算结果输出方式变更,在变更的同时要注意对开发成本和开发效率的考量。
当从Hadoop架构转向Storm架构后,业务逻辑需要进行重新开发。在Hadoop架构中,业务逻辑使用HiveQL语言开发。HiveQL是Hadoop平台提供的类SQL语言,供开发工程师编写存储过程以操作Hive数据仓库中的表和数据,从而完成所需的数据处理过程。在运行Hive存储过程时,Hive解释器会生成执行计划,将HiveQL语句解析成底层的MapReduce程序,提交给JobTracker去执行[7];因此,HiveQL的开发效率较高,开发工程师无需使用JAVA语言直接编写底层MapReduce程序,而且HiveQL的开发门槛也较低。传统的数据处理一般都是在关系型数据库如Oracle中进行,当需要将业务逻辑从Oracle平台迁移至Hive平台时,Oracle数据库开发工程师可以十分容易地进行Hive开发。
而从Hadoop架构转向Storm架构后,需要开发工程师使用JAVA语言来完成业务逻辑的二次开发,对开发效率和开发成本会产生一定的影响,这是项目规划中需要重点考量评估的一个关键点。
同样的业务逻辑,由Hadoop架构迁移至Storm架构中时,主要的工作量在于使用Storm编程组件实现HiveQL中可以直接使用的AVG、SUM、COUNT、DISTINCT以及GROUP BY等标准SQL操作。在实现这些功能模块时,可以巧妙地利用Storm架构的stream grouping特性。stream grouping定义了一系列分组方式,分组方式决定了消息流在各组件间如何传递,分组的类型主要包括shuffle grouping(随机分组)、fields grouping(字段分组)、all grouping(全部分组)和direct grouping(直接分组)等。
例如,可以使用fields grouping字段分组机制来实现GROUP BY操作的功能。在运营商业务逻辑中,经常需要统计分省指标,利用fields grouping机制实现的GROUP BY操作可以用来进行分省指标的计算。fields grouping是这样一种消息传递模式,在spout组件和bolt组件之间,按照消息中指定的某个字段来决定该消息分发至哪一个bolt。在统计分省指标时,可以将省份字段设置为分组的依据。这样,不同省份的消息可以进入不同省份对应的bolt中,然后在每个省份对应的bolt中对其进行处理,可以得到分省的计算指标。
在实际应用场景中,大数据分析处理的计算结果往往要写入到传统的关系型数据库中,以方便对计算结果进行展示和管理。在Hadoop架构中,可以使用Hadoop生态环境中的Sqoop工具来完成这一功能。Sqoop可以将计算结果从HDFS或Hive数据仓库传输至传统关系型数据库(如Oracle和Mysql),也可以将传统关系型数据库中的数据传输至HDFS或Hive数据仓库。出于程序简洁性的考虑,可以直接将Sqoop传输程序插入到Hive存储过程的结束处,在Hive计算过程完成后直接调用Sqoop传输程序来传送计算结果[8]。
在Storm架构中,输入数据源源不断地进入计算系统,每时每刻都在更新计算结果。Storm的设计出于计算速度的考量,采用了内存计算的模式,所以计算结果是存在于内存中的。因为是使用JAVA语言进行程序编写,所以可以直接使用JDBC的方式连接关系型数据库,来传输计算结果。可以在流式处理系统的最后增加一个bolt组件,来完成这一功能[9]。
如果在每条输入数据更新计算结果后,都写入关系型数据库的话,会对关系型数据库造成较大的压力。可以根据时延的要求,选择以固定的时间频率写入关系型数据库。
随着互联网的飞速发展,新的业务对数据处理的实时性要求不断提高。当传统的离线处理架构难以满足实时性要求的时候,可以适时考虑更换大数据处理技术架构来完成业务需求。信息社会瞬息万变,我们需要不断地变革和创新,才能为社会创造更好的互联网服务。
[1]崔杰,李陶深,兰红星.基于Hadoop的海量数据存储平台设计与开发[J].计算机研究与发展,2012(49):12-18
[2]李美敏.解读Web 2.0时代的微博文化[EB/OL].[2014-10-20].http://media.people.com.cn/GB/22114/206896/239176/17143067.html
[3]董新华,李瑞轩,周湾湾,等.Hadoop系统性能优化与功能增强综述[J].计算机研究与发展,2013(50):1-15
[4]林伟伟.一种改进的Hadoop数据放置策略[J].华南理工大学学报,2012(40):152-158
[5]赵建红.基于Twitter Storm的数据实时分析处理工具研究[J].商情,2013(8):274-275
[6]胡宇舟,范滨,顾学道,等.基于Storm的云计算在自动清分系统中的实时处理应用[J].2014(34):96-99
[7]沙恒,贴军.基于Hadoop子项目——Hive的云计算性能测试[J].软件导刊,2012(11):14-16
[8]NextMark.Sqoop在Hadoop和关系型数据库之间的数据转移[EB/OL].[2014-10-20].http://www.linuxidc.com/Linux/2014-02/97305.htm
[9]韦海清.浅谈Java通过JDBC连接Oracle数据库技术[J].计算机光盘软件与应用,2014(7):298-300