杨 帆, 何正伟, 刘力荣
(1.武汉理工大学 航运学院, 湖北 武汉 430063;2.内河航运技术湖北省重点实验室, 湖北 武汉 430063;3.国家水运安全工程技术研究中心, 湖北 武汉 430063)
随着“一带一路”倡议的稳步推进,水上交通已经成为中国同各国之间贸易往来的重要方式。水路运输由于其成本低、可运载货物量大等优势而成为重要的运输方式,水上贸易的推进能够极大促进国家之间经济贸易的往来与发展。然而,随着水上交通越发频繁,运输量逐年增加,水路运输所带来的交通事故也在不断增长。船舶自动识别系统(Automatic Identification System,AIS)是目前监测与分析船舶状况、维持水上交通稳定进行的重要工具。通过分析AIS设备中的AIS数据,能够对水上交通的交通流量、交通拥堵状况、交通稳定性等方面进行判别与预测。然而,随着水路运输的增长,AIS数据量也在不断增加,传统的数据处理方式很难满足如此大规模的数据处理要求。因此,需要通过大数据处理平台对AIS数据进行实时、有效地分析和处理。其中,AIS数据中存在大量的异常交通信息,这些异常信息往往包含着错误的船舶状态与交通信息,因此,能够实时、高效地对异常信息进行检测十分重要。
为了检测异常的交通数据,不少研究者做了充分的调研与研究。Turochy[1]通过交通流参数的阈值控制对交通异常数据进行检测;陈德旺等[2]提出3种判断快速路交通流异常数据的算法,并讨论了这3种算法之间的集成;李成兵等[3]提出一种基于最小二乘支持向量机的交通异常数据检测方法;Chen Shu-yan等[4]通过对离线数据进行数据挖掘从而检测出交通异常数据。通过对之前的研究方法进行分析,可以看出目前的研究主要是通过对离线数据进行分析和检测,很少涉及到对数据的在线实时分析和检测。本设计通过Spark大数据处理平台对AIS数据进行分析和处理,借助小波分析的方法对船舶交通流量、船舶速度与船舶状态进行分析,检测出异常的交通数据,从而减小异常数据对水上交通研究的干扰。
目前船舶AIS数据中包含了船舶运行的大量数据,是分析水上交通数据的重要工具。水上交通的异常数据主要包含以下几类:a)典型错误数据,这些数据主要是一些明显的交通异常数据,比如纬度91°,船舶速度100等错误数据;b)异常交通情况数据,这些数据包含了异常的交通状况,相比前后时间点的交通情况数据来说,这些数据有着明显的交通信息突变,因此被判断为异常数据;c)缺失数据,这部分交通数据为空,没有包含任何的船舶运行数据。本设计主要是对异常交通情况数据进行分析与检测,提高交通数据的可靠性。
由于AIS数据量庞大,包含船舶交通信息量多,传统的单机处理模式已经无法满足大规模的AIS数据处理的要求。本设计通过Spark数据处理引擎来对AIS数据进行分析与处理。Spark是继Hadoop大数据处理平台之后的又一大数据处理工具,它在继承Hadoop中MapReduce计算框架的同时,又有着出色的并行计算能力,能够快捷、高效地对庞大的数据集进行处理[5-6]。
由于AIS数据通过基站向本地进行传输,因此,数据传输基本能够保证实时性。在数据接收阶段,采用了异步回调机制对数据进行认证和接收,首先向服务器发送带有密码的套接字,服务器进行接收并且校验,如果密码正确,则进行AIS数据的传输,从而保证系统的安全性。Spark将AIS数据转换成弹性分布式数据集(Resilient Distributed Datasets,RDD),通过自身的算子对数据进行一系列转换,最终的处理结果将通过分布式文件系统进行存储。图1为AIS数据接收部分的流程图。
图1 AIS数据接收
本设计通过小波分析对水上交通的异常数据进行分析和检测。相比傅里叶分析,小波分析通过对信号进行一系列的伸缩、平移等转换,对信号进行多角度、多分辨的处理,从而解决傅里叶变换无法解决的问题,使得小波分析方法有着更高的应用价值。
在对水上交通数据进行处理时,假设处理的信号为f(t)∈L2(R),其中,L2(R)为所有平方可积函数所组成的空间。因此,通过小波函数进行小波变换后得到:
(1)
(2)
其中,a为膨胀参数,b为平移参数,此时式(1)称为离散小波变换。当对AIS交通数据进行多层小波变换时,假设此时输入信号为s,通过m层变换后得到子空间为(d1,d2,…,dm,am),并且满足以下关系:
s=d1⊕a1=d1⊕d2⊕a2=d1⊕d2⊕…⊕dm⊕am,
(3)
其中,⊕表示异或逻辑运算,di为信号s的高频部分,ai为信号s的低频部分。因此,通过小波变换将AIS中的数据进行多层分解,能够对高频和低频部分进行分别处理[7-9]。
在AIS数据中,由于信号传输过程受到干扰,或者信号中夹杂着噪声等因素导致交通数据出现异常,这些异常数据一般存在于信号分解后的高频部分。因此,异常检测过程就是通过小波分析的方法,对高频数据进行处理,处理后的高频数据和低频数据进行重构,从而得到重构后数据s′,通过对s′和s进行分析,最终判断异常数据出现的位置以及异常数据的个数,并且对异常数据根据不同情况采取数据清除或数据矫正,从而完成异常数据的检测功能。
本设计借助大数据处理平台对AIS数据进行处理,通过小波分析的方法将数据进行多层分解,通过对高频数据部分进行处理后,将数据进行重构。通过对原始数据与重构数据的分析与检测得到残差序列,从而判断出异常数据的位置和个数,最终的处理结果写入分布式文件系统HDFS中[10-11]。
图2 系统总体框架
首先,AIS数据经过服务器校验并发送后,客户端开始对数据进行接收和读取,需要对AIS数据进行预处理,包含去掉数据为空的数据并且对AIS数据进行解析;解析后的AIS数据会通过Spark大数据处理系统进行处理,具体的处理操作包括map、reduceBykey、groupBykey等算子,统计出水上交通选定区域的交通流量、平均交通速度以及船舶状态;之后,AIS数据会通过小波分析进行处理,经过对AIS数据多层分解、高频信号处理等过程,对数据进行重构;分别将数据和Spark大数据处理平台统计分析后所得结果进行比对和计算,将异常数据检测出来,并且通过对数据进行分析,判断是否完成对数据的修复或提出,最终对整个异常数据检测系统性能进行分析和评价。整个设计的总体框架如图2所示。
AIS数据要通过Spark大数据处理平台对数据进行初步处理和计算。本设计中,选择交通情况中的3个指标进行异常数据分析:交通流量情况、船舶平均速度和船舶状态。其中,交通流量情况通过对AIS数据中的动态数据船舶MMSI(特定船舶的编号)和Spark的reduceBykey算子对特定区域船舶数量进行统计,从而得到不同时间段的水上交通流量情况。船舶平均速度通过对AIS动态数据中船舶对地速度进行统计,通过Spark的aggregate函数对速度进行求解平均值,从而计算出不同时间段的船舶平均速度。船舶状态通过AIS动态数据中的船舶状态进行统计分析,通过统计不同时间段船舶的状态得到船舶状态的总体情况。通过3个不同的算法来实现数据的处理,其中算法1为交通流量情况统计,算法2为船舶平均速度计算,算法3为船舶状态统计,具体的算法如下:
算法1 交通流量算法
Input:AIS动态数据
Output:船舶交通流量
1.ais←HDFS
2.aisf←ais.filter(._=0)
3.aisf,s←aisf.split(",").split(" ")
4.aisf,s,m←aisf.map(_.toInt)
5.f←aisf,s,m.reduceBykey
其中,ais表示从HDFS中读取的原始AIS数据,aisf表示去掉缺失数据后的的AIS数据,aisf,s表示去掉原始数据中的“,”和空格的AIS数据,aisf,s,m为转换为整数类型后的AIS数据。
算法2 平均速度算法
Input:AIS动态数据
Output:船舶平均速度
1.ais←HDFS
2.aisf←ais.filter(._<20)
3.aisf,s←aisf.split(",").split(" ")
4.aisf,s,m←aisf.map(_.toInt)
算法3 船舶状态统计算法
Input:AIS动态数据
Output:各状态船舶数量
1.ais←HDFS
2.aisf←ais.filter(._=0)
3.aisf,s←aisf.split(",").split(" ")
4.aisf,s,m←aisf.map(_.toInt)
5.mumstate←aisf,s,m.reduceBykey
其中,mumstate表示通过reduceBykey计算后的各状态下的船舶数量值。
通过小波分析对船舶异常数据进行检测。首先需要读取HDFS分布式文件系统中的AIS数据,并且选取小波函数和小波分析的层数,本设计中选择的小波函数为db3小波函数,分解层数为3层,通过对数据进行多层次分解后,需要将分解后的数据的高频部分进行阈值选择。通过对船舶交通流量、平均速度和船舶状态进行处理后,可以确定3种交通情况的合理阈值大小,对于不满足阈值大小的异常数据进行选取并且输出,并且通过异常数据分析部分对异常数据进行判断。小波分析部分的算法为
算法4 小波分析算法
Input:AIS动态数据
Output:小波分析结果
1.ais←HDFS
2.aisf←function=db3,layer=3
3.aisf,s←aisf.wlen
4.aisf,s,m←aisf,s.wlen
5.mumstate←aisf,s,m.wlen
其中,ais表示从HDFS中读取的原始AIS数据,aisf表示选择的小波函数为db3小波函数、分解层数为3层处理后的AIS数据,aisf,s、aisf,s,m、mumstate分别表示通过wlen函数分解处理后的结果。
(4)
通过剩余误差e求解出待检测数据的标准差σ,计算公式为
(5)
如果小波分析中的异常数据和标准差之间的关系为vi≤3σ,此时可以对异常数据进行修复;如果异常数据与标准差之间的关系为vi>3σ,表示异常数据与标准差之间的差值过大,此时选择对异常数据进行剔除。该部分的处理算法为
算法5 异常数据分析算法
Input:待检测异常数据
Output:异常数据分析结果
1.ais←HDFS
图3 3种集群环境下的数据处理时间
2.aisf←e,σ
3.If(aisf<=3σ)
aisf,s←aisf
4.If(aisf>3σ)
delete aisf
5.out←aisf,s
本文选取数据的地理位置是经纬度分布在(东经120.725°,北纬32.049°)到(东经120.835°,北纬31.981°)之间的一个矩形区域内的航道,时间范围为2017年2月6日—2月10日。这个范围的航道区域是南通航段,在长江航道中位置特殊且十分重要,它紧靠长江入海口,是长江航道上船舶入海的必经航道,也是海上船舶进入长江的必经航道。南通航段内船舶通航量巨大,船舶水上作业十分繁忙。
AIS大数据处理的集群环境包括了主节点Master和3个从节点Slave1、Slave2、Slave3,分别通过算法1、算法2、算法3对数据进行处理,通过Spark自身的算子分布式计算,分别计算得出船舶交通流量、区域船舶平均速度和船舶状态。数据处理时间如图3所示。
从图中处理时间可以得出,通过reduceBykey算子对船舶交通流量进行统计时,随着数据量的增长,数据处理时间也有所增加,此时4节点的处理性能最好,处理时间最短;通过aggragate函数对船舶平均速度进行求解时,当数据量大于100万条时,2节点与4节点的处理时间基本稳定在2000 ms;对不同船舶的状态进行统计过程中,多节点的处理优势也随着数据量的增长而逐步体现,数据量达到100万条时,数据处理时间基本稳定。
通过算法1、算法2、算法3对数据处理的时间可以得出,Spark大数据平台运用分布式集群的处理机制,能够提高数据的处理速度,并且对于大规模数据来说,分布式的处理环境比单机的处理环境处理性能更好,处理时间更短,数据处理过程更加稳定高效。
通过小波分析对AIS数据进行分解和重构后,可以检测出异常的交通信息,检测结果如图4所示。分别对船舶交通流量信号、船舶平均速度信号和船舶状态信号通过小波分析检测异常数据并进行分析后,采用最小二乘的方法,对检测出的AIS数据中的异常信息进行分析,分别记录检测出的不同交通数据类型中的异常数据、检测中的误判数据和漏判数据,并计算出系统对于异常数据检测的准确率。表1为各交通数据类型的异常数据分析结果。
图4 异常数据检测结果
数据类型异常数据/条误判数据/条漏判数据/条准确率/%船舶流量62332194.6船舶速度54281695.6船舶状态71432493.3
通过对异常数据进行分析可以得出,检测出的异常数据基本符合AIS数据的异常特征,异常数据的检测准确度较高,能够对AIS数据的异常信息进行快速、准确的检测和识别,相比传统的数据检测方法,本系统在准确性和实时性方面有更大的优势。
为了对水上交通的异常数据进行检测,本文提出了一种基于大数据处理平台的AIS异常数据处理与分析方法,通过对庞大的AIS数据集进行在线、实时地处理和分析,找出交通数据中的异常数据并对异常数据进行分析。实验结果表示,异常数据符合水上交通数据的实际情况,并且能够在短时间内快速、稳定地完成数据的处理和检测。
由于AIS数据量大,包含船舶动态数据和静态数据,传统的数据处理模式无法满足处理速度的要求。因此,本文首先构建基于Spark的大数据处理平台,通过Spark自身的算子,在集群环境中对AIS数据进行并行化的处理,处理结果写入分布式文件系统HDFS中,从而保证了数据处理的实时性。
其次,本文对水上交通数据中的3个重要指标:水上交通流量、区域船舶平均速度和船舶状态,通过3种算法分别进行处理。采用小波分析的方法将AIS数据进行多层分解和重构,通过重构后的异常信号检测结果和残差序列信号图找出交通数据中的异常数据。
最后,通过对异常数据进行检测和分析,确定对异常数据是否检测完成,是否存在误判、漏判的现象。统计各个交通指标的异常数据情况和系统的检测准确率。
通过大数据处理平台和小波分析相结合的方法,本文提出的水上交通异常数据监测分析系统能够满足水上交通的实际情况。然而,本文主要是集中在对异常数据的检测和分析,未来的研究中,将会更多地引入对异常数据的修正和预测,并且将通过可视化的工具实时接收处理后的交通数据,以更加直观的方式展示处理结果,为海事部门与海洋环境保护机构提供更加高效、稳定的监管服务。