于起超 韩旭 马丹璇 罗登昌
摘 要: 为了解决传统数据清洗工具面对海量数据时复杂度高、效率低的问题,设计实现了流式大数据数据清洗系统。利用分布式计算技术清洗数据,以解决性能低的问题。该系统由统一接入模块、计算集群和调度中心三部分组成,实现了多种数据源的统一接入,分布式处理,并通过Web界面进行清洗流程的交互式配置。实验结果表明,面对海量数据的时候,流式大数据数据清洗系统的性能强于传统的单机数据清洗,提高了清洗效率。
关键词: 数据清洗; 大数据; 流式; 分布式架构
中图分类号:TP311 文献标识码:A 文章编号:1006-8228(2021)09-01-04
Abstract: A streaming big data ETL system was designed and implemented so that the problem of high complexity and low efficiency of traditional ETL tools in the face of big data can be resolved. The system uses distributed computing technology to clean the data to solve the problem of low performance. The system consists of the unified access module, computing cluster and dispatching center. It realizes the unified access of multiple data sources and distributed processing, and the interactive configuration of cleaning process through web interface. The experiment results show that in the face of big data, the performance of the streaming big data ETL system is better than the traditional single machine ETL ones, which improves the cleaning efficiency.
Key words: ETL; big data; streaming; distributed architecture
0 引言
大数据的来临改变了很多传统工作方式,其中就包括ETL。ETL即数据抽取(Extract)、转换(Transform)、清洗(Cleaning)、装载(Loading)的过程,是数据仓库建设的重要环节,负责整个数据仓库的调度[1]。其效率的高低和清洗数据质量的高低,直接决定数据仓库建设和决策的正确性。传统的数据清洗框架有如下几个问题。
⑴ 数据清洗性能低
传统的数据清洗方式处理的数据量往往很小。在大数据时代数据量很可能每天成TB级别增加,加上有些清洗算法消耗很高的计算能力,导致传统的数据清洗方法异常缓慢,甚至很难正常运行[2]。
⑵ 數据源多样性
大数据的多样性导致数据源的复杂多样[3]。数据仓库的数据往往来源于许多不同的系统,每个系统又包含多个模块,每个模块又包含单独的数据源。从数据结构来看,不仅包括结构化数据还包括各种复杂的半结构化数据[4],这不仅提高了ETL程序的复杂性,也加大了维护的难度[5]。
针对现有数据清洗面临的问题文中提出了流式大数据数据清洗系统,系统采用Kafka做中间件将接入数据与处理数据进行解耦[6],在一定程度上解决了传统数据清洗框架普遍存在的问题。
1 系统设计
1.1 系统总体架构设计
流式数据清洗架构如图1所示。系统涉及多种数据源,包括Excel、监测日志、关系型数据库等。数据源通过统一数据接入模块进行统一封装后推送入分布式消息队列Kafka中。计算集群消费数据并执行清洗操作,最终将清洗后的结果输出到数据仓库中。
这种架构主要有以下优势:①将不同类型数据都转换成流的形式,使不同数据在形式上进行统一。清洗数据的计算节点只需关心具体数据不需要处理数据来源问题。②清洗数据采用并行分布式方式处理,提高了数据清洗的性能。计算节点可以根据实际负载情况进行扩展,具有很强的扩展性。③交互式的调度中心可以根据需求对清洗流程进行可视化配置,降低了数据清洗的复杂度。
1.2 统一数据接入模块设计
1.2.1 统一数据接入架构设计
如图2为统一数据接入的架构图,统一数据接入模块主要包括定时器,文件监控,SQL执行三大子模块。
⑴ 定时器
定时器模块为文件监控模块和SQL执行模块提供定时功能,通过定时来控制数据的采集速率。用户通过界面配置定时,可针对于每一种数据源定制执行周期。
⑵ 文件监控
文件监控模块是针对于日志文件的采集而设计的模块。当监控的文件夹内有文件增加时,文件监控模块读取新增文件,将文件按照约定的解析规则进行解析,生成规定的统一数据协议并推送kafka中。
⑶ SQL执行
SQL执行模块实现对Mysql,Oracle,SQL Server等关系型数据库的采集。SQL执行模块定时从数据库中读取一批次的数据,并转化为统一数据协议推送Kafka中。
1.2.2 统一数据协议设计
统一数据协议如表1所示。主要有以下几个字段。uuid为动态生成的每条数据的唯一id,nameId为数据源的唯一id,timestamp为生产这条数据的时间。fields为一个字符串数组,存放关系型数据库的字段名称或者Excel的列名。dates存放具体的数据值。
1.3 计算集群模块设计
计算集群由多个计算节点组成,计算节点架构图如图3所示。
⑴ 接口
接口模块用于和调度中心模块和统一数据接入模块进行通信,包括数据源配置接口、集群管理接口、流程调度接口等接口。接口模块采用RPC接口协议,RPC协议即Remote Produce Call远程过程调用协议,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。RPC基于高效的二进制传输,与HTTP协议相比,字节大小和序列化耗时更低。
⑵ 同步器
同步器模块用来与数据库中调度作业进行同步。该模块保存作业运行的实时状态,并在作业重启后读取作业最后运行状态,从而保证清洗作业的正确运行。
⑶ 元数据
元数据模块保存数据源数据结构的信息,缓存清洗数据的字典码表信息。
⑷ 流程解析器
流程解析器模块通过接口模块读取作业清洗流程的配置信息,并将配置信息解析成数据清洗对应的有向无环图。
⑸ 算子执行器
算子执行器读取配置的清洗参数,并调用算子中的清洗方法进行清洗。算子执行器无需关心具体的清洗流程,只需关注于算子中的清洗方法,使得数据清洗具有可扩展性。
1.4 调度中心模块设计
调度中心模块作为系统用户交互的窗口,为用户提供可视化的清洗流程配置界面,方便各种复杂清洗里的配置。调度中心模块有数据源管理、集群配置、算子管理、清洗字典管理、清洗流程管理等功能模块,如图4所示。
⑴ 数据源管理
数据源管理模块针对不同的数据源提供统一的配置管理功能。该模块对数据源提供接入规则,主要包括定时周期、监控文件夹、抽取SQL语句、统一数据协议等,并与统一数据接入模块进行交互,控制统一数据接入模块的启动与停止。
⑵ 集群管理
集群管理模块对统一接入模块集群和计算集群提供管理和监控功能,如监控集群的上线和下线、监控集群资源的利用情况、监控清洗作业执行情况并对错误作业提供预警功能。
⑶ 算子管理
算子管理模块提供对计算算子进行统一管理功能。计算算子分为计算算子和输出算子。计算算子用于数据清洗,输出算子用于清洗结果的输出。常用的输出算子有Elasticsearch输出算子、Hive输出算子、数据库输出算子、Kafka输出算子等。在添加算子的时候需要配置算子的执行函数、算子描述、参数名、参数类型等。
⑷ 清洗算子字典管理
清洗算子字典管理模块针对于字典替换算子而设计的。该模块提供字典配置功能,并将映射关系缓存Redis中。字典替换算子读取Redis缓存,并作字典映射处理。
⑸ 清洗流程管理
清洗流程管理模块为用户提供交互式的清洗流程配置功能。用户通过Web界面,在画布上拖拽清洗算子,配置每一个清洗算子对应的清洗参数,并按照清洗规则进行连线,形成一个由起点到输出的流程图。这种可视化的配置方式,给用户提供了直观的清理流程控制,降低了数据清洗的复杂度。
2 系统实现
2.1 计算节点数据清洗的实现
计算节点的清洗流程如图5所示。计算节点主动从Kafka拉取消息,并获取消息中的nameId,判断nameId是否存在清洗规则库,若清洗规则库不存在则跳过清洗流程,若存在则进行清洗环节。计算节点从调度中心临时库获取对应的清洗流程图,并遍历流程图中的每一个算子。算子利用JAVA的反射原理调用清洗算子中的清洗函数执行清洗工作。
2.2 计算算子的实现
计算算子将清洗规则进行封装形成一个完整的可控的工具类,主要包括数据验证,数据转换,数据去重,缺失插值等[7]。
本文计算算子的设计参考了Hive的UDF的设计理论,UDF(User Defined Function)提供了一种接口,可以对流中的每一条消息进行相应的处理[8]。算子类继承接口UDF,并实现接口evaluate的函数,evaluate函数有两个参数一个是从流中获取的一条消息protocol,args是从调度中心获取的自定义参数。
3 实验分析对比
3.1 实验环境
为了对比传统方式和基于流式计算分布式清洗框架数据清洗的性能,本实验采用三台服务器来部署计算集群,用一台服务器部署传统清洗方式Kettle。具体的配置如表2所示。
实验数据来自与地质的险情日志数据,现采用100万、300万、500萬与800万条日志数据进行实验测试。测试计算算子有字典替换算子,输出算子是Elasticsearch批量输出算子。
3.2 实验结果
如图6所示为传统Kettle单机方式和分布式数据清洗方式所用时间的折线图。通过折线图可以看出,随着数据量增加,分布式方式斜率变缓,且所用时间明显小于Kettle的传统方式。
4 结束语
随着大数据技术的不断发展,数据清洗在异构数据集成和数据仓库等领域的研究与应用越来越引起人们的重视[9]。但由于数据的复杂性,数据清洗具有很大的难度[10]。本文从数据清洗的概念出发,分析了大数据背景下数据清洗所面临的困难与问题。提出了一种基于流式计算的数据清洗框架,并对框架架构和各个模块进行剖析,并通过实验对比分析,证明了系统在运行效率有良好的表现。可以看出系统具有很强的扩展性,通用性,大大简化了数据清洗的操作流程,具有一定的实际价值[11]。此框架待完善的方面有:①数据流程图目前只能支持简单的流程配置,更为复杂的流程还未适配到调度中心。②算子的扩展性有待提高,目前支持的算子有限。③支持的数据源有限,目前只支持三种数据类型,未来可增加更多的数据种类。④计算集群计算能力有待提高,未来可将计算集群移植到Spark,Hadoop等并行计算框架上[12]。
参考文献(References):
[1] 刘佳俊,喻钢,胡珉.面向城市基础设施智慧管养的大数据智能融合方法[J].计算机应用,2017.37(10):2983-2990
[2] 何刚.基于Hadoop平台的分布式ETL研究与实现[D].东华大学,2014.
[3] 黃毅,钟碧良.基于XML的异构数据库间数据迁移的研究[J].科技管理研究,2008.28(8):173-174
[4] 刘华,胡燕,王涛.Web数据清洗研究[J].软件导刊,2007.3:75-77
[5] 叶舟,王东.基于规则引擎的数据清洗[J].计算机工程,2006.32(23):52-54
[6] Garg N.Learning Apache Kafka-Second Edition[J].2015.
[7] Kandel S, Heer J, Plaisant C, et al. Research directions in data wrangling:visualizations and transformations for usable and credible data[J]. Information Visualization,2016.10(4):271-288
[8] 范会丽,彭宁,任薇.基于Hadoop平台的数据清洗研究[J].电脑知识与技术,2020.16(5):33-34
[9] 李垚周,李光明.分布式数据清洗系统设计[J].网络安全技术与应用,2020.2.
[10] Rahm E, Do Honghai. Data Cleaning: Problems and Current Approaches[J].Data Engineering,2000.23(4).
[11] 王铭军,潘巧明,刘真,et al.可视数据清洗综述[J]. 中国图象图形学报,2015.20(4):468-482
[12] Li X, Mao Y. Real-Time data ETL framework for big real-time data analysis[C]//IEEE International Conference on Information & Automation.IEEE,2015.10.