基于Storm的物联网海量实时数据流处理研究

2016-04-08 03:45张前进
常州工学院学报 2016年6期
关键词:数据源海量异构

张前进

(安徽国防科技职业学院信息工程系,安徽六安237011)

基于Storm的物联网海量实时数据流处理研究

张前进

(安徽国防科技职业学院信息工程系,安徽六安237011)

针对物联网中数据实时与异构的特点,设计了基于云计算的海量实时数据感知与处理模型。结合海量实时数据并行处理机制与数据处理流程,利用开源实时处理系统Storm,实现了海量实时数据流处理过程,并给出了核心实现方法。

物联网;实时数据;海量数据

物联网的概念是国际电信联盟(International Telecommunication Union,ITU)在2005年的《ITU Internet reports 2005-the Internet of things》中提出的。ITU认为“通过对事物内嵌RFID微型芯片或者传感器芯片,通过互联网就能实现物与物、人与物、人与人之间的信息交互,从而形成一个无所不在的物联网”[1]。随着物联网技术的快速发展,以及智慧城市、智能交通、智慧农业等项目的建设,传感器的数量呈指数级增长,IDC报告中指出,到2020年全球传感器数量将突破2 120亿个[2]。如此庞大数量的传感器每时每刻都在产生实时数据,其数据规模是海量的。在海量的物联网数据中,一方面,数据本身是异构的,既有存储在关系数据库中的结构化数据,又有web形式的半结构化数据,还有音视频等文档形式的非机构化数据;另一方面,很多应用场景的实时数据具有时效性,如智慧农业中的天气、环境等。针对物联网实时数据规模与特点,本文利用分布式实时处理Storm开源架构,设计了基于物联网的海量实时数据处理系统。

1 物联网海量数据实时感知与处理模型

物联网中除了数据是异构的,感知设备本身的数据采集协议、传输协议也是异构的。为了屏蔽设备的异构,在物联网数据实时感知与处理模型中引入云服务平台,如图1所示。

图1 物联网实时数据感知与处理模型

感知层:感知层由传感器、二维码标签、RFID标签、M2M终端等感知设备组成,是物联网体系中的重要组成部分[3],负责利用传感设备节点采集与感知实时的环境数据。由于感知层是物联网的基础,其可靠性决定了物联网系统的稳定性,因此,感知层的主要任务是构建一个低成本、高可靠性的感知网络。

传输层:传输层由接入网和物联网关两部分组成。接入网负责将采集到的传感数据通过网络进行传输,由无线传感网、局域网、卫星网等组成。因此,接入网是一种异构网络,不同的网络,其网络协议、传输协议均不相同,要达到不同网络间信息的互联互通,需要将异构网络进行融合。物联网关能够实现不同感知网络协议与接入网络协议间的转换以及将数据按照统一格式进行封装,从而解决异构网络间的互联互通问题。

处理层:处理层是物联网海量实时数据流处理系统的核心部分。数据采集与处理模块负责采集传输层上传的数据,并按照业务规则对数据进行转换处理,将无意义的离散数据与业务建立映射关系,变成有意义的业务数据。云资源服务平台负责为应用层提供与平台、网络无关的统一数据资源服务。处理层中引入云服务平台,为实时数据处理提供可扩展的、具有弹性的云计算服务[3],有效提高整个系统的数据处理能力。

应用层:应用层由智能交通、智慧农业、智慧城市等智能应用与服务组成。应用层直接面向用户,负责为用户提供不受时间、地点限制的智能服务。

2 海量实时数据处理流程

2.1 海量实时数据的并行处理机制

相对串行计算而言,并行计算是指将一个按照顺序执行的计算任务,分解成若干个可以同时执行的子任务加以并行执行,从而完成整个计算任务,并行计算的主要目的是快速解决大型且复杂的计算问题[5]。

基于物联网感知的数据具有流式数据的特点,主要表现为:数据是一组时间序列下针对感知对象相关状态属性的数据,数据间具有无相互依赖的特征[6]。同时,实时感知的物联网数据还具有量大流速快的特点。因此,对实时感知的海量物联网数据的处理,不仅要求系统具有高可靠性、高稳定性,还要求系统能够快速处理。为了满足海量数据处理的实时性要求,采用数据并行处理技术完成数据的实时处理。

并行处理机制如图2所示,首先根据感知数据源对象属性特征进行并行划分,然后根据数据分发机制将上述数据分发到不同的并行处理节点上,并行节点按照预定义的计算规则对收到的数据进行运算处理,最后将各个并行处理节点处理的中间数据汇总到合并节点上,通过运算形成为上层应用服务使用的最终数据。

图2 海量实时数据并行处理机制

2.2 海量实时数据处理流程

海量实时数据处理流程主要包含数据采集、数据预处理、实时计算分析与存储、UI展示等过程。具体实现步骤如下:

1)数据采集。通过传输层采集由感知层产生的各类实时数据,之后将数据发送到数据控制服务器。

2)数据预处理。数据预处理实现对数据的分类、清洗、格式转换等操作。

3)实时计算分析与存储。首先将预处理完成后的数据放入待处理消息队列,按顺序将消息放入并行计算集群完成实时计算分析。然后分两条线处理,一条线直接进入第5步,另一条线进入第4步。

4)计算结果存储与深度分析。这样可以更好地为用户提供个性化的服务。

5)UI展示。

3 基于Storm的分布式海量实时数据处理系统设计

大数据处理可以分为批处理和实时流处理两种模式[7]。对于大数据的批处理模式,目前较为流行的是基于MapReduce与HDFS的开源框架Hadoop分布式存储计算平台[8]。Hadoop开源框架适合对历史数据的集中处理,例如大规模网站访问日志的分析、大型购物网站的网页索引等,但它无法满足物联网中大规模实时感知数据的处理。大数据实时处理方面,较流行的有Storm、Spark、Samza等基于Apache的开源框架。本文基于Storm开源框架设计分布式海量实时数据处理系统。Storm框架最初由BackType开发,2011年被Tritter公司收购,同年由Tritter在GitHub上将其开源。

3.1 Storm基本组件

Storm开源框架主要分为Nimbus和Supervisor两种组件,这两种组件都是快速失败的,没有状态。任务状态和心跳信息都保存在Zookeeper上,Zookeeper是Storm重点依赖的外部资源。

Nimbus是控制节点的后台程序,负责为工作节点分配工作和发送代码,并且监控工作节点的工作状态[9],全局只有一个Nimbus。

Supervisor是工作节点的后台程序,每一个工作节点上运行一个,负责接受控制节点Nimbus分配的任务,会监听分配任务给它的那个控制节点,根据需要关闭或者启动工作进程worker。

一个工作节点会运行一个或多个工作进程,每一个工作节点都会执行一个Topology任务子集[10]。Topology是Storm框架中运行的一个封装计算任务逻辑的实时应用程序,由Spout和Bolt构成。工作节点与控制节点的通信与协调都是通过Zookeeper来实现。

3.2 基于Storm的分布式海量实时数据处理系统

3.2.1 系统架构

基于物联网的海量实时数据流处理系统架构如图3所示,由数据源接入模块、数据缓存模块、Storm集群、Hadoop集群、关系数据库、UI展示等部分组成。

图3 系统架构图

1)数据源接入模块

数据源接入模块负责为数据处理集群快速接入数据源。本文采用的是基于开源的Apache Flume日志系统实现各种实时感知数据源的快速接入,Flume是Cloudera提供的一个高可靠、高可用的分布式海量日志采集及聚合和传输系统,还可以用于历史数据的收集。同时,该模块还能够对数据进行简单处理。

2)数据缓存模块

对于实时数据处理,如果数据流量较大,数据处理模块的处理能力可能无法达到,甚至引起宕机。因此,系统引入Kafka系统数据缓存模块,Kafka系统是一个高吞吐量的分布式发布订阅消息系统,通过O(1)的磁盘数据结构保证消息的持久性和稳定性,其吞吐量可以支持每秒数百万的消息。

3)Storm与Hadoop集群

Storm集群负责实时数据流的处理,由一个主节点和若干从节点组成。使用Storm的一个Spout插件storm-kafka可以持续不断地从Kafka系统中读取数据,然后通过Storm集群进行实时运算,同时将运算结果通过关系数据库实现持久性保存。Hadoop集群由一个主管理节点和若干个计算节点组成,负责对历史数据的批处理,实现对物联网感知数据的深度分析与挖掘。

3.2.2 系统实现

1)数据源接入与数据缓存组件集成

Flume本身自带了诸多数据源的sink,对于已存在的sink只需针对环境修改配置文件即可。本文通过自定义Flume中的Kafka sink实现两者的集成,自定义Flume的sink需要继承AbstractSink并实现Configurable接口,该接口主要包含消息处理process()方法和sink配置configure(Context arg0)方法。

•process()方法实现示例:

public Status process() throws EventDeliveryException{

……

byte[] body=event.getBody();

final String msg=new String(body);

final KeyedMessage message=new KeyedMessage(topic,msg);

producer.send(message);

……

}

•configure(Context arg0)方法实现示例:

public void configure(Context arg0) {

Properties prop=new Properties();

prop.put(″zookeeper.connect″,zookeeperValue);

prop.put(″metadata.broker.list″,brokerValue);

prop.put(″serializer.class″,StringEncoder.class.getName());

producer=new Producer(new ProducerConfig(prop));

}

2)ISpout与IBolt接口实现

Storm中的计算主要分为两种类型,一个是数据源的处理Spout和中间数据的处理Bolt。Spout作为task运行在worker内容,主要负责数据的发送,其核心方法为nextTuple()。

•nextTuple()方法示例:

public void nextTuple() {

……

this.collector.emit(new Values(sendData));//发送数据

}

Bolt负责节点处理,既可以进行简单的数据处理,也可以实现数据流的合并等复杂计算。其核心方法为excute(Tuple tuple,BasicOutputCollector collector)。

•excute(Tuple tuple,BasicOutputCollector collector)方法示例:

public void execute(Tuple tuple,BasicOutputCollector collector) {

String sentence=(String) tuple.getValue(0);

//数据处理逻辑

……

collector.emit(new Values(out));

}

4 结语

物联网感知数据的实时处理在现实中有较大的应用价值,例如智能交通中实时交通情况大数据分析,更加有利于公众服务。本文设计的基于Storm的海量数据的实时处理系统具有高可靠、高效的特点,可以很好地应对物联网数据异构的问题,并且可以胜任大规模的实时处理任务。

[1]高哲,翁祖泉.基于物联网海量数据处理的实时数据库应用研究[J].中国集成电路.2013(11):57-58.

[2]DIGNAN Larry.Internet of things:$8.9 trillion market in 2020,212 billion connected things[EB/OL].(2013-10-03)[2016-10-20].http://www.zdnet.com/article/internet-of-things-8-9-trillion-market-in-2020-212-billion-connected-things/.

[3]马骏,郭渊博,马建峰,等.物联网感知层基于资源分层的多用户访问控制方案[J].电子学报,2014(1):28.

[4]罗剑明.制造物联网的实时数据感知与处理模型的研究[D].广州:广东工业大学,2015:12.

[5]王慧.基于Hadoop的并行挖掘算法的研究[D].北京:首都师范大学,2013:3-4.

[6]赵卓峰,魏文飞,马强.基于无共享架构的海量感知数据实时处理系统[J].微电子学与计算机,2012,29(9):10.

[7]王铭坤,袁少光,朱永利,等.基于Storm的海量数据实时聚类[J].计算机应用,2014,34(11):3078.

[8]覃雄派,王会举,杜小勇,等.大数据分析:RDBMS与MapReduce的竞争与共生[J].软件学报,2012,23(1):35-36.

[9]李川,鄂海红,宋美娜.基于Storm的实时计算框架的研究与应用[J].软件,2014,35(10):17.

[10]邓立龙,徐海水.Storm实现的应用模型研究[J].广东工业大学学报,2014,31(3):115.

责任编辑:杨子立

Real Time Massive Data Stream Processing in Internet of Things Based on Storm

ZHANG Qianjin

(Department of Information Engineering,Anhui Vocational College Of Defense Technology,Lu′an 237011)

According to the realtime and heterogeneous characteristics of data in the Internet of things,a realtime massive data sensing and processing model was designed on the basis of cloud computing.Based on the open source real time operating system Storm,a combination of real time massive data parallel processing mechanism and data processing flow realized real time massive data stream processing with core implementation method proposed.

Internet of things;real time data;massive data

10.3969/j.issn.1671⁃0436.2016.06.007

2016-11-21

安徽省高校自然科学研究重点项目(KJ2016A120)

张前进(1982— ),男,硕士,讲师。

TP391

A

1671- 0436(2016)06- 0030- 04

猜你喜欢
数据源海量异构
一种傅里叶域海量数据高速谱聚类方法
试论同课异构之“同”与“异”
海量快递垃圾正在“围城”——“绿色快递”势在必行
Web 大数据系统数据源选择*
基于不同网络数据源的期刊评价研究
异构醇醚在超浓缩洗衣液中的应用探索
overlay SDN实现异构兼容的关键技术
一个图形所蕴含的“海量”巧题
LTE异构网技术与组网研究
基于真值发现的冲突数据源质量评价算法