马 彬,李玉涛,许 琪
(1.江苏省气象信息中心,江苏 南京 210005;2.江苏省气侯中心,江苏 南京 210005)
随着气象观测设备信息化程度的大幅提高,地面自动气象站的时空密度不断增加,气象自动站已实现了分钟加密观测,产生的观测数据量也呈指数级增长[1-3],具有数据种类多、数据规模大、数据实时性强以及价值密度低等特点[4]。同时,自动站也已成为监视天气变化、决策服务辅助支持的重要手段,为气象预报、气象防灾减灾、气候预测与生态环境评估等提供十分重要的基础数据支撑[5-6]。为进一步推动气象事业高质量发展,中国气象局提出以气象信息化推动气象现代化的发展战略,而作为气象服务最为核心业务应用之一的自动站数据,也将面临重大的挑战,对数据的实时采集处理、数据质量、数据存储及大规模查询等要求也越来越高,需要在秒级甚至更短时间内完成数据的全流程处理,从而提高响应效率,发挥更重要的应用价值。
Spark作为主流的开源分布式计算框架,具有可扩展、高吞吐量和可容错等特点[7]。Spark Streaming则是Spark框架的实时流处理组件[8],采用了一种新的离散流处理模型,进行计算处理时,将数据流以时间片为单位进行切割形成弹性分布式数据集RDD(Resilient Distributed Dataset),而RDD提供了共享内存式的并行运算,因此Spark在批处理、迭代计算、交互式查询和流处理等多种计算模式方面具备高时效的处理能力[9]。HBase是Hadoop Database的简称,属于NoSQL[10-11],是一个KeyValue类型的分布式存储数据库,具有海量存储、高并发、高可靠以及可伸缩等特点[12],适应于气象数据的存储管理应用,能够有效地解决气象大数据的存储和检索响应不足等问题[13]。
该文重点研究了基于Spark Streaming的气象自动站数据实时流处理与分布式存储应用技术,通过对实时气压、温度、降水、风速、湿度等气象要素的基本质控算法设计,实现了自动站数据流式采集、解码、基本质控和入库的分布式实时数据全流程处理功能。
目前,现有的气象自动站数据经基层台站设备采集后,通过宽带网统一传输至省级落地入库,通过气象资料业务系统对数据进行质控后再共享或分发给各类业务应用,其传输流程环节多,易导致数据处理不及时、交互响应慢、统计时效差等问题。此外,由于省级部署的一些决策、服务等业务平台开发早,大都依托于传统的关系型数据库进行海量数据的存储,在大批量多并发查询情况下,通常存在着数据检索能力不足、检索性能下降等问题。并且省级开发的业务系统大都应用比较深入,覆盖业务范围广,升级改造存在一定困难,极大地影响了气象数据的应用服务体验。因此,为满足大规模自动站数据能够在秒级完成与用户的交互响应,就对数据处理流程节点的集约化设计和高质量、高可靠的数据存储及检索功能提出了更高的要求。
Spark Streaming技术的应用研究,为实现高效实时数据处理提供了技术支撑[14]。开展基于数据流式采集、传输、质控、存储为一体的气象自动站数据全流程设计及应用研究,能够从根本上解决气象自动站数据落地环节多、任务处理耦合紧、处理系统部署分散等问题;进一步提升气象大数据的传输效率与数据质量;改善业务平台在多并发,长序列数据检索使用时的问题。在当前气象信息化任务高效推动的背景下,对气象数据以流式实现全流程的处理具有迫切的应用研究需求。
系统总体架构主要分为4层:数据层、处理层、逻辑层和应用层,如图1所示。
图1 系统总体架构
数据层主要分为源数据和处理后的分布式数据存储,源数据通过处理层相应的Spark Streaming组件实现数据的实时流处理功能,Flume为源数据文件的解码采集模块,获取源数据流后将数据暂存到Kafka消息中间件,由Spark Streaming调用Kafka中的数据做实时处理,再将所需处理的数据根据逻辑层相应的算法及表结构设计通过接口存储到分布式数据库中。逻辑层设计的功能主要包括温度、压强、降水等气象自动站数据基本要素的查询以及检索。应用层分为用户应用和业务应用两类,用户应用指为科研人员提供数据服务,主要为客户端及科研用的虚拟化服务器等,业务应用则指根据各业务场景的应用需求开发的平台和系统等。
2.2.1 数据实时采集技术分析
Flume[15]是一个分布式、高可靠、具备可定制化能力的日志采集传输系统[16],其数据流由事件(Event)贯穿始终,Event代表一个完整数据的最小单元,是事务的基本单位。这些Event携带日志数据并且带有头信息,由Agent外部的Source通过特定的格式化后生成,然后再发送到指定的目的地(Sink)进行下一步操作。为确保数据能够成功传输,通常Source会把Event推送至一个缓冲区(Channel)中,待确保前一个Event已由Sink处理完后,Channel再清空自己的缓存数据。Sink则负责持久化日志或者把事件推送到外部其他的Source。
Flume以Agent为最小的独立运行单位,每个Agent由Source、Channel和Sink组件构成,Flume结构如图2所示。
图2 Flume Agent结构
Source:负责接收数据或通过特定机制生成数据,然后以Flume的Event格式传递给一个或者多个Channel,Flume提供多种数据接收的方式,但Source必须至少和一个Channel相关联。
Channel:位于Source和Sink间的一种存储容器,用于缓存Source推送进来的数据,起着链接桥梁的作用。Channel将从Source接收到的Event缓存起来,直到它们被Sink消费完成。同时,它支持一个完整的事务,可提供顺序保证,这样就确保了数据在收发时的一致性,并且可以和任意数量的Source和Sink工作。
Sink:负责将数据传输到下一跳或最终目的地,任务成功结束后将数据从Channel移除。典型的Sink类型为:存储数据到目的的终端Sink,如HDFS、HBase;自动消耗的Sinks,如Null Sink;以及用于Agent间通信的IPC Sink,如Avro。
Event:Flume数据传输的基本单元。一行文本内容会被反序列化成一个Event。
2.2.2 数据实时处理技术分析
数据的实时处理主要是对采集的气象自动站脏数据进行清洗。数据格式错误、数值错误等多种原因导致的脏数据,若不经过清洗就直接解析传入到架构组件中,会产生极大的成本和时间代价[17]。数据实时处理依据数据实时传输运行框架环境,采用Kafka和Spark Streaming实现。Kafka是一个分布式、高吞吐、基于发布订阅的消息系统[18],具有持久化、高吞吐、分布式、多客户端支持以及实时等特点,适用于离线和在线的消息消费。利用Kafka技术可在廉价的PC Server上搭建起大规模消息系统,从而大幅提升数据实时处理能力,其结构如图3所示。
图3 Kafka结构
Broker:在Kafka集群上一个服务器称为一个Broker。
Topic:每条发布到Kafka集群的消息都有一个类别,称为Topic。
Consumer:向Topic订阅,并且接受发布到这些Topic的消息。
Producer:负责发布消息到Kafka Broker。
数据与处理:主要对数据的合法性进行检测,包括界限值、奇异值,以及数据内部一致性等,该文主要检测各字段是否与气象行业数据质控体系相符,如要素数值、日期等格式。
2.2.3 数据分布式存储技术分析
经过Spark Streaming 流式处理后的数据将存储到HBase[19]。HBase的实现包括三个主要功能组件,即库函数,一个Master主服务器和许多个RegionServer。主服务器Master负责管理和维护HBase表的分区信息,维护RegionServer列表、分配Region和负载均衡。RegionServer存储和维护分配给自己的Region,用来处理来自客户端的读写请求。客户端则不依赖于Master,它是通过请求ZooKeeper获取存储了Region和RegionServer映射关系的元数据表信息,然后直接从RegionServer获取数据。ZooKeeper是一个分布式应用程序协调服务[20-21],提供统一命名服务、配置管理和分布式锁等基础服务[22],具有集群管理、Master选举、分布式协调通知和分布式队列等功能[23]。基于以上所述的工作模式,实现了HBase快速响应的特点,其结构如图4所示。
图4 HBase结构
在HBase的使用中,RowKey的设计极为重要。HBase按照RowKey的顺序去遍历所有可能的数据,然后再依次匹配相应列的值,直到获取所需的数据。若RowKey设计不合理,会造成单个Region访问压力过大,难以有效发挥其处理性能。因此,设计的RowKey要确保其具有唯一性,然后充分利用其有序性,均匀地分布在各个HBase节点上。
2.3.1 数据实时采集设计
将自动站文件目录设置为Flume监控目录,Flume将收到的源文件实时解析为数据流发送到Spark Streaming的监控输入目录。Flume是分布式的,可以同步处理到达的多个文件,同时它也提供了许多可调的故障恢复和容错机制,当某个节点出现故障时,数据能够被传送到其他节点上而不会丢失,从而保证数据的完整性。以下是源文件以气压要素进行解析数据流的代码样例。
static List
String[] tmp = null;
FileReader fr = new FileReader(FileName);
BufferedReader br = new BufferedReader(fr);
while ((line = br.readLine()) != null) {
tmp = line.split(" ");
for (int i = 0; i < tmp.length; i++) {
tmp_list.add(tmp[i]);
}
}
p_list.add(tmp_list.get(1));//获取气压标识段所有元素数据
String[] p Tmp = null;
p Tmp = p_list.get(0).split(" ");
String station_P = p Tmp[1];//台站气压数据
String sea_P = p Tmp[2];//海平面气压数据
PressureClass2 pc2 = new PressureClass2();
pc2.setStation_P(station_P);//封装台站气压数据
pc2.setSea_P(sea_P); //封装海平面气压数据
2.3.2 数据实时处理设计
数据在前端由Flume收集起来,通过Kafka来做缓存和容灾,最后由Spark Streaming来做实时处理。为减小代码间的耦合性,设计中将自动站各要素的质控算法代码单独放入一个特定的Streaming来对数据流进行实时清洗,再将Kafka的Topic中清洗完成后的数据放入另一个Topic中供后续的业务来处理。相关代码示意如下:
String topics = "weatherTopic";
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("name");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
sparkContext.setLogLevel("WARN");
JavaStreamingContext ssc = new JavaStreamingContext(sparkContext, Durations.seconds(1));//初始化StringContext
Collection
Map
kafkaParams.put("metadata.broker.list", "ip:port");//消费端设置
kafkaParams.put("bootstrap.servers", " ip:port ");
2.3.3 数据质控算法设计
气象自动站源文件数据要素多达100余种,设计中针对常用的基本气象数据要素,如气压、温度、降水、风速、湿度等,依据气象行业标准质控技术规程中对质控对象的分类,进行流数据的基本质控算法设计。本质控算法设计分为分钟数据质控和小时数据质控两类,质控方法则选用基本的格式检查法、界限值检查法和内部一致性检查法,相关规则如下所述。
规则1 在QC方法质控码(QCcode)中,为每个QC方法设置7级质控码,取值范围为-3~3,其中0表示正确,±1表示可疑,±2表示警告,±3表示数据错误。QC码的符号表示疑误数据偏离真值的方向,负号表示疑误数据偏小,正号表示疑误数据偏大,即数据质量随着控制码数值绝对值的增加而降低。为方便使用,提出各质控码的符号表示形式,用f(e)表示数据格式检查法的QC码。f1(e)表示界限值检查法的QC码,f2(e)表示范围值检查法的QC码,f3(e)表示内部一致性检查法的QC码,其中e表示质控的要素,如P表示气压,T表示温度,U表示相对湿度,F表示风速,R表示降水。
规则2 在数据格式检查法中,按照地面自动站气象要素资料(国家站)格式说明。本站气压、海平面气压、最高本站气压、最低本站气压字段长度均为5 Byte;台站气温、最高气温、最低气温、分钟相对湿度、最小相对湿度、2分钟平均风速、10分钟平均风速、最大风速、小时降水量、每1小时极大风速、过去6小时极大风速、过去12小时极大风速均为4 Byte。
规则3 在界限值检查法中,各气象要素的界限值阈值范围参考气象观测规范进行设定,如 气压数据定义范围为[500,1 200],温度为[-55,55],湿度为[0,100],小时降水为[0,600],风速为[0,150]。
规则4 在内部一致性检查法中,定义气压、温度和相对湿度的第60分钟的数据同小时正点数据不一致即为警告数据,即E59≠En。此外,对于同一文件内的数据,当前时刻的气压、温度要素值应介于最小值与最大值之间,即Emin≤En≤Emax,但需提出的是,当前的温度值(用Tn表示)应不小于当前的露点温度值(用Td表示),即Td≤Tn。对于特殊的要素,如相对湿度,其当前时刻的值不应小于最小相对湿度值,即Emin≤En,本定义中的E均表示质控的要素。具体算法说明如表1和表2所示。
表1 分钟数据质控算法设计说明
表2 小时数据质控算法设计说明
2.3.4 数据分布式存储表结构设计
在该系统数据库的表结构设计中,需充分考虑源文件数据中所含基本信息要素的唯一性,利用MD5方法将行主键站号散列化,方便将所有数据散列到不同的Region上,从而有助于提高数据的查询响应效率。因此将行主键设计采用MD5(站号)+源文件内观测时间组合的方法存储Spark Streaming实时质控后的要素数据,将质控后的数据存储在一个定义为aws_qcalldata的列族中,具体设计如表3所示。
表3 自动站表结构
MD5算法是在MD4算法基础上由美国密码学家罗纳德·李维斯特(Ronald Linn Rivest)设计,通过该算法能够将任意长度的文本转换为一个固定长度(128位)的散列值,成为一个不可逆的字符串,从而有效保证了数据的安全性和信息传输的完整性[24-26]。功能实现代码样例如下:
public static String getSaltMD5(String password) {
Random random = new Random();
StringBuilder sBuilder = new StringBuilder(16);
sBuilder.append(random.nextInt(99999999)).append(random.nextInt(99999999));
int len = sBuilder.length();
if (len < 16) {
for (int i = 0; i < 16 - len; i++) {
sBuilder.append("0");
}
}
String salt = sBuilder.toString();
password = md5Hex(password + salt);
char[] cs = new char[48];
for (int i = 0; i < 48; i += 3) {
cs[i] = password.charAt(i / 3 * 2);
char c = salt.charAt(i / 3);
cs[i + 1] = c;
cs[i + 2] = password.charAt(i / 3 * 2 + 1);
}
return String.valueOf(cs);
}
bc.setId(MD5Utils.getSaltMD5(stationNum)+ObservTime);//basic info 字段引入MD5算法
系统测试运行环境采用4台虚拟化服务器做集群,其具体部署情况如表4所示。
表4 测试环境部署架构
系统运行环境基于Spark 2.1.3版本,在程序中引入,数据库集群由MPPDB 6.5.1.5构建,Flume版本基于1.7.0,kafka为1.0.0,以及jdk 1.8。
江苏共有国家基本气象观测自动站70余个,每个文件表示该台站在某时刻所采集的气象数据信息,文件内容包含了百余类气象要素数据。测试中则采用实时业务应用的国家基本气象观测站共享目录中文本文件作为源数据,将数据滚动复制到应用服务器Flume实时监听目录data/listen_data中,使用Spark Streaming将Kakfa中的流数据写入MPPDB,针对单个源文件及多个源文件从目录监听、解析、实时质控至数据入库的全流程耗时分别进行5次统计,取平均耗时作为测试结果,具体数据如表5和表6所示。
表5 单文件数据流处理性能测试结果 ms
分析表5和表6可知,单站文件从源文件解析至入库全流程平均耗时在4 s以内;多个文件同时传输时,每个文件入库的全流程平均耗时约1.46 s,能够达到秒级数据处理能力,与现有的气象自动站数据在1 min内到达预报员桌面的要求相比,完全满足实时业务的应用需求。
表6 多文件数据流处理性能测试结果
在多场景数据查询性能测试中,根据常用的业务查询需求,将多个场景的SQL写入至一个查询文件,同时对MPPDB数据库的数据翻倍到TB量级后进行批量查询,查三次取平均值。
测试结果如表7所示。
表7 多场景数据查询性能测试结果
测试结果表明,在不同场景的查询条件下,该系统的点查询响应为毫秒级,加权查询为秒级,能够有效地支撑实时业务中对气象自动站数据的查询应用。
从业务实际应用需求出发,基于Spark Streaming的流式计算框架,开展以数据流式采集、传输、质控、存储为一体的气象自动站数据全流程设计及应用研究,通过模拟业务中使用的情景进行性能测试,验证该系统的可行性和适用性。通过对测试结果的分析,表明该系统能够有效地提升气象自动站数据的实时处理和查询能力,与现有的数据处理系统相比具有以下优点:
(1) 将自动站数据常用的气象要素质控算法设计并融入流处理组件中,实现基于数据流式采集、传输、质控、存储为一体的气象自动站数据全流程处理功能,减少了数据落地处理节点,进一步提高了数据处理的时效性和可用性。
(2)实现数据加密算法设计并应用在分布式数据库中,在提高数据存储和检索效率的同时,极大地提升了数据的安全性。
(3)系统的设计开发从实际业务应用需求出发,且部署灵活,可作为省级自动站质控数据存储的实时备份,为气象业务提供更加可靠的数据保障。
但此系统在应用研究中也存在一些不足,如搭建系统环境的虚拟计算资源比较缺乏,数据库和应用服务器配置低于部署要求的最低配置标准。同时,在集群应用的规划上,为节约资源,将主应用和中间件部署在同一台虚拟服务器上,一定程度上都影响了数据实时处理的性能。在后续的应用研究中,将把文件所有气象要素数据的实时质控处理纳入设计工作,进一步完善数据质控算法,搭建资源充足的系统环境,优化Spark Streaming的作业动态调度配置以及调优系统集群各组件的一致性、容错性和高可用性等,开展更为深入的业务应用研究。