徐 达,曾 乐,王英杰
(国家气象信息中心,北京 100081)
气象综合业务实时监控系统—“天镜”[1]是国家气象信息中心为建设统一数据环境、整合分散独立的监视业务建立的通用、综合、高效的集约化监视平台。“天镜”能够为全国气象部门在收集、分发、入库、数据同步各个环节提供实时观测数据和产品的数据全流程监视服务。目前“天镜”每小时接收处理气象业务监视全流程[2]数据记录达到3千万条,累计接入的数据资料超过400种,为了使目前的数据全流程监视业务可以更高效地在大数据计算和分布式存储架构上运行,需要对目前海量监视数据的处理中加大对计算策略和存储策略的研究力度。
Spark[3]是对海量数据计算处理的重要工具和手段,是基于弹性分布式数据集(RDD)的数据结构,具有数据流模型特点。RDD将数据保留在内存中,且允许用户程序多次查询,降低了对磁盘和网络的开销,适用于在线计算和迭代计算。“天镜”系统使用Spark计算全流程数据,并按全国、省、市县维度的统计指标进行汇聚。气象资料的接入和监视环节的扩展使得需要计算和处理的监视信息激增,使得Spark运行作业时间变长,这对于满足时效性要求而言需要缩短计算任务的运行时间,一种方式是从Spark集群框架和配置参数进行修改和优化,另一种方式则是通过对程序代码进行改动,采用最优的计算策略来提升计算效率。
2017年10月,中国气象局批复了由国家气象信息中心牵头,国家级各业务单位共同参与建设的气象综合业务实时监控系统(一期)项目。该项目旨在建立技术先进的监控系统技术框架,实现综合监视和告警运维核心功能,建立规范的监控信息采集接口,监视范围横向覆盖气象资料现有数据流程各环节,纵向覆盖信息系统从网络及安全、服务器、存储、中间件、应用软件运行状态。气象综合业务实时监控系统(一期)计划2018年底建成后,将完成气象综合业务实时监控的基础框架,建立系统的硬件平台和技术平台,从技术上解决了原MCP系统面临的性能瓶颈问题,建立规范化的监视信息采集接口,实现监视告警的核心功能,实现国家级基于CIMISS数据环境的资料数据流程的收集、分发、解码入库、接口服务等环节的监视,以及CMACast卫星广播系统、部际系统等系统的监视。但是随着监视信息不断增长,现有的运行环境在处理计算上会有延迟,尤其是在中国地面分钟级资料的实时监视上会出现页面为0的情况[4]。
国外气象行业的监视系统也是主要围绕着数据传输网络、数据收集生成、数据质量、观测设备状态进行监控,如美国国家海洋和大气管理局(NOAA)建设了观测系统监控中心(OSMC)实时监测全球海洋观测系统的性能[5],欧洲中期天气预报中心(ECMWF)通过常规观测告警系统检测数据可用性和质量问题[6],美国国家环境预报中心(NCEP)的实时数据监测系统(RTDMS)主要监测数据的数量和时效性[7]。国外的数据监视系统是基于传统的数据资料文件入库,并对该文件资料进行质量评估后,绘制该类观测资料的打点时序图,对资料进行分类监视。ECMWF和NOAA更加侧重资料到报后的质量情况,通过设计测试的数值预报模式来校验到报的观测资料是否合格,通过地图打点的方式提供数据服务,并用颜色来区分该类资料的数据质量情况。
围绕《全国气象发展“十三五”规划》提出的“智慧气象”发展目标,气象业务在实施现代化、信息化、集约化、标准化的进程中,都需要监控系统来保障业务的高效稳定运行。但是,各气象业务的现有监控系统都是独立开发和运维,监控系统分散且数量庞大,运行维护人力成本高;各监控系统仅监控业务流程中的独立环节,上下游监控信息无法共享,缺乏对业务全流程的总体监控,出现故障时准确定位故障位置困难、分析故障原因不及时,导致业务监控运维效率低。因此,急需实现对观测、信息、预报预测、公共服务及政务的全流程、全要素、全过程的一体化监控和运维,以提升气象业务运行管理的质量和效率。2016年底,按照中国气象局统一部署,由预报司牵头组织与协调,观测司配合,信息中心作为实施技术组组长单位,协同各成员单位上下一心,通力合作,共同推动气象综合业务实时监控系统建设,树立和打造气象综合业务监控品牌——“天镜”[8]。
气象全流程监控实现对数据从收集、分发、入库、数据同步到应用的全流程、全生命周期监控。在收集环节由国内气象传输系统(CTS)收到气象资料后,经过文件打包处理后,把文件分发给业务系统和用户。在入库环节中解码入库程序按照气象要素、时次等条件进行拆解,按照存储规则录入不同的数据库中。为了提供气象资料查询服务,需要将解码后的数据在不同类型库中进行同步。在气象资料全流程监视设计中需要对收集、分发、入库、同步环节进行监视。全流程实时指标见表1,计算依赖于节目表信息和总控配置信息,节目表信息用来指定该类气象资料资料是否为考核资料,总控配置信息主要包含:资料业务时次配置信息、单站的单环节的单时次及时配置信息、统计规则(时次、时次截日、时次截小时、小时、日)、各个环节之间的关联关系、文件级资料的应收数、检测告警开始时间、需要告警指标、告警持续时间等相关配置。
表1 全流程实时计算收集环节核心指标
Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目。目前Spark生态系统已经发展成为一个包含多个子项目的集合,包含SparkSQL、Spark Streaming、GraphX、MLlib、等子项目。
Spark是基于内存计算的大数据并行计算框架,与Hadoop的MapReduce相比,Spark基于内存的运算速度更快,同时保证了高容错性和高可伸缩性,Spark实现了高效的DAG执行引擎,从而可以通过内存来高效处理数据流[9-10]。
在“天镜”中,Spark的体系架构如图1所示。“天镜”采用Standlone模式部署Spark集群,通过Zookeeper,一个开源的分布式应用程序协调服务软件进行集群管理,在Spark集群上创建常驻的SparkSession即常驻的Driver进程用于交互Spark程序,SparkSession中包含开源的ActorSystem,一套开源的用于设计跨处理器和网络的可扩展弹性系统。服务端的ActorSystem向Zookeeper注册自身的地址。在外部调度任务模块的驱动下,将获取服务端的Actor-System地址,随机选择其中一个地址,提交SprakSQL任务,SparkSQL任务提交成功后,会把任务和接收提交的ActorSystem信息注册到Zookeeper,用于后续查看SparkSQL任务状态和取消任务。
图1 “天镜”中Spark的体系架构
全流程各环节监视信息通过接口网关进入后至高速缓冲通道,一路数据直接入库进行持久化,一路数据进行标准化构建和数据清洗形成中间结果表(见表2)。
表2 台站级资料预处理后中间结果表
根据总控配置表的业务频次(cron表达式[0 0 0/1 * * ? ]、统计规则[时次、时次截小时、时次截日、小时、日])信息计算出业务时次,并生成一个sparkSQL文件存入到HDFS中,提交给spark计算,计算考核指标的SparkSQL语句如下:
1.--考核应收
2.sum(coalesce(CO_CHECK_TD,0))AS CO_CHECK_TD,
3.--考核及时收
4.sum( casewhen CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_INTIME_ACTUAL,
5.--考核逾期收
6.sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_LATETIME_ACTUAL,
7.--考核实收数
8.(sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)) AS CO_CHECK_ACTUAL,
9.--考核缺收数
10.(sum(coalesce(CO_CHECK_TD,0)) - (sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))) AS CO_CHECK_LOC,
11.--考核及时率
12.(sum( casewhen CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)/sum(coalesce(CO_CHECK_TD,2147483646))) AS CO_CHECK_INTIME_RATE,
13.--考核到报率
14.((sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))/sum(coalesce(CO_CHECK_TD,2147483646))) AS CO_CHECK_RATE.
“天镜”系统部署在36台IntelX86物理服务器上(见图2),其中5台服务器用于部署网关模块(gateway),数据预处理模块(standardizer)主要负责接收监视信息的收集和全流程中间结果(指标详情)的处理,3台服务器用于部署消息中间件(kafka)集群,用于数据的高速缓存,避免因数据量过大导致后端数据库写入压力过大。18台服务器部署分布式日志数据库用于对监视信息的原始指标,中间结果,最终计算指标进行存储。用于计算的Spark集群(版本2.3.1)[11]部署在5台CPU 24核,内存256G,3.2TSAS磁盘,操作系统为Centos7.3服务器上。
图2 “天镜”-气象数据全流程系统架构
基于Spark计算引擎对气象全流程监视信息进行实时处理,作业调度任务每分钟执行一次,按照台站级气象资料(StationDiStaticJob)和文件级气象资料(FileDiStaticJob)分为两个计算任务。随着接入的气象资料种类越来越多,每分钟处理的监视信息也呈几何级增长,执行的Spark任务的耗时在20分钟以上,导致气象全流程监视界面中气象区域站资料无法及时显示。与此同时,运维人员发现Spark集群中有个别节点的负载特别高,这种情况是因为数据源单个spark input read数据量过大,或者单个task相对于其他task spark input read较大的情况,导致的读取数据源明显不均匀[12]。因此尽量使用可切割的文本存储,生成尽量多的task进行并行计算,可以从数据源避免倾斜,并从源头增大并行度[13]。通过观察Spark任务管理页面可以看到已完成的计算任务资源使用和耗时情况,如表3所示,正常计算任务需要分配计算资源10核,内存5 GB。
表3 优化前Spark任务运行监视结果
进行Spark计算任务的优化的目的,是为了充分利用硬件本身的性能,最大限度地提升Spark中Executor的执行效率[14-17]。依据气象全流程监视界面资料展示情况,拆分为地面资料、海洋资料、高空资料、辐射资料、农业与生态资料、大气成分、雷达数据、卫星数据、气象服务产品、数值预报产品共10类资料,每类资料又分为考核资料和非考核资料。相较于优化前,虽然增加了SparkSQL模板的复杂度,但是提升了气象考核资料的计算效率,该文以传输环节考核资料为例,新增的SparkSQL模板如下:
1.base.sql.co.checks=sum( casewhen CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END) AS CO_CHECK_TD, sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_INTIME_ACTUAL, sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_LATETIME_ACTUAL, (sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)) AS CO_CHECK_ACTUAL, (sum( case when CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END) - (sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))) AS CO_CHECK_LOC,(sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)/coalesce(sum( case when CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END), 2147483646)) AS CO_CHECK_INTIME_RATE,((sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))/coalesce(sum( case when CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END), 2147483646)) AS CO_CHECK_RATE
在向Spark进行任务提交时,客户端处理程序需要将气象资料按照上述分类进行拆解,核心代码如下:
1.…
2.for (TabMcmConfig config : tabOminCmCcSubsystem Allconfigs) {
3.// 1、资料编码
4.String ctsCode = config.getcCtsCode();
5.String sodCode = config.getcSodCode();
6.//资料大类
7.String dataClass = config.getDataClass();
8.String ctsSodCode = ctsCode.concat(":").concat(sodCode);
9.//文件级还是站点级
10.String dataSourceType = config.getcDataSource();
11.if ("1".equals(dataSourceType)) {
12.if(!fileDiComputeEnabled) {
13.continue;
14.}
15.dataSourceType = "file";
16.}else {
17.dataSourceType = "station";
18.}
19.…
此段代码通过获取总控配置后对每类气象资料进行分类,分类后生成的计算任务与生成的SparkSQL模板匹配,从而完成计算任务拆解,单个SparkSQL只计算一类考核资料或者一类非考核资料。
该文采用自动化测试的方法,由于对程序代码结构进行了修改和微调,因此需要对优化后的全流程指标计算结果正确性进行验证。正确性可以根据监视页面中资料的统计指标和系统告警进行判断,如图3所示,可以通过查看Spark作业任务日志进行验证,如表4所示。该文展示的全流程监视界面与优化前资料监视统计指标计算结果一致,并且中国地面分钟降水数据在一级界面中可以显示正常。优化后单个计算任务的计算时间控制2分钟以内。
图3 气象综合业务实时监控系统—“天镜”全流程监视界面
表4 优化后Spark任务运行监视结果
通过拆分计算任务,生成尽可能多的task增加Spark计算并行度,成功将气象全流程计算框架优化并业务运行,如表5所示,获得了10倍的加速效果,提高了程序的运行效率。但是“天镜”系统在处理大数据计算时还是有瓶颈,原因是地面区域站气象资料会产生大量重复数据,要能够高效处理海量的监视数据,除了对计算任务拆分,还需要对计算任务设置优先级,针对核心资料优先分配计算资源计算,这就需要业务人员对资料的监视等级进行配置,同时要熟悉Spark资源分配机制,在此基础上来做系统优化,能够较好地提升优化效果。
表5 “天镜”全流程Spark计算任务优化前后运行时间