丁夢蘇,陳世敏
(計算機(jī)體系結(jié)構(gòu)國家重點實驗室(中國科學(xué)院計算技術(shù)研究所),北京 100190)
(*通信作者電子郵箱chensm@ict.ac.cn)
輕量級大數(shù)據(jù)運算系統(tǒng)Helius
丁夢蘇,陳世敏*
(計算機(jī)體系結(jié)構(gòu)國家重點實驗室(中國科學(xué)院計算技術(shù)研究所),北京 100190)
(*通信作者電子郵箱chensm@ict.ac.cn)
針對Spark數(shù)據(jù)集不可變,以及Java虛擬機(jī)(JVM)依賴環(huán)境引起的代碼執(zhí)行、內(nèi)存管理、數(shù)據(jù)序列化/反序列化等開銷過多的不足,采用C/C++語言,設(shè)計并實現(xiàn)了一種輕量級的大數(shù)據(jù)運算系統(tǒng)——Helius。Helius支持Spark的基本操作,同時允許數(shù)據(jù)集整體修改;同時,Helius利用C/C++優(yōu)化內(nèi)存管理和網(wǎng)絡(luò)傳輸,并采用stateless worker機(jī)制簡化分布式計算平臺的容錯恢復(fù)過程。實驗結(jié)果顯示:5次迭代中,Helius運行PageRank算法的時間僅為Spark的25.12%~53.14%,運行TPCH Q6的時間僅為Spark的57.37%;在PageRank迭代1次的基礎(chǔ)上,運行在Helius系統(tǒng)下時,master節(jié)點IP接收和發(fā)送數(shù)據(jù)量約為運行于Spark系統(tǒng)的40%和15%,而且200 s的運行過程中,Helius占用的總內(nèi)存約為Spark的25%。實驗結(jié)果與分析表明,與Spark相比,Helius具有節(jié)約內(nèi)存、不需要序列化和反序列化、減少網(wǎng)絡(luò)交互以及容錯簡單等優(yōu)點。
內(nèi)存計算;大數(shù)據(jù)運算;分布式計算;有向無環(huán)圖調(diào)度;容錯恢復(fù)
在科學(xué)研究和產(chǎn)業(yè)實踐中,MapReduce[1]集群編程模型已經(jīng)廣泛應(yīng)用于大規(guī)模數(shù)據(jù)處理。MapReduce系統(tǒng)把用戶編制的串行Map和Reduce程序自動地分布并行執(zhí)行,在每次運算前,系統(tǒng)需要從分布式文件系統(tǒng)中讀取輸入數(shù)據(jù),運算完成后,系統(tǒng)要將計算結(jié)果寫入分布式文件系統(tǒng)中。如此一來,多個MapReduce運算之間只能通過分布式文件系統(tǒng)才能共享數(shù)據(jù),這不僅產(chǎn)生了大量的中間文件,而且反復(fù)讀寫磁盤大幅降低了運算性能。隨著內(nèi)存容量指數(shù)級增長和單位內(nèi)存價格不斷下降,大容量內(nèi)存正成為服務(wù)器的標(biāo)準(zhǔn)配置,于是內(nèi)存計算逐漸被主流商用系統(tǒng)和開源工具所接受。以內(nèi)存計算為核心思想的Spark[2-4]在性能上遠(yuǎn)超基于MapReduce的Hadoop[5]:迭代計算性能和數(shù)據(jù)分析性能分別可以提高20倍和40倍。Spark在保持MapReduce自動容錯、位置感知調(diào)度、可擴(kuò)展性等優(yōu)點的同時,高效地支持多個運算通過內(nèi)存重用中間結(jié)果,從而避免了外存訪問的開銷。Spark的基本數(shù)據(jù)模型是彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset, RDD)[3]。一個RDD是一個只讀的數(shù)據(jù)集合,生成之后不能修改。RDD支持粗粒度的運算,即集合中的每個數(shù)據(jù)元素都進(jìn)行統(tǒng)一的運算。RDD可以劃分為分區(qū)分布在多個機(jī)器節(jié)點上,所以一個運算可以在多個節(jié)點上分布式地執(zhí)行。Spark通過記錄計算間的沿襲(Lineage)以支持容錯,當(dāng)出現(xiàn)故障導(dǎo)致RDD分區(qū)丟失時,Spark根據(jù)記錄的計算沿襲,重新計算并重建丟失的分區(qū)。
然而,Spark的設(shè)計和實現(xiàn)存在著一定的局限性。首先, RDD被設(shè)計成只讀的數(shù)據(jù)集,既不支持重寫,也不支持?jǐn)?shù)據(jù)追加。于是,Spark需要為每個新創(chuàng)建的RDD分配內(nèi)存空間,尤其在迭代計算時,每個循環(huán)都產(chǎn)生一組新的RDD,這加大了內(nèi)存開銷。其次,Spark采用Scala程序設(shè)計語言實現(xiàn),在Java虛擬機(jī)(Java Virtual Machine, JVM)[6]上運行,繼承了Java的一系列問題。程序編譯后生成字節(jié)碼,執(zhí)行時再由JVM解釋執(zhí)行或進(jìn)行即時(Just-In-Time, JIT)編譯成為機(jī)器碼。內(nèi)存管理無法主動釋放內(nèi)存,必須由JVM的垃圾回收機(jī)制才能釋放內(nèi)存。數(shù)據(jù)傳輸時,需要經(jīng)歷數(shù)據(jù)的序列化和反序列化,不僅增加了轉(zhuǎn)換的計算代價,而且序列化的數(shù)據(jù)通常增加了類型等信息,引起網(wǎng)絡(luò)傳輸數(shù)據(jù)量的增加。這些問題在一定程度上限制了系統(tǒng)的性能。
為此,用C/C++語言設(shè)計并實現(xiàn)了一種輕量級的大數(shù)據(jù)運算系統(tǒng)——Helius。Helius采用了一種類似于RDD的數(shù)據(jù)模型,稱為BPD(Bulk Parallel Dataset)。BPD與RDD的區(qū)別在于BPD可寫,RDD只可讀。用戶可以選擇重寫B(tài)PD,系統(tǒng)無須重新分配內(nèi)存,而是直接覆蓋原來的區(qū)域。這樣不僅節(jié)省了內(nèi)存開銷,而且提高了運算性能。BPD在多個計算間提供了一種高效的共享方式,計算結(jié)果存入內(nèi)存,其他計算通過直接訪問內(nèi)存快速地獲取輸入。
與Spark相同,Helius也采用master-worker分布式架構(gòu),通過記錄各個BPD操作之間的計算沿襲構(gòu)建依賴關(guān)系,動態(tài)生成計算的有向無環(huán)圖(Directed Acyclic Graph, DAG)[7],劃分計算階段,在每個階段中多個計算任務(wù)并行執(zhí)行。Helius支持Spark的各項計算、自動容錯、感知調(diào)度和可擴(kuò)展性。相對Spark而言,Helius的優(yōu)勢具體如下:
1)降低內(nèi)存開銷。Helius采用C/C++實現(xiàn),程序運行時能夠?qū)崟r回收內(nèi)存;此外,BPD的可變性支持系統(tǒng)在計算過程中充分利用已有的內(nèi)存空間,減少了不必要的內(nèi)存開銷。
2)不需要序列化和反序列化。數(shù)據(jù)在系統(tǒng)中以二進(jìn)制字節(jié)的方式存儲,當(dāng)集群節(jié)點都是x86機(jī)器時,網(wǎng)絡(luò)傳輸時可以直接發(fā)送二進(jìn)制數(shù)據(jù),不需要進(jìn)行Endian轉(zhuǎn)換和序列化/反序列化。
3)減少網(wǎng)絡(luò)交互。Helius使用一種類似于push的方式傳遞數(shù)據(jù),master直接操控數(shù)據(jù)的傳輸,worker之間不需要互相發(fā)送請求,從而減少了網(wǎng)絡(luò)請求的交互。
4)簡化容錯恢復(fù)。Helius應(yīng)用了一種stateless worker的思想,worker遵循網(wǎng)絡(luò)請求進(jìn)行工作,請求包含計算所需的狀態(tài)信息,而worker除BPD數(shù)據(jù)分區(qū)外,不保存計算狀態(tài)。這樣,系統(tǒng)將多點故障集中到了對單點master的故障處理。
1.1 BPD數(shù)據(jù)模型
類似于Spark系統(tǒng)中的RDD,BPD是一種分布式的數(shù)據(jù)集合,可以劃分為多個數(shù)據(jù)分區(qū),存放在多個worker節(jié)點上。BPD支持粗粒度的運算,集合中的每個數(shù)據(jù)元素都進(jìn)行統(tǒng)一的操作。這樣,不同worker節(jié)點上的BPD分區(qū)可以并行執(zhí)行相同的運算。
與RDD不同,BPD是可變數(shù)據(jù)集??紤]一個簡單的例子,對所有數(shù)據(jù)元素自增,因RDD只讀性的限制,Spark需要分配內(nèi)存空間,為這個操作創(chuàng)建一個新的RDD;而Helius避免了額外的空間開銷,新的結(jié)果可以直接填充覆蓋原始的數(shù)據(jù)集。
BPD遵循一套嚴(yán)格且靈活的可變機(jī)制。嚴(yán)格性是系統(tǒng)層考慮的問題,體現(xiàn)在只有用戶計算產(chǎn)生的BPD可變,并且要求計算過程中新產(chǎn)生的數(shù)據(jù)元素占用的內(nèi)存空間維持不變。Helius針對少部分遵循可變機(jī)制的用戶計算函數(shù)(UDFListCombine函數(shù))實現(xiàn)更新接口,其他函數(shù)不提供BPD更新支持。靈活性針對用戶層而言,用戶調(diào)用支持BPD更新的函數(shù)(如UDFListCombine)時,可以通過設(shè)置函數(shù)參數(shù)(真或假)指示該BPD是否在該運算中更新。若不更新,系統(tǒng)將新創(chuàng)建一個BPD;若更新,新結(jié)果將覆蓋待計算的BPD,無需重新分配空間。對于一系列不改變數(shù)據(jù)結(jié)構(gòu)的操作而言,系統(tǒng)只需覆蓋相應(yīng)數(shù)值,在處理大量的數(shù)據(jù)時節(jié)省了時空開銷。
RDD的只讀性簡化了數(shù)據(jù)一致性的實現(xiàn)。在Helius中則需要考慮如何保持BPD的一致性。與Spark相似,在Helius中,用戶提供一個主驅(qū)動程序,BPD體現(xiàn)為程序中的特殊變量,變量之間的運算對應(yīng)于BPD分布式運算。Helius的master節(jié)點加載主驅(qū)動程序,按照執(zhí)行步驟,執(zhí)行相應(yīng)的分布式運算。從概念上看,雖然BPD運算是分布并行的,但是這個主驅(qū)動程序?qū)嶋H上是一個串行程序(當(dāng)然可以包含循環(huán)、分支等控制流語句),它描述了BPD運算步驟之間的串行執(zhí)行順序和依賴關(guān)系。所以,單一的主驅(qū)動程序可以保證BPD數(shù)據(jù)的一致性。對于多個并發(fā)執(zhí)行的主驅(qū)動程序,Helius禁止發(fā)生修改的BPD在多個并發(fā)程序之間共享,只有當(dāng)進(jìn)行修改操作的程序執(zhí)行完畢后,被修改的BPD才可以被其他程序所使用。
具體實現(xiàn)時,master記錄BPD的元數(shù)據(jù),主要包含依賴關(guān)系、分區(qū)數(shù)據(jù)、分區(qū)劃分信息和存儲方式。依賴關(guān)系記錄了父子BPD之間的轉(zhuǎn)換關(guān)系,分區(qū)數(shù)據(jù)記錄了子分區(qū)與一個或多個父分區(qū)之間的生成規(guī)則。一個BPD的多個分區(qū)大小可以不等,存儲在worker節(jié)點上。worker把內(nèi)存劃分為等長的數(shù)據(jù)塊,一個BPD分區(qū)由一個或多個數(shù)據(jù)塊組成,這些數(shù)據(jù)塊分布在內(nèi)存或文件中,具體的存儲位置由BPD的存儲方式?jīng)Q定。用戶可調(diào)用系統(tǒng)接口選擇數(shù)據(jù)的存儲方式。BPD可以按照用戶指定的劃分方式重新哈希散列成指定個數(shù)的分區(qū)。
1.2 BPD記錄數(shù)據(jù)類型
Helius系統(tǒng)將BPD按二進(jìn)制數(shù)據(jù)進(jìn)行存儲和處理。一個BPD數(shù)據(jù)集中的所有記錄都具有相同的結(jié)構(gòu),可以是鍵值對Key-Value元組,也可以是無key或是無value的單個元素。對于key或是value,它的結(jié)構(gòu)可以是定長或變長的數(shù)據(jù),可以表達(dá)C/C++中的原子類型(數(shù)值類型、字符串類型等)、struct和class數(shù)據(jù)(內(nèi)部不允許指針、沒有虛函數(shù))。key或value也可以進(jìn)一步有內(nèi)部嵌套結(jié)構(gòu),可以嵌套包含兩個值或是多個值。嵌套主要發(fā)生在Join等操作的結(jié)果BPD上。系統(tǒng)提供方法獲取BPD記錄的key或value的二進(jìn)制數(shù)據(jù),二進(jìn)制數(shù)據(jù)與正確的數(shù)值類型的轉(zhuǎn)換依賴于用戶的代碼。通常在C/C++程序中,只需要對相應(yīng)類型的指針變量賦值即可,不需要額外的轉(zhuǎn)換和拷貝。
1.3 編程模型
用戶將C/C++的主驅(qū)動程序編譯成動態(tài)庫后提交給master,與此同時指定master的運行入口函數(shù)。master解析庫文件,依次執(zhí)行函數(shù)體內(nèi)的語句。在主驅(qū)動程序中,一個BPD表現(xiàn)為一個可操作的C++對象。各種計算通過調(diào)用該對象相應(yīng)的方法而實現(xiàn),計算可以生成新的BPD對象,或者修改已有的BPD對象。而這些BPD對象上的操作,就被Helius對應(yīng)為對BPD多個分區(qū)上的分布式運算。
Helius提供兩大類計算,用以處理BPD數(shù)據(jù)集:系統(tǒng)計算和用戶計算。
1)系統(tǒng)計算:完全由系統(tǒng)實現(xiàn)的計算,包括union、cartesianProduct、partitionBy、join、groupBy等,用戶可以直接調(diào)用系統(tǒng)計算函數(shù)處理BPD數(shù)據(jù)集,這些計算都不改變輸入的BPD數(shù)據(jù)集。
union(A,B) →A∪BcartesianProduct({
2)用戶計算:系統(tǒng)提供應(yīng)用程序編程接口(ApplicationProgrammingInterface,API),由用戶實現(xiàn)具體操作功能。用戶在處理數(shù)據(jù)集之前需要根據(jù)API實現(xiàn)函數(shù)接口。在用戶計算中,用戶可以選擇是否改變待計算的BPD數(shù)據(jù)集。
A=udfCompute(B,udf)
A=udfComputeMulti(B,C, …,udf)
A=udfListCombine(B,udf)
系統(tǒng)計算函數(shù)的語義很清晰。例如:join操作把兩組輸入BPD的Key-Value記錄,按照key進(jìn)行等值連接,輸出記錄的value部分是嵌套結(jié)構(gòu),由兩個匹配記錄的value部分組合形成;groupBy操作按照key進(jìn)行分組,把同一組的所有value表示成一個list,即一個包含多值的嵌套結(jié)構(gòu)。而用戶計算接口主要包括三類,都要求用戶提供一個根據(jù)相應(yīng)接口實現(xiàn)的函數(shù)(在表中以udf表示)。首先,udfCompute方法針對單個BPD數(shù)據(jù)集的每條Key-Value記錄進(jìn)行處理,例如可以實現(xiàn)WordCount中單詞的拆分。udfComputeMulti方法對多個BPD數(shù)據(jù)集的Key-Value記錄進(jìn)行某種運算。實際上,多個輸入的BPD數(shù)據(jù)集進(jìn)行了一次join操作,系統(tǒng)對每個join的結(jié)果調(diào)用一次用戶實現(xiàn)的udfComputeMulti函數(shù)。這兩類操作對數(shù)據(jù)集的結(jié)構(gòu)沒有要求,可為1.2節(jié)提及的任意一種存在形式。udfListCombine與udfCompute的處理對象類似,不同的是數(shù)據(jù)集key、value必須同時存在,并且value為包含多值的嵌套結(jié)構(gòu)。它實現(xiàn)對每個key的多個value值進(jìn)行聚合的操作,類似MapReduce系統(tǒng)中的Reduce操作。
1.4 實例介紹
以WordCount為例,統(tǒng)計文本中所有單詞出現(xiàn)的次數(shù),用戶的主驅(qū)動程序如下:
BPD*lines=sc.loadFile(file);BPD*words=udfCompute(lines,newmySplit());BPD*wordgroup=words->groupBy();BPD*wordcount=udfListCombine(wordgroup,newmyCombine());
loadFile函數(shù)用于從文本生成一個BPD對象——lines,它的每個記錄是一行文本。udfCompute函數(shù)調(diào)用用戶自定義的mySplit函數(shù)對每行文本記錄進(jìn)行處理,在該示例中表現(xiàn)為將字符串拆分成多個單詞,產(chǎn)生的words結(jié)果記錄中key為單詞,value為數(shù)值1。groupBy函數(shù)將Key-Value數(shù)據(jù)集按key進(jìn)行分組。在這里,每個不同的單詞為一組。最后,udfListCombine函數(shù)調(diào)用用戶自定義的myCombine函數(shù)對同一key的所有value進(jìn)行某種運算(在該示例中為求和)。
下面以udfListCombine函數(shù)為例,介紹udf函數(shù)的實現(xiàn)。用戶自定義實現(xiàn)的函數(shù)如下:
classmyCombine:publicUDFListCombine{voidcall(ValueIterator*it,Value*out){intsum=0;while(it->hasNext()){int*val=(int*)(it->next());sum+=*val;
}
out->put(&sum,sizeof(int));
}};
用戶實現(xiàn)了一個myCombine類,它繼承了UDFListCombine類,實現(xiàn)UDFListCombine中的虛函數(shù)call()。call()的第一個輸入?yún)?shù)是一個定義在輸入BPD記錄value列表上的Iterator迭代器。上述實現(xiàn)在while循環(huán)中通過這個Iterator依次訪問列表中的每個value,把value的地址賦值給相應(yīng)類型的指針,就可以直接操作。call()的第二個參數(shù)用于輸出結(jié)果的BPD的value部分。在這里,把求和的結(jié)果寫入out。
從上面的示例可見,用戶可以使用C/C++程序簡潔地表達(dá)大數(shù)據(jù)的運算。
Helius分布式運行的基礎(chǔ)是表達(dá)BPD運算關(guān)系的DAG。用戶的主驅(qū)動程序提交給master執(zhí)行時,系統(tǒng)通過BPD變量獲取具體運算及依賴關(guān)系,形成運算DAG。然后,Helius把一個DAG劃分成多個階段,每個階段內(nèi)部的運算可以在一起執(zhí)行,從而減少中間結(jié)果的生成。一個階段的輸出結(jié)果為另一個階段的輸入。其中,最后一個階段的輸入來自原始數(shù)據(jù)源(例如文件),第一個階段的計算結(jié)果是程序最終的輸出結(jié)果。按照這種層次依賴關(guān)系,系統(tǒng)自上而下檢查各個階段(首先檢查第一個階段),當(dāng)前階段運行時將自動檢測其依賴的其他階段,若其他階段準(zhǔn)備就緒,則提交該階段的任務(wù);否則,迭代檢查依賴的所有階段,直至所有依賴階段準(zhǔn)備就緒后提交。每個階段包含了一系列任務(wù),系統(tǒng)將這些任務(wù)分配到最佳節(jié)點位置,并確保所有數(shù)據(jù)就緒。
2.1 DAG的生成及階段的創(chuàng)建
在Helius系統(tǒng)中,DAG的生成過程以及階段的創(chuàng)建過程與Spark系統(tǒng)類似,都是根據(jù)用戶的主驅(qū)動程序進(jìn)行的。用戶主驅(qū)動程序執(zhí)行時,系統(tǒng)先記錄BPD的運算和依賴關(guān)系,并不立即執(zhí)行所對應(yīng)的分布式運算,只有當(dāng)遇到lookup、collect和程序結(jié)束時,才執(zhí)行之前記錄的所有BPD運算。
與Spark不同的是,Helius將數(shù)據(jù)的shuffle操作單獨抽取出來,顯示地表達(dá)在DAG中,而非表示在其他的操作里。這樣,DAG可以記錄shuffle的狀態(tài)信息,而不需要每個worker在實現(xiàn)BPD運算(例如groupBy)時,記錄shuffle的狀態(tài)信息。
記錄的BPD形成了一個運算有向無環(huán)圖(DAG),如圖1所示。圖的每個頂點是一個BPD或者BPD的版本(若被修改),頂點之間的有向邊代表BPD運算的生成關(guān)系。有向邊從輸入BPD指向結(jié)果BPD。
圖1中每個頂點代表一個BPD,其中BPD1和BPD2的union操作生成BPD3,BPD3的groupBy操作產(chǎn)生BPD4,BPD4是最終的計算目標(biāo)。系統(tǒng)在執(zhí)行用戶主驅(qū)動程序時,記錄BPD的運算和依賴關(guān)系。在這個例子中,當(dāng)程序結(jié)束時,才生成DAG開始分布式計算。需要注意,圖1中BPD3和BPD4之間的邊是虛線,實際上DAG中刪除了這條邊。這也正體現(xiàn)了Helius與Spark的不同點。因為groupBy操作隱含地需要shuffle數(shù)據(jù),系統(tǒng)自動生成了BPD5(圖中深色填充表示),并修改了圖,使BPD3的輸出指向BPD5,BPD5的輸出指向BPD4。
圖1 DAG生成過程
階段的創(chuàng)建由目標(biāo)頂點和shuffle操作確定。在圖1所示的DAG基礎(chǔ)上,master開始自下而上創(chuàng)建階段,如圖2所示。master首先為目標(biāo)頂點BPD4創(chuàng)建一個階段(記為階段0),并從該位置開始迭代遍歷其父BPD。檢測發(fā)現(xiàn)BPD4依賴于shuffle的結(jié)果,而shuffle必然需要網(wǎng)絡(luò)傳輸,所以master以shuffle對應(yīng)的BPD5為目的創(chuàng)建一個新的階段(記為階段1)。依此類推,master將DAG以shuffle為邊界分為多個階段,每個階段內(nèi)的BPD運算可以整合在一起執(zhí)行,以提高運算的性能。
圖2 階段創(chuàng)建過程
所有階段創(chuàng)建完畢后,master從上向下依次遞歸提交階段:在嘗試提交階段0,master檢測到該階段依賴于階段1,于是master掛起階段0重新提交階段1;由于階段1無依賴階段,因此階段1順利被提交;master開始提交階段1對應(yīng)的所有任務(wù),階段1完成后遞歸提交階段0;目標(biāo)階段完成后,結(jié)束調(diào)度。
2.2 任務(wù)提交
當(dāng)一個階段成功提交后,master將為該階段的目標(biāo)BPD創(chuàng)建并提交任務(wù)。BPD的每個分區(qū)作為一個任務(wù),各個分區(qū)獨立地執(zhí)行相同的計算,這使得多個任務(wù)可以在多個worker節(jié)點上并行執(zhí)行。
在分布式運算環(huán)境下,基于位置感知分配任務(wù)到存儲數(shù)據(jù)的節(jié)點會大幅提高運算的性能,減小網(wǎng)絡(luò)傳輸?shù)膸?。Helius提供位置感知調(diào)度。在DAG的基礎(chǔ)上,master進(jìn)一步確定父子BPD每個分區(qū)之間的映射關(guān)系(shuffle過程除外)。在分配任務(wù)時,遞歸計算該任務(wù)所在的分區(qū)依賴的父分區(qū)的位置,直到找到已經(jīng)緩存的父分區(qū)后,將該任務(wù)發(fā)送到該父分區(qū)所在的worker節(jié)點,完成位置感知調(diào)度。
如果一個任務(wù)同時依賴兩個父分區(qū),并且兩個父分區(qū)均已緩存時,那么默認(rèn)將該任務(wù)分配到第一個依賴的父分區(qū)上。當(dāng)兩個父分區(qū)的數(shù)據(jù)在不同節(jié)點上,并且第二個父分區(qū)的數(shù)據(jù)量遠(yuǎn)大于第一個父分區(qū)時,將任務(wù)分發(fā)給第一個父分區(qū)所處的工作節(jié)點會增加網(wǎng)絡(luò)開銷,降低系統(tǒng)性能。一種優(yōu)化方法是根據(jù)多個依賴的父分區(qū)的數(shù)據(jù)量確定最佳分配節(jié)點。
2.3 數(shù)據(jù)傳輸
當(dāng)一個任務(wù)依賴多個數(shù)據(jù)源(多個父BPD分區(qū)),并且多個數(shù)據(jù)源不在同一工作節(jié)點時,worker節(jié)點需要獲得所有的輸入數(shù)據(jù),才能開始計算任務(wù)。
Spark提供一種類似pull的獲取方式,如圖3所示。workerA向master請求數(shù)據(jù),master定位數(shù)據(jù)所在的workerB,由workerB將數(shù)據(jù)發(fā)送給workerA。workerA完成任務(wù)后回答master。
圖3 Spark 數(shù)據(jù)傳輸機(jī)制
值得注意的是,在Spark系統(tǒng)中,對依賴數(shù)據(jù)的獲取是計算任務(wù)的一部分,worker在運行提交的任務(wù)時,可能需要遠(yuǎn)程獲取數(shù)據(jù)。在發(fā)送消息1和消息4之間,workerA需要保持相應(yīng)的狀態(tài)信息,使worker的工作和故障處理變得相對復(fù)雜。
Helius提出一種statelessworker的機(jī)制,對數(shù)據(jù)的獲取過程類似于push。在該機(jī)制下,worker不負(fù)責(zé)獲取數(shù)據(jù),而是由master指示其進(jìn)行操作。worker對于每個網(wǎng)絡(luò)請求,只完成相應(yīng)的操作,而在網(wǎng)絡(luò)請求之間,不記錄額外的狀態(tài)。如果一個任務(wù)所需數(shù)據(jù)在本節(jié)點不存在時,向master報錯。
將提交任務(wù)分成兩個步驟:傳輸數(shù)據(jù)和提交作業(yè)。數(shù)據(jù)傳輸?shù)倪^程如圖4所示:master告訴workerB傳輸數(shù)據(jù)給workerA,workerB傳輸指定的數(shù)據(jù),workerA接收完數(shù)據(jù)后回復(fù)master傳輸完成;master接收到傳輸完畢信號后,緊接著提交作業(yè)。這樣一來,系統(tǒng)保證了在分配工作之前,工作節(jié)點有需要的數(shù)據(jù)支持工作的進(jìn)行,同時worker不需要保持額外的狀態(tài)。這種statelessworker的機(jī)制簡化了系統(tǒng)的容錯處理,由于worker嚴(yán)格地按照master的指示工作,worker的工作機(jī)制相對來說簡單了許多,在該點的故障及故障處理隨之簡化。系統(tǒng)將故障處理主要集中在master執(zhí)行。
圖4 Helius數(shù)據(jù)傳輸機(jī)制
2.4 數(shù)據(jù)重組
不同于數(shù)據(jù)傳輸(transfer)操作,數(shù)據(jù)重組(shuffle)操作需要將數(shù)組重組分發(fā)到所有的工作節(jié)點,可能會占用大量的內(nèi)存空間和網(wǎng)絡(luò)帶寬。
為了減少shuffle對系統(tǒng)性能的影響,采用一種基于雙緩沖的邊計算邊發(fā)送的策略。worker為每個shuffle目標(biāo)worker節(jié)點都維持著一個緩沖區(qū),包含2個數(shù)據(jù)塊空間(分區(qū)數(shù)據(jù)由多個等長數(shù)據(jù)塊組成)。在處理shuffle時,將數(shù)據(jù)寫入相應(yīng)worker的緩沖區(qū)的數(shù)據(jù)塊中。當(dāng)緩沖區(qū)中一個數(shù)據(jù)塊已滿,可以發(fā)送這個數(shù)據(jù)塊,同時將數(shù)據(jù)寫入另一個數(shù)據(jù)塊。
圖5呈現(xiàn)的是針對workerA單方面shuffle產(chǎn)生的數(shù)據(jù)發(fā)送的過程。圖中的連線表示worker之間的連接狀態(tài),填充灰色部分代表該部分內(nèi)存已滿。
圖5 數(shù)據(jù)shuffle過程
3.1 實驗環(huán)境
集群環(huán)境由5臺服務(wù)器組成,其中1臺master,4臺worker,服務(wù)器的處理器為IntelXeonCPUES- 2650v2 @2.60GHz×8, 內(nèi)存128GB, 硬盤1TB,操作系統(tǒng)為Ubuntu14.04 64位。集群中的工作節(jié)點均單線程運行。Helius和Spark的實驗版本分別為0.0.1 和1.6.1。Helius編譯器為G++ 4.8.1, -o2選項優(yōu)化,Spark編譯器為Sbt0.13.12。
實驗以PageRank[8-9]算法和TPCH[10]基準(zhǔn)為例,從時間、網(wǎng)絡(luò)、內(nèi)存三方面開銷比較Helius和Spark的性能,并在最后對BPD的更新性能以及Helius的可擴(kuò)展性進(jìn)行評估。
3.2 PageRank
實驗輸入文本為1.1GB, 包含網(wǎng)頁4 847 570個,鏈接記錄68 993 773條。Spark集群運行PageRank算法的配置選項為:spark.driver.memory=16g,spark.executor.memory=16g。
3.2.1 時間開銷
運行PageRank算法時,分別記錄迭代1、2、3、4、5次的時間開銷。表1呈現(xiàn)的實現(xiàn)結(jié)果表明,在迭代5次的過程中,Helius運行PageRank算法的時間僅為Spark的25.12%~53.14%。因為Helius在實現(xiàn)PageRank算法時,采用的是一種建立在數(shù)據(jù)塊內(nèi)有序、塊間無序的基礎(chǔ)上優(yōu)化join操作的策略,在每次更新rank值時直接重寫舊值,而非重新創(chuàng)建新的BPD。
表1 Helius和Spark迭代時間對比
3.2.2 網(wǎng)絡(luò)開銷
在PageRank迭代1次的基礎(chǔ)上,記錄master節(jié)點在程序運行過程中接收到的字節(jié)數(shù)和發(fā)送的字節(jié)數(shù),結(jié)果如表2所示。在分布式環(huán)境中,運行在Helius系統(tǒng)下時,master節(jié)點IP接收和發(fā)送數(shù)據(jù)量約為運行于Spark系統(tǒng)的40%和15%。
表2 Helius和Spark網(wǎng)絡(luò)開銷對比
3.2.3 內(nèi)存開銷
在PageRank迭代1次的基礎(chǔ)上, 每隔5s記錄worker節(jié)點內(nèi)存剩余情況。表3呈現(xiàn)的是以20s為間隔記錄的worker節(jié)點使用的內(nèi)存量(單位:MB)。Helius在50s左右運行結(jié)束,逐漸回收內(nèi)存;此時,Spark仍處于工作狀態(tài),直到210s左右結(jié)束。在worker運行的過程中,Helius占用內(nèi)存6 758MB,Spark占用內(nèi)存26 648MB,Helius約為Spark的25%。
表3 Helius和Spark內(nèi)存開銷對比
3.3 TPCH Q6性能
以TPCH的ForecastingRevenueChangeQuery(Q6)為例,取ScaleFactor為100(文本79.8GB),測試Helius和Spark的運行時間。Spark在該例中為默認(rèn)配置。實驗結(jié)果為Helius花費271.595s,Spark花費473.382s,Helius消耗時間僅為Spark的57.37%。
Helius從文本獲取輸入數(shù)據(jù)是一種篩選-丟棄的過程,根據(jù)用戶提供的查詢字段的列值,在讀取文本記錄時選取相應(yīng)的字段值構(gòu)成數(shù)據(jù)集,后續(xù)所有操作都建立在已篩選字段的數(shù)據(jù)集的基礎(chǔ)上;而Spark程序在加載文件時沒有對字段進(jìn)行篩選,運行過程中,所有的數(shù)據(jù)集中的每條記錄都保持了輸入文本的所有字段。
3.4 BPD更新性能
在PageRank迭代1次的基礎(chǔ)上,測試在BPD更新與不更新的情況下,worker運行UDFListCombine函數(shù)的開銷時間,以及master運行用戶提交的驅(qū)動程序所用的總時間。PageRank在迭代1次的基礎(chǔ)上會運行1次UDFListCombine函數(shù)。
從表4可以看出,在BPD更新的情況下,worker運行UDFListCombine的速度比不更新稍快;master運行整個程序也稍快。就表4的結(jié)果而言,BPD更新在運行時間方面的性能提升不大,這種結(jié)果很大程度上受到Helius實現(xiàn)的限制,我們將在后續(xù)的工作中進(jìn)一步研究BPD的更新。
表4 有否BPD更新時運行時間對比
3.5 可擴(kuò)展性
以3.3節(jié)中的TPCHQ6為例,測試Helius集群分別搭建在2、4、6、8臺worker的運行時間,結(jié)果如表5所示。在當(dāng)前實驗條件考慮的擴(kuò)展情況下,當(dāng)worker節(jié)點數(shù)增加1倍時,Helius運行任務(wù)所需的時間減少50%左右。
表5 Helius可擴(kuò)展性性能
本文介紹了一種輕量級的基于內(nèi)存計算的大數(shù)據(jù)運算系統(tǒng)Helius。Helius由C/C++語言實現(xiàn),避免了Spark因JVM運行環(huán)境引起的開銷,利用數(shù)據(jù)集整體修改這一特性實現(xiàn)高效計算,采用一種statelessworker的機(jī)制簡化容錯處理,并通過維持一套嚴(yán)格的修改機(jī)制確保了數(shù)據(jù)一致性。Helius在時間、網(wǎng)絡(luò)、內(nèi)存三方面性能相對Spark均有所提升。就數(shù)據(jù)集更新性能而言,Helius存在很大的提升空間。此外,目前Helius還未實現(xiàn)節(jié)點故障恢復(fù),故障處理以及深層次的一致性管理問題有待后續(xù)深入研究。
)
[1]DEANJ,GHEMAWATS.MapReduce:simplifieddataprocessingonlargecluster[J].CommunicationoftheACM— 50thAnniversaryIssue: 1958-2008, 2008, 51(1): 107-113.
[2]ZAHARIAM.Anarchitectureforfastandgeneraldataprocessingonlargeclusters,UCB/EECS- 2014- 12 [R].Berkeley:UniversityofCaliforniaatBerkeley, 2014.
[3]ZAHARIAM,CHOWDHURYM,DAST,etal.Resilientdistributeddatasets:afault-tolerantabstractionforin-memoryclustercomputing[C]//NSDI’12:Proceedingsofthe9thUSENIXConferenceonNetworkedSystemsDesignandImplementation.Berkeley,CA:USENIXAssociation, 2012: 15-28.
[4]TheApacheSoftwareFoundation.ApacheSpark[EB/OL].[2016- 05- 30].http://spark.apache.org/.
[5]TheApacheSoftwareFoundation.ApacheHadoop[EB/OL].[2016- 05- 30].http://hadoop.apache.org/.
[6]SARIMBEKOVA,STADLERL,BULEJL,etal.WorkloadcharacterizationofJVMlanguages[J].Software:PracticeandExperience, 2016, 46(8): 1053-1089.
[7]ISARDM,BUDIUM,YUY,etal.Dryad:distributeddata-parallelprogramsforsequentialbuildingblocks[C]//EuroSys’07:Proceedingsofthe2ndACMSIGOPS/EuroSysEuropeanConferenceonComputerSystems2007.NewYork:ACM, 2007: 59-72.
[8]BERKHIUTJ.Google’sPageRankalgorithmforrankingnodesingeneralnetworks[C]//Proceedingsofthe2016 13thInternationalWorkshoponDiscreteEventSystems.Piscataway,NJ:IEEE, 2016: 163-172.
[9]PAGEL,BRINS,MOTWANIR,etal.ThePageRankcitationranking:bringingordertotheWeb,TechnicalReport1999- 66 [R/OL].California:StanfordUniversity, 1999 [2016- 04- 11].http://ilpubs.stanford.edu:8090/422/1/1999- 66.pdf.
[10]TransactionProcessingPerformanceCouncil.TPCBenchmarkTMHStandardSpecificationRevision2.17.1 [S/OL].[2016- 05- 30].http://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf.
[11]MALEWIEZG,AUSTEMMH,BIKAJC,etal.Pregel:asystemforlarge-scalegraphprocessing[C]//SIGMOD’10:Proceedingsofthe2010ACMSIGMODInternationalConferenceonManagementofData.NewYork:ACM, 2010: 135-146.
[12]CARSTOIUD,LEPADATUE,GASPARM.Hbase-non-SQLdatabase,performancesevaluation[J].InternationalJournalofAdvancementsinComputingTechnology, 2010: 2(5): 42-52.
ThisworkispartiallysupportedbytheCASHundredTalentsProgram,theGeneralProjectoftheNationalNaturalScienceFoundationofChina(61572468),theInnovativeCommunityProjectoftheNationalNaturalScienceFoundationofChina(61521092).
DING Mengsu, born in 1993, M.S.candidate.Her research interests include big data processing, parallel distributed computing.
CHEN Shimin, born in 1973, Ph.D., professor.His research interests include data management system, big data processing, computer architecture.
Helius: a lightweight big data processing system
DING Mengsu, CHEN Shimin*
(KeyLaboratoryofComputerSystemandArchitecture(InstituteofComputingTechnology,ChineseAcademyofSciences),Beijing100190,China)
Concerning the limitations of Spark, including immutable datasets and significant costs of code execution, memory management and data serialization/deserialization caused by running environment of Java Virtual Machine (JVM), a light-weight big data processing system, named Helius, was implemented in C/C++.Helius supports the basic operations of Spark, while allowing the data set to be modified as a whole.In Helius, the C/C++ is utilized to optimize the memory management and network communication, and a stateless worker mechanism is utilized to simplify the fault tolerance and recovery process of the distributed computing platform.The experimental results showed that in 5 iterations, the running time in Helius was only 25.12% to 53.14% of that in Spark when running PageRank iterative jobs, and the running time in Helius was only 57.37% of that in Spark when processing TPCH Q6.On the basis of one iteration of PageRank, the IP incoming and outcoming data sizes of master node in Helius were about 40% and 15% of those in Sparks, and the total memory consumed in the worker node in Helius was only 25% of that in Spark.Compared with Spark, Helius has the advantages of saving memory, eliminating the need for serialization and deserialization, reducing network interaction and simplifying fault tolerance.
in-memory computation; big data processing; distributed computation; Directed Acyclic Graph (DAG) scheduling; fault tolerance and recovery
2016- 08- 12;
2016- 10- 22。
中國科學(xué)院“百人計劃”項目;國家自然科學(xué)基金面上項目(61572468);國家自然科學(xué)基金創(chuàng)新群體項目(61521092)。
丁夢蘇(1993—),女,江西吉安人,碩士研究生,主要研究方向:大數(shù)據(jù)處理、并行分布式計算; 陳世敏(1973—),男,北京人,研究員,博士,主要研究方向:數(shù)據(jù)管理系統(tǒng)、大數(shù)據(jù)處理、計算機(jī)體系結(jié)構(gòu)。
1001- 9081(2017)02- 0305- 06
10.11772/j.issn.1001- 9081.2017.02.0305
TP311.133.1
A