许森
摘要 文章针对传统方式存储与处理大数据的局限性,提出了基于Flink和Hadoop的卡口数据分析方法,通过Flink对原始卡口数据流进行高效的初步处理和异常数据过滤,得到高质量原始卡口数据,采用HBase读写海量卡口数据,利用MapReduce计算指定时间内所有车辆的路段旅行时间,并利用Mahout中的K-Means进行聚类分析获取有效且符合实际的结果值,通过实验验证,证明方法的有效性和合理性。
关键词 大数据;卡口分析;Flink;Hadoop;分布式框架
中图分类号 TP311.13文献标识码 A文章编号 2096-8949(2023)09-0004-03
0 引言
隨着物联网和云计算的快速崛起,全球数据量大幅度地增加,标志着进入了大数据时代。海量的数据必然无法用人脑来推算和估测,或者用单台的计算机进行处理,必须采用分布式计算架构,依托云计算的分布式处理、分布式数据库、云存储和虚拟化技术。
传统的流处理框架基于微批量处理,时效性不能做到真正的毫秒级别,并且只支持处理时间,即数据到达系统的时间,而不是数据本身的时间戳,这在面对海量实时流数据时会导致数据的乱序和不一致的问题。传统关系型数据库存储与分析大数据时的开销严重影响了数据库性能。
该文针对海量交通卡口数据的过滤存储与基于卡口数据的精准数据分析挖掘进行了大量的研究,利用Flink[1]对海量的初始数据进行初步过滤和计算,利用HBase[2]存储卡口数据,在Hadoop[3]平台上利用MapReduce[4]和Mahout进行卡口数据的数据挖掘分析,获取路段的有效旅行时间,克服了传统手段难过滤海量数据,传统数据库空间扩展性差、查询时间过长等问题。
1 概述
Flink是一种分布式计算框架,可以对无界有界的数据流进行有状态计算。所以,它可以处理实时的流数据,也可以处理离线的批数据,而且可以保证数据的正确性和一致性。在面对城市海量卡口数据的时候能够精准有效地进行实时过滤,筛掉部分无效初始数据的同时可以进行字段的整合计算等,为后面大数据平台进行复杂计算和分析提供更加有序且有效的数据。
HBase是Apache Hadoop的数据库,能够对大数据提供随机、实时的读写访问功能,具有开源、分布式、可扩展及面向对象的特点。城市交通网中具有上百个路口,各个方向的卡口在机动车通过时,都会向中心传输大量数据,传统的数据库扩展性差,因此该文采取HBase作为数据库。
MapReduce计算框架简单,可以通过它轻易地编写出同时在多台主机上运行的程序,还可以在任何安装Hadoop的集群中运行同样的程序,不论这个集群有多少台主机,因此该文采用MapReduce作为卡口数据的处理框架。
Mahout作为一个机器学习软件库,充分利用了Hadoop平台,它主要关注于推荐引擎、聚类和分类。该文需要对卡口数据进行聚类分析,因此使用Mahout作为数据挖掘的工具。
Hive作为构建在Hadoop上的数据仓库,可以用来进行数据提取转化加载(ETL),这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制,该文将计算结果存储于HDFS中,利用Hive进行数据读取和展示。
该文通过Flink过滤数据,处理HBase中的卡口数据表,通过MapReduce过程和K-Means聚类分析,输出指定路径上的有效旅行时间。整个过程如图1。
2 卡口数据过滤和存储
2.1 卡口存储数据表设计
卡口数据数量庞大,将其存储在非关系型数据库HBase中。HBase中rowkey的设计应该便于数据的快速查询。
旅行时间查询是统计指定时间段it内所有车辆通过指定路径(p1,…pn)需要的时间(其中pi为交叉口i),由于行健的设计规则,行健的第一部分不能为到达时间,那么为了增加按时间查询旅行时间的查询速度,需要利用二级索引。
为方便查询将车牌号+车牌颜色+车牌类型+通过时间作为Rowkey[5],利用HBase的主键索引功能进行快速查询。卡口表逻辑视图设计如表1。
索引数据的存储利用协处理器,在添加一条卡口数据的之前,向卡口表中写入一行包含查询条件的索引数据,索引数据表的设计如表2。
2.2 Flink过滤卡口数据设计
卡口海量初始数据流不可避免地会有明显异常的数据,剔除这些异常数据能够明显提升需要计算的数据集质量。filter算子是Flink的过滤筛选器,将符合条件的数据输出到下一步进行计算。这里设置过滤条件为卡口编号、车牌号、车牌颜色、车牌类型和通过时间等字段不允许为空,不符合条件的数据在输入到Flink作业的时候就会被剔除,有效提升数据质量。
为达到2.1rowkey的设计要求,就需要对卡口数据进行分组。Flink keyby是一个基于key的分组转换算子,可以根据指定的key对数据流中的元素进行重新划分。这里设置keyby为车牌号,Flink会在处理实时卡口数据时将车牌号相同的数据分到同一组。卡口原始数据的字段都是单独的,在设计process时,把车牌号+车牌颜色+车牌类型+通过时间合成一个字段,最后把处理好的结果通过Flink sink直接写入到HBase。
3 卡口数据处理-MapReduce
MapReduce数据处理模型非常简单,Map和Reduce过程的输入和输出是键值对(key,value),即遵循如下常规模式:Map:(k1,v1)→(k2,v2) Reduce(k3,list(v3))→(k4,v4)
该文MapReduce的输入是HBase的查询结果,通过巧妙的Rowkey设计与二级索引使批量获取记录集合中的元素挨在一起(应该在同一个Region下),可以在遍历结果时获得很好的性能。Hadoop平台上的编程模式是将程序拆分成Map过程和Reduce过程,Map过程将HBase中的行数据进行处理,为Reduce过程提供输入。Reduce过程收集map过程的输出,对其进行汇总与计算,并将结果写入HDFS[6]。
3.1 Map过程
Map过程利用Scan类读取HBase中的数据,从HBase表中读取的作业以[rowkey:result]格式接收(key,value)键值对,即CarNum+TypeColor+CarType+PassDate作为key,列族cf1作为value输入。经过程序的简单处理,将CarNum+TypeColor+CarType作为key,PointCode+PassDate作为value输出。
3.2 Reduce过程
经过Shuffle过程的处理,Reduce过程将Map过程的输出作为输入,对车牌号为cnx的车辆的所有通过实践(PassDate)进行排序,排序后的时间为(st1,…sty),对应的交叉口为(pc1,…pcy),定义PC(sti)为在时间sti到达的路口。定义RD(xi,…xj)为移除了(xi,…xj)中相邻重复点的向量(保留第一个重复点)。cnx在下的旅行时间TTx如下计算:
TTx={stj?sti|RD(PC(sti),…,PC(stj))=}
在Reduce过程完成后,按照CarNum+StartTime作为key,TTx作为value输出,将数据持久化存储于HDFS中。
3.3 实验
在包含1 000 000條卡口数据的数据集上进行该次实验,取两个相邻路口组成,根据路段距离,将旅行时间上限阈值设置为2 min,下限阈值设置为0.5 min,统计的时间为00:00:00—00:30:00,在以上输入参数下运行MapReduce过程,计算结果如图2中横坐标为车辆在路线的起始时间,纵轴为完成路线的总时间(min),例如在00:10:00有5辆车通过卡口,这5辆车通过路线的时间分别为(0.87,0.88,0.9,0.88,0.87),表示为图2的3个较大的圆点。
4 卡口数据处理-K-Means
4.1 利用Hive读取数据
在MapReduce处理卡口数据后,结果被存储于HDFS中,可以通过建立外部表的方式读取HDFS上的数据。可以通过如下方式建立Hive外部表:
Create external table TGS_DATA_HIS_TRAVELTIME
(data string,time string)
row format delimited
fields terminated by ‘@
location‘/user/TGS_DATA_HIS_TRAVELTIME_RESULT
MapReduce处理后的结果数据按行写入HDFS目录/user/TGS_DATA_HIS_TRAVELTIME_RESULT中,键值对以@分隔。利用select * from TGS_DATA_HIS_TRAVELTIME可以获取结果数据。
4.2 K-Means过程
K-Means方法作为聚类分析的一种,是在没有给定划分类别的情况下,根据数据相似度进行样本分组的一种方法。K-Means的处理过程包含输入格式化,随机生成Cluster,聚类计算,结果输出等过程。算法过程如下:
(1)读取MapReduce过程输出的数据,利用一个Map过程将HDFS上的数据转换成SequenceFile文件,数据以向量形式保存。
(2)随后采用蓄水池抽样的方法在上面的向量里随机抽样k个序列族Cluster。
(3)分别计算各向量到各个聚类中心的距离,将向量分配到距离最近的聚类中。
(4)完成所有向量的分配后,重新计算K个聚类的中心。
(5)不断迭代,当满足迭代结束条件后,聚类完成。
采用包含向量个数最多的簇的中心点作为旅行时间的有效值。
4.3 实验
实验中设k=3,最大迭代次数为3,收敛阈值设为1,K-Means过程处理卡口数据后的结果如图3,图 中横坐标为车辆在路线的起始时间,纵轴为完成路线的总时间(min)。图中各点的集合为含有最多向量的簇集,该簇集的中心点为0.84 m,取该值表示的50 s作为路线旅行时间的有效值,该值与多次人工实地跑车的有效结果时间在评价应用中的误差可以忽略,可知使用该文的方法可以准确地对路线的旅行时间进行分析计算。
5 结语
该文针对传统存储与处理大数据的局限性,提出了基于Flink和Hadoop的卡口数据挖掘分析方法,通过Flink框架来过滤原始卡口数据流,提高用于分析的数据集质量,利用MapReduce和K-Means聚类算法计算指定时间内所有车辆的路段旅行时间,并利用实验验证了方法的有效性。该值可用于区域交通方案的评估分析,如绿波效果的评估等,为用户提供快速的辅助分析决策,克服了传统人工实地跑车的方式进行统计,对实现智能交通管理具有重要的意义。
参考文献
[1]Garcia-Gil D, Ramfrez-Gallego S, Garcia S, et al. A Comparison on Scalability Forbatch Big Data Processing on Apache Spark and Apache Flink[J]. Big DataAnalytics,2017(1): 20-21.
[2]Chang F, Dean J, Ghemawat S, et al. Bigtable: A distributed Storage System for Structured Data[J]. ACM Transactions on Computer Systems (TOCS), 2008(2): 1-26.
[3]Borthakur D, Gray J, Sarma J S, et al. Apache Hadoop Goes Realtime at Facebook[C].Proceedings of the 2011 ACM SIGMOD International Conference on Management of Data. ACM, 2011: 1071-1080.
[4]Mahout A. Scalable Machine Learning and Data Mining[J]. 2013-4-24. http: //mahout. Apache.org, 2012.
[5]Ghemawat S, Gobioff H, Leung S T. The Google File System[C].ACM SIGOPS Operating Systems Review. ACM, 2003(5): 29-43.
[6]戴传友. 基于云计算的交通卡口数据分析[D].广州:华南理工大学, 2016.