嚴磊 汪小可
摘 要:基于Hadoop平臺的實時電影推薦系統(tǒng)在需要大量迭代計算時運行速度明顯變慢,無法根據(jù)用戶行為作出實時反饋。針對以上問題,設(shè)計基于Spark流式計算的實時電影推薦系統(tǒng),可更好地滿足用戶實時需求。基于Spark流式計算的實時電影推薦系統(tǒng)將傳統(tǒng)電影推薦算法與Spark流式計算方法相結(jié)合,在線部分使用Spark Streaming實時接收用戶模擬評分,并使用Scoket編程模擬用戶瀏覽商品時產(chǎn)生的實時日志數(shù)據(jù)。日志數(shù)據(jù)包括用戶當前瀏覽電影、觀看電影次數(shù)、停留時間與是否購買該商品,再使用Spark Streaming構(gòu)建實時數(shù)據(jù)處理系統(tǒng),計算出當前用戶相關(guān)度最高的電影并進行推薦。實驗結(jié)果表明,基于Spark 平臺的電影實時推薦系統(tǒng)在離線推薦訓(xùn)練過程中,訓(xùn)練速度相對于Hadoop 平臺有明顯提高,能根據(jù)用戶行為作出實時反饋,并向用戶進行電影推薦。
關(guān)鍵詞:電影推薦;Spark Streaming;Spark;實時推薦
DOI:10. 11907/rjdk. 182121
中圖分類號:TP301 文獻標識碼:A 文章編號:1672-7800(2019)005-0044-05
Abstract:The real-time movie recommendation system of the Hadoop platform can't make the feedback in real time according to the users' behavior. The real-time movie recommendation system based on Spark flow calculation can better meet the users' real-time demand. The real time movie recommendation based on Spark flow calculation is to combine the traditional movie recommendation algorithm with the spark streaming computing film attention. The online part uses Scoket to simulate the user's browsing products to produce real time data. The data includes the movies that the user is currently browsing and the number and stay time of watching the movie and the purchase of the product. Then Spark Streaming is used to build real-time data processing system to calculate current users' biggest concerns about those movies. The implementation results show that compared to the Hadoop platform, Spark platform based on real-time recommendation system achieves the speed of the off-line recommendation training significantly higher than that of the Hadoop platform, and can make real-time feedback according to user behavior, and want users to carry out real-time recommendation.
Key Words:movie recommendations; Spark Streaming; Spark; real-time recommendation
1 Spark與Hadoop簡介
根據(jù) IDC 發(fā)布的數(shù)字宇宙報告顯示,至 2020 年數(shù)字宇宙將超出預(yù)期,達到 40ZB,相當于地球上人均產(chǎn)生 5 247GB數(shù)據(jù)[1]。如何對海量數(shù)據(jù)進行及時、高效的存取并挖掘出其中的有效信息一直是學術(shù)界的研究熱點[2-3]。從計算的角度看,目前大數(shù)據(jù)處理框架主要分為Spark框架與MapReduce框架(屬于Hadoop生態(tài)系統(tǒng))。
Hadoop是一個高效、可靠、可擴展的開源分布式軟件框架,主要用于大規(guī)模數(shù)據(jù)存儲與業(yè)務(wù)計算處理[4];Spark是一個具備低延遲、易用性等特點的大數(shù)據(jù)處理框架,并且引入了RDD(Resilient Distributed Datasets)[5]的抽象。因此,與Hadoop相比,其應(yīng)用于內(nèi)存中的運行速度提升了上百倍,在磁盤上的運行速度也得到了大幅提升。
很多學者對Spark平臺進行了大量研究,如王虹旭等[6]在 Spark 平臺上設(shè)計一個能夠?qū)A繑?shù)據(jù)進行高效分析的并行數(shù)據(jù)分析系統(tǒng);曹波等[7]在 Spark平臺上實現(xiàn)FP-Growth 算法的并行計算,利用車牌記錄跟蹤車輛;Lu等[8]創(chuàng)新性地在Spark上使用遠程內(nèi)存提高對海量數(shù)據(jù)的處理速度;Yang等[9]研究分批處理的梯度下降算法在Spark 平臺上的并行計算問題,提升了深度置信網(wǎng)絡(luò)的訓(xùn)練收斂速度。
隨著電子商務(wù)的快速發(fā)展,推薦系統(tǒng)得到了越來越多公司重視[10]。Amazon、Facebook和 Yahoo 是最早將 Spark應(yīng)用于推薦領(lǐng)域的公司。例如:Amazon會根據(jù)用戶歷史瀏覽記錄在每個頁面下方作相應(yīng)推薦,還會根據(jù)用戶最近一次商品瀏覽記錄,根據(jù)其它物品與該物品相似度作商品推薦。國內(nèi)將 Spark 應(yīng)用于推薦領(lǐng)域的公司有阿里、優(yōu)酷土豆、豆瓣等。
2 Spark流式電影推薦系統(tǒng)設(shè)計
2.1 系統(tǒng)架構(gòu)設(shè)計
Sprak平臺采用Spark Streanming技術(shù),在用戶每次訪問網(wǎng)站時,Spark Streaming 的輸入數(shù)據(jù)按照 batch size(如1s)分成一段段數(shù)據(jù)(Discretized Stream,簡稱DStream)[11],每一段數(shù)據(jù)都轉(zhuǎn)換成 Spark中的 RDD,可根據(jù)訪問日志實時計算關(guān)注度,并與離線推薦結(jié)果合并進行推薦,從而使電影網(wǎng)站推薦結(jié)果可根據(jù)用戶行為實時改變。
如圖1所示,系統(tǒng)主要分為離線計算與在線計算兩部分[12]。離線部分使用基于Spark MLlib 平臺的協(xié)同過濾算法,首先對海量靜態(tài)數(shù)據(jù)進行處理,然后進行離線推薦;在線計算部分使用Spark流式計算電影關(guān)注度并進行推薦。
系統(tǒng)使用Java進行開發(fā),整體架構(gòu)如圖2 所示。
將基于Spark MLlib平臺的協(xié)同過濾算法推薦結(jié)果與Spark流式計算電影關(guān)注度相結(jié)合進行推薦。將離線模型推薦的前10部電影存儲到Redis數(shù)據(jù)庫中,利用Socket2實時計算用戶對電影的關(guān)注度,然后將Redis數(shù)據(jù)庫推薦列表中的前5部電影替換成關(guān)注度最高的5部電影,得到最后的實時推薦列表。
2.2 離線計算設(shè)計
離線部分使用基于Spark MLlib平臺的協(xié)同過濾算法,協(xié)同過濾可分為:基于用戶的協(xié)同過濾(UserCF)[13]、基于商品的協(xié)同過濾(ItemCF)[14]與基于模型的協(xié)同過濾(ModelCF)[15]。本文選用基于模型的協(xié)同過濾算法,根據(jù)用戶喜好電影數(shù)據(jù)集預(yù)測用戶可能喜歡的電影,然后進行推薦。
(1)數(shù)據(jù)集準備。數(shù)據(jù)集包含films.dat、score.dat、users.dat。films數(shù)據(jù)集格式為:電影ID::電影名稱::電影類型;score數(shù)據(jù)集格式為:用戶ID::電影ID::評分::時間戳;users數(shù)據(jù)集格式為:用戶ID::性別::年齡::職業(yè)編號:郵編?!拔易约旱脑u分數(shù)據(jù)”保存在my.txt中,格式為:我的ID::電影ID::我的評分::評分時間。數(shù)據(jù)集中總共包含6 039個用戶、3 952部電影,以及100多萬條評分數(shù)據(jù)。
(2)訓(xùn)練數(shù)據(jù)集推薦。首先記載數(shù)據(jù)集,按照“::”切分數(shù)據(jù),緩存之后統(tǒng)計得分最高的前10部電影,在Web界面的“猜你喜歡”欄目向未登錄用戶進行推薦。偽代碼片段如下:
//根據(jù)文件夾位置加載數(shù)據(jù)集
val scoreRdd = sc.textFile(數(shù)據(jù)位置)
//根據(jù)::切分數(shù)據(jù),緩存
val score = scoreRdd.map(“::”)。cache
//統(tǒng)計得分最高的前10個電影
val topK10ScoreMovie = score.map(統(tǒng)計函數(shù))。take(10)。foreach(println)
然后,訓(xùn)練模型進行離線預(yù)測。按照score.dat數(shù)據(jù)集中的時間戳將數(shù)據(jù)集劃分為訓(xùn)練(55%,加入用戶評分)、校驗 (15%)與測試(30%)3部分。設(shè)置多個訓(xùn)練參數(shù),其中ranks、lambdas、iters都設(shè)置兩個參數(shù),以便于三層嵌套循環(huán)產(chǎn)生8個組合(也即8個推薦模型),MLlib使用交替最小二乘法(ALS)學習這些隱性因子[17]。一般使用RMSE(Root-Mean-Square Error)評估誤差是否收斂[18],如公式(2)所示。
其中,N為三元組
最后,剔除已觀看電影,并使用最佳模型推薦10部用戶可能感興趣的電影。離線推薦偽代碼如下:
//分別加載樣本評分數(shù)據(jù)、我的評分數(shù)據(jù)、電影數(shù)據(jù)
score = sc.textFile(數(shù)據(jù)位置)
myRatings = addRatings(數(shù)據(jù)位置)
movies = sc.textFile(數(shù)據(jù)位置)
//將樣本評分數(shù)據(jù)劃分為訓(xùn)練(55%,加入用戶評分)、校驗 (15%)與測試(30%)數(shù)據(jù),并進行緩存
training = socre.filter(x => x. _1 < 6). cache
validation = score.filter(x => x. _1 >= 6 && x. _1 < 8). cache
test = score.filter(x => x. _1 >= 8). cache
//設(shè)置ranks、num Iters、lambdas等參數(shù),ranks 是模型中隱語義因子個數(shù),num Iters為迭代次數(shù),Lambdas為正則化參數(shù)
ranks = List(8, 12)
lambdas = List(0.1, 10.0)
numIters = List(10, 20)
//三層嵌套產(chǎn)生8個模型,計算RMSE值
model = ALS.train(training, rank, numIter, lambda)
bestModel=RMSE最小
//使用最佳模型預(yù)測評分,對用戶進行推薦
println("推薦前10的電影")
bestModel.get.predict(). collect.sortBy()
2.3 在線計算設(shè)計
Spark流式計算電影關(guān)注度Spark Streaming 是現(xiàn)有 Spark 核心 API 的一種擴展,適用于實時數(shù)據(jù)在可擴展、高吞吐、高容錯等特性下的流處理[19]。Spark Streaming的內(nèi)部處理機制為:接收實時流數(shù)據(jù),根據(jù)一定時間間隔拆分成一批批數(shù)據(jù)并通過Spark Engine進行處理,最終得到處理后的結(jié)果[20]。在線計算框架如圖3所示。
本文通過Java Socket編程模擬用戶瀏覽電影網(wǎng)站產(chǎn)生的實時日志數(shù)據(jù)。Socket1發(fā)送信息格式為:電影ID::瀏覽次數(shù)::停留時間::是否收藏::觀看次數(shù)。Spark Streaming 實時接收Socket1發(fā)送的用戶數(shù)據(jù)流,并將其劃分為 Batch(可理解為各個批次的數(shù)據(jù)塊)。引入Spark相關(guān)jar包,用Spark引擎處理Batch數(shù)據(jù),再以Batch形式輸出。創(chuàng)建Socket2接收Socket1發(fā)送的數(shù)據(jù),因為用戶不同行為對關(guān)注度的影響權(quán)重不同,所以需要定義一個計算公式。本文設(shè)定瀏覽次數(shù)權(quán)重為0.8,瀏覽時間權(quán)重為0.6,是否收藏權(quán)重為1,觀看次數(shù)權(quán)重為1。使用Spark Sreaming 實時接收模擬用戶日志信息并分析其關(guān)注度,得到推薦列表。偽代碼如下:
//先定義一個JavaStreamingContext
SparkConf sparkConf = new SparkConf(). setAppName("job的名字"). setMaster("local[2]")
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,窗口時間);
//創(chuàng)建一個服務(wù)器端,監(jiān)聽指定端口
ServerSocket SerScoket = new ServerSocket(端口號);
//獲取模擬數(shù)據(jù)
JavaReceiverInputDStream
//設(shè)定瀏覽次數(shù)權(quán)重為0.8,瀏覽時間權(quán)重為0.6,是否收藏權(quán)重為1,觀看次數(shù)權(quán)重為1
followValue = Double.parseDouble(lineSplit[1])*0.8+Double.parseDouble(lineSplit[2])*0.6 +Double.parseDouble(lineSplit[3])*1+Double.parseDouble(lineSplit[4])*1;
//對初始化的DStream進行事務(wù)級別的處理,通過updateStateByKey以Batch Interval為單位對歷史狀態(tài)進行更新
UpdateFollowValue = splitMess.updateStateByKey(函數(shù)操作)
//將
JavaPairRDD
在離線模型訓(xùn)練完畢后得到離線推薦列表,將推薦列表的前10個推薦結(jié)果寫入Redis 緩存中,以提高數(shù)據(jù)存取速度,提升網(wǎng)站性能;然后啟動實時推薦任務(wù),找到在線關(guān)注度最高的5部電影;根據(jù)用戶ID在Redis 緩存系統(tǒng)中找到離線推薦列表,以此為基礎(chǔ)構(gòu)建新的推薦列表;去掉離線推薦列表的后5個推薦結(jié)果,將在線推薦的5部電影放在推薦列表開頭,構(gòu)成最終的在線推薦列表。
3 實驗測試
由于Spark平臺在處理任務(wù)時,相對于Hadoop平臺在速度上更具有優(yōu)勢,因此本文采用 Spark 平臺進行離線與在線推薦。為了測試 Hadoop與 Spark 平臺在處理計算任務(wù)時的性能差異,本文選用離線訓(xùn)練方式對使用的電影數(shù)據(jù)集進行訓(xùn)練,然后對兩個平臺執(zhí)行不同任務(wù)的作業(yè)時間進行對比。實驗結(jié)果如圖4所示,結(jié)果表明在執(zhí)行Word Count、User Based 及Item Based等迭代次數(shù)不多的任務(wù)時,Spark平臺運行效率相對于Hadoop平臺有明顯提升。ALS 模型在Hadoop與Spark平臺上的訓(xùn)練性能對比如圖5所示,表明在迭代次數(shù)不斷增加的情況下,Spark平臺的優(yōu)勢越來越明顯,運行效率是Hadoop平臺的10倍以上。
以上測試驗證了以Spark平臺作為系統(tǒng)基礎(chǔ)架構(gòu)的優(yōu)越性,繼續(xù)對系統(tǒng)性能進行測試。系統(tǒng)要求在Ubuntu 17.04 操作系統(tǒng)上運行,并安裝 JDK1.8、Tomcat1.7、MySQL5.5、Hadoop2.2.0、Scala2.10.4、Spark1.0.0、HBase- 0.98.11-hadoop2、eclipse等軟件,且客戶端與服務(wù)器需保持網(wǎng)絡(luò)連接通暢[21]。
首先對離線與在線部分分別進行測試。統(tǒng)計評分前10的電影,登錄后利用協(xié)同過濾算法為用戶作離線推薦,如圖6、圖7所示。
在線推薦部分測試如圖8、圖9所示,分別為Socket1模擬用戶新操作與Socket2計算關(guān)注度。
完成對系統(tǒng)各功能模塊的詳細設(shè)計后,接下來對系統(tǒng)整體進行測試,驗證實時推薦系統(tǒng)的可行性。對Web界面進行操作,分別測試系統(tǒng)功能是否符合預(yù)期。在Web上操作與Web界面反映的測試用例如圖10所示。
JavaWeb顯示電影推薦結(jié)果,用戶登錄后界面如圖11所示,用戶點擊刷新后界面如圖12所示。由于技術(shù)限制,只顯示了推薦列表的前9部電影。
實驗結(jié)果表明,根據(jù)用戶行為變化可對電影進行實時更新,相比于傳統(tǒng)電影推薦算法,本文創(chuàng)新地提出大數(shù)據(jù)下電影網(wǎng)站的實時推薦算法,將離線推薦結(jié)果與實時流計算的推薦結(jié)果進行融合,生成實時推薦列表。實驗驗證發(fā)現(xiàn),Spark相比于Hadoop具有更快的運行速度,系統(tǒng)能正常運行并實時對用戶進行電影推薦。
4 結(jié)語
本文設(shè)計并實現(xiàn)了一套基于Spark平臺的電影推薦系統(tǒng),可分析用戶行為日志信息并實時計算關(guān)注度,產(chǎn)生在線推薦列表,然后與離線推薦相結(jié)合對用戶進行推薦。但是系統(tǒng)尚有一些不足之處,本文在線計算中采用模擬器模擬用戶行為日志,將來需要加強系統(tǒng)對實際用戶行為日志的采集與傳輸。另外本系統(tǒng)沒有使用Spark集群對訓(xùn)練任務(wù)進行分配,因而未能實現(xiàn)負載均衡,下一步需要研究并解決Spark的集群負載不均衡問題。
參考文獻:
[1] IDC. The digital universe of opportunities:rich data and the incdreasing value of the internet of things [EB/OL]. http://www.emc.com/leadership/digital-universe/2014iview/executive-summary.htm.
[2] CAO J,WU Z,WANG Y,et al. Hybrid collaborative filtering algorithm for bidirectional Web service recommendation[J].Knowledge and Information Systems,2013,36(3):607-627.
[3] GE Y, XIONG H, TUZHILIN A, et al. Cost-aware collaborative filtering for travel tour recommendations[J]. ACM Transactions on Information Systems,2014,32(1):479-496.
[4] 趙鐵柱,袁華強. 基于并發(fā)策略的分布式文件系統(tǒng)性能優(yōu)化方案[J]. 網(wǎng)絡(luò)安全技術(shù)與應(yīng)用,2013(7):17-18.
[5] REYNOLD X S, JOSH R, MATEI Z, et al. Shark: SQL and rich analytics at scale[J]. Computer Science, 2012:13-24.
[6] 王虹旭,吳斌,劉旸. 基于Spark的并行圖數(shù)據(jù)分析系統(tǒng)[J]. 計算機科學與探索,2015,9(9):1066-1074.
[7] 曹波,韓燕波,王桂玲. 基于車牌識別大數(shù)據(jù)的伴隨車輛組發(fā)現(xiàn)方法[J]. 計算機應(yīng)用, 2015,35(11):3203-3207.
[8] LU X, RAHMAN M W U, ISLAM N, et al. Accelerating spark with RDMA for big data processing: early experiences[C]. Proceedings of the 22nd Annual Symposium on High-Performance Interconnects,2010:9-16.
[9] YANG J,HE SQ. The optimization of parallel DBN based on spark[C]. Proceedings of the 19th Asia Pacific Symposium on Intelligent and Evolutionary Systems,2016:157-169.
[10] 單明. 基于個性化推薦的電子商務(wù)推薦系統(tǒng)的設(shè)計與實現(xiàn)[D]. 長春:吉林大學, 2014.
[11] ZAHARIA M, DAS T, LI H, et al. Discretized streams: an efficient and fault-tolerant model for stream processing on large clusters[C]. Proceedings of the 4th USENIX conference on Hot Topics in Cloud Computing,2012:10.
[12] 張賢德. 基于Spark平臺的實時流計算推薦系統(tǒng)的研究與實現(xiàn)[D]. 鎮(zhèn)江:江蘇大學, 2016.
[13] 俞美華. 融合用戶興趣度與項目相關(guān)度的電影推薦算法研究[J]. 電腦知識與技術(shù),2017,13(8):22-26.
[14] RESNICK P, IACOVOU N, SUCHAK M, et al. GroupLens:an open architecture for collaborative filtering of netnews[C]. ACM Conference on Computer Supported Cooperative Work. ACM,1994:175-186.
[15] SARWAR B, KARYPIS G, KONSTAN J, et al. Item-based collabora-tive filtering recommendation algorithms[C]. Proceedings of the 10th International Conference on World Wide Web. ACM,2001:285-295.
[16] 閻輝,張學工,李衍達. 支持向量機與最小二乘法的關(guān)系研究[J]. 清華大學學報:自然科學版, 2001,41(9):77-80.
[17] SU X, KHOSHGOFTAAR T M. A survey of collaborative filtering techniques[M]. Hindawi Publishing Corp,2009.
[18] DE REZENDE R. Giving flexibility to the nelson-siegel class of term structure models[R]. Available at SSRN1290784, 2011.
[19] 趙文芳, 劉旭林. Spark Streaming框架下的氣象自動站數(shù)據(jù)實時處理系統(tǒng)[J]. 計算機應(yīng)用, 2018(1): 38-43.
[20] 李天喜. 基于Spark Streaming的試驗數(shù)據(jù)處理系統(tǒng)的研究與實現(xiàn)[D]. 西安:西安電子科技大學,2015.
[21] 周斯波,程廣,趙宇杰. 計算機軟件的測試方法和裝置[P]. CN 106126426 A,2016.
(責任編輯:黃 ?。?/p>