严磊 汪小可
摘 要:基于Hadoop平台的实时电影推荐系统在需要大量迭代计算时运行速度明显变慢,无法根据用户行为作出实时反馈。针对以上问题,设计基于Spark流式计算的实时电影推荐系统,可更好地满足用户实时需求。基于Spark流式计算的实时电影推荐系统将传统电影推荐算法与Spark流式计算方法相结合,在线部分使用Spark Streaming实时接收用户模拟评分,并使用Scoket编程模拟用户浏览商品时产生的实时日志数据。日志数据包括用户当前浏览电影、观看电影次数、停留时间与是否购买该商品,再使用Spark Streaming构建实时数据处理系统,计算出当前用户相关度最高的电影并进行推荐。实验结果表明,基于Spark 平台的电影实时推荐系统在离线推荐训练过程中,训练速度相对于Hadoop 平台有明显提高,能根据用户行为作出实时反馈,并向用户进行电影推荐。
关键词:电影推荐;Spark Streaming;Spark;实时推荐
DOI:10. 11907/rjdk. 182121
中图分类号:TP301 文献标识码:A 文章编号:1672-7800(2019)005-0044-05
Abstract:The real-time movie recommendation system of the Hadoop platform can't make the feedback in real time according to the users' behavior. The real-time movie recommendation system based on Spark flow calculation can better meet the users' real-time demand. The real time movie recommendation based on Spark flow calculation is to combine the traditional movie recommendation algorithm with the spark streaming computing film attention. The online part uses Scoket to simulate the user's browsing products to produce real time data. The data includes the movies that the user is currently browsing and the number and stay time of watching the movie and the purchase of the product. Then Spark Streaming is used to build real-time data processing system to calculate current users' biggest concerns about those movies. The implementation results show that compared to the Hadoop platform, Spark platform based on real-time recommendation system achieves the speed of the off-line recommendation training significantly higher than that of the Hadoop platform, and can make real-time feedback according to user behavior, and want users to carry out real-time recommendation.
Key Words:movie recommendations; Spark Streaming; Spark; real-time recommendation
1 Spark與Hadoop简介
根据 IDC 发布的数字宇宙报告显示,至 2020 年数字宇宙将超出预期,达到 40ZB,相当于地球上人均产生 5 247GB数据[1]。如何对海量数据进行及时、高效的存取并挖掘出其中的有效信息一直是学术界的研究热点[2-3]。从计算的角度看,目前大数据处理框架主要分为Spark框架与MapReduce框架(属于Hadoop生态系统)。
Hadoop是一个高效、可靠、可扩展的开源分布式软件框架,主要用于大规模数据存储与业务计算处理[4];Spark是一个具备低延迟、易用性等特点的大数据处理框架,并且引入了RDD(Resilient Distributed Datasets)[5]的抽象。因此,与Hadoop相比,其应用于内存中的运行速度提升了上百倍,在磁盘上的运行速度也得到了大幅提升。
很多学者对Spark平台进行了大量研究,如王虹旭等[6]在 Spark 平台上设计一个能够对海量数据进行高效分析的并行数据分析系统;曹波等[7]在 Spark平台上实现FP-Growth 算法的并行计算,利用车牌记录跟踪车辆;Lu等[8]创新性地在Spark上使用远程内存提高对海量数据的处理速度;Yang等[9]研究分批处理的梯度下降算法在Spark 平台上的并行计算问题,提升了深度置信网络的训练收敛速度。
随着电子商务的快速发展,推荐系统得到了越来越多公司重视[10]。Amazon、Facebook和 Yahoo 是最早将 Spark应用于推荐领域的公司。例如:Amazon会根据用户历史浏览记录在每个页面下方作相应推荐,还会根据用户最近一次商品浏览记录,根据其它物品与该物品相似度作商品推荐。国内将 Spark 应用于推荐领域的公司有阿里、优酷土豆、豆瓣等。
2 Spark流式电影推荐系统设计
2.1 系统架构设计
Sprak平台采用Spark Streanming技术,在用户每次访问网站时,Spark Streaming 的输入数据按照 batch size(如1s)分成一段段数据(Discretized Stream,简称DStream)[11],每一段数据都转换成 Spark中的 RDD,可根据访问日志实时计算关注度,并与离线推荐结果合并进行推荐,从而使电影网站推荐结果可根据用户行为实时改变。
如图1所示,系统主要分为离线计算与在线计算两部分[12]。离线部分使用基于Spark MLlib 平台的协同过滤算法,首先对海量静态数据进行处理,然后进行离线推荐;在线计算部分使用Spark流式计算电影关注度并进行推荐。
系统使用Java进行开发,整体架构如图2 所示。
将基于Spark MLlib平台的协同过滤算法推荐结果与Spark流式计算电影关注度相结合进行推荐。将离线模型推荐的前10部电影存储到Redis数据库中,利用Socket2实时计算用户对电影的关注度,然后将Redis数据库推荐列表中的前5部电影替换成关注度最高的5部电影,得到最后的实时推荐列表。
2.2 离线计算设计
离线部分使用基于Spark MLlib平台的协同过滤算法,协同过滤可分为:基于用户的协同过滤(UserCF)[13]、基于商品的协同过滤(ItemCF)[14]与基于模型的协同过滤(ModelCF)[15]。本文选用基于模型的协同过滤算法,根据用户喜好电影数据集预测用户可能喜欢的电影,然后进行推荐。
(1)数据集准备。数据集包含films.dat、score.dat、users.dat。films数据集格式为:电影ID::电影名称::电影类型;score数据集格式为:用户ID::电影ID::评分::时间戳;users数据集格式为:用户ID::性别::年龄::职业编号:邮编。“我自己的评分数据”保存在my.txt中,格式为:我的ID::电影ID::我的评分::评分时间。数据集中总共包含6 039个用户、3 952部电影,以及100多万条评分数据。
(2)训练数据集推荐。首先记载数据集,按照“::”切分数据,缓存之后统计得分最高的前10部电影,在Web界面的“猜你喜欢”栏目向未登录用户进行推荐。伪代码片段如下:
//根据文件夹位置加载数据集
val scoreRdd = sc.textFile(数据位置)
//根据::切分数据,缓存
val score = scoreRdd.map(“::”)。cache
//统计得分最高的前10个电影
val topK10ScoreMovie = score.map(统计函数)。take(10)。foreach(println)
然后,训练模型进行离线预测。按照score.dat数据集中的时间戳将数据集划分为训练(55%,加入用户评分)、校验 (15%)与测试(30%)3部分。设置多个训练参数,其中ranks、lambdas、iters都设置两个参数,以便于三层嵌套循环产生8个组合(也即8个推荐模型),MLlib使用交替最小二乘法(ALS)学习这些隐性因子[17]。一般使用RMSE(Root-Mean-Square Error)评估误差是否收敛[18],如公式(2)所示。
其中,N为三元组
最后,剔除已观看电影,并使用最佳模型推荐10部用户可能感兴趣的电影。离线推荐伪代码如下:
//分别加载样本评分数据、我的评分数据、电影数据
score = sc.textFile(数据位置)
myRatings = addRatings(数据位置)
movies = sc.textFile(数据位置)
//将样本评分数据划分为训练(55%,加入用户评分)、校验 (15%)与测试(30%)数据,并进行缓存
training = socre.filter(x => x. _1 < 6). cache
validation = score.filter(x => x. _1 >= 6 && x. _1 < 8). cache
test = score.filter(x => x. _1 >= 8). cache
//设置ranks、num Iters、lambdas等参数,ranks 是模型中隐语义因子个数,num Iters为迭代次数,Lambdas为正则化参数
ranks = List(8, 12)
lambdas = List(0.1, 10.0)
numIters = List(10, 20)
//三层嵌套产生8个模型,计算RMSE值
model = ALS.train(training, rank, numIter, lambda)
bestModel=RMSE最小
//使用最佳模型预测评分,对用户进行推荐
println("推荐前10的电影")
bestModel.get.predict(). collect.sortBy()
2.3 在线计算设计
Spark流式计算电影关注度Spark Streaming 是现有 Spark 核心 API 的一种扩展,适用于实时数据在可扩展、高吞吐、高容错等特性下的流处理[19]。Spark Streaming的内部处理机制为:接收实时流数据,根据一定时间间隔拆分成一批批数据并通过Spark Engine进行处理,最终得到处理后的结果[20]。在线计算框架如图3所示。
本文通过Java Socket编程模拟用户浏览电影网站产生的实时日志数据。Socket1发送信息格式为:电影ID::浏览次数::停留时间::是否收藏::观看次数。Spark Streaming 实时接收Socket1发送的用户数据流,并将其划分为 Batch(可理解为各个批次的数据块)。引入Spark相关jar包,用Spark引擎处理Batch数据,再以Batch形式输出。创建Socket2接收Socket1发送的数据,因为用户不同行为对关注度的影响权重不同,所以需要定义一个计算公式。本文设定浏览次数权重为0.8,浏览时间权重为0.6,是否收藏权重为1,观看次数权重为1。使用Spark Sreaming 实时接收模拟用户日志信息并分析其关注度,得到推荐列表。伪代码如下:
//先定义一个JavaStreamingContext
SparkConf sparkConf = new SparkConf(). setAppName("job的名字"). setMaster("local[2]")
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,窗口时间);
//创建一个服务器端,监听指定端口
ServerSocket SerScoket = new ServerSocket(端口号);
//获取模拟数据
JavaReceiverInputDStream
//设定浏览次数权重为0.8,浏览时间权重为0.6,是否收藏权重为1,观看次数权重为1
followValue = Double.parseDouble(lineSplit[1])*0.8+Double.parseDouble(lineSplit[2])*0.6 +Double.parseDouble(lineSplit[3])*1+Double.parseDouble(lineSplit[4])*1;
//对初始化的DStream进行事务级别的处理,通过updateStateByKey以Batch Interval为单位对历史状态进行更新
UpdateFollowValue = splitMess.updateStateByKey(函数操作)
//将
JavaPairRDD
在离线模型训练完毕后得到离线推荐列表,将推荐列表的前10个推荐结果写入Redis 缓存中,以提高数据存取速度,提升网站性能;然后启动实时推荐任务,找到在线关注度最高的5部电影;根据用户ID在Redis 缓存系统中找到离线推荐列表,以此为基础构建新的推荐列表;去掉离线推荐列表的后5个推荐结果,将在线推荐的5部电影放在推荐列表开头,构成最终的在线推荐列表。
3 实验测试
由于Spark平台在处理任务时,相对于Hadoop平台在速度上更具有优势,因此本文采用 Spark 平台进行离线与在线推荐。为了测试 Hadoop与 Spark 平台在处理计算任务时的性能差异,本文选用离线训练方式对使用的电影数据集进行训练,然后对两个平台执行不同任务的作业时间进行对比。实验结果如图4所示,结果表明在执行Word Count、User Based 及Item Based等迭代次数不多的任务时,Spark平台运行效率相对于Hadoop平台有明显提升。ALS 模型在Hadoop与Spark平台上的训练性能对比如图5所示,表明在迭代次数不断增加的情况下,Spark平台的优势越来越明显,运行效率是Hadoop平台的10倍以上。
以上测试验证了以Spark平台作为系统基础架构的优越性,继续对系统性能进行测试。系统要求在Ubuntu 17.04 操作系统上运行,并安装 JDK1.8、Tomcat1.7、MySQL5.5、Hadoop2.2.0、Scala2.10.4、Spark1.0.0、HBase- 0.98.11-hadoop2、eclipse等软件,且客户端与服务器需保持网络连接通畅[21]。
首先对离线与在线部分分别进行测试。统计评分前10的电影,登录后利用协同过滤算法为用户作离线推荐,如图6、图7所示。
在线推荐部分测试如图8、图9所示,分别为Socket1模拟用户新操作与Socket2计算关注度。
完成对系统各功能模块的详细设计后,接下来对系统整体进行测试,验证实时推荐系统的可行性。对Web界面进行操作,分别测试系统功能是否符合预期。在Web上操作与Web界面反映的测试用例如图10所示。
JavaWeb显示电影推荐结果,用户登录后界面如图11所示,用户点击刷新后界面如图12所示。由于技术限制,只显示了推荐列表的前9部电影。
实验结果表明,根据用户行为变化可对电影进行实时更新,相比于传统电影推荐算法,本文创新地提出大数据下电影网站的实时推荐算法,将离线推荐结果与实时流计算的推荐结果进行融合,生成实时推荐列表。实验验证发现,Spark相比于Hadoop具有更快的运行速度,系统能正常运行并实时对用户进行电影推荐。
4 结语
本文设计并实现了一套基于Spark平台的电影推荐系统,可分析用户行为日志信息并实时计算关注度,产生在线推荐列表,然后与离线推荐相结合对用户进行推荐。但是系统尚有一些不足之处,本文在線计算中采用模拟器模拟用户行为日志,将来需要加强系统对实际用户行为日志的采集与传输。另外本系统没有使用Spark集群对训练任务进行分配,因而未能实现负载均衡,下一步需要研究并解决Spark的集群负载不均衡问题。
参考文献:
[1] IDC. The digital universe of opportunities:rich data and the incdreasing value of the internet of things [EB/OL]. http://www.emc.com/leadership/digital-universe/2014iview/executive-summary.htm.
[2] CAO J,WU Z,WANG Y,et al. Hybrid collaborative filtering algorithm for bidirectional Web service recommendation[J].Knowledge and Information Systems,2013,36(3):607-627.
[3] GE Y, XIONG H, TUZHILIN A, et al. Cost-aware collaborative filtering for travel tour recommendations[J]. ACM Transactions on Information Systems,2014,32(1):479-496.
[4] 赵铁柱,袁华强. 基于并发策略的分布式文件系统性能优化方案[J]. 网络安全技术与应用,2013(7):17-18.
[5] REYNOLD X S, JOSH R, MATEI Z, et al. Shark: SQL and rich analytics at scale[J]. Computer Science, 2012:13-24.
[6] 王虹旭,吴斌,刘旸. 基于Spark的并行图数据分析系统[J]. 计算机科学与探索,2015,9(9):1066-1074.
[7] 曹波,韩燕波,王桂玲. 基于車牌识别大数据的伴随车辆组发现方法[J]. 计算机应用, 2015,35(11):3203-3207.
[8] LU X, RAHMAN M W U, ISLAM N, et al. Accelerating spark with RDMA for big data processing: early experiences[C]. Proceedings of the 22nd Annual Symposium on High-Performance Interconnects,2010:9-16.
[9] YANG J,HE SQ. The optimization of parallel DBN based on spark[C]. Proceedings of the 19th Asia Pacific Symposium on Intelligent and Evolutionary Systems,2016:157-169.
[10] 单明. 基于个性化推荐的电子商务推荐系统的设计与实现[D]. 长春:吉林大学, 2014.
[11] ZAHARIA M, DAS T, LI H, et al. Discretized streams: an efficient and fault-tolerant model for stream processing on large clusters[C]. Proceedings of the 4th USENIX conference on Hot Topics in Cloud Computing,2012:10.
[12] 张贤德. 基于Spark平台的实时流计算推荐系统的研究与实现[D]. 镇江:江苏大学, 2016.
[13] 俞美华. 融合用户兴趣度与项目相关度的电影推荐算法研究[J]. 电脑知识与技术,2017,13(8):22-26.
[14] RESNICK P, IACOVOU N, SUCHAK M, et al. GroupLens:an open architecture for collaborative filtering of netnews[C]. ACM Conference on Computer Supported Cooperative Work. ACM,1994:175-186.
[15] SARWAR B, KARYPIS G, KONSTAN J, et al. Item-based collabora-tive filtering recommendation algorithms[C]. Proceedings of the 10th International Conference on World Wide Web. ACM,2001:285-295.
[16] 阎辉,张学工,李衍达. 支持向量机与最小二乘法的关系研究[J]. 清华大学学报:自然科学版, 2001,41(9):77-80.
[17] SU X, KHOSHGOFTAAR T M. A survey of collaborative filtering techniques[M]. Hindawi Publishing Corp,2009.
[18] DE REZENDE R. Giving flexibility to the nelson-siegel class of term structure models[R]. Available at SSRN1290784, 2011.
[19] 赵文芳, 刘旭林. Spark Streaming框架下的气象自动站数据实时处理系统[J]. 计算机应用, 2018(1): 38-43.
[20] 李天喜. 基于Spark Streaming的试验数据处理系统的研究与实现[D]. 西安:西安电子科技大学,2015.
[21] 周斯波,程广,赵宇杰. 计算机软件的测试方法和装置[P]. CN 106126426 A,2016.
(责任编辑:黄 健)