胡英楣,王甫棣,谭小华,邢丽平,乔 淼
1(国家气象信息中心,北京 100081)
2(湖北省气象信息与技术保障中心,武汉 430074)
3(内蒙古自治区气象信息中心,呼和浩特 010051)
气象资料是气象业务和科学研究的基础[1],地面、高空、卫星、雷达等海量多源观测数据及产品的实时收集与分发对于气象资料的共享与使用具有重要意义.气象通信系统承担气象观测资料、预报预测和服务产品收集与分发,是连接气象综合观测系统、气象预报预测系统和公共气象服务系统的桥梁与纽带,是支撑气象业务和大气科学研究的基础业务平台[2].
1992 年10 月,气象卫星综合应用业务系统(“9210工程”)获批立项.1998 年,依托9210 工程建设的国内气象通信业务系统投入业务运行,通信传输方式也由卫星通信改为地面宽带通信.该系统适应当时的计算环境和气象资料需求进行设计,为保证数据处理的一致性和完整性,采用了单一进程方式依托数据节目表控制来进行气象资料的接收、处理和分发[3].随着地球环境观测、全球气候观测系统、全球大气观测系统等国际性的观测网络的日趋完善以及各类气象观测手段的不断发展,国内参加交换的数据种类、数量、频次也随之猛增,尤其是自动站地面观测、高空探测、多普勒雷达、卫星、闪电定位、GPS、沙尘暴、大气成分、生态、水文原始观测数据以及预报产品等新增的气象数据.9210 国内气象通信业务系统在扩展性方面的弊端逐渐显现,原有通信系统在资料的传输时效能力上不能满足业务的需求,国家级和各个省级信息部门自行开发了传输系统来满足新增探测数据的传输业务需求.这种单一进程方式的通信业务处理模式是制约系统扩展能力的原因之一.
消息队列一种线性表,它提供了有保障的消息传递、有效的路由、安全性、事务处理支持以及基于优先级的消息传递,是用于创建分布式、松散连接的消息通信应用的关键技术之一,在气象通信系统有着广泛的应用[4].
2006 年,依托中国气象局气象宽带网项目,新一代国内气象通信系统开展建设[5].该系统为了解决各类新增气象探测资料的数据传输,采用模块化的设计并内置消息队列(Message Queue,MQ)方式实现数据收发内部进程通信[6],实现系统内部功能模块的解耦以支撑多机部署,增强系统适应海量新增资料通信的可伸缩性.
在新一代国内气象通信系统中,利用消息队列技术实现通信系统收发任务的并发执行、异步处理以及任务分解/聚合处理等,满足各类气象资料不同的处理逻辑以及多用户多方式传输的需求.比如针对气象资料的预处理以及格式检查等不需要与主收发进程同步执行的任务启用一个新的任务消息队列进行异步处理;将不用用户接收数据不同的传输方式、传输路径切分到不同的任务消息队列中,也可以将不同的数据针对同一用户的分发合并到一个任务消息队列中.
依托新一代国内气象通信系统的建设成果,2009 年,在全国综合气象信息共享平台(China Integrated Meteorological Information Service System,CIMISS)中,建设了数据收集与分发子系统(China Telecommunication System,CTS),它是整个CIMISS 数据处理前端,负责收集来自国内气象综合探测系统、互联网、业务单位、行业部门的各类观测数据和加工产品,进行规范化的预处理,实现国家级与省级、省级与省级之间数据的互联互通,保障气象信息传输共享的时效性和可靠性,为预报预测、公共气象服务等提供有力的数据支撑[7],是国内气象通信系统的延续.CTS 于2015 年底开始投入业务试运行,2016 年3 月全国业务化.
在CTS 中应用了Active MQ 消息中间件服务器,将消息队列技术扩展到多机集群间进程间通信.利用可靠异步传输机制实现高性能、高可用的消息传输中心并对外提供统一标准的公共消息传输服务,降低系统的耦合度及管理的复杂性,屏蔽底层异构平台和跨网络域业务交互,实现系统不同功能逻辑单元的气象业务和控制指令消息的消息交互,并推动气象数据在系统间流转,实现应用的互连和互操作[8].
基于消息队列技术的提升,相较于9210 气象通信系统、新一代国内气象通信系统,CTS 的布式特性得到增强,是典型的多机多线程模式的分布式系统.在业务运行初期能够满足业务的扩展性和实时性等性能要求.但随着各类气象资料的统一接入以及服务用户的不断增加,系统出现了性能下降甚至出现消息队列堵塞等问题.
消息队列[9-11]按照其是否记录队列状态可划分为无状态化消息队列和有状态化消息队列:
(1)无状态化消息队列.任何应用进程的操作将发送消息到队列中,其他系统根据需要订阅该消息,然后按照需求进行业务逻辑处理.在面向对象(object-oriented)软件设计开发过程中,设计模式(design pattern)代表了最佳的实践,是软件开发人员在软件开发过程中面临的一般问题的解决方案.无状态化消息队列便是采用了典型的发布订阅设计模式,一个消息可以有多个消费者.
(2)有状态化消息队列.将各类请求按照不同的状态划分到不同的队列,从而使得不同的队列出现问题后相互不影响;还可以进行优先级区分,一些重要请求可以优先处理等.有状态化消息队列按照队列类型可以延伸包括等待队列、排重队列、本地执行队列、失败队列等;按照优先级可以划分为普通队列、优先队列等.有状态化消息队列采用了观察者模式,它属于行为型模式的一种,通过定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新.将观察者模式用于气象信息系统也有不少实践案例,比如在气象卫星数据接收与预处理调度机制的设计中应用该模式构建了相应的数据接收与预处理分发作业调度机制[12],在北京全球信息系统中将此模式应用于缓存数据处理[13].
新一代国内气象通信系统和CTS 第一版的设计采用的无状态化消息队列这种模式,在系统资源能力充足的条件下,采用这种方式是最为简化且保证传输效率的模式.
以CTS 为例,图1 所示为CTS 数据收发的流程:对每一个收发任务(task),数据收集进程(common collection)将准备好处理分发的数据送入处理目录,并将待处理消息推送到待处理消息队列中;在不考虑优先级控制条件下,数据处理进程(proc)遵循消息队列先进先出原则顺序处理队列中的消息;若处理成功则将待分发消息推送到待分发队列中,并按照待分发目的用户组织数据目录;最后数据分发进程(dist)申请分发资源并根据待分发队列中消息的通知处理用户的数据分发.
图1 CTS 优化前数据收发流程图
作为一个7×24 不间断运行的业务系统,CTS 也设计了数据收发任务监控并进行相应的容错处置功能.为了保障收发系统的数据的完整性,通过任务进程反馈的成功与否标志来判定是否重新处理.换句话说,系统并不记录消息队列状态,也不记录当前实际任务处理情况,通过从消息队列的订阅情况来进行错误处理.如图1 所示,当数据处理进程(proc)处理失败时,会将此消息重新排入队尾等候下一次处理;对于数据分发进程(dist)若分发失败时也采取同样的处理方式.
假设系统的计算资源足够大,当外部应用消费队列中消息时,出现异常后发送的回滚消息始终都无法占满消息队列,所有的消息最终都会被消费掉.在不计较正常数据的传输时效条件下,这种消息的顺序处理和异常回滚策略是保证所有数据传输可靠传输的可以接受的解决方案.但这种理想状态下的设计方案没有考虑复杂的业务情况,特别是在出现气象宽带网络或通信服务器导致的收发系统持续性的异常时.
CTS 每日处理着个数为千万级别、容量为TB 级别的气象实时资料,同时数据的时效性是气象通信系统最重要的评价指标.若按此功能设计,伴随着CTS接入资料量和分发用户的逐步增加,CTS 中异常状态信息会在被处理失败后不断追加回消息队列的末尾,不断的捕获异常消息并进行处理会占据大量正常资源,简单的循环处理造成CTS 处理正常任务能力下降,整体传输时效降低.更为关键的问题是:如果当这类收发任务出现若干这种持续性异常时,当不断被处理和回滚的异常消息占满队列时,正常的消息将无法再进入队列,导致队列陷入死循环状态且无法自动恢复,CTS一旦瘫痪必须进行人工处理.
要想使正常的数据收发不受影响,需要将消息队列中的异常任务消息及时移除,并且在异常状态恢复之前,不再向分发队列中发送异常用户的消息.考虑引入有状态化消息队列的模式改进CTS 数据收发功能:引入一个状态文件,通过记录消息队列中收发状态信息来进行容错处置,并且根据状态文件记录的不同状态,对收发系统进行不同的处理逻辑.
如图2 所示,本文方法在现有收发系统中增加状态文件,作为一个抽象的“观察者”,通过扩展抽象类形成收发任务状态、处理状态和分发状态等不同的“观察者”.每个“观察者”进一步包含具体的状态处理操作.比如对于分发状态,包括对于因为目的用户分发失败、申请系统分发资源失败、配置文件用户状态暂停标志、以及数据量超限等;对于处理状态,可能包括正常状态、异常状态以及因异常次数未达阈值的异常状态等.
图2 采用有状态化消息队列方案的建模
参照改进方案的建模设计,设计并实现了如图3所示的CTS 优化流程:对于数据处理未成功的任务,系统将判断状态文件的情况,如处于异常状态且异常次数超过阈值的,将待处理文件写入磁盘(落盘),更新状态文件并记录,以待后续处理;对于数据分发未成功的任务,系统将判断状态文件的情况,如处于异常状态且异常次数超过阈值的,将待分发文件写入磁盘(落盘),更新状态文件并记录,以待后续分发.系统独立创建数据再处理(reproc)和数据再分发(redist)进程会按照休眠时间策略以及优先级专门处理任务处理状态为异常的消息,并不断修改状态文件中的信息.当状态文件记录的任务处理状态正常时,再处理(reproc)和再分发(redist)进程会重新唤起落盘目录文件,送入正常的处理流程中.这样实现的好处在于所有异常的消息不会直接送入正常消息队列,保证了正常消息队列的快速处理.同时,对于各类异常状态的不同处理也使得系统的容错能力得到增强,提升系统的可靠性.
为提高用户状态监控的合理性与准确性,对异常次数进行实时累加和递减处理,通过阈值范围提供用户状态是否正常的判定条件.
通过这些状态的排列组合又形成收发任务(task)的多种状态,在系统实现中通过状态文件的标志进行区分.
(1)数据处理/再处理(proc/reproc)获得消息向数据分发(dist)写入消息文件时,生成.task,表明任务初始化状态.
(2)数据分发(dist)获得消息后,准备处理,将任务从.task 修改为.tasking,表明任务的正常状态.
(3)若用户状态文件中分发状态正常时,而数据分发(dist)申请计算资源失败时,生成.task.src,表明资源失败状态.
(4)用户状态文件中分发状态为异常时,数据处理/再处理(proc/reproc)会向数据分发(dist)目录下写入.task.block;数据分发(dist)直接将消息落盘为.task.block,表明任务阻塞状态.
(5)无论用户状态文件中用户状态是否正常,一旦数据分发/再分发(dist/redist)发送失败,都会生成.task.fail,表明任务失败状态.
(6)若用户分发状态暂停或数据量超限时,数据处理/再处理(proc/reproc)会向数据分发(dist)目录下写入.task.susp,表明任务挂起状态.
优化后的CTS 的系统稳定性明显提升,未出现过因异常队列占满消息队列造成CTS 瘫痪的重大故障.同时,本次优化更因为减少了大量不必要的异常消息处理,使得通信传输性能明显提高,数据收集与分发的小时平均时间延迟由优化前的20 s 以上减少至10 s 左右.图4 选取2016 年5 月(优化前)与2018 年10 月(优化后)CTS 优化前后多日平均各时次传输最大时间延迟、最小时间延迟、平均时间延迟进行对比,可以看出,优化后的各项时间延迟较优化前均有明显的减少.
图3 CTS 优化后数据收发流程图
图4 CTS 优化前后各时次平均传输时间延迟对比
通过引入有状态化消息队列模式的设计,将异常消息的状态准确记录,通过不同的系统进程区分正常消息和异常消息的处理,保证了异常消息不会占据宝贵的正常消息队列,减少异常消息被反复处理的资源消耗,既提升了通信系统的可靠性,也优化了传输时效性.经过CTS 的优化效果也证明了该技术思路的有效性.目前国内气象通信软件系统第二版(CTS2.0)正在建设中[14,15],在CTS 的基础上增加了消息和流两种通信传输模式,提升了实时气象数据高时效传输能力,该优化思路值得在新系统的建设借鉴并运用.