摘 要:针对电信大数据在流动人口统计中的处理需求,采用Intel?Hadoop发行版,设计Hive数据仓库并进行优化,重点对性能影响较大的join连接和数据倾斜问题进行了优化。实验表明,对于TB级数据,简单统计如count、sum等可在10分钟以内完成,聚合统计如join、group by等可在30分钟左右完成,能有效支撑大数据环境下的流动人口统计和监测。
关键词:Hive;优化;join;数据倾斜
中图分类号:TP301 文献标识码:A
1 引言(Introduction)
电信运营商在移动通信业务运营过程中,获取了大量客观、真实的用户历史数据,这些历史数据可以客观反映用户的消费行为,也可以反映影响用户消费行为的内外部因素的变化情况[1]。根据移动通信客户的来话与去话等话务信息,结合客户身份资料,可以实现对特定区域人口的流入、流出情况及流动类型等进行分析。
然而,基于移动通信数据的流动人口统计面临诸多挑战:①数据源多样化:CDR(语音、SMS、GPRS、3G、4G等)、计费信息、客户信息、基站参数等;②数据量大:高达360TB原始数据(某省电信公司);③数据增长快速:2TB/天。通信数据呈现出大数据的特征,既有的技术架构和路线,已不能处理如此海量的电信数据。
近年来,涌现出了众多的大数据处理架构,其中Hadoop开源架构应用最广泛,在移动、电信等部门通过部署Hadoop架构开展电信大数据服务取得了一定的成效。本文针对电信大数据在流动人口统计中的处理需求,采用Intel?Hadoop发行版,设计Hive数据仓库并进行优化,重点对性能影响较大的join连接和数据倾斜问题进行了优化,实现海量数据的高效查询和统计,满足流动人口的快速统计和分析。
2 Hive数据仓库设计(Hive data warehouse design)
移动通信大数据的流动人口业务需求分析:移动通信数据的抽取、转换和导入;基于日、月、年的报表统计和数据规模;数据仓库30TB数据。现方案采用10台服务器,以实现数据的高速装载、查询和统计分析,如图1所示。
图1 Hive数据仓库设计
Fig.1 Hive data warehouse design
Hive是一个建立在Hadoop之上的数据仓库,用于查询和分析结构化海量数据。采用HDFS进行数据存储和Map/Reduce进行数据操作。基本特点包括:
(1)提供类似于SQL的查询语言。
(2)高扩展性(scale-out),动态扩容无须停机。
(3)针对海量数据的高性能查询和分析系统。
(4)提供灵活的扩展性。
(5)复杂数据类型,扩展函数和脚本等。
在运行count、sum等聚合函数进行统计计算时发现,将数据从普通数据库导入Hive中,分区的个数以及各分区数据量的均衡性会影响Hive的性能。解决办法就是给导入的表增加一个自增的int类型的字段,用这个字段来进行数据分割,最后得到的分区就是均衡的,如图2所示。
图2 数据分区
Fig.2 Data partition
3 Hive性能分析和优化(Hive performance analysis
and optimization)
Hadoop的分配优化主要包含以下三个层面:①底层Map和Reduce的参数调优;②Hive内部逻辑优化;③SQL代码逻辑优化。
3.1 Map/Reduce端的优化
Map/Reduce端的优化主要通过分析各个可调参数在Map/Reduce任务运行过程中起到的作用,通过改变参数大小优化底层分配策略。
表1 Map side调优参数表
Tab.1 Tuning parameter table of map side
选项 类型 默认值 描述
io.sort.mb Int 100 缓存Map中间结果的buffer大小(in MB)
io.sort.record.percent float 0.05 io.sort.mb中间来保存Map output记录边界的百分比
io.sort.spill.percent float 0.80 Map开始做spill操作的阀值
io.sort.factor Int 10 做merge操作时同时操作的stream数的上限
min.num.spill.for.combine Int 3 Combiner函数运行的最小spill数
Mapred.compress.map.output Boolean False Map中间结果是否采用压缩
Mapred.map.output.compression.codec Class name Org.apache.Haddoop.io.compress.defaultcodec Map中间结果的压缩格式
表2 Reduce side调优参数表
Tab.2 Tuning parameter table of Reduce side
选项 类型 默认值 描述
Mapred.reduce.parallel.copies Int 5 每个Reduce并行下载Map结果的最大线程数
Mapred.reduce.copy.backoff Int 300 Reduce下载线程最大等待时间(in sec)
io.sort.factor Int 10 同上
Mapred.job.shuffle.input.buffer.percent Float 0.7 用来缓存shuffle数据的Reduce task heap百分比
Mapred.job.reduce.input.buffer.percent Float 0.0 Sort完成后Reduce计算阶段缓存数据的百分比
Mapred.job.shuffle.merge.percent Float 0.66 缓存占内存多少百分比后做merge操作
3.2 Hive内部逻辑优化和代码逻辑优化
Hive使用HQL(Hibernate Query Language),HQL不仅提供了类似标准SQL语句的查询方式,而且提供更加丰富灵活、更为强大的查询能力,允许用户自定义Mapper和Reducer来处理更为复杂的查询分析任务。导致Hive性能不佳的原因有两个:①没有索引支持,查询需要暴力扫描全表;②在处理小量数据时Map/Reduce框架耗费资源比例过大,即Map/Reduce框架本身具有较高的延迟,导致基于此框架下的HQL查询也体现高延迟性。优化思路:
由于Hive的HQL语言是自动转化为Map/Reduce程序进行执行的。每个job对应一个Map/Reduce框架,所以尽可能减少job的个数可以减少执行时间。
Map/Reduce有其数据特性,Hive也有优化约定,所以编写Hive语言时需注意一些规则,才能提高查询效率。
本文对性能影响较大的join多表连接和数据倾斜等问题实施优化。
3.2.1 join优化
Hive只支持等值连接(equality joins)、外连接(outer joins)和左半连接(left semi joins)。
join时,每次Map/Reduce任务的执行过程如下:reducer会缓存join序列中前面所有表的记录,然后通过最后一个表将结果序列化到HDFS。这有助于减少在reduce端内存的使用量。无论是外关联outer join还是内关联inner join,如果join的key相同,无论jion多少个表都会合并成一个Map/Reduce任务。
查询时,应该尽量将小表放在join的左边,否则会因为缓存浪费大量内存。例如:
SELECT x.val,y.val,z.val FROM x JOIN y ON(x.key=y.key1)JOIN z ON (z.key=y.key1)
三个表使用同一个join key,生成一次Map/Reduce任务计算。Reduce端先缓存x表和y表的记录,然后每次取得z表中的一个记录就计算一次join结果。
两次Map/Reduce任务:
SELECT x.val,y.val,z.val FROM x JOIN y ON(x.key=y.key1)JOIN z ON (z.key=y.key2)
生成两次Map/Reduce任务:第一次缓存x表,用y表序列化;第二次缓存第一次Map/Reduce计算的结果,然后用z表序列化。
Hive不支持where子句中的子查询,SQL常用的IN/EXISTS子句需要改写。IN/EXISTS子查询在HIVE中一个更高效的实现是利用LEFT SEMI JOIN重写子查询语句。LEFT SEMI JOIN的限制是,JOIN右边的表不能在WHERE子句、SELECT子句或其他地方过滤,只能在ON子句中设置过滤条件。
SELECT x.key, x.value FROM x WHERE x.key in (SELECT y.key FROM y)
被重写为:
SELECT x.key, x.val FROM x LEFT SEMI JOIN y on (x.key=y.key)
对于多个子查询SQL无关且计算量过大的SQL,可以开启并行执行MR任务,减少计算压力。
hive.exec.parallel[=true]
hive.exec.parallel.thread.number[=8]
hive.exec.parallel可以控制一个SQL中多个可并行执行的job的运行方式。当hive.exec.parallel为true的时候,同一个SQL中可以并行执行的job会并发的执行。参数hive.exec.parallel.thread.number就是控制对于同一个SQL来说同时可以运行的job的最大值,该参数默认为8。此时最大可以同时运行8个job。通过修改参数hive.exec.parallel和hive.exec.parallel.thread.number测试不同情况的执行速度,实现性能优化和负载均衡。
3.2.2 数据倾斜问题的解决
数据倾斜表现:任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量Reduce子任务未完成。因为其处理的数据量和其他Reduce差异过大。单一Reduce的记录数与平均记录数差异过大,通常可能达到三倍甚至更多。最长时长远大于平均时长。造成数据倾斜的主要原因:①key分布不均匀;②业务数据本身的特性;③建表时考虑不周;④某些SQL语句本身就有数据倾斜。
表3 数据倾斜
Tab.3 Data skew
关键词 情形 后果
Join 其中一个表较小,但是key集中 分发到某一个或几个Reduce上的数据远高于平均值
大表与大表,但是分桶的判断字段0值或空值过多 这些空值都由一个Reduce处理,非常慢
group by group by维度过小,
某特殊值过多 处理某值的Reduce非常耗时
Count Distinct 某值的数量过多 处理此特殊值的Reduce耗时
(1)参数调节
hive.map.aggr=true
Map 端部分聚合,相当于Combiner。
hive.groupby.skewindata=true
数据倾斜聚合优化,设置参数hive.groupby.skewindata=true,控制生成两个MR Job,第一个MR Job中,Map的输出结果会随机分配到reduce做一次预汇总,减少某些key值条数过多或某些key值条数过少而造成的数据倾斜问题。
(2)SQL语句调节
如何Join:关于驱动表的选取,选用join key分布最均匀的表作为驱动表,做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。
大小表Join:使用map join让小的维度表先进内存。在map端完成reduce。
大表Join大表:把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
count distinct大量相同特殊值:count distinct时,将特殊值单独处理。如果还有其他计算需要进行group by,可以先将特殊值的记录单独处理,再和其他计算结果进行union。
group by维度过小:采用sum() group by的方式来替换count(distinct)完成计算。
举例:空值产生的数据倾斜
日志中,常会有信息丢失的问题,比如log中的user_id和users表中的user_id关联,会碰到数据倾斜的问题。
解决方法1:user_id为空的不参与关联
select * from log x join users y on x.user_id is not null and x.user_id=y.user_id union all select * from log x where x.user_id is null;
解决方法2:空值的key变成一个字符串加上随机数形成新的key值
select * from log x left outer join users y on case when x.user_id is null then concat(‘hive,rand() ) else x.user_id end=y.user_id;
方法2比方法1效率更高,IO和作业数都少了。方法1中log读取两次,jobs数是2,方法2中job数是1。以上优化方法适合无效id,比如-99,“”,null等产生的倾斜问题。
4 结论(Conclusion)
针对移动通信数据中的流动人口统计业务需求,设计Hive数据仓库并进行优化,重点对性能影响较大的join连接和数据倾斜问题进行了优化,实现了数据的高速查询和统计。简单统计,如count,sum等10分钟以内完成;聚合统计,如join、group by等30分钟左右完成,高效完成了流动人口的统计。
参考文献(References)
[1] 智勇.基于移动通信信息资源的人口流动趋势研究[J].山东社 会科学,2013(5):102-105.
[2] 王大力.基于移动通信数据处理的公安流动人口管理系统设 计与研究[D].同济大学,2012.
[3] 朱珠.基于Hadoop的海量数据处理模型研究和应用[D].北京 邮电大学,2011.
作者简介:
周天绮(1976-),男,硕士,讲师.研究领域:大数据处理.