岑凱倫,于紅巖,楊騰霄
(1.上海海事大學(xué)信息工程學(xué)院,上海201306;2.上海海事大學(xué)交通運輸學(xué)院,上海 201306;3.上海紐盾科技有限公司研發(fā)部,上?!?00092)
大數(shù)據(jù)下基于Spark的電商實時推薦系統(tǒng)的設(shè)計與實現(xiàn)
岑凱倫1,于紅巖2,楊騰霄3
(1.上海海事大學(xué)信息工程學(xué)院,上海201306;2.上海海事大學(xué)交通運輸學(xué)院,上海201306;3.上海紐盾科技有限公司研發(fā)部,上海200092)
隨著互聯(lián)網(wǎng)規(guī)模的迅速增長,導(dǎo)致用戶在面對海量的互聯(lián)網(wǎng)信息時,無法從中獲取自己真正感興趣的信息,產(chǎn)生“信息超載”問題。個性化推薦在此問題上彌補了搜索引擎的不足,即代替用戶評估其所有未看過的產(chǎn)品,并通過分析用戶的興趣愛好和歷史行為,主動推薦符合用戶喜好的項目。目前個性化推薦系統(tǒng)已在電子商務(wù)、電影、音樂網(wǎng)站等領(lǐng)域取得了顯著的成功。
根據(jù)IDC發(fā)布的數(shù)字宇宙報告顯示,至2020年數(shù)字宇宙將超出預(yù)期,達到40ZB,相當(dāng)于地球上人均產(chǎn)生5247GB的數(shù)據(jù)[1]。面對未來如此巨大規(guī)模的數(shù)據(jù)量,傳統(tǒng)單機環(huán)境下的推薦系統(tǒng)存在著兩大問題:一是單機節(jié)點的推薦模型訓(xùn)練由于單機硬件條件的限制,無法存儲所有需要運算的數(shù)據(jù)量;二是由于訓(xùn)練數(shù)據(jù)集規(guī)模的增大,單機節(jié)點進行訓(xùn)練的時長不斷增長。傳統(tǒng)單機環(huán)境下的推薦系統(tǒng)無法滿足大數(shù)據(jù)時間推薦的需求,Hadoop[2]平臺能夠處理高達上TB級別的海量數(shù)據(jù)。目前有大量的學(xué)者對單機的機器學(xué)習(xí)算法使用Hadoop平臺編寫進行擴展以實現(xiàn)對大規(guī)模數(shù)據(jù)集的處理。江小平[3]等基于MapReduce編程模型對樸素貝葉斯文本分類算法進行并行化擴展。劉義[4]等基于Map-Reduce編程模型在Hadoop平臺上實現(xiàn)了基于R-樹的k-近鄰連接算法。對于推薦內(nèi)容的計算,大量的學(xué)者將推薦系統(tǒng)和Hadoop平臺進行集成,Yu[5]等采集用戶之間傳遞的信息以及發(fā)表的游記文本作為訓(xùn)練數(shù)據(jù),利用Hadoop平臺構(gòu)建旅游推薦系統(tǒng)。 Walunj[6]等利用基于MapReduce實現(xiàn)的Mathout算法庫構(gòu)建電子商務(wù)推薦系統(tǒng),該算法庫集成了協(xié)同過濾算法,具有更好的操作性。
Hadoop平臺解決了海量數(shù)據(jù)計算推薦模型的問題,但是Hadoop平臺在并行計算時必須將中間結(jié)果存儲在磁盤中,并且需要從磁盤中再次讀取,導(dǎo)致Hadoop平臺構(gòu)建的推薦系統(tǒng)存在如下不足:一是離線推薦模型在面對海量數(shù)據(jù)時會出現(xiàn)訓(xùn)練時間較長的問題;二是無法對用戶的實時日志行為做出實時處理。由于基于Hadoop平臺構(gòu)建的推薦系統(tǒng)存在的不足,無法滿足實時推薦的需求,使得用戶對于電商網(wǎng)站的推薦反饋速度提出了更高的要求。Spark是新興的大數(shù)據(jù)處理引擎,其很好地解決了Hadoop平臺在計算時需要將運算的中間結(jié)果存入磁盤所導(dǎo)致的計算速度緩慢問題。從2009年Spark誕生至今,作為開源項目已經(jīng)在流處理、圖計算、機器學(xué)習(xí)、結(jié)構(gòu)化數(shù)據(jù)查詢等各個方面,取得了很多重要的成果[7]。Spark平臺為迭代式數(shù)據(jù)處理提供更好的支持,每次迭代的數(shù)據(jù)可以保存在內(nèi)存中,而不是寫入文件。Spark平臺提供了集群的分布式內(nèi)存抽象,即RDD[8],一個不可變的帶分區(qū)集合,以實現(xiàn)數(shù)據(jù)操作方式的多樣性。目前針對Spark平臺的相關(guān)研究論文較少,Lu[9]等利用遠程內(nèi)存提升Spark平臺在處理大數(shù)據(jù)時的速度。Qi[10]等利用Spark平臺將用于配對測試檢測的基因算法進行兩階段并行處理,提升了配對測試的體積大小和計算的效率。Yang[11]等基于Spark平臺提出了分批處理的梯度下降算法,并對深度置信網(wǎng)絡(luò)進行訓(xùn)練,提升了收斂速度。國內(nèi)對于Spark平臺的研究目前主要集中在一些互聯(lián)網(wǎng)行業(yè),如阿里巴巴、百度、騰訊、網(wǎng)易、搜狐等。騰訊公司數(shù)據(jù)倉庫已經(jīng)大量使用 Spark平臺替代原來的Hadoop平臺的MapReduce,并使系統(tǒng)性能大大提高。曹波[12]等將傳統(tǒng)關(guān)聯(lián)分析中的FP-Growth算法在Spark平臺實現(xiàn)了并行處理,解決了識別大數(shù)據(jù)的伴隨車輛組問題。王虹旭[13]等設(shè)計了在Spark平臺上的并行數(shù)據(jù)分析系統(tǒng),來解決海量數(shù)據(jù)分析問題。嚴(yán)玉良[14]等提出了一種基于Spark的大規(guī)模單圖頻繁子集挖掘算法,通過次優(yōu)樹構(gòu)建并行計算的候選子圖,在給定最小支持度時挖掘出所有的頻繁子圖。王詔遠[15]等基于Spark平臺提出一種并行蟻群優(yōu)化算法,通過將螞蟻轉(zhuǎn)換為彈性分布式數(shù)據(jù)集,由此給出一系列轉(zhuǎn)換算子,實現(xiàn)螞蟻構(gòu)造過程的并行化。
目前基于Hadoop平臺的推薦系統(tǒng)解決了推薦模型并行訓(xùn)練的問題,但離線訓(xùn)練速度慢。通過對Spark平臺的研究,Spark平臺擁有比Hadoop平臺更強大的計算能力,能更快速地處理并行數(shù)據(jù),但目前的研究僅是針對大數(shù)據(jù)下電商網(wǎng)站離線推薦系統(tǒng)的設(shè)計,并未提出基于Spark平臺的實時推薦流程和算法。本文設(shè)計和實現(xiàn)了應(yīng)對大數(shù)據(jù)的基于Spark平臺的電商實時推薦系統(tǒng),設(shè)計了實時推薦系統(tǒng)流程,提出了分布式日志實時采集、分布式日志實時傳輸、實時日志過濾和基于Spark平臺的實時推薦模型的關(guān)鍵技術(shù)。實驗結(jié)果表明,本系統(tǒng)具有高可靠性和穩(wěn)定性,能夠滿足大數(shù)據(jù)下實時推薦的需求。
1.1系統(tǒng)架構(gòu)設(shè)計
(1)設(shè)計思想
電商網(wǎng)站存在著大量的用戶隱式行為 (例如用戶瀏覽商品、用戶下單、用戶取消訂單、用戶將商品加入購物車和用戶將商品從購物車刪除),此外,由于電商系統(tǒng)規(guī)模的擴大和各個業(yè)務(wù)系統(tǒng)的拆分,使得系統(tǒng)日志文件散落在各個服務(wù)器上。傳統(tǒng)基于Hadoop平臺的推薦系統(tǒng)無法有效地匯總用戶隱式行為日志,并對隱式行為日志進行有效分析,無法滿足系統(tǒng)實時推薦的需求。本文的設(shè)計思想是根據(jù)電商網(wǎng)站的顯式用戶行為相對稀缺這一特點,采用用戶隱式行為來構(gòu)建用戶評分,并在隱式數(shù)據(jù)源的基礎(chǔ)上將傳統(tǒng)基于Hadoop平臺構(gòu)建的推薦系統(tǒng)移植到Spark平臺,同時在傳統(tǒng)離線推薦的基礎(chǔ)上結(jié)合用戶實時點擊流,實時分析用戶行為,并融合離線推薦模型,以反饋最適合當(dāng)前用戶的實時推薦列表。本文設(shè)計的基于Spark平臺的電商實時推薦系統(tǒng)架構(gòu),如圖1所示。
在圖1中,基于Spark平臺的電商實時推薦系統(tǒng)架構(gòu)分為3層:離線處理層、服務(wù)層和實時處理層。在服務(wù)層,首先系統(tǒng)將訪問各個業(yè)務(wù)系統(tǒng)的請求交由多臺應(yīng)用網(wǎng)關(guān)進行下發(fā),在應(yīng)用網(wǎng)關(guān)集群前通過HTTP服務(wù)器進行負(fù)載均衡。然后通過構(gòu)建分布式日志框架,在應(yīng)用網(wǎng)關(guān)服務(wù)器上安裝分布式日志采集Agent,采集訪問各個業(yè)務(wù)系統(tǒng)的日志信息。由于電商網(wǎng)站的日志產(chǎn)出量巨大,需要可靠的消息傳送中間件作為模型訓(xùn)練與數(shù)據(jù)源采集之間的紐帶,系統(tǒng)構(gòu)建了基于Kafka集群的消息分發(fā)中間件,實現(xiàn)日志數(shù)據(jù)的統(tǒng)一下發(fā)。由于日志數(shù)據(jù)中包含著各個業(yè)務(wù)系統(tǒng)的日志以及用戶點擊流的日志,在進入離線或?qū)崟r推薦階段前,需經(jīng)過統(tǒng)一的數(shù)據(jù)清洗。與以往將日志數(shù)據(jù)存儲于某一固定介質(zhì)、統(tǒng)一做離線批處理完成清洗不同,本系統(tǒng)采用Spark平臺的Spark Streaming技術(shù)實現(xiàn)日志的實時處理。Spark Streaming技術(shù)可以按照時間分片,對固定時間間隔內(nèi)收到的數(shù)據(jù)進行統(tǒng)一批處理,能達到實時處理的效果,并具有很高的吞吐量。
圖1 基于Spark平臺的電商實時推薦系統(tǒng)架構(gòu)
在離線處理層,作為實時推薦的數(shù)據(jù)源收集完畢后,對數(shù)據(jù)源中的用戶行為進行權(quán)重的分級,得到用戶對于某商品的基本評分,并輸入推薦模型訓(xùn)練。傳統(tǒng)的方案是使用Hadoop平臺的離線推薦模型訓(xùn)練,但Hadoop平臺存在三個問題:一是抽象層次低,需要編寫很冗余的代碼完成操作;二是Hadoop平臺只提供Map和Reduce兩個操作,表達能力欠缺;三是處理中間結(jié)果存儲在HDFS文件系統(tǒng)中,使得計算迭代式任務(wù)速度緩慢。本設(shè)計采用的Spark平臺利用RDD進行抽象,實現(xiàn)的數(shù)據(jù)邏輯相比Hadoop平臺更簡短,同時提供多種轉(zhuǎn)換和操作,具有很強的表達力。同時,相對于Hadoop平臺,Spark平臺的中間計算結(jié)果可以緩存在內(nèi)存中,對于需要很多迭代計算的推薦任務(wù),提高了計算效率。此外,基于Spark計算框架和Spark Mlib機器學(xué)習(xí)庫提供了ALS推薦模型,可以構(gòu)建新的離線推薦系統(tǒng),并且將電商網(wǎng)站所有用戶推薦列表寫入Redis緩存系統(tǒng)中,緩解電商網(wǎng)站系統(tǒng)壓力。
在電商網(wǎng)站中,如果只進行離線的模型訓(xùn)練,用戶當(dāng)天的訪問行為并不能實時地反映在推薦列表上,無法更好地滿足用戶需求以及提高電商網(wǎng)站商品的轉(zhuǎn)化率。因此,在實時處理層,系統(tǒng)需要對實時的用戶行為進行處理,將其與離線推薦的結(jié)果進行混合,從而提高實時推薦的效果。Hadoop平臺由于存儲的特性,只適用于批處理的場景,而采用了Spark Streaming(Spark流技術(shù))的Spark平臺,針對用戶的每次訪問,可以實時過濾日志信息,抽出所需要的信息,獲得與該商品相似的前N位商品列表,并與離線模型進行混合處理,進行重排序,使得電商網(wǎng)站可以感知到用戶最新的行為,提升電商網(wǎng)站的轉(zhuǎn)化率。
與以往基于Hadoop平臺的離線推薦系統(tǒng)相比,本文構(gòu)建的基于Spark平臺的電商實時推薦系統(tǒng)具有比以往更快的反饋速度和訓(xùn)練速度。
(2)實時推薦系統(tǒng)流程
基于以上設(shè)計思想,系統(tǒng)從Spark Streaming端獲取所需要的數(shù)據(jù),并復(fù)用了日志數(shù)據(jù)源端提供的數(shù)據(jù),經(jīng)過數(shù)據(jù)聚合、數(shù)據(jù)傳輸和數(shù)據(jù)過濾后,進行離線和實時推薦,返回融合了離線推薦和實時推薦結(jié)果的推薦列表。系統(tǒng)設(shè)計的實時推薦流程如下。
步驟1:計算隱式評分。電商網(wǎng)站通過HTTP服務(wù)器Nginx,根據(jù)配置好的響應(yīng)規(guī)則,將用戶的請求分發(fā)到多臺應(yīng)用網(wǎng)關(guān)中,由應(yīng)用網(wǎng)關(guān)完成向各個業(yè)務(wù)系統(tǒng)的請求調(diào)用,如購物車、交易以及商品系統(tǒng)。在應(yīng)用網(wǎng)關(guān)中植入分布式日志采集工具Agent,收集發(fā)向各個業(yè)務(wù)系統(tǒng)的日志信息,并匯集后發(fā)向Kafka消息集群。Kafka集群會接入Spark Streaming實時處理框架進行日志過濾,抽取出用戶交易行為、用戶瀏覽行為和用戶對購物車操作行為,并寫入Hive表。使用Shark讀取Hive表。其中Shark是基于Spark平臺上且兼容Hive語法的SQL執(zhí)行引擎,其底層調(diào)用Spark并行實現(xiàn)。在調(diào)用Shark時,系統(tǒng)賦予每一種用戶行為的不同權(quán)重,利用Shark計算用戶對商品的評分。
步驟2:離線推薦模型訓(xùn)練。計算完隱式評分,即可以得到(用戶ID-商品ID-評分)三元組,作為離線推薦模型的數(shù)據(jù)源,由于單一用戶在網(wǎng)站上的購買數(shù)據(jù)占商品總量很低,因此使用交替最小二乘(ALS)算法,計算出隱式因子,填補用戶未購買的商品的預(yù)測評分,然后訓(xùn)練出離線推薦模型。
步驟3:生成離線推薦列表。將電商網(wǎng)站上的用戶依次輸入模型,得到所有電商網(wǎng)站注冊用戶的離線推薦列表,設(shè)置推薦列表長度,為了減低數(shù)據(jù)庫訪問的壓力,系統(tǒng)將所有的推薦列表放入Redis緩存系統(tǒng)中,同時提供獲取推薦列表的接口,供PC端、移動網(wǎng)頁端和移動App端調(diào)用。其中Redis是一款基于內(nèi)存存儲的,可持久化的鍵值對數(shù)據(jù)庫。
步驟4:生成實時推薦列表。首先利用 Spark Streming技術(shù),將Kafka集群傳來的日志信息過濾出日志點擊流,從中抽取出用戶產(chǎn)生行為操作的商品ID和用戶ID。然后根據(jù)步驟2訓(xùn)練好的離線推薦模型,進行商品相似度排序,可得到相似度排名前5的商品。最后根據(jù)得到的用戶ID和商品ID的推薦列表,構(gòu)建商品ID和用戶ID的列表,即商品被推薦到用戶的鍵值對,定位到相關(guān)用戶ID,并將用戶推薦列表的前5個替換為步驟5得到的TOP 5商品,以此減少Redis的更新次數(shù),來優(yōu)化系統(tǒng)的響應(yīng)速度。
1.2系統(tǒng)架構(gòu)設(shè)計
本文設(shè)計和實現(xiàn)的基于Spark平臺的電商實時推薦系統(tǒng),主要會經(jīng)歷如下階段:日志數(shù)據(jù)的采集;日志數(shù)據(jù)的聚合;日志數(shù)據(jù)的傳輸;日志數(shù)據(jù)的過濾;用戶隱式行為的實時推薦。
(1)分布式日志的實時采集
電商實時推薦系統(tǒng)需要大量隱式的用戶行為作為基礎(chǔ)數(shù)據(jù),而且每種用戶行為的源日志信息分布在不同的業(yè)務(wù)系統(tǒng)中,需要構(gòu)建分布式日志匯總系統(tǒng)將日志進行收集,以備后續(xù)流程使用。本系統(tǒng)基于開源的分布式日志收集工具Logstash,實現(xiàn)對各業(yè)務(wù)子系統(tǒng)的日志進行收集。分布式日志采集模塊如圖2所示。
圖2 分布式日志采集模塊
在圖2中,系統(tǒng)植入在應(yīng)用網(wǎng)關(guān)處的日志監(jiān)控可以實時監(jiān)測日志文件的變化,并根據(jù)偏移量,讀取來自交易系統(tǒng)、商品系統(tǒng)和購物車系統(tǒng)的最新日志信息,然后將日志輸出到Redis中緩存起來。日志聚合索引目錄是日志的存儲者,負(fù)責(zé)從Redis緩存中收集日志,并格式化處理,輸出給所需要的用戶。分布式日志采集模塊的自定義輸出為Kafka消息集群。
(2)基于Kafka集群的數(shù)據(jù)傳輸
通過構(gòu)建分布式日志實時采集模塊,完成了用戶行為日志的采集。但是在進入日志過濾階段之前,由于日志流并發(fā)產(chǎn)生且數(shù)量很大,如何保證數(shù)據(jù)的實時性以及盡量減少數(shù)據(jù)丟失,這些都給隱式用戶行為日志數(shù)據(jù)的收集帶來了巨大的挑戰(zhàn)。LinkedIn公司開發(fā)了一套專用的分布式消息訂閱和發(fā)布系統(tǒng)——Kafka,于2010年開源,并且成為Apache的開源項目之一。本文設(shè)計和實現(xiàn)的電商實時推薦系統(tǒng)中,構(gòu)建Kafka集群,來承載上千萬的用戶行為日志信息,為后續(xù)的日志過濾階段提供了安全可靠的消息傳輸。由于Kafka集群是一套分布式系統(tǒng),其吞吐量可以隨著集群的擴展而線性增加。圖3為基于Kafka集群的數(shù)據(jù)分發(fā)架構(gòu)。
圖3 基于Kafka集群的數(shù)據(jù)傳輸
在圖3中,Kafka集群由三個部分構(gòu)成:生產(chǎn)者(Producer),代表日志的來源;代理(Broker),代表消息的中間存儲層;消費者(Consumer),代表消息的使用者。其中,Producer負(fù)責(zé)將消息收集并推送(Push)到Broker,而Broker則負(fù)責(zé)接收Producer發(fā)送來的消息,并將消息本地持久化,Consumer則是消息的真正使用者,從Broker拉?。≒ull)消息并進行處理。系統(tǒng)中植入在應(yīng)用網(wǎng)關(guān)的 Logstash日志監(jiān)控會將處理完的日志發(fā)送至LogStash日志聚合索引,由LogStash日志聚合索引作為生產(chǎn)者將日志數(shù)據(jù)發(fā)送至Kafka集群,Spark節(jié)點作為消費者,啟動Spark Spreaming處理實時傳來的日志流,并根據(jù)實時推薦的需求做不同的過濾處理。
(3)基于Spark Streaming的日志過濾
數(shù)據(jù)傳輸后,系統(tǒng)統(tǒng)一使用Spark Streaming過濾數(shù)據(jù),并根據(jù)流程將日志做不同的處理,實現(xiàn)離線和實時推薦的復(fù)用的日志過濾模塊。Spark Streaming接收到的是實時收集到的日志信息,含有很多的噪聲數(shù)據(jù),需要從中抽取出所需要的信息。在實時推薦流程中,需要獲取點擊流的日志數(shù)據(jù),從中抽取出用戶ID和商品ID。用戶點擊商品所調(diào)用的接口方法用于獲取商品詳情,根據(jù)預(yù)先定義的日志信息的主題,從Kafka代理層中拉取日志信息。其中在日志信息中記錄了用戶這次請求調(diào)用的接口。LogStash展示的是商品詳情查看源日志的格式化日志,具體如表1所示。
表1 LogStash的格式化日志
由于表1只是LogStash提供的前端展現(xiàn),在系統(tǒng)流程中,需要調(diào)用Spark Sreaming對所有接受到的日志調(diào)用filter函數(shù),將消息中包含獲取商品詳情方法的日志過濾出來,過濾后得到所有的商品詳情的請求日志,在消息中解析變量字段對應(yīng)的內(nèi)容,從中獲得itemId,即商品ID,然后獲取用戶行為字段,并從字段對應(yīng)的內(nèi)容中獲取_cip(用戶IP),_uid(用戶id)等關(guān)鍵信息,為后續(xù)的實時推薦提供了數(shù)據(jù)源。
(4)基于Spark平臺的實時推薦算法
本文設(shè)計的大數(shù)據(jù)電商實時推薦系統(tǒng)主要分為離線處理和實時處理兩個不同的流程,基于Spark平臺對已有離線推薦系統(tǒng)進行優(yōu)化,并且在實時性上進一步加強,將離線推薦的結(jié)果和實時推薦的結(jié)果進行融合,實現(xiàn)電商網(wǎng)站的實時推薦。系統(tǒng)首先進行離線模型的訓(xùn)練,離線推薦主要基于對用戶隱式行為的挖掘,如支付、未支付、增刪購物車和瀏覽詳情等操作,因而需要獲得用戶的隱式行為,得到用戶對商品的隱式評分。隱式用戶行為表如表2所示。
表2 隱式用戶行為表
基于Spark平臺的實時推薦算法如下:
(1)讀取用戶行為表。系統(tǒng)運用Shark從Hive中獲取3個用戶行為表,即用戶交易表、用戶購物車數(shù)據(jù)表和用戶瀏覽商品記錄表。
(2)構(gòu)建訓(xùn)練數(shù)據(jù)源。讀入用戶行為表,根據(jù)用戶點擊行為的權(quán)重,得到(用戶ID,商品ID),評分)鍵值對。讀入交易表,對支付行為以及非支付行為進行分別處理,根據(jù)對應(yīng)的權(quán)重,得到(用戶ID,商品ID),評分)鍵值對。讀入購物車數(shù)據(jù)表,因為購物車有多種不同的行為,本系統(tǒng)只需要增加物品至購物車,以及從購物車刪除商品,購物車表進行過濾,篩選出需要的記錄,得到(用戶ID,商品ID),評分)。讀入用戶瀏覽商品記錄表,根據(jù)對應(yīng)的權(quán)重,得到(用戶ID,商品ID),評分)鍵值對。
(3)離線推薦模型訓(xùn)練。處理完3個用戶行為表,調(diào)用union函數(shù),將用戶行為表中得到的(用戶ID,商品ID),評分)鍵值對進行融合,去掉重復(fù)鍵值對,并構(gòu)建Spark Mlib機器學(xué)習(xí)庫中基于ALS的協(xié)同過濾算法的數(shù)據(jù)源,即(用戶ID,商品ID,評分)三元組。設(shè)置ALS迭代的次數(shù)以及相關(guān)參數(shù),ALS算法會對用戶-商品評分矩陣進行分解,利用隱語義因子進行表達,同時用于預(yù)測缺失的元素。
(4)實時推薦模型。離線模型訓(xùn)練完畢后,首先電商網(wǎng)站將網(wǎng)站所有的用戶輸入模型,將推薦列表寫入Redis緩存系統(tǒng),優(yōu)化網(wǎng)站性能。然后啟動實時推薦任務(wù),根據(jù)從點擊流中取得的商品ID,利用離線推薦模型,取得與之最相似的前5個商品。最后在Redis緩存系統(tǒng)中找到對應(yīng)用戶ID的推薦列表,剔除原有列表的最后5個,將第2步中得出的5個商品放入Redis中推薦列表的隊首。
通過上述離線推薦與實時推薦的融合,完成基于Spark平臺的實時推薦模型,達到實時響應(yīng)用戶請求,實現(xiàn)實時推薦反饋的目的。
本文的實驗環(huán)境如下:基于Spark平臺的電商實時推薦系統(tǒng)搭建了3臺云服務(wù)器,托管在阿里云上,承擔(dān)每日的用戶訪問;每臺服務(wù)器配置8核CPU,16GB內(nèi)存和300GB硬盤。軟件配置如下:采用Spark 1.5.2版本用于大數(shù)據(jù)處理;Java 1.8版本用于編寫Spark程序;Logstash 2.1.1版本用于分布式日志采集;Kafka 0.8.2.2版本用于分布式日志數(shù)據(jù)傳輸;此外Hadoop 2.6版本用于分布式文件系統(tǒng)并與Spark平臺進行測試對比。本文對分布式日志采集、分布式日志傳輸、實時日志過濾和實時推薦等系統(tǒng)關(guān)鍵技術(shù)進行實驗。
2.1分布式日志采集
大數(shù)據(jù)下電商網(wǎng)站每天為大量的用戶提供服務(wù),圖4顯示了電商網(wǎng)站的每天采集的日志總量,達到1600 萬/天的日志吞吐量。海量的用戶行為日志數(shù)據(jù)為實時推薦提供了足量的訓(xùn)練數(shù)據(jù)。本文所構(gòu)建的分布式日志采集模塊解決了大數(shù)據(jù)電商網(wǎng)站跨系統(tǒng)收集用戶訪問日志的問題。
圖4 電商網(wǎng)站每天日志總量
2.2分布式日志數(shù)據(jù)傳輸
分布式日志采集系統(tǒng)每天會采集到1600萬的日志信息,其中絕大部分會交給Kafka集群進行傳遞,作為實時推薦的數(shù)據(jù)源,因此需要對Kafka集群進行吞吐量的測試,以保證數(shù)據(jù)可靠、實時傳輸。多個Producer可同時向同一個主題發(fā)送數(shù)據(jù),在Broker負(fù)載飽和前,Producer數(shù)量越多,集群每秒收到的消息量越大,并且呈線性增漲,不同個數(shù)Producer時的總吞吐率如圖5所示。
圖5 Kafka集群的生產(chǎn)者性能實驗
由圖5可以看出,單個Producer每秒可成功發(fā)送約128萬條負(fù)載為100字節(jié)的消息,并且隨著Producer個數(shù)的提升,每秒總共發(fā)送的消息量線性提升。系統(tǒng)中有4臺Producer,每天產(chǎn)生的日志總量是1600萬。系統(tǒng)構(gòu)建的Kafka集群,經(jīng)實驗證明,足以接收穩(wěn)定傳輸分布式采集到的數(shù)據(jù)。在穩(wěn)定接收的前提下,對Kafka集群又進行消費測試,在集群中已有大量消息的情況下,使用1-3個Consumer時的Kafka集群總吞吐量如圖6所示。
圖6 Kafka集群的消費者性能實驗
由圖6可知,單個Consumer每秒可消費306萬條消息,該數(shù)量遠大于單個Producer每秒可消費的消息數(shù)量,這保證了在默認(rèn)配置下,消息可被及時處理。并且隨著Consumer數(shù)量的增加,系統(tǒng)集群的總吞吐量呈線性增加,能夠滿足用戶訪問量增大,日志傳輸量增大的需求。
2.3基于Spark Streaming的日志過濾
Kafka集群可以穩(wěn)定負(fù)載本文構(gòu)建的實時推薦系統(tǒng)的日志傳輸量,因此需要對Spark Streaming實時處理日志,對提取出所需要數(shù)據(jù)的力進行測試。Spark-
Streaming處理日志速率如圖7所示。
圖7 SparkStreaming處理日志速率
在圖7中,Spark Streaming平均每秒處理202條記錄,且運行狀況良好。同時根據(jù)系統(tǒng)運行15小時的日志顯示,Spark Streaming一共完成18557次實時批處理,提取了13533355條記錄,能夠滿足實時日志的處理需求。
利用Spark Streaming對實時日志流進行實時過濾,從日志中抽取出對應(yīng)的商品ID和用戶ID,供實時推薦流程使用,抽取出的日志信息如表3所示。
表3 實時抽取的日志信息
2.4基于Spark平臺的實時推薦
由于Spark平臺在處理任務(wù)上相對于Hadoop平臺的優(yōu)越性,本文采用Spark以及其生態(tài)系統(tǒng)中的ALS模型作為實時推薦平臺的計算平臺與訓(xùn)練模型。為了測試Hadoop平臺與Spark平臺在處理計算任務(wù)時的性能差異,本系統(tǒng)選用了電商平臺采集的數(shù)據(jù)集對Spark平臺與Hadoop平臺的MapReduce在執(zhí)行作業(yè)性能上做了對比實驗。Spark與Hadoop執(zhí)行作業(yè)時間對比如圖8所示。
從圖8中可以看出,Spark平臺在進行不同作業(yè)類型的計算時,性能都相對于Hadoop平臺的MapReduce平均提升4倍以上。但對于WordCount、UserBased及ItemBased此類迭代次數(shù)不多的任務(wù)時,相對于Hadoop平臺的MapReduce計算速率提升幅度較小,平均提升3倍以上。
在進行本系統(tǒng)所使用的ALS模型訓(xùn)練時,因為其需要多次迭代運算,性能提升非常顯著。ALS模型在Hadoop平臺及Spark平臺上訓(xùn)練性能對比,如圖9所示。
圖8 Spark平臺與Hadoop平臺執(zhí)行作業(yè)時間對比
圖9 ALS模型在Hadoop及Spark平臺上訓(xùn)練性能對比
從圖9中可以發(fā)現(xiàn),在多次迭代后,Spark平臺的效率相比Hadoop平臺要提高10倍以上,這是由于Hadoop平臺的Mapreduce每次迭代后,都要重新讀取HDFS,使得作業(yè)完成的時間和迭代次數(shù)成線性增長,而Spark平臺由于其將中間結(jié)果緩存在內(nèi)存中,即使進行多次迭代,時間也不會出現(xiàn)明顯增加。
圖8和圖9的對比實驗顯示了Spark平臺作為推薦平臺的基礎(chǔ)架構(gòu)相對于傳統(tǒng)推薦系統(tǒng)的優(yōu)越性。實驗最后對離線推薦的結(jié)果進行了測試。圖10顯示的是在移動App端基于測試用戶的用戶行為的離線推薦結(jié)果。當(dāng)測試用戶點擊了巧克力的類目,通過實時獲取用戶訪問的信息,實時推薦模塊會啟動,抽取出與該商品最相似的商品,并與離線推薦列表進行融合,產(chǎn)生實時推薦列表。測試用戶的實時推薦結(jié)果如圖11所示。
圖10 離線推薦結(jié)果
圖11 實時推薦結(jié)果
圖10和圖11的實驗結(jié)果驗證了本文設(shè)計的基于Spark的電商實時推薦系統(tǒng)能夠有效承載網(wǎng)站的日志信息,并根據(jù)用戶的實時用戶行為做出實時推薦反饋,優(yōu)化了用戶體驗,提升了網(wǎng)站的銷售額。根據(jù)日志采集系統(tǒng),電商網(wǎng)站推薦模塊的交易轉(zhuǎn)化率提升了5%,有效優(yōu)化了用戶體驗。
基于Hadoop實現(xiàn)的推薦系統(tǒng)存在著離線訓(xùn)練速度慢,并且無法對用戶實時行為做出推薦反饋,不能滿足大數(shù)據(jù)時代用戶對實時推薦系統(tǒng)的需求。以往研究表明,Spark平臺在并行處理大數(shù)據(jù)上擁有比Hadoop平臺更強的運算性能,但目前未有一套完整的實現(xiàn)流程解決Spark平臺下針對用戶隱式行為日志做出實時推薦的問題。本文設(shè)計和實現(xiàn)了大數(shù)據(jù)下基于Spark平臺的電商實時推薦系統(tǒng);提出了一套新的實時推薦流程;針對跨系統(tǒng)用戶隱式行為日志的收集及傳輸?shù)男枨螅O(shè)計并實現(xiàn)了分布式日志采集模塊和分布式日志傳輸模塊;并且通過基于Spark Streaming的日志實時過濾模塊完成日志數(shù)據(jù)的過濾。在統(tǒng)一數(shù)據(jù)源的基礎(chǔ)上,本文創(chuàng)新地提出了大數(shù)據(jù)下電商網(wǎng)站的實時推薦算法,將離線推薦推薦的推薦結(jié)果和實時流計算出的推薦結(jié)果進行融合,生成實時推薦列表。最后用實驗驗證了系統(tǒng)的可靠性、穩(wěn)定性以及相對于Hadoop平臺的高效性。下一步的工作將針對大數(shù)據(jù)下電商網(wǎng)站越來越多種類的用戶行為,設(shè)計多樣的數(shù)據(jù)處理方式,以提升系統(tǒng)的通用性。
[1]IDC.The Digital Universe of Opportunities:Rich Data and the Incdreasing Value of the Internet of Things[EB/OL].[2014-04]. http://www.emc.com/leadership/digital-universe/2014iview/executive-summary.htm
[2]FERRERIA C R L,Traina J C,MACHADO T A J,et al.Clustering Very Large Multi-Dimensional Datasets with Mapreduce[C]. 17th ACM SIGKDD International Conference on Knowledge Discovery and Data Mining,2011 ACM.San Diego:ACM Press,2011:690-698.
[3]江小平,李成華,向文等.云計算環(huán)境下樸素貝葉斯文本分類算法的實現(xiàn)[J].計算機應(yīng)用,2011,31(9):2551-2555.
[4]劉義,景寧,陳犖,熊偉.MapReduce框架下基于R-樹的k-近鄰連接算法.軟件學(xué)報,2013,24(8):1836-1851.
[5]YU Y,HUANG C,LEE Y.An Intelligent Touring System Based on Mobile Social Network and Cloud Computing for Travel Recommendation[C].28th International Conference on Advanced Information Networking and Applications Workshops(AINA),2014 IEEE. Victoria,Canada:IEEE Press,2014:19-24.
[6]WALUNJ S G,SADAFALE K.An Online Recommendation System for E-commerce Based on Apache Mahout Framework[C].2013 Annual Conference on Computers and People Research,2013 ACM.Cincinnati:ACM Press,2013:153-158.
[7]ZAHARIA M,CHOWDHURY M,F(xiàn)RANKLIN M J,et al.Spark:Cluster Computing with Working Sets[C].Proceedings of the 2nd USENIX Conference on Hot Topics in Cloud Computing,2010:10-10.
[8]ZAHARIA M,CHOWDHURY M,DAS T,et al.Resilient Distributed Datasets:A Fault-Tolerant Abstraction for in-Memory Cluster Computing[C].Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation.USENIX Association,2012:2-2.
[9]X.LU,M.W.U.RAHMAN,N.ISLAM,D.SHANKAR.Accelerating Spark with RDMA for Big Data Processing:Early Experiences[C]. Proceedings of the 22nd Annual Symposium on High-Performance Interconnects.2010:9-16.
[10]QI RZ,WANG ZJ,LI SY.A Parallel Genetic Algorithm Based on Spark for Pairwise Test Suite Generation[J].Journal of ComputerScience and Technology,2016,31(2):417-27.
[11]YANG J,HE SQ.The Optimization of Parallel DBN Based on Spark[C].Proceedings of the 19th Asia Pacific Symposium on Intelligent and Evolutionary Systems,2016:157-169.
[12]曹波,韓燕波,王桂玲.基于車牌識別大數(shù)據(jù)的伴隨車輛組發(fā)現(xiàn)方法[J].計算機應(yīng)用,2015,35(11):3203-3207.
[13]王虹旭,吳斌,劉旸.基于Spark的并行圖數(shù)據(jù)分析系統(tǒng)[J].計算機科學(xué)與探索,2015,9(9):1066-1074.
[14]嚴(yán)玉良,董一鴻,何賢芒等.FSMBUS:一種基于Spark的大規(guī)模頻繁子圖挖掘算法[J].計算機研究與發(fā)展,2015,52(8):1768-1783.
[15]王詔遠,王宏杰,刑煥來等.基于Spark的蟻群優(yōu)化算法[J].計算機應(yīng)用,2015,35(10):2777-2780,2797.
Big-Data;Spark Platform;Hadoop Platform;Real-Time Recommendation;Implicit User Behavior
Design and Implement of E-Commerce Real-Time Recommender System with Spark Based on Big Data
CEN Kai-lun1,YU Hong-yan2,YANG Teng-xiao3
(1.College of Information Engineering,Shanghai Maritime University,Shanghai 201306;2.College of Transport and Communications,Shanghai Maritime University,Shanghai 201306;3.Research and Department,Shanghai Newdon Technology Company Limited,Shanghai 200092)
國家自然科學(xué)基金(No.61562056)、教育部人文社科青年基金資助項目(No.13YJC630210)、2014年上海市科技型技術(shù)創(chuàng)新基金項目(No.1401H164800)、上海市楊浦區(qū)國家創(chuàng)新型試點城區(qū)建設(shè)與管理專項資金項目(No.2015YPCX03-002)
1007-1423(2016)24-0061-09DOI:10.3969/j.issn.1007-1423.2016.24.015
岑凱倫(1991-),男,上海人,碩士研究生,研究方向為云計算、大數(shù)據(jù)處理
于紅巖(1979-),女,山東文登人,講師,博士,研究方向為電子商務(wù)、云計算安全
楊騰霄(1977-),男,山西長治人,工程師,碩士,研究方向為云計算安全
2016-05-12
2016-07-25
大數(shù)據(jù)下基于Hadoop平臺構(gòu)建的電商推薦系統(tǒng)存在著計算緩慢、無法根據(jù)用戶實時行為作出推薦的問題。針對以上問題,設(shè)計和實現(xiàn)基于Spark平臺的電商實時推薦系統(tǒng)。與Hadoop平臺構(gòu)建的推薦系統(tǒng)相比,系統(tǒng)首先基于Spark平臺構(gòu)建了分布式日志采集模塊和分布式日志數(shù)據(jù)傳輸模塊,用于采集和傳輸用戶隱式行為日志,解決電子商務(wù)跨系統(tǒng)數(shù)據(jù)源收集問題;其次在統(tǒng)一數(shù)據(jù)源的基礎(chǔ)上,采用基于Spark的矩陣分解推薦模型進行離線訓(xùn)練,提升離線推薦訓(xùn)練的效率;進而在離線推薦的基礎(chǔ)上,提出一種使用Spark Streaming實時流技術(shù)對電商日志數(shù)據(jù)做實時過濾,獲取用戶當(dāng)前所需商品,并將離線推薦結(jié)果與實時推薦結(jié)果通過統(tǒng)一介質(zhì)融合的方案,實現(xiàn)對用戶隱式行為進行實時推薦反饋的功能。最后經(jīng)實驗證明,基于Spark平臺的電商實時推薦系統(tǒng)相對于Hadoop平臺的電商推薦系統(tǒng)具有更高的可靠性和穩(wěn)定性,能夠承載大規(guī)模數(shù)據(jù)量,離線推薦訓(xùn)練速度相對于Hadoop平臺提高10倍,并且對用戶的實時行為也能夠作出實時推薦反饋,提升5%的交易轉(zhuǎn)化率,增強電商網(wǎng)站的用戶體驗。
大數(shù)據(jù);Spark平臺;Hadoop平臺;實時推薦;用戶隱式行為
Concerns the problem that the e-commerce recommendation system which based on Hadoop platform has low computing speed and can't make recommendation based on real-time user behavior.In order to solve the problem,designs real-time e-commerce recommendation system which is based on Spark platform.What is different from the previous system is that distributed log collection module and distributed log data transmission module are designed to collect and transfer log data of implicit user behavior,which solves the problem of collecting the log data come from different system.On the basis of a unified data source,the matrix decomposition model based on Spark is used to do off-line training and Spark streaming is used to do real-time log filtering to get the most similar goods to the good which included in the log.The result of real-time recommendation and off-line recommendation is merged in the system as feedback to the realtime user behavior.The experimental results show that the system which can carry massive amounts of data has the higher reliability and stability than the system which is based on Hadoop,the training speed of the off-line recommendation is 10 times as fast as that of the Hadoop platform,can make real-time recommended feedback to real-time user behavior which increase the user experience and the percent conversion of trade can be increased 5%.