李明東 王英 焦杰
摘要:本文基于Hadoop平臺(tái)設(shè)計(jì)了一個(gè)實(shí)時(shí)數(shù)據(jù)處理系統(tǒng),通過(guò)對(duì)主流實(shí)時(shí)計(jì)算框架的研究,解決了Spark,puma沒(méi)能解決的數(shù)據(jù)源主動(dòng)接入問(wèn)題.本系統(tǒng)設(shè)計(jì)主要包括核心計(jì)算模塊設(shè)計(jì)、數(shù)據(jù)接入模塊設(shè)計(jì)和存儲(chǔ)模塊設(shè)計(jì).主要用到的算法包括可靠性機(jī)制算法、信號(hào)量機(jī)制算法、事務(wù)性機(jī)制算法等.實(shí)踐結(jié)果表明,系統(tǒng)處理效率高且運(yùn)行穩(wěn)定.
關(guān)鍵詞:Hadoop;實(shí)時(shí)數(shù)據(jù)處理;可靠性機(jī)制
中圖分類號(hào):TP311.13? 文獻(xiàn)標(biāo)識(shí)碼:A? 文章編號(hào):1673-260X(2019)04-0047-03
隨著大數(shù)據(jù)技術(shù)的快速發(fā)展,要求現(xiàn)有平臺(tái)不僅能夠處理海量數(shù)據(jù),還要能夠快速的對(duì)接批量數(shù)據(jù),實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)處理與結(jié)果的展現(xiàn);本文以Hadoop主流數(shù)據(jù)處理平臺(tái)為基礎(chǔ),使用storm計(jì)算框架完成各個(gè)模塊的功能設(shè)計(jì).在整個(gè)系統(tǒng)框架下,分別對(duì)可靠性等機(jī)制進(jìn)行了內(nèi)外部環(huán)境的功能及性能的測(cè)試.為后續(xù)數(shù)據(jù)可視化的實(shí)時(shí)展現(xiàn)以及預(yù)測(cè)提供了堅(jiān)實(shí)的理論和實(shí)驗(yàn)基礎(chǔ).
1 實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)結(jié)構(gòu)與功能設(shè)計(jì)
基于ARM微處理器芯片的智能遠(yuǎn)程防盜系統(tǒng)的結(jié)構(gòu)功能設(shè)計(jì)主要包括:電源模塊、無(wú)線本系統(tǒng)開(kāi)發(fā)核心基于Hadoop架構(gòu),結(jié)合了kafka[1]、HBase、Thritf,以及Zookeeper[2]集群等開(kāi)源工具,使用Storm作為數(shù)據(jù)計(jì)算模塊.實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)的環(huán)境和服務(wù)部署框架如圖1所示.
1.1 Storm-YARN
Hadoop集群中的3臺(tái)機(jī)器提供給Storm- YARN使用,一臺(tái)作為Nimbus,另外兩臺(tái)作為Supervisor機(jī),每個(gè)開(kāi)啟4個(gè)工作進(jìn)程.
1.2 Kafka
用于提供高吞吐消息服務(wù)的Kafka隊(duì)列部署在一臺(tái)Linux物理機(jī)器上,Kafka可以有效地解決在線數(shù)據(jù)活躍導(dǎo)致與系統(tǒng)之間速度不匹配的問(wèn)題.Kafka通過(guò)追加數(shù)據(jù)的方法完成對(duì)磁盤數(shù)據(jù)的長(zhǎng)久保持,提高系統(tǒng)運(yùn)算能力的同時(shí)又能穩(wěn)定存儲(chǔ)數(shù)據(jù).
1.3 HBase
Hadoop集群中的3臺(tái)機(jī)器被劃分用于部署HBase服務(wù).HBase架構(gòu)圖如圖2所示.
HBase中客戶端通過(guò)遠(yuǎn)程過(guò)程調(diào)用機(jī)制與HRegionServer和HMaster進(jìn)行通信.當(dāng)用戶對(duì)數(shù)據(jù)進(jìn)行讀寫(xiě)操作時(shí),客戶端通過(guò)遠(yuǎn)程過(guò)程調(diào)用機(jī)制與HRegionServer通信,對(duì)數(shù)據(jù)進(jìn)行創(chuàng)建、權(quán)限、刪除等操作時(shí),客戶端通過(guò)遠(yuǎn)程過(guò)程調(diào)用機(jī)制與HMaster通信.
1.4 實(shí)時(shí)系統(tǒng)中Hadoop集群配置
本系統(tǒng)Hadoop集群包括10臺(tái)機(jī)器,一臺(tái)機(jī)器為Namenode,其余為Datanode.Namenode用于維護(hù)文件系統(tǒng)樹(shù),包括樹(shù)內(nèi)的文件和目錄,Datanode存儲(chǔ)和檢索數(shù)據(jù)塊,并維護(hù)數(shù)據(jù)塊存儲(chǔ)列表,一定周期內(nèi)將信息發(fā)送給Namenode[3].
1.5 Zookeeper集群配置
Zookeeper集群分配4臺(tái)機(jī)器,Zookeeper采用與文件系統(tǒng)相似的目錄節(jié)點(diǎn)樹(shù)來(lái)存儲(chǔ)數(shù)據(jù),數(shù)據(jù)的集群管理通過(guò)維護(hù)和檢測(cè)數(shù)據(jù)的變化以實(shí)現(xiàn),此外Zookeeper在本系統(tǒng)中為HBase等程序提供服務(wù).
2 實(shí)時(shí)數(shù)據(jù)接入
2.1 實(shí)時(shí)數(shù)據(jù)處理
首先啟動(dòng)kafka消息隊(duì)列服務(wù),將用戶數(shù)據(jù)源接入系統(tǒng)緩沖池,第二步啟動(dòng)位于數(shù)據(jù)源層的數(shù)據(jù)源接入模組,讀取配置,向外提供服務(wù).用戶向系統(tǒng)發(fā)送一項(xiàng)任務(wù)時(shí),系統(tǒng)首先對(duì)任務(wù)進(jìn)行邏輯解析,將解析后的任務(wù)發(fā)送到計(jì)算層,完成實(shí)時(shí)計(jì)算和存儲(chǔ).系統(tǒng)外應(yīng)用使用應(yīng)用程序編程接口將數(shù)據(jù)發(fā)送到系統(tǒng),并在消息隊(duì)列中進(jìn)行緩存,數(shù)據(jù)在消息隊(duì)列中排隊(duì)等待,對(duì)數(shù)據(jù)處理需要在計(jì)算層有相關(guān)的在線處理進(jìn)程.
在采用C/S架構(gòu)的數(shù)據(jù)源接入層中,外部應(yīng)用被稱為Client,系統(tǒng)即為Sever端.Client端可以通過(guò)發(fā)送數(shù)據(jù)給Server后,等待Server確定后繼續(xù)發(fā)送數(shù)據(jù)或者不經(jīng)過(guò)Server確定一直發(fā)送數(shù)據(jù)這兩種方式傳輸數(shù)據(jù)[4].實(shí)時(shí)數(shù)據(jù)接入流程圖如圖3所示.
2.2 數(shù)據(jù)處理模式設(shè)計(jì)
數(shù)據(jù)處理[4]包括對(duì)數(shù)據(jù)的統(tǒng)計(jì)、提取、過(guò)濾、計(jì)算TopN、數(shù)據(jù)聚合等,還要對(duì)中文數(shù)據(jù)流進(jìn)行分詞操作.數(shù)據(jù)處理流程如圖4所示.
2.3 實(shí)時(shí)處理系統(tǒng)實(shí)現(xiàn)
實(shí)時(shí)處理系統(tǒng)由數(shù)據(jù)接入模塊、存儲(chǔ)模塊、核心計(jì)算模塊組成.
(1)數(shù)據(jù)接入工作流程如圖5所示.
模塊中分為客戶端、服務(wù)器、通信以及消息隊(duì)列;客戶端發(fā)送流式數(shù)據(jù)至服務(wù)器,同時(shí)為了提升消息的傳輸效率和質(zhì)量,在客戶端中加入了Retey機(jī)制,并設(shè)置最大的Retry次數(shù)是5次,當(dāng)連續(xù)5次調(diào)用失敗才算最終失敗.
當(dāng)客戶端調(diào)用失敗時(shí)拋出異常,系統(tǒng)調(diào)用handleTException方法處理異服務(wù)器需要能快速響應(yīng)客戶端的請(qǐng)求,因此本文服務(wù)器采用線程池工作模式,設(shè)置最小線程數(shù)8,最大線程數(shù)256,這樣提高了服務(wù)器響應(yīng)速度,又最大程度減少了資源的消耗[5].
Kafka消息隊(duì)列部署在一臺(tái)linux機(jī)器上,Kafka將來(lái)自同一數(shù)據(jù)源的消息即同一主題,默認(rèn)分區(qū)個(gè)數(shù)為10.在Kafka中,生產(chǎn)者產(chǎn)生消息并且將消息發(fā)送給服務(wù)器;消費(fèi)者負(fù)責(zé)使用消息,這三者的關(guān)系如圖6所示.
通信部分有handleMsg以及handleMsg兩個(gè)接口方法,用戶根據(jù)需求選擇調(diào)用.
(2)實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)核心計(jì)算框架
本系統(tǒng)中的實(shí)時(shí)計(jì)算部分是基于Storm框架開(kāi)發(fā),spout組件提供數(shù)據(jù)噴發(fā)服務(wù),Bolt組件提供數(shù)據(jù)處理操作,二者構(gòu)成Storm的在線計(jì)算任務(wù).核心計(jì)算框架如圖7所示.
3 實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)算法的設(shè)計(jì)與處理
3.1 可靠性機(jī)制算法設(shè)計(jì)
在基于Storm平臺(tái)的可靠性機(jī)制算法下構(gòu)建流程如下圖所示.
如果消息處理失敗,則調(diào)用fail方法.首先將消息隊(duì)列頭中的消息移除,消息處理結(jié)果被標(biāo)記為失敗,進(jìn)行計(jì)時(shí).消息處理成功,調(diào)用ack方法,將消息隊(duì)列頭中的消息移除,消息處理結(jié)構(gòu)被標(biāo)記為成功,開(kāi)始計(jì)時(shí),計(jì)時(shí)結(jié)束調(diào)用nextTuple方法發(fā)送接下來(lái)的消息[6].
3.2 信號(hào)量機(jī)制算法
本系統(tǒng)基于Storm信號(hào)量機(jī)制開(kāi)發(fā)了一個(gè)組件Signalspout.Signalspout組件用于發(fā)送清空緩存等操作的信號(hào)給其他組件,只需signalspout組件定時(shí)發(fā)射信號(hào)就能實(shí)現(xiàn)從一個(gè)方面控制多個(gè)時(shí)間粒度[7],signalspout工作原理如圖9所示.
3.3 事務(wù)性機(jī)制算法
使用TridentTopology事務(wù)性在線任務(wù)完成該算法,Trident包括Partition-local操作、Merge/Join操作、流分組操作、Pepartitionning操作、Aggregation操作[8].
4 總結(jié)
本文基于Hadoop平臺(tái)設(shè)計(jì)了一個(gè)實(shí)時(shí)數(shù)據(jù)處理系統(tǒng),彌補(bǔ)了spark、Hadoop平臺(tái)不能供多用戶實(shí)時(shí)操作數(shù)據(jù)的不足.HBaseBolt組件實(shí)現(xiàn)了存儲(chǔ)消息序列到HBase數(shù)據(jù)庫(kù)中,將tuple數(shù)據(jù)樣例轉(zhuǎn)變?yōu)閜ut實(shí)例進(jìn)行存儲(chǔ).改進(jìn)后的實(shí)時(shí)處理系統(tǒng)確保數(shù)據(jù)源組件spout發(fā)出的信息能被bolts及時(shí)捕捉并處理.系統(tǒng)采用的信號(hào)量機(jī)制控制對(duì)時(shí)間粒度不同時(shí),控制數(shù)據(jù)分流并進(jìn)行置零計(jì)數(shù);通過(guò)多次運(yùn)行試驗(yàn),系統(tǒng)處理數(shù)據(jù)及時(shí)且運(yùn)行穩(wěn)定,提升了平臺(tái)處理數(shù)據(jù)的效率.
參考文獻(xiàn):
〔1〕曲風(fēng)富.京東基于Samza的流失計(jì)算實(shí)踐[J].程序員,2014(2):40-43.
〔2〕Yang L,Yan Z.A method to avoid single failure of Namenode in HDFSZookeeper[J].Software,2016.
〔3〕金曉軍.Trident Storm與流計(jì)算經(jīng)驗(yàn)[J].程序員,2015(10):99-103.
〔4〕朱珠.基于Hadoop的海量數(shù)據(jù)處理模型研究與應(yīng)用[D].北京郵電大學(xué),2014.
〔5〕陳飛.基于MapReduce的數(shù)據(jù)清洗算法研究[D].昆明理工大學(xué),2016.101-103.
〔6〕徐媛媛.基于MapReduce的相似性連接研究[D].寧波大學(xué),2014.22-25.
〔7〕雷斌.面向復(fù)雜距離度量的MapReduce相似性連接技術(shù)研究[D].東北大學(xué),2016.55-58.
〔8〕韓來(lái)明.基于遺傳算法的分布式數(shù)據(jù)挖MapReduce架構(gòu)研究[D].天津大學(xué),2015.31-35.