黄志 苏传程 苏晓红
(广西壮族自治区气象信息中心,南宁 530022)
CIMISS(China Integrated Meteorological Information Sharing System) 作为国省统一数据环境,是目前开展气象数据服务的核心基础数据支撑平台。CIMISS的存储架构采用ORACLE关系型数据库,目前数据量已达到亿数量级,由于关系型数据库存储压缩比不高,使得存储空间日趋紧张,数据库管理维护任务繁琐而艰巨[1-3]。MUSIC(Meteorological Unified Service Interface Community:气象数据统一服务接口)接口对于短时间序列的数据查询与统计时效较快,随着用户对地面气象观测数据的小时、日值需求不断增加,对于超过10年以上的长时间序列、多站点和多气象要素的查询处理响应支撑能力明显不足,严重影响了此类数据服务业务的开展。
中国气象局和北京市气象信息中心使用HBase数据存储架构和相关大数据技术,将实时计算框架和分布式数据库系统相结合,实现了海量自动站分钟数据快速入库处理,各类要素查询和统计处理均能达到毫秒级响应,满足了大规模自动站分钟数据在业务应用中对存储和查询的性能需求。HBase 是一个高可靠性、高性能、面向列的分布式数据库,适合处理实时随机读写超大规模的数据,但是HBase表的设计模式和复杂的散列化使用方法,对于使用频率集中在少数列、数据很少更新、格式固定的海量历史数据,其存储和管理的模式过于复杂,增加了设计和维护的难度,具有很大的改进空间[4-7]。研究表明Hadoop大数据相关技术通过自定义格式构建TXT大文本数据集,完成HDFS转换存储,在大数据环境下对于海量数据的查询处理,Spark并行处理TXT大文本的方式明显优于CIMISS关系型数据库处理的方式[8-9]。
本文在前期Hadoop相关大数据研究应用的基础上[8],以历史地面气象记录月报表为数据源,重新设计数据ETL流程,重构基于Parquet列式的存储特性的HDFS数据集,嵌入Spark的Broadcast广播变量,优化Spark并行度执行参数,实现Hadoop集群各工作节点的物理资源利用率最大化,并提升SparkSql的关联查询效率。
目前CIMISS系统采用行式存储的关系型数据库架构,以使用频率最高的地面观测小时值、日值数据库表为例,其表字段有150~300个左右,存储的站点数2900多个,累计数据量已达到亿数量级。用户在实际使用中查询最多的是降水、温度、风、相对湿度等几个常用要素列,数据库每次运行需要把一整行中所有列读取后,再从中提取出目标列,这种处理模式直接导致大数据量查询效率低下,很多数据产品的统计处理只能通过二次处理去获取。此外,数据缺失、数据异常的问题较严重,直接影响了月、年值的数据统计,由于关系型数据库对于海量数据修改需要耗费大量的人力和时间,导致此类数据的完整性、准确性问题一直得不到很好的解决[10-11]。
如何提高Spark的并行处理效率是本文首要研究的内容。以本文的Hadoop集群为例,因为本集群需要配置主服务器接收 Http 请求,需要固定的IP地址在应用服务过程进行交互、调试以及app的相关的输出信息,所以Spark集群采用yarn-client运行模式;如果采用 Yarn-Cluster 模式,Yarn 会随机选择一个NodeManager 启动Driver 端程序,服务器的IP地址将是随机的,此外集群设计3个driver作为driver 端的负载均衡。图1为yarn-client模式下Spark集群的执行流程[12]。
Spark作业中各个stage的task数量代表了Spark作业在各个阶段stage的并行度,合理设置并行度可以充分利用集群物理资源,使得每个task处理的数据量处于最优值, 进而提升运行处理速度。
基于现有Hadoop集群的物理资源,如果Spark集群的处理并行度未能与之相匹配,不仅会造成资源的浪费,也使得Spark集群整体处理效率未能达到最优。
本文所描述的Hadoop的大数据集群包含两种不同型号的8台虚拟服务器,包括了1台NameNode和7台DataNode,表1已列出7台DataNode的CPU的core总数和内存总量。实际部署中每个节点需要预留资源给非Yarn部分的管理服务使用,比如NameNode在Spark应用中需要做数据归约操作,需要把各个节点上的数据聚合到客户端,需要的JVM应用的内存较大,而Driver的内存大小与jvm的内存大小相关,在Yarn-client模式下Driver的内存就是非Yarn所管理支配的内存。在不影响系统以及其他应用程序的正常运行情况下,系统需要预留物理资源给系统以及其他应用程序,所以本文约定给每个节点预留约20%~25%的内存、2个CPU给非yarn部分的管理服务使用,例如对于16核CPU和32 GB内存的服务器,集群只分配14个CPU和24 GB内存给该节点供yarn管理调度使用。
表1 大数据集群资源配置清单
从表1可知,整个集群可利用的CPU资源为56+42=98个,内存为96 GB+39 GB=135 GB,其中CPU 总个数影响Spark的Job 中的 Task 的并行度,CPU 越多可同时执行的 Task 就越多,计算处理就越快;在内存分配方面,内存分配过小,会引起 Task 计算任务频繁GC从而影响执行效率甚至导致宕机,内存分配过大,则会形成冗余浪费。
整个Spark集群中设定的Executor进程个数是与CPU和内存的组合分配方案匹配的。因为DataNote 1~4的内存和CPU比例为2∶1而DataNote 5~7的内存和CPU比例是1∶1,整个集群物理机上的资源比例不一致,假设1个Executor进程以1个CPU为基准配置,如果Task 作业需要的内存≤1 GB,那么每台机器上的CPU可以完全利用,但会造成内存浪费,DataNode 1~4尤为严重;如果 Task 作业需要的内存在1~2 GB之间,DataNode 1~4的CPU基本完全利用,DataNode 5~7的CPU和内存会出现冗余;如果Task 需要的内存>2 GB,那么全部节点的CPU和内存都存在冗余,资源浪费情况最严重。所以本集群从CPU或内存资源利用率最大化的角度出发,得出的资源最优配置方案如表2所示,此方案的Spark集群的executor配置为:2核心数+2900 M+384 M(堆外内存),方案可以将两种型号服务器即DataNode 1~4的CPU、DataNode 5~7的内存都分别利用完,使得executor个数与整体资源利用率达到最优匹配。
表2 Spark计算资源分配方案
对于Spark处理过程中需要采集的文件分区数(partition),本文分为加载数据和处理数据的两个分区进行描述,加载数据由HDFS进行分区,每个block对应一个分区,不同文件之间不能合并,例如对于日值数据而言,每次全站统计需要加载2700多个站的parquert文件,分区数为2700+,每个文件都是独立加载提取数据并分别获取结果;本文涉及的处理任务为数据基础查询,处理过程没有涉及数据聚合操作,所以不会产生数据倾斜。本文设置Spark.default.Parallelism的task的参数值为300(之前使用Spark默认设置的参数一般为几十个task,task过少会导致资源的浪费,不能最大限度发挥Spark集群的处理效率),是CPU核心总数的2~3倍左右,一个job的task数等于分区数,所以本文的Spark的task总任务数最大为2700+,实际任务数据会根据每个节点实际的Executor资源情况进行重新分配,所以总任务数会远小于2700。根据当前Spark集群的配置方案,提交集群初始化命令行如下[12]:
MYMSpark_HOME/bin/Spark-submit
--master yarn
--executor-memory 2900M
--num-executors 40
--executor-cores 2
--driver-memory 4G
--conf Spark.default.parallelism=300
--conf Spark.storage.memoryFraction=0.6
--conf Spark.shuffle.memoryFraction=0.2
/home/applications/MeteoDataArchives/MeteoDataArchives.jar &
Apache Parquet是Hadoop生态系统中常用的面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,Spark处理框架默认数据存储格式为Parquet,所以两者具有很好的兼容性。
根据用户对CIMISS系统数据查询的使用场景进行分析,比如查询多站点长时间序列日值,95%以上的查询需求集中在日值表200多个字段中的降水、温度、相对湿度等几个字段,所以采用列式存储是很好的存储方式。为了提升数据的完整性、可用性,本文使用A文件(历史地面气象记录月报表)作为数据源,该文件是经过人工审核的数据文件(每个站每月一个A文件,包含了当月70多个气象要素的日统计值、20多个小时观测值等数据),具有较高的准确性和完整性,数据稳定后续很少再做修改和删除,规避了Parquet格式数据文件在更新、删除操作上的劣势。
以日值为例,本文对A文件所辖的77个日气象统计要素,增加站号、时间2个字段共79个字段,以此作为Parquet文件的schema设计格式(表3),此格式的79个字段虽然比CIMISS的日值表所包含的200多个字段少了一半,但其所涵盖的主要气象统计要素已能满足日常95%以上的数据需求(小时值等应用情况类似)。
表3 日值schema表字段格式说明
数据ETL(Extract-Load-Transform)处理是本文数据环境集成的第一步,目的是将A文件中的分散、零乱、标准不统一的数据按自定义格式整合到一起,也是构建Parquet格式数据集的关键环节。同样以日值数据为例,此过程是归集本省全部站点建站至今所有A文件并按站点进行归类,按站点逐个文件对其中包含的77个气象要素日值按日抽取后进行数据初级质控,剔除历史错误极值,对特殊字符进行数字化转换等,以保证后续列式数据的同质性,完成77个要素的数据抽取清洗之后,将对应站号、日期和77个要素日值按日逐行拼接,合并为长时间序列的行格式的大文本TXT文件(每个站点对应一个日值合并文件),以充分提高HDFS存储块的利用率(HDFS的存储块默认大小为128 M),最终转换生成Parquet格式的列式数据文件并完成HDFS存储转换(存储路径通过文件夹和文件名区分)。上述的数据抽取、清洗、格式转换和存储处理流程就是本文的数据的ETL处理流程,因为Spark框架已对Parquet格式的读写转换进行了封装,一般情况下采用默认的参数设置就能完成Parquet格式文件的读写处理。图2为日值数据的ETL处理流程[13-15]。
图2 日值数据的ETL处理流程
经过ETL处理后生成的Parquet文件是由一个header和一个或多个block块组成,以一个footer结尾。header中只包含一个4个字节的魔术字“PAR1”用来识别整个Parquet文件格式,文件中所有的metadata(元数据)都存在于footer中,footer中的metadata包含了格式的版本信息、schema信息、block中的metadata信息,footer中倒数第2个字段是一个以4个字节长度的footer的metadata,最后一个字段则是与header中包含一样的“PAR1”。
Parquet文件中的每个block以Row Group的形式存储,因此文件中的数据被划分为一个或多个Row Group,这些Row Group是由一个或多个Column Chunks组成的列数据,每个Column Chunks的数据以Page的作为最小单元进行组织,每个Page只包含特定列的值,因为前文在数据ETL过程中的数据清洗环节已将特殊字符经过数字转换,使得每个page保持了同质性从而具有很好的压缩特性。图3为本文日值Parquet文件的Header、Data Block和Footer格式描述。
图3 日值Parquet文件的Header、Data Block和Footer格式
表4是包含相同的数据内容的TXT格式与Parquet格式的文件数据容量对比情况,通过对表4进行分析发现,小时值的单个文件数据量最大,压缩率也最大(超过95%),说明单个文件数据量越大压缩率就越大,证明Parquet格式的数据压缩性能非常出色,可以为Hadoop集群节省大量的存储空间,使用Parquet格式数据可以弥补Spark在数据压缩处理上不足,对后续提升Spark的处理性能有很好的促进作用。
表4 格式转换前后存储占用情况
利用Parquet格式转换提高了数据文件压缩和存储效率,Spark从1.6版本开始对操作Parquet进行了优化,提升了其扫描吞吐量,数据文件的查找速度提高了1倍以上,采用Parquet有利于Spark的调度和执行。
由于在生成Parquet数据文件的ETL过程中并没有将站名、经度、纬度等台站参数数据列写入Parquet文件中,一是为了不增加要素列,二是因为这些参数信息会随着台站的迁移或台站撤销而发生变更,假如Parquet文件中写入上述参数数据,在以经纬度为例,如果某站点的经纬度参数发生变更,则需要对Parquet文件中相应列的数据进行修改和数据集重构,由于Parquet格式不支持实时大批量的数据删除修改操作,所以在实际的数据查询处理中,可以将所需的台站参数数据和经过Spark转换后的DataFrame主表,通过站号进行关联查询获取最终结果。
之前的Spark集群在做关联查询时,task在处理过程中需要使用临时变量(例如参与关联查询的台站参数信息表),因此每个task在处理过程中会拷贝一份台站参数信息表作为副本,当遇到大数据量查询时会同时产生几百个task,就需要同时拷贝几百份副本,使得集群网络IO开销短时间内剧增,导致集群的处理效率急剧下降。
因此,本文嵌入Spark的Broadcast广播变量解决台站参数的关联查询问题。Spark的Broadcast广播变量是提前将一个轻量化的只读变量(数据表或文本)缓存在集群每个节点的executor进程中,避免在执行查询时才向所有task传递变量。本集群在Spark集群初始化时,通过Spark的Broadcast广播变量预先将台站参数数据拷贝到每个executor中,这部分数据只占用executor中很小一部分的内存,却能有效减少后续关联查询处理时节点之间的数据交互和归集操作,能将网络IO开销缩减7~8倍,从而提高查询的处理效率。图4为Spark-Broadcast关联查询处理流程[16]。
图4 基于Parquet的Spark-Broadcast关联查询处理流程
Broadcast初始化部分代码如下所示:
SparkSession SparkSession = SparkUtils.getSparkSession(); //获取Spark会话
SparkConTXT SparkConTXT = SparkSession.SparkConTXT();
Dataset
.option("url", dataUrl) //jdbc url
.option("user", username) //数据库用户名
.option("password", password) //数据库密码
.option("dbtable", "station_info").load();//表名
Broadcast
Dataset
Dataset
Dataset
基于优化后的Spark集群和嵌入Broadcast广播变量后的SparkSql处理流程,本文将以客户端查询大数据量的响应时效快慢为指标,对Parquet与TXT两种格式的处理响应时效进行比较,通过从客户端触发查询事务,到全部查询结果输出转换为可下载文本的全过程所耗费的时间,可以直观地对比出这两种方式的处理效率。
从表5的测试结果可以看出,当查询站点数、总数据量以及查询的目标列数较少时,Parquet与TXT两种格式查询耗时区别很小;当需要查询的数据量呈倍增趋势时,以日值查询为例,Parquet格式的查询耗时比TXT格式缩减了1倍以上;对于小时值查询,Parquet格式的查询耗时比TXT格式缩减4~5倍,说明对于单个数据容量越大的文件,Parquet格式比TXT格式查询效率提高更明显。从每种格式的查询耗时可以看出,Parquet格式的查询耗时总体波动很小,而TXT格式查询耗时总体则波动很大。
表5 Parquet与TXT格式查询时效比对(建站至今)
从表6的并发查询结果可以得知,Parquet格式在处理100用户的并发情况下效率提升更加显著,非常适用于海量数据的查询处理,这是因为Parquet格式文件的同列数据类型的同质性使其拥有高效的压缩编码,所以在Spark解码时可以快速跳过不符合条件的数据,有效减少冗余数据列,只读取目标列,提高了数据扫描和并行处理的效率。
表6 100用户并发Parquet与TXT格式查询时效比对
本文基于原有Hadoop大数据架构,对Parquet列式存储技术、Spark分布式集群的并行处理配置参数以及关联查询流程进行深入研究与应用,实现了对海量气象数据的高效存储、查询和高并发访问,弥补了 CIMISS支撑能力的不足。利用 Parquet格式构建基于A文件的历史地面观测数据集,其高可用性可作为 CIMISS和天擎数据环境下此类数据的备份,为气象业务提供更可靠的数据保障,对提升气候预测、评价和可行性分析等传统气象业务的综合应用能力具有重要意义。Parquet格式实现代码简捷高效,特别适用于数据很少更新、处理时效要求不高的海量历史数据如海量网站日志的分析处理等应用场景,结合数据挖掘和人工智能技术,可衍生出更多的大数据处理模型。
在后续的工作中,将开展Spark即席查询的研究与应用,拓展更多的统计处理流程和发布更多数据产品的众创接口,以满足用户更多的数据服务需求,取得更好的数据应用效益。