基于Spring Batch+Gemfire+CXF的金融大数据集成和整合

2020-09-02 01:22朱铮雄黄宇青
计算机应用与软件 2020年8期
关键词:调用数据处理处理器

朱铮雄 黄宇青

1(海际金控有限公司 上海 200120)2(上海市计算技术研究所 上海 200040)

0 引 言

巴塞尔协议Ⅲ[1]根据银行交易数据与风险资产计算一级资本的充足率。银行的各种交易数据的集成和整合是整个巴塞尔系统的核心。传统方式采用脚本和数据库存储过程[2]进行数据整合,串联起上游数据源系统和下游商业报告系统。随着金融全球化的步伐加快,数据量呈几何级增长,从原先面临亿级数据处理到面对每天万亿级以上的处理数据,系统面临极大的瓶颈。原先采用的全部是存储过程,导致数据库瓶颈大,单个节点扩展性能难,数据处理速度慢,存储过程难以维护。同时下游报表系统也严重依赖上游数据库[2],数据库经常卡顿,严重制约整个巴塞尔系统运行的准确性、时效性。提升数据处理性能,提高自动化程度,降低系统之间耦合度是摆在整个银行运维和开发面前的重要问题。

本文采用Spring Batch的系统构架方式,重新架构整个数据整合系统,并结合目前业界最为流行的Web Service对数据采集、数据处理、数据发布、数据持久化等各个应用模块按照面向服务模式进行解耦合。本文将这些应用模块按照面向服务的设计理念重新定义接口,使整个模块有机地联系起来,每个服务相互独立并且可以以一种统一和通用的方式进行交互。当某个服务无法适应数据压力时,可以进行水平扩展,在保持各个模块独立的扩展性的同时进行创新,确保了数据整合的可靠、稳定、高效,且满足巴塞尔系统的运行要求,成为海量数据整合的核心枢纽。采用这种整合方式,数据处理效率高,系统易于扩展以及被下游系统接入。

1 相关技术

1.1 Spring Batch

Spring Batch[3]是一款优秀的、开源的大数据并行处理框架。通过Spring Batch可以构建出轻量级的健壮的并行处理应用,支持事务、并发、流程、监控、纵向和横向扩展,提供统一的接口管理和任务管理。它是一款基于 Spring 的企业批处理框架,通过它可以完成大数据并发批量处理。使用Spring Batch可以实现下列目标[4]:

(1) Batch Data:能够处理大批量数据的导入、导出和业务逻辑计算;

(2) Automation:无须人工干预,能够自动化执行批量任务;

(3) Robustness:不会因为无效数据或错误数据导致程序崩溃;

(4) Reliability:通过跟踪、监控、日志及相关的处理策略(retry,skip,restart);

(5) Scaling:通过并发和并行技术实现应用的纵向和横向扩展,满足数据处理的性能需求。

Spring Batch良好的大数据批处理的性能和高可扩展性,使其被广泛应用于各类自动化的数据迁徙系统中,包括超大数据的气象系统[3]。

1.2 Gemfire

VMware vFabric Gemfire[5]是一个弹性可扩展分布式内存数据管理平台,可用于构建需要超高速数据交互的、具有高度可扩展能力的应用系统。它能够跨越多台虚拟机、多个 JVM 和多个Gemfire服务器来管理应用对象。使用动态备份和分区,使它能提供多种平台特性,例如:数据持久性、可靠的事件通报、连续查询、通用的并行处理、高吞吐、低延迟、高扩展性、持续有效性和WAN分布。

基于以上特性,可以看出VMware vFabric Gemfire非常适合于巴塞尔风险业务管理系统的数据缓存层,它可以满足本文对整合交易数据进行实时访问的需求,其性能可以随着需要弹性扩展,并且可以在多台服务器上实现部署,实现海量内存缓存池的要求。

1.3 Apache CXF

Apache CXF[6]是一个开源的Services框架,CXF支持使用Frontend编程API来构建和开发Services,如JAX-WS。这些Services可以支持多种协议,例如:SOAP、XML/HTTP、RESTful HTTP、CORBA,并且可以在多种传输协议上运行,例如:HTTP、JMS、JBI。CXF大大简化了Services的创建,同时继承了XFire传统,可以天然地与Spring进行无缝集成。

2 数据整合整体设计

2.1 功能需求分析

系统按照功能大致可以分成三个主要阶段:大数据读取,数据整合与映射以及数据计算与存储。

(1) 大数据读取:批量高速读取交易流程上所有的数据,包括交易对手的合同、实际交易、账户信息、证券产品信息以及价格,存入中间高速缓存中。

(2) 数据整合与映射:将高速缓存中的交易对手信息、交易系统、合约信息做一一映射,存入数据库系统中,同时将证券产品中的固收产品单独提取并写入数据库,根据缓存中的合约信息计算净合约,将净合约信息更新进系统中。

(3) 数据计算与存储:先根据已经映射好的数据做风险敞口计算,再根据抵押品现状抵减风险敞口值,最后计算VaR、预期信用风险、客户的风险评级和产品的敏感度值。

图1为系统主要功能模块。

图1 系统功能模块图

各个模块的功能如下:

协议/交易/账户(Agreement/Trade/Account)的BCP Loading:使用Sybase的BCP导入数据。BCP[7]基于DB-Library以并行的方式导入批量的数据,目的是快速导入Agreement/Trades/Account等基础信息进入数据库中待用。

协议数据(Agreement):与交易对手签订的结算合约。

交易数据(Trades):实际交易数据。

账户数据(Account):交易对手等账户信息。

证券数据(Security):证券化的产品信息。

价格数据(Pricing):标价信息。

交易对手/机构映射(Counterparty/Legal Mapping):使用存储过程处理,将交易对手信息(Legal指公司交易机构)与交易信息对应,存入系统表中。

主要数据映射(Master Data Mapping):使用存储过程处理,将交易对手、交易信息以及合约信息数据一一匹配映射,存入系统表中。

固收数据处理(Fix Income Process):将固定收益证券(包括中长期国债、公司债券、市政债券和抵押债券等债务类证券)提取出来,存入数据库中。

价格和数据处理(Pricing & Mapping Process):将标价信息与固定收益证券映射,对固定收益债券进行标价,用于计算市场风险,结果存入数据库中。

净合约映射(Netting Data Mapping):使用存储过程对双边净合约(Netting Agreement)的信用风险抵减计算,并且将得出的数据全部存储更新在系统表中。

风险敞口计算(Analytical Calculation Processor):使用存储过程对数据进行风险敞口的初步计算,并且将得出的数据全部存储更新在系统表中。

抵押品计算(Collateral Mapping):使用存储过程对抵押品进行的计算,并且抵减对应的风险敞口值,将得出的数据全部存储更新在系统表中。

VaR计算(VaR Calculation):使用存储过程计算在一定概率水平(置信度)下,交易数据价值在未来特定时期内的最大可能损失,并将结果存入数据库中。

CEF计算(CEF Calculate):使用存储过程计算信用风险敞口预期值。

机构PDLC计算(Facility PDLC Calculation):根据交易数据更新调整机构客户信用值,同时可以授予信用值。

敏感度映射(Sensitivity):使用存储过程针对固定收益证券按照利率、利差和在投资收益率等数据敏感度对标的资产做数据映射,完成数据的敏感性值预设。

2.2 系统设计

按照上述的系统主要功能需求,数据整合核心服务层主要分为Spring Batch大数据加载和通用服务数据处理两大部分。Spring Batch+Gemfire+CXF 轻量级架构系统结构如图2所示。

图2 核心服务层系统结构图

本文利用Spring Batch批量读入大文件,使用Mapper将文件数据映射成对象数据,根据数据类型适配不同的数据处理器(Processor),并调用通用服务中与之对应的服务进行数据处理,将处理完成的数据存储进数据库,并发布在Gemfire cache中。

根据系统功能划分,有如下处理器:

(1) 客户处理器(Customer Processor):使用Spring Batch导入客户及交易对手信息,并调用客户服务获取匹配交易对手信息。

(2) 协议处理器(Agreement Processor):使用Spring Batch导入合同信息,并调用协议计算服务获取匹配合同以及净值优惠计算的信息。

(3) 机构处理器(Facility Processor):使用Spring Batch导入客户信用信息,并且调用机构数据服务获取客户信用的信息。

(4) 证券处理器(Securities Processor):使用Spring Batch导入证券信息,并且调用证券化服务获取证券的信息。

(5) 计算处理器(Calculate Processor):使用Spring Batch导入计算信息,并且调用计算服务和投资组合服务来计算整个信息。

(6) 敏感度处理器(Sensitivity Processor):使用Spring Batch导入敏感度信息,并且调用敏感度服务来比较和计算敏感度。

2.2.1 Spring Batch大数据加载

数据处理流程如图3所示。

图3 数据处理流程图

本文将处理的数据分解为Job,并且为其定义属性和基础设施,通过Reader、Processor和Writer来实现数据业务处理、基于Pojo的开发以及领域对象描述。本文分别使用了如下组件:

(1) 批量任务加载器(Batch Loader):使用Autosys定时启动Spring Batch开始处理文件。

(2) 任务库(Job repository):用来持久化Job的元数据,是所有Job的中心仓库。

(3) 任务启动器(Job launcher):从Job的中心仓库取出一个Job,并且启动。

(4) 任务(Job):Batch操作的基础执行单元。

(5) 步骤(Step):Job的一个阶段,一个Job由一组Step构成,其中Tasklet Step包含一个事务过程,包含重复执行、同步、异步等策略。

(6) 单元数据(Item):从数据源读出或写入的一条数据记录。

(7) 单元数据读取(Item Reader):从给定的数据源读取Item集合。

(8) 单元数据业务处理(Item Processor):调用通用服务中对应的服务组件,对Item进行逻辑业务处理(包括数据映射和数值计算等。

(9) 单元数据写入(Item Writer):把Item写入数据源。

针对超级大数据,我们还可以对Spring Batch进行横向和纵向的扩展,确保整个系统能够应付超大的数据集合压力。对任务进行扩展的几种方式如表1所示。

表1 扩展方式描述

每个Step都可以并行处理,Step并行处理模式使用了在一个节点上横向处理,但随着作业处理量的增加,如果一台节点服务器无法满足Job的处理,可以采用Partitioning Step的方式将多个机器节点组合起来完成一个Job的处理。如图4所示,主服务器对Item读、写的处理逻辑进行分离,通常情况下将读操作放在一个节点进行,将写操作分发到另外的节点执行。这样做到了负载均衡和主从复制,理论上只要增加处理服务器,就几乎可以无限提升Step的处理业务能力。

图4 集群处理结构图

定义任务举例:

……

processor=″compositeItemProcessor″

writer=″cacheWriter″

commit-interval=″1000″ />

……

上述任务中,定义了一个交易数据的加载,定义使用fileReader批量文件读取组件和使用Gemfire的缓存写入cacheWrite,同时也定义了数据处理器Composite Item Processor映射。

核心处理器Spring配置举例:

在此配置中,使用了数据组合类Composite Item Processor处理Transaction Item对象,把Mapping Processor、Customer Processor、Securities Processor、Agreement Processor和Facility Processor、Trade Level Calculation Processor和Remove Obj Reference Processor等单个处理器“串联”在一起生成 Transaction 对象,这样就可以通过调整配置文件,达到各个组件复用和灵活配置的目的。

以配置中的客户处理Customer Processor为例子,在Spring中定义bean:

实现customerProcessor类:

public class CustomerProcessor implements ItemProcessor {

public BaseTransaction process(BaseTransaction txn) throws Exception {

setAccountDetail(txn);

return txn;

}

protected void setAccountDetail(BaseTransaction txn) {

this.accountDataService.matchAccounts(txn);

this.transactionService.processHouseAccounts(txn);

}

}

处理客户数据需要进行两个步骤:(1) 将客户账户信息进行映射;(2) 将集团客户内部挂消帐内部处理掉。

2.2.2 Gemfire做数据通用服务的DAO

通用服务主要提供核心业务逻辑处理,并以Web Service方式将数据发布在网络上。其主要体系结构如图5所示。

图5 通用服务系统结构图

本文主要采用面向接口的方式对通用服务进行设计。通过Java Interface。接口中对服务实现方法进行抽象定义,并对方法进行具体实现。通过面向接口编程,可以完成统一调用,应用在不同数据源上,比如对于同一个DAO接口,分别有Gemfire、jdbc、hibernate以及jms的实现,当Service调用DAO的save功能时,可以同步完成对Gemfire、数据库以及jms的数据存储(发布)。

本文以交易服务为例,定义了Transaction Data Service的创建交易数据(create)接口:

public interface TransactionDataService {

public void create(BaseTransaction tx);

}

GemfireDAO通过对缓存节点Region块的存储传入交易数据,实现了创建交易数据(create)接口,在实现过程中,注意使用同步来确保数据的原子性:

public class TransactionGemfireDAOImpl implements TransactionDAO {

public BaseTransaction create(BaseTransaction tx) {

if(tx != null){ this.getTransactionRegion().put(tx.getId(),tx);

}

return tx;

}

public Region getTransactionRegion() {

String regionName = ″/transactionRegion″;

if(this.transactionRegion == null){

synchronized(this){

this.transactionRegion = RegionUtils.getRegionByName(regionName);

}

}

return transactionRegion;

}

}

2.2.3 CXF完成通用服务数据接口发布

通用服务通过CXF发布标准的Web Service,不仅可以做到水平横向扩展,同时也方便与下游系统主要系统对接,按照其要求定义对应的接口,本文以敏感性分析为例,在保持其他条件不变的前提下,研究单个市场风险要素(利率、汇率、股票价格和商品价格)的变化可能会对金融工具或资产组合的收益或经济价值产生的影响。

在Web.xml中除了传统加载Spring配置文件外,还需要加载CXF的Servlet,完成Web Service的映射:

CXF

CXFServelt

org.apache.cxf.transport.servelt.CSFServlet

CXFServelt

/*

在Spring中定义bean:

本文还定义了Sensitivity接口:

@WebService

public class SensitivityWebSerivce implements SensitivityService {

@WebMethod

public String getTransactionss(@Webparam(name=″asOfDate″)String asOfDate, @WebParam(name=″tradeType″)){

Criteria criteria = new Criteria(asOfDate, tradeType);

return transactionDataService.getTransactions(criteria);

}

}

本文通过定义一个敏感度的查询数据接口,直接调用内部的Transaction Service查询接口,间接地把内部交易服务标准的Web Service的方式提供给下游系统使用。同时因为采用的是分布式部署,系统扩展很方便,下游运行报表期间,直接增加CXF的节点就轻松应对了高访问需求,同时隔离了对数据库的压力。

3 应用效果

本文在使用Spring Batch+Gemfire+CXF框架重构了整个整合系统后,在银行内部管理系统中的3台8核CPU的虚拟机(SpringBatch 80 GB内存,Gemfire 120 GB内存,CXF80 GB内存)上进行了UAT环境测试实验。计算处理1.4 TB数据所需时间从2小时变为20分钟。此外,通过提升部分风险计算服务器CPU性能,进一步提升了空间。下游系统读取系统稳定性大大增强,没有再发生因为下游系统查询导致核心数据库宕机的情况,极大地节省了人力维护成本和机器运行成本。

4 结 语

传统巴塞尔数据整合系统依赖存储过程和ETL工具,系统复杂,性能较差,而Spring Batch 和Gemfire的整合应用在业界整合使用案例较少。本文提出一套全新的基于Spring Batch+Gemfire+CXF轻量级应用架构来处理海量的巴塞尔整合数据,用于替代之前大量的存储过程的设计。整个系统通过实际测试、运行,表现良好,具有各层间低耦合、高扩展性、高可靠性的特点,性能远超传统依赖存储过程的数据整合系统。基于Spring Batch+Gemfire+CXF架构是一个行之有效的轻量级、大数据、低成本应用整合解决方案,可以推广到大数据、高可用、企业级、可伸缩的企业、银行等应用开发中。

猜你喜欢
调用数据处理处理器
认知诊断缺失数据处理方法的比较:零替换、多重插补与极大似然估计法*
基于低频功率数据处理的负荷分解方法
无人机测绘数据处理关键技术及运用
高层建筑沉降监测数据处理中多元回归分析方法的应用研究
高层建筑沉降监测数据处理中多元回归分析方法的应用研究
系统虚拟化环境下客户机系统调用信息捕获与分析①
基于属性数据的系统调用过滤方法
ADI推出新一代SigmaDSP处理器
利用RFC技术实现SAP系统接口通信
火线热讯