王建荣,华连生,唐怀瓯,王 云,王 静
(安徽省气象信息中心,安徽 合肥 230031)
数值预报产品是14大类气象资料之一,是天气预报、分析和气候预测的重要资料来源,在科研和业务中发挥了重要作用。
中国气象局CIMISS[1-2](全国综合气象信息共享平台)数据库中存储了多种数值预报产品信息,包含起报时间、预报时效、层次、预报要素代码、区域代码、单要素GRIB文件路径等字段,而具体的GRIB文件存储在GPFS文件系统中。为确保Oracle数据库的稳定运行,数值预报产品记录保存3~6个月,并定时清除表空间。在科研和业务工作中,往往需要长时间序列的数值预报产品数据,并且要求实时检索性能,因此考虑利用分布式架构来解决海量气象数据存储检索所面临的问题。
在分布式存储和计算技术中,Hadoop框架具有高吞吐量、高并发、高容错性、高可靠性、低成本等优势。目前基于Hadoop生态系统的气象数据存储方案成为国内外的研究热点。李永生等[3]选用Hadoop与HBase相结合的方式设计数值预报产品服务平台;陈东辉等[4]详细介绍了基于HBase的气象地面分钟数据分布式存储系统。文中选取HBase数据库实现气象数据文件的分布式存储管理;使用Quartz定时采集数值预报产品文件;利用Kafka消息队列将文件采集、产品解码、存储入库功能解耦;进行前端GRIB解码入库性能优化和后端数据检索性能优化。实验测试验证了数值预报产品分布式处理与存储系统设计的可行性,为海量气象数据的处理、存储和检索服务提供一种解决方法。
系统功能模块如图1所示。
图1 系统功能模块
(1)文件采集模块。
通过Quartz scheduler定时从数值预报产品目录复制GRIB文件到解码程序入口目录。
(2)产品解码模块。
调用GRIB API[5-6]实现GRIB1、GRIB2文件的解码,并且生成解码日志文件和要素GRIB文件(GRIB2格式)。
(3)数据存储模块。
调用HDFS[7-8]API将产品文件、要素GRIB文件和解码日志文件上传至HDFS分布式文件系统。另一方面,使用MapReduce并行程序将解码日志文件存入HBase。
(4)数据检索模块。
利用Solr实现HBase的辅助索引,提高数值预报产品数据的检索效率。
系统一次完整的执行流程如图2所示。
执行步骤如下:
(1)Quartz周期性调度完成数值预报产品文件采集和消息入队;
(2)解码程序读消息,并根据包含的文件名解码产品;
(3)将产品文件、要素GRIB文件全部上传至HDFS;
(4)生成解码日志文件如消息队列;
(5)入库程序读消息,将日志文件入HBase;
(6)HBase协处理器同步记录至Solr索引库。
图2 总体流程
Quartz是OpenSymphony开源组织在任务调度领域的一个开源项目,基于Java实现。
主要代码如下:
Scheduler scheduler=StdSchedulerFactory.getDefaultScheduler();
scheduler.start();
JobDetail job=JobBuilder.newJob(GribProcessJob.class).withIdentity("job","group").build();
……
scheduler.scheduleJob(job, trigger);
GribProcessJob类实现Job接口,重载execute函数,完成GRIB文件采集转储和发送消息到产品文件队列的过程。
Apache Kafka是用Scala语言实现的分布式消息队列系统,使用Zookeeper进行集群的管理。Kafka有以下特性:可扩展性、数据分区、低延迟、持久存储、处理大量不同消费者的能力。
Kafka由Producer、Broker(消息服务器)和Consumer三部分组成,Producer和Consumer均属于客户端。应用程序通过Producer API发送消息到Broker集群Leader(主节点),再通过Consumer API从Broker服务器消费消息。Kafka消息的两个重要概念为Topic(主题)和Partition(分区)。
分布式处理与存储系统创建了两个消息队列:产品文件队列和日志文件队列。为产品文件队列创建名为“gribfilelist”的topic,每个topic包含3个partition;为日志文件队列创建名为“logfilelist”的topic,每个topic也包含3个partition。key相同的消息都被发送到同一个分区(partition),如所有的ecmf文件名被发送到相同的分区,而jma文件名被发送到另一个分区。
客户端解码程序完成GRIB文件解码后将解码日志文件发送至日志文件队列。
客户端入库程序循环请求消息队列,检查并获取最新的消息后按顺序完成:数值预报产品文件、要素GRIB文件和解码日志文件写入HDFS;解码日志MapReduce方式存入HBase数据库。
Kafka Producer的异步发送模式允许进行批量发送:客户端先将消息缓存在内存中,然后一次请求批量发送出去。
配置策略,比如可以指定缓存的消息达到某个量的时候就发出去,或者缓存了固定的时间后就发送出去,可提高消息发送效率、减少服务端的I/O次数。
GRIB码即二进制格点加工数据,是WMO(世界气象组织)推荐使用的表格驱动代码之一,主要用来表示天气分析和预报的产品资料。现行的GRIB码有两个版本(Edition),即GRIB1和GRIB2。GRIB2对数据的描述基于模板和码表,而模板引用码表。
GRIB API是由ECMWF(欧洲中期天气预报中心)设计研发的,为用户提供了C/C++、Fortran等语言的编程接口。用户程序使用key/value(键/值)方法存取GRIB数据。GRIB文件中所有信息(Message)都通过key来检索。每个key都有固定的类型,如实型、整数型、字符串等。
系统采用GRIB API的C/C++接口实现数值预报产品解码。以ECMF产品为例,Quartz每5 min执行文件采集任务,从ECMF产品目录复制文件至解码程序临时目录temp下,例如产品文件名为:
W_NAFP_C_ECMF_20160511055659_P_C1D05110000051100011.bin
解码程序调用GRIB API对其进行解码后输出解码日志文件和要素GRIB文件:
W_NAFP_C_ECMF_20160511055659_P_C1D05110000051100011.bin.log
该文件由多条解码记录组成,单条记录的格式如下:
1|20160511|0|0|98|0|100|850|WIV|ANEA|250|250|NAFP_ECMF_0_FTM-98-ANEA-WIV-250X250-100-850-999998-999998-999998-2016051100-0.GRB
各字段用“|”分割,字段定义与表1相对应,而NAFP_ECMF_0_FTM-98-ANEA-WIV-250X250-100-850-999998-999998-999998-2016051100-0.GRB即是要素GRIB文件。文件名含义:加工中心代码为欧洲中期天气预报中心、预报分辨率为0.25o×0.25o、850 hPa等压面层格点经纬度范围(60o,-10o,60o,150o)的纬向风资料,其存储于HDFS分布式文件系统ECMWF相关存储路径下。
HBase(Hadoop database)[9-10]运行在HDFS分布式文件系统上,使用Zookeeper管理集群,提供高可靠性、高性能、列存储、可伸缩、实时读写特性,主要用来存储非结构化和半结构化的松散数据。
系统将数值预报产品通过GRIB API解码后存储在HBase中,不同的数值预报产品分开存储在不同的实体数据表中,目前存储了3大类数值预报产品,分别为ECMWF(欧洲中期数值预报中心)发布的细网格(0.25o×0.25o水平分辨率)和粗网格(2.5o×2.5o水平分辨率)的数值预报产品,JMA(日本气象厅)发布的0.5o×0.5o水平分辨率和1.25o×1.25o水平分辨率的数值预报产品,NCEP/FNL再分析资料。数据表以行键、列族、数据的方式存储数值产品的实体数据。数据表存储内容见表1。
表1 数据表存储内容说明
data:gribpath是解码所得要素GRIB文件在HDFS中的存储路径。
选取表1中data:date、data:validtime和data:centre三列做数据模型展示,见表2。
表2 数据模型示例表
Rowkey(行键):HBase中的Rowkey唯一标识一行记录。根据HBase的优化原则[7],Rowkey的长度易固定且不超过200 Bytes,设计如下:AAAAATTT:yyyyMMdd:nnnmmmm:IIIIXJJJJ
其中,AAAAA为5字母长度的英文缩写,不足5位则在其后补“9”,代表数值预报产品的预报要素名称;TTT为预报时效;nnn表示高度层类型,mmmm表示层次;IIII表示4位I方向增量,不足4位则前导置“0”;JJJJ表示4位J方向增量,不足4位则前导置0。
以ECMF数据表的行键为例:
TEMP9006:20160511:1000010:0250X0250
其含义是:对于温度要素(temp),在2016年5月11日00:00起报,预报时效为未来6 h的预报场,预报层次为10 hPa,I方向增量为0.25o,J方向增量为0.25o。
时间戳(timestamp):每条数据更新的历史记录,同一行键数据再次入库会记录不同的时间戳。
列族(column family):每种数值预报产品的表结构基本相同,每张表只设一个列族data,包含的列(column qualifier)有data:date、data:validtime、data:centre、data:gribpath等。HBase存储的都是Byte数组。
Apache Solr是一种开源的、基于 Lucene的全文检索引擎,支持XML、JSON和python等常用的输出格式。而SolrCloud[11-12]是基于Solr和Zookeeper的分布式搜索方案,使用Zookeeper作为集群的配置信息中心。
HBase在存储时,默认按照Rowkey进行排序(字典序)并通过Rowkey及其range来检索数据,在HBase查询时,有以下几种方式:
(1)通过get方式,指定Rowkey获取唯一一条记录;
(2)通过scan方式,设置startRow和stopRow参数进行范围匹配;
(3)全表扫描,即直接扫描整张表中所有行记录。
HBase对Rowkey的一级索引支持较好,按Rowkey查询的响应时间达到毫秒级。HBase内置Filter(过滤器)特性以支持多条件查询的二级索引。但HBase的Filter是直接扫描记录的,如果数据范围很大,会导致查询速度很慢。因此基于Solr来实现二级索引,满足Rowkey之外的多要素数据检索需求。
设计Solr索引的关键问题是合理地配置索引字段。Zookeeper统一管理XML格式的Solr索引字段描述文件:managed-schema,SolrCloud各实例共享同一个managed-schema。
主要配置如下:
……
设置HBase表的id字段为Solr索引的unique-Key,存储HBase记录的Rowkey值。
上文所介绍的Solr索引设计是入库性能优化的前提。
入库程序采用了MapReduce编程模型[13-14]。MapReduce作业读取解码日志文件插入到HBase数据库中。解码程序省略了reduce步骤,因mapper输出中间数据到reducer需要通过网络,受限于Hadoop集群带宽。
HBase的协处理器[15](Coprocessor)分为两类,Observer和EndPoint,其中Observer的代码部署在服务端,相当于对API调用的代理。系统选用RegionObserver接口。
HBase协处理器需要获取HBase的插入和更新操作:拦截put操作,获取其内容,同步写入Solr。HBase协处理器定义以及同步数据到Solr的主要代码如下:
public class SolrIndexCoprocessorObserver extends BaseRegionObserver {
@Override
public void postPut(ObserverContext
String rowKey = Bytes.toString(put.getRow());
try {
Cell cellEdition=put.get(Bytes.toBytes("data"), Bytes.toBytes("edition")).get(0);
String strEdition=new String(CellUtil.cloneValue(cellEdition));
……
SolrInputDocument doc=new SolrInputDocument();
doc.addField("id", rowKey);
doc.addField("edition", strEdition);
……
//写入缓冲
SolrWriter.addDocToCache(doc);
}
(1)软件及版本:Quartz-2.2.3;hadoop-2.6.0;zookeeper-3.4.6;solr 5.5.4;hbase-1.2.2;GRIB API 1.13.1。
(2)硬件配置。
测试环境由6台X86架构的服务器组成,操作系统均为64位Ubuntu 14.04。其中5台服务器构建Hadoop、Zookeeper、HBase、Solr集群,1台部署数值预报产品解码入库程序。
处理器:Intel Core i5-3470 3.20 GHz;
磁盘:1TB,7200 rpm,SATA III接口;
内存:16 GB;
网络环境为千兆局域网。
选取ECMWF高分辨率数值预报产品及其解码产生的要素GRIB文件为测试对象,其常见的文件大小分布为:约2 MB、约10 MB、约105 MB和约160 MB,而解码得到的要素GRIB文件数也随之不同。
(1)HDFS写入性能。
数值预报产品有846个文件,共96 GB,平均大小116 M。客户端程序调用HDFS API的文件复制操作将数值预报产品文件写入HDFS文件系统需要的时间为1 190.986 s,平均写文件速度为82.54 MB/s;要素GRIB文件上传至HDFS集群的速度近似。
(2)HBase入库性能。
采用统计学方法:总体有96 360个解码日志文件,共57 816 000条记录,耗时4 576.9 s,平均写入速度12 632 条/s;随机抽取1 000,2 000,…,10 000条记录入库,见图3。测试结果表明,随着入库记录数的增加,数据入库性能总体平稳,最快写入速度为13 677 条/s。
图3 入库时间和入库记录数的关系
(3)索引完整性验证。
测试用例如下:
用例编号UC1:按起报时间、预报层次、预报时效、单预报要素检索预报要素场;
用例编号UC2:按起报时间范围、预报层次、预报时效、单预报要素检索预报要素场;
用例编号UC3:按起报时间、预报层次、预报时效、多预报要素检索预报要素场。
基于HBase Filter[16]的条件过滤查询和辅助索引查询返回的记录数对比如表3所示。
表3 HBase Filter与SolrCloud查询记录数对比
表3中每个测试用例均做了3组对比,基于SolrCloud索引的查询记录数均和HBase Filter查询的记录数一致,说明索引完整可用。
(4)HBase检索性能。
表3中各测试用例最大查询记录数所需时间对比如表4所示。
表4 HBase Filter与Solr查询效率对比
由表4可知,基于SolrCloud的查询效率远远高于HBase Filter查询,按时间点的查询基本都在毫秒级返回结果;对于UC2中,按时间范围检索方面,HBase Filter效率较低,不适合时间序列的查询,在实际的气象业务应用中,需要结合Solr对HBase进行索引优化,来满足检索时效的要求。
针对关系型数据库对数值预报产品数据的存储及检索效率低等问题,设计了分布式处理与存储系统。利用Quartz任务调度采集数值预报产品文件,Kafka消息队列解耦数值产品解码与入库程序,研究HBase分布式数据库结合SolrCloud索引服务的数据存储与检索优化方案,设计了适合气象业务应用的数值预报产品数据存储模型,并建立Solr索引。关键技术是前端MapReduce并行程序入库、HBase协处理器同步记录至SolrCloud。实验测试表明,该方案提高了存储效率和检索速度,能够满足业务中的时效性要求。
[1] 熊安元,赵 芳,王 颖,等.全国综合气象信息共享系统的设计与实现[J].应用气象学报,2015,26(4):500-512.
[2] 杨润芝,马 强,李德泉,等.内存转发模型在CIMISS数据收发系统中的应用[J].应用气象学报,2012,23(3):377-384.
[3] 李永生,曾 沁,徐美红,等.基于Hadoop的数值预报产品服务平台设计与实现[J].应用气象学报,2015,26(1):122-128.
[4] 陈东辉,曾 乐,梁中军,等.基于HBase的气象地面分钟数据分布式存储系统[J].计算机应用,2014,34(9):2617-2621.
[5] 张 苈,周峥嵘,刘媛媛.ECMWF GRIB API及其应用[C]//中国气象学会气象通信与信息技术委员会暨国家气象信息中心科技年会.北京:国家气象信息中心,2011.
[6] 李 葳.NECP FNL资料解码及数据格式转换[J].气象与减灾研究,2011,34(1):63-68.
[7] WHITE T.Hadoop:the definitive guide,3E[M].[s.l.]:O’Reilly Media,2012.
[8] DUTTA H,KAMIL A,POOLERY M,et al.Distributed storage of large-scale multidimensional electroencephalogram data using Hadoop and HBase[M]//Grid and cloud database management.Berlin:Springer,2011.
[9] GEORGE L.HBase:the definitive guide[M].Sebastopol:O’Reilly Media,2011.
[10] STONEBRAKER M. SQL databases v. NoSQL databases[J].Communications of the ACM,2010,53(4):10-11.
[11] 郝 强,高占春.基于SolrCloud的网络百科检索服务的实现[J].软件,2015,36(12):103-107.
[12] 付剑生,徐林龙,林文斌.分布式全网职位搜索引擎的研究与实现[J].计算机技术与发展,2015,25(5):6-9.
[13] 杨润芝,沈文海,肖卫青,等.基于MapReduce计算模型的气象资料处理调优试验[J].应用气象学报,2014,25(5):618-628.
[14] 李永生,曾 沁,杨玉红,等.基于大数据技术的气象算法并行化研究[J].计算机技术与发展,2016,26(9):47-49.
[15] 邹敏昊.基于Lucene的HBase全文检索功能的设计与实现[D].南京:南京大学,2013.
[16] 张 叶,许国艳,花 青.基于HBase的矢量空间数据存储与访问优化[J].计算机应用,2015,35(11):3102-3105.