• 
    

    
    

      99热精品在线国产_美女午夜性视频免费_国产精品国产高清国产av_av欧美777_自拍偷自拍亚洲精品老妇_亚洲熟女精品中文字幕_www日本黄色视频网_国产精品野战在线观看 ?

      基于RDD關(guān)鍵度的Spark檢查點(diǎn)管理策略

      2017-12-16 05:09:32英昌甜王維慶錢育蓉
      計(jì)算機(jī)研究與發(fā)展 2017年12期
      關(guān)鍵詞:檢查點(diǎn)內(nèi)存分區(qū)

      英昌甜 于 炯 卞 琛 王維慶 魯 亮 錢育蓉

      1(新疆大學(xué)電氣工程博士后科研流動(dòng)站 烏魯木齊 830046) 2(新疆大學(xué)軟件學(xué)院 烏魯木齊 830008) 3((新疆大學(xué)電氣工程學(xué)院 烏魯木齊 830046)

      基于RDD關(guān)鍵度的Spark檢查點(diǎn)管理策略

      英昌甜1,2于 炯2卞 琛2王維慶1,3魯 亮2錢育蓉2

      1(新疆大學(xué)電氣工程博士后科研流動(dòng)站 烏魯木齊 830046)2(新疆大學(xué)軟件學(xué)院 烏魯木齊 830008)3((新疆大學(xué)電氣工程學(xué)院 烏魯木齊 830046)

      (yingct@xju.edu.com)

      Spark默認(rèn)容錯(cuò)機(jī)制由程序員設(shè)置檢查點(diǎn),并利用彈性分布式數(shù)據(jù)集(resilient distributed dataset, RDD)的血統(tǒng)(lineage)進(jìn)行計(jì)算.在應(yīng)用程序復(fù)雜度高、迭代次數(shù)多以及數(shù)據(jù)量較大時(shí),恢復(fù)過(guò)程需要耗費(fèi)大量的計(jì)算開銷.同時(shí),在執(zhí)行恢復(fù)任務(wù)時(shí),僅考慮數(shù)據(jù)本地性選擇節(jié)點(diǎn),并未考慮節(jié)點(diǎn)的計(jì)算能力,這都會(huì)導(dǎo)致恢復(fù)時(shí)間增加,無(wú)法最大化發(fā)揮集群的性能.因此,在建立Spark執(zhí)行模型、檢查點(diǎn)模型和RDD關(guān)鍵度模型的基礎(chǔ)上,提出一種基于關(guān)鍵度的檢查點(diǎn)管理(criticality checkpoint management, CCM)策略,其中包括檢查點(diǎn)設(shè)置算法、失效恢復(fù)算法和清理算法.其中檢查點(diǎn)設(shè)置算法通過(guò)分析作業(yè)中RDD的屬性以及對(duì)作業(yè)恢復(fù)時(shí)間的影響,選擇關(guān)鍵度大的RDD作為檢查點(diǎn)存儲(chǔ);恢復(fù)算法根據(jù)各節(jié)點(diǎn)的計(jì)算能力做出決策,選擇合適的節(jié)點(diǎn)執(zhí)行恢復(fù)任務(wù);清理算法在磁盤空間不足時(shí),清除關(guān)鍵度較低的檢查點(diǎn).實(shí)驗(yàn)結(jié)果表明:該策略在略增加執(zhí)行時(shí)間的情況下,能夠選擇有備份價(jià)值的RDD作為檢查點(diǎn),在節(jié)點(diǎn)失效時(shí)能夠有效地降低恢復(fù)開銷,提高節(jié)點(diǎn)的磁盤有效利用率.

      內(nèi)存計(jì)算;Spark;檢查點(diǎn)管理;失效恢復(fù);RDD屬性

      近年來(lái),隨著互聯(lián)網(wǎng)的快速發(fā)展,特別是云計(jì)算的普及,全球數(shù)據(jù)量以每年約50%的速度遞增,大數(shù)據(jù)[1-3]日益受到人們的重視.2015年9月,國(guó)務(wù)院印發(fā)《促進(jìn)大數(shù)據(jù)發(fā)展行動(dòng)綱要》,確定了大數(shù)據(jù)發(fā)展的國(guó)家頂層設(shè)計(jì),大數(shù)據(jù)與各行各業(yè)的結(jié)合已是行業(yè)未來(lái)發(fā)展的必然趨勢(shì).隨著大數(shù)據(jù)時(shí)代的到來(lái),數(shù)據(jù)每天都在急劇快速膨脹,發(fā)掘這些數(shù)據(jù)的價(jià)值,需要一種高效而穩(wěn)定的分布式計(jì)算框架和模型.在分布式計(jì)算框架中,Apache Spark[4-6]由于其基于內(nèi)存的高性能計(jì)算模式以及豐富靈活的編程接口,得到了廣泛的支持和應(yīng)用,大有逐漸取代Hadoop MapReduce成為新一代大數(shù)據(jù)計(jì)算引擎的趨勢(shì).

      Spark采用了基于分布式共享內(nèi)存的彈性分布式數(shù)據(jù)集(resilient distributed datasets, RDD)[7]作為數(shù)據(jù)結(jié)構(gòu),RDD是Spark對(duì)分布式內(nèi)存的抽象,具有可重構(gòu)性、不變性、分區(qū)局部性以及可序列化等特性[8-10].作為Apache頂級(jí)的開源項(xiàng)目,Spark基于自身的核心部分,在迭代計(jì)算、交互式查詢計(jì)算以及批量流計(jì)算等方面,開發(fā)了適應(yīng)大數(shù)據(jù)處理的多種場(chǎng)景生態(tài)組件,如SQL處理引擎Spark SQL和Shark、流式處理引擎Spark Streaming、機(jī)器學(xué)習(xí)系統(tǒng)MLLib、圖計(jì)算框架GraphX、統(tǒng)計(jì)分析工具SparkR以及分布式內(nèi)存文件系統(tǒng)Tachyon等.

      Spark是基于MapReduce算法實(shí)現(xiàn)的分布式計(jì)算框架,因此具有MapReduce的優(yōu)點(diǎn)[11-12].同時(shí)Spark在很多方面都彌補(bǔ)了MapReduce的不足,比MapReduce的通用性更好、迭代運(yùn)算效率更高、作業(yè)延遲更低.Spark的主要優(yōu)勢(shì)包括:提供了一套支持DAG圖的分布式并行計(jì)算的編程框架,減少多次計(jì)算之間中間結(jié)果寫到HDFS的開銷;提供Cache機(jī)制來(lái)支持需要反復(fù)迭代計(jì)算或者多次數(shù)據(jù)共享,減少數(shù)據(jù)讀取的I/O開銷;使用多線程池模型來(lái)減少任務(wù)啟動(dòng)開銷,shuffle過(guò)程中避免不必要的sort操作以及減少磁盤I/O操作;具有更廣泛的數(shù)據(jù)集操作類型.

      在Spark任務(wù)的執(zhí)行過(guò)程中,首先將HDFS中存儲(chǔ)的數(shù)據(jù)作為數(shù)據(jù)源加載到RDD中進(jìn)行處理,再通過(guò)一系列的操作生成計(jì)算結(jié)果,每次操作所產(chǎn)生的中間結(jié)果都以RDD的形式存儲(chǔ)在內(nèi)存中.Spark通過(guò)數(shù)據(jù)集的血統(tǒng)(lineage)實(shí)現(xiàn)容錯(cuò),當(dāng)節(jié)點(diǎn)失效導(dǎo)致數(shù)據(jù)丟失時(shí),Spark可以根據(jù)血統(tǒng)重新計(jì)算,以實(shí)現(xiàn)丟失數(shù)據(jù)的自動(dòng)重建.為了避免錯(cuò)誤恢復(fù)的代價(jià)與運(yùn)行時(shí)間成正比增長(zhǎng),Spark提供了檢查點(diǎn)(checkpoint)[13]功能,通過(guò)設(shè)置檢查點(diǎn)來(lái)記錄中間狀態(tài),避免從頭開始計(jì)算的漫長(zhǎng)恢復(fù).

      然而,Spark集群默認(rèn)的容錯(cuò)機(jī)制將檢查點(diǎn)選擇和設(shè)置的權(quán)利交給編程人員,而程序員往往根據(jù)經(jīng)驗(yàn)進(jìn)行選擇,使結(jié)果充滿不確定性.數(shù)據(jù)丟失后恢復(fù)的效果好壞和效率高低,會(huì)隨著程序員的水平不同而出現(xiàn)巨大差異.錯(cuò)誤的檢查點(diǎn)策略不僅會(huì)導(dǎo)致程序變慢,恢復(fù)效率降低,甚至?xí)速M(fèi)持久化存儲(chǔ)(磁盤/固態(tài)硬盤等)的空間,影響其他作業(yè)執(zhí)行效率.此外,由于現(xiàn)有分布式框架中的檢查點(diǎn)策略,往往根據(jù)時(shí)間周期定時(shí)設(shè)置,并未對(duì)具體作業(yè)進(jìn)行分析,考慮影響作業(yè)恢復(fù)效率的重要因素,因此選取的檢查點(diǎn)并不一定有效,反而可能影響整體系統(tǒng)的計(jì)算和容錯(cuò)性能.

      為解決這一問(wèn)題,本文主要做了3方面工作:

      1) 對(duì)內(nèi)存計(jì)算框架的作業(yè)執(zhí)行機(jī)制進(jìn)行分析,建立執(zhí)行效率模型,給出了RDD計(jì)算代價(jià)和任務(wù)執(zhí)行時(shí)間的定義.

      2) 通過(guò)分析RDD的恢復(fù)過(guò)程,建立了檢查點(diǎn)恢復(fù)模型,給出了RDD關(guān)鍵度、失效恢復(fù)比的定義,并證明這些定義與任務(wù)恢復(fù)效率的關(guān)系,為算法設(shè)計(jì)提供基礎(chǔ)模型.

      3) 在相關(guān)模型定義和證明的基礎(chǔ)上,提出了檢查點(diǎn)管理策略的問(wèn)題定義,以此作為算法設(shè)計(jì)的主要依據(jù).

      1 相關(guān)工作

      檢查點(diǎn)/恢復(fù)策略是分布式計(jì)算廣泛使用的容錯(cuò)技術(shù),在國(guó)內(nèi)外都有較多的相關(guān)研究[14-19].傳統(tǒng)的檢查點(diǎn)策略主要分為3類:應(yīng)用級(jí)實(shí)現(xiàn)、用戶級(jí)實(shí)現(xiàn)和系統(tǒng)級(jí)實(shí)現(xiàn).

      1) 應(yīng)用級(jí)實(shí)現(xiàn).由編程人員或自動(dòng)程序?qū)z查點(diǎn)代碼與應(yīng)用程序代碼整合,檢查點(diǎn)活動(dòng)由應(yīng)用程序自動(dòng)執(zhí)行.將檢查點(diǎn)存儲(chǔ)到持久化存儲(chǔ)中,并且當(dāng)節(jié)點(diǎn)故障后,從檢查點(diǎn)重啟.這種方法的挑戰(zhàn)在于需要編程人員對(duì)設(shè)置檢查點(diǎn)的應(yīng)用有透徹的了解.

      2) 用戶級(jí)實(shí)現(xiàn).需要用戶級(jí)的數(shù)據(jù)庫(kù)來(lái)設(shè)置檢查點(diǎn),并且應(yīng)用程序與數(shù)據(jù)庫(kù)相鏈接,這種方法對(duì)用戶不透明,因?yàn)閼?yīng)用的修改、編譯都鏈接到檢查點(diǎn)數(shù)據(jù)庫(kù).

      3) 系統(tǒng)級(jí)實(shí)現(xiàn).檢查點(diǎn)/恢復(fù)策略可以實(shí)現(xiàn)在系統(tǒng)級(jí)別,包括OS內(nèi)核或硬件.當(dāng)實(shí)現(xiàn)在系統(tǒng)級(jí)別,對(duì)用戶通常是透明的,并且對(duì)應(yīng)用程序不需要修改.

      另外一些研究成果主要考慮分布式環(huán)境下并行計(jì)算框架的失效恢復(fù)策略.Storm[20]是一個(gè)實(shí)時(shí)流式處理平臺(tái),運(yùn)行在Java虛擬機(jī)之上;使用Clojure和Java編寫,并且支持多語(yǔ)言編程;它依靠Nimbus在集群分發(fā)代碼,并將任務(wù)分配給工作節(jié)點(diǎn);由Zookeeper為其提供了容錯(cuò),并與Nimbus協(xié)調(diào)重新分配任務(wù)到其他可用節(jié)點(diǎn);然而,沒(méi)有設(shè)置檢查點(diǎn)機(jī)制.Apache S4[21]是一個(gè)開源的流處理平臺(tái),利用Zookeeper提供容錯(cuò).與Storm不同,它提供了檢查點(diǎn)機(jī)制.在一個(gè)節(jié)點(diǎn)出現(xiàn)故障時(shí),故障轉(zhuǎn)移機(jī)制重新啟動(dòng)一個(gè)新節(jié)點(diǎn)進(jìn)行任務(wù)恢復(fù).為了減少延遲,檢查點(diǎn)是異步的,這意味著每個(gè)獨(dú)立節(jié)點(diǎn)執(zhí)行檢查點(diǎn),沒(méi)有全局一致性.文獻(xiàn)[22-23]提出了一種基于內(nèi)存的分布式文件系統(tǒng)Tachyon,兼容包括Spark在內(nèi)的多種計(jì)算框架.Tachyon本質(zhì)上是將內(nèi)存存儲(chǔ)功能從Spark或Yarn中分離出來(lái),使上層計(jì)算框架實(shí)現(xiàn)更高的執(zhí)行效率.在Tachyon的實(shí)現(xiàn)中,文件采用兼容HDFS的Block方式進(jìn)行管理,使用不同的存儲(chǔ)媒介對(duì)數(shù)據(jù)分層次緩存,但檢查點(diǎn)算法僅保存當(dāng)前最新生成的RDD,與任務(wù)和RDD的特性無(wú)關(guān).文獻(xiàn)[24]提出了內(nèi)存文件系統(tǒng)RAMCloud,將內(nèi)存作為文件的存儲(chǔ)介質(zhì),而磁盤則作為備份介質(zhì),為了提高效率,RAMCloud使用日志結(jié)構(gòu)存儲(chǔ),利用集群的并發(fā)能力和高速帶寬的Infiniband實(shí)現(xiàn)快速恢復(fù),保證了數(shù)據(jù)的可用性和完整性,但Spark與RAMCloud并不兼容,因?yàn)镽AMCloud的高內(nèi)存占用率會(huì)影響Spark的計(jì)算性能.

      因此,為了解決檢查點(diǎn)設(shè)置的問(wèn)題,其中的重點(diǎn)就是選擇哪些有價(jià)值的RDD作為檢查點(diǎn)存儲(chǔ)在磁盤中,以及在數(shù)據(jù)丟失后應(yīng)選擇什么樣的節(jié)點(diǎn)進(jìn)行恢復(fù).在實(shí)時(shí)系統(tǒng)或數(shù)據(jù)庫(kù)系統(tǒng)中,對(duì)周期設(shè)置檢查點(diǎn)而言,所有數(shù)據(jù)的重要性相同,僅僅根據(jù)周期設(shè)置時(shí)的時(shí)間選擇最新生成的數(shù)據(jù)設(shè)置為檢查點(diǎn).與此不同的是,由于Spark基本存儲(chǔ)單元為RDD,不同RDD所具有的屬性如操作復(fù)雜度、血統(tǒng)長(zhǎng)度、計(jì)算代價(jià)和RDD數(shù)據(jù)量大小各不相同,對(duì)于作業(yè)恢復(fù)時(shí)所產(chǎn)生的恢復(fù)效果不同,因此在作業(yè)執(zhí)行過(guò)程中,異步存儲(chǔ)對(duì)恢復(fù)更有價(jià)值的RDD在磁盤中,在設(shè)置檢查點(diǎn)的同時(shí)盡量不影響作業(yè)執(zhí)行效率.通過(guò)對(duì)Spark作業(yè)執(zhí)行機(jī)制和執(zhí)行時(shí)間進(jìn)行理論分析,在恢復(fù)時(shí)根據(jù)數(shù)據(jù)本地性、節(jié)點(diǎn)空閑情況和節(jié)點(diǎn)能力選擇節(jié)點(diǎn),并執(zhí)行恢復(fù)任務(wù),從而縮短恢復(fù)時(shí)間.

      2 問(wèn)題的建模和分析

      本節(jié)通過(guò)分析Spark作業(yè)執(zhí)行機(jī)制,定義了RDD和作業(yè)的執(zhí)行開銷,提出了作業(yè)執(zhí)行模型、檢查點(diǎn)模型和關(guān)鍵度模型,并且在理論基礎(chǔ)上對(duì)目標(biāo)問(wèn)題進(jìn)行了定義.

      2.1 Spark作業(yè)執(zhí)行機(jī)制

      在Spark應(yīng)用中,整個(gè)執(zhí)行流程在邏輯上會(huì)形成有向無(wú)環(huán)圖(DAG).Action算子觸發(fā)之后,根據(jù)RDD的血統(tǒng),將所有累積的算子形成一個(gè)有向無(wú)環(huán)圖,然后由調(diào)度器調(diào)度該圖上的任務(wù)進(jìn)行運(yùn)算.Spark根據(jù)RDD之間不同的依賴關(guān)系切分形成不同的階段(stage),一個(gè)階段包含一系列函數(shù)執(zhí)行流水線.

      Spark作業(yè)DAG的典型示例如圖1所示,A,B,C,D,E,F(xiàn)和G分別代表不同的RDD,RDD內(nèi)的方框代表分區(qū),虛線框?yàn)殡A段.數(shù)據(jù)從HDFS輸入Spark,形成RDDA和RDDC,RDDC上執(zhí)行map操作,轉(zhuǎn)換為RDDD,RDDD和RDDE執(zhí)行union操作,轉(zhuǎn)換為F,而在B和F通過(guò)join連接轉(zhuǎn)化為G的過(guò)程中會(huì)執(zhí)行Shuffle,最后RDDG輸出并保存到HDFS中.

      Fig. 1 Directed acyclic graph of Spark job圖1 Spark作業(yè)的有向無(wú)環(huán)圖

      Spark在未設(shè)置檢查點(diǎn)算法時(shí),若數(shù)據(jù)失效,則需要利用血統(tǒng)重新計(jì)算來(lái)實(shí)現(xiàn)恢復(fù).若所有需要的數(shù)據(jù)都丟失,則RDD必須重新計(jì)算生成,使任務(wù)的完成時(shí)間延長(zhǎng),還增加了額外的計(jì)算開銷.若能為有價(jià)值的RDD設(shè)置檢查點(diǎn),則能有效縮減任務(wù)恢復(fù)時(shí)間.

      2.2 作業(yè)執(zhí)行模型

      定義1. RDD計(jì)算代價(jià).Spark任務(wù)中,分區(qū)由父節(jié)點(diǎn)為輸入數(shù)據(jù)計(jì)算生成.設(shè)Parentsi jk為分區(qū)Pi jk的父節(jié)點(diǎn)集合.分區(qū)Pi jk的計(jì)算代價(jià)為數(shù)據(jù)讀取代價(jià)與數(shù)據(jù)處理代價(jià)之和,即:

      TPi jk=read(Parentsi jk)+proc(Parentsi jk).

      (1)

      RDD的所有分區(qū)由集群工作節(jié)點(diǎn)并行計(jì)算生成,因此其計(jì)算代價(jià)為所有分區(qū)計(jì)算代價(jià)的最大值,即:

      TRDDi j=max(TPi j1,TPi j2,…,TPi jk).

      (2)

      定義2. 作業(yè)執(zhí)行時(shí)間.如圖1所示,Spark以寬依賴為分界點(diǎn),將作業(yè)劃分為多個(gè)階段執(zhí)行,因此階段分為窄依賴和寬依賴2類.

      由于Spark任務(wù)以分區(qū)為最小粒度單位,因此在任務(wù)分配時(shí),分區(qū)計(jì)算時(shí)間與節(jié)點(diǎn)計(jì)算能力有關(guān).設(shè)cpw為工作節(jié)點(diǎn)計(jì)算能力,單位是tuple/s;SPi jk為分區(qū)Pi jk的元組個(gè)數(shù).

      由于窄依賴的父分區(qū)為上一個(gè)RDD對(duì)應(yīng)位置的一個(gè)分區(qū),則窄依賴分區(qū)計(jì)算時(shí)間:

      (3)

      對(duì)于窄依賴階段,每個(gè)階段包括多條流水線(每條流水線包括多個(gè)RDD的不同分區(qū)).記窄依賴Stage共包含m個(gè)RDD,RDD相同位置的分區(qū)組成流水線,所有RDD劃分為x條流水線,單條流水線的分區(qū)集合為pipei x={Pi 1x,Pi 2x,…,Pi jx},那么單條流水線的執(zhí)行時(shí)間可表示為

      (4)

      對(duì)于stagei,記其流水線集合為Pipesi={pipei 1,pipei 2,…,pipei x},那么該階段的執(zhí)行時(shí)間應(yīng)為各流水線執(zhí)行時(shí)間最大值,即:

      NTstagei=max(Tpipei 1,Tpipei 2,…,Tpipei x)=

      寬依賴的父分區(qū)集合為上一個(gè)RDD的所有分區(qū),即整個(gè)RDD,且寬依賴RDD的分區(qū)計(jì)算時(shí)間大于窄依賴RDD的分區(qū)計(jì)算時(shí)間,則寬依賴分區(qū)計(jì)算時(shí)間:

      (6)

      由于寬依賴階段中僅包含1個(gè)RDD,則寬依賴階段的計(jì)算時(shí)間為

      WTstagei=max{WTPi j1,WTPi j2,…,WTPi jk}.

      (7)

      一個(gè)作業(yè)由若干個(gè)階段構(gòu)成,由于Spark以寬依賴為界劃分階段,而作業(yè)的最后一個(gè)階段必為寬依賴(所有執(zhí)行均為寬依賴操作),記作業(yè)中寬、窄依賴的個(gè)數(shù)分別為m和n,那么作業(yè)執(zhí)行時(shí)間可表示為

      (8)

      2.3 檢查點(diǎn)模型

      在節(jié)點(diǎn)故障時(shí),會(huì)使存儲(chǔ)在該節(jié)點(diǎn)內(nèi)存上的多個(gè)RDD部分甚至所有分區(qū)不可用,這將導(dǎo)致作業(yè)無(wú)法繼續(xù)正常執(zhí)行,在未設(shè)置檢查點(diǎn)的情況下,作業(yè)需要回溯,直到找到可用的RDD.極端情況下,甚至需要重新調(diào)度,之前所計(jì)算的所有工作和耗費(fèi)的系統(tǒng)資源都會(huì)浪費(fèi).因此,對(duì)于作業(yè)而言,有必要設(shè)置檢查點(diǎn),從而降低節(jié)點(diǎn)宕機(jī)后產(chǎn)生的恢復(fù)開銷.

      定義3. 作業(yè)恢復(fù)代價(jià).作業(yè)執(zhí)行到任意RDD,若集群中某個(gè)工作節(jié)點(diǎn)發(fā)生故障,作業(yè)將丟失該工作節(jié)點(diǎn)計(jì)算的所有中間結(jié)果.記故障的工作節(jié)點(diǎn)編號(hào)為k(工作節(jié)點(diǎn)編號(hào)對(duì)應(yīng)流水線序號(hào)和寬依賴RDD中的分區(qū)編號(hào)),那么對(duì)于所有窄依賴階段,每個(gè)階段丟失一條流水線的數(shù)據(jù),則在不考慮恢復(fù)調(diào)度開銷時(shí),窄依賴階段的恢復(fù)代價(jià)為丟失流水線的重新計(jì)算代價(jià),即:

      (9)

      而對(duì)于寬依賴階段而言,RDD的一個(gè)分區(qū)丟失,因此在不考慮恢復(fù)調(diào)度開銷時(shí),寬依賴階段的恢復(fù)代價(jià)為該分區(qū)的重新計(jì)算代價(jià),即:

      WRi=WTPi jk.

      (10)

      由于工作節(jié)點(diǎn)故障為隨機(jī)事件,發(fā)生故障時(shí)當(dāng)前計(jì)算的RDD可能位于任何階段內(nèi),因此,若故障發(fā)生在窄依賴階段,則作業(yè)恢復(fù)代價(jià)還應(yīng)考慮丟失分區(qū)在流水線的前續(xù)節(jié)點(diǎn)的重新計(jì)算代價(jià).而若故障發(fā)生在寬依賴階段,由于該階段僅包含一個(gè)RDD,無(wú)前續(xù)節(jié)點(diǎn)恢復(fù)代價(jià),僅需要恢復(fù)該RDD.

      設(shè)當(dāng)前作業(yè)已執(zhí)行的階段中共有x個(gè)窄依賴、y個(gè)寬依賴,正在執(zhí)行的階段共有m個(gè)RDD,當(dāng)前正在計(jì)算RDDi h,在不考慮恢復(fù)調(diào)度開銷時(shí)作業(yè)恢復(fù)代價(jià)可表示為

      (11)

      即作業(yè)恢復(fù)代價(jià)等于前續(xù)寬、窄依賴階段的恢復(fù)代價(jià)之和,再加上丟失分區(qū)在當(dāng)前階段中父分區(qū)的計(jì)算代價(jià)之和.

      定理1. 設(shè)Jobi={RDDi 1,RDDi 2,…,RDDi n},RDDi(j-1)∈Jobi且RDDi j∈Jobi.若RDDi j具有更長(zhǎng)的血統(tǒng),并且RDDi(j-1)是RDDi j唯一的parentRDD.那么,在執(zhí)行Jobi的計(jì)算節(jié)點(diǎn)都失效時(shí),恢復(fù)RDDi j的開銷較高,且為TRDDi j.

      證明. 若RDDi j具有更長(zhǎng)的血統(tǒng),RDDi(j-1)是RDDi j的parentRDD.設(shè)RDDi(j-1)的血統(tǒng)長(zhǎng)度為k,則RDDi j的血統(tǒng)長(zhǎng)度為k+1.考慮2種情況:

      1) 若Jobi設(shè)置了檢查點(diǎn),則只需要從檢查點(diǎn)處開始計(jì)算恢復(fù).若恢復(fù)所需要的最新檢查點(diǎn)為RDDi p,恢復(fù)調(diào)度開銷為常量αi的情況下,則恢復(fù)開銷分別為

      2) 若Jobi未設(shè)置檢查點(diǎn),在計(jì)算節(jié)點(diǎn)失效時(shí),則所有RDD都需要重新計(jì)算,重新部署調(diào)度開銷為常量βi,則恢復(fù)開銷分別為

      無(wú)論是情況1或情況2,明顯可看出,RRDDi j>RRDDi(j-1)且RRDDi j-RRDDi(j-1)=TRDDi j.

      證畢.

      定理2. 設(shè)Jobi={RDDi 1,RDDi 2,…,RDDi n},RDDi(j-1)∈Jobi,且RDDi j∈Jobi.RDDi(j-1)為RDDi j的唯一parentRDD.若節(jié)點(diǎn)失效時(shí),僅丟失了RDDi j的第l個(gè)分區(qū)Pi jl,那么在RDDi j的操作為窄依賴或?qū)捯蕾嚂r(shí),其恢復(fù)Pi jl的開銷不同,且寬依賴時(shí)恢復(fù)開銷為max(RPi j1,RPi j2,…,RPi jk).

      證明. 由于RDDi j的parentRDD為RDDi(j-1),則操作為窄依賴時(shí)可知丟失第l個(gè)分片Pi jl,只需通過(guò)RDDi(j-1)的對(duì)應(yīng)父分片Pi j(l-1)通過(guò)流水線計(jì)算獲得,故恢復(fù)Pi j(l-1)開銷為

      RRDDi j(narrow)=RPTi jl=
      αi+read(PTi(j-1)l)+proc(PTi jl).

      操作為寬依賴時(shí),丟失第l個(gè)分片Pi jl,需通過(guò)RDDi(j-1)所有的分片計(jì)算得到Pi jl,則恢復(fù)PTi j(l-1)的開銷為

      RRDDi j(wide)=RRDDi j=max(RPi j1,RPi j2,…,RPi jk).

      因此,RRDDi j(wide)≥RRDDi j(narrow).

      證畢.

      定理3. 設(shè)Jobi={RDDi 1,RDDi 2,…,RDDi n},RDDi(j-1),RDDi j,RDDi(j+1)∈Jobi.RDDi(j-1)是RDDi(j-1)和RDDi j的唯一parentRDD,RDDi(j-1)已備份為檢查點(diǎn)并存儲(chǔ)在正常運(yùn)行節(jié)點(diǎn),且計(jì)算代價(jià)procRDDi(j+1)≥procRDDi j.當(dāng)計(jì)算節(jié)點(diǎn)失效時(shí),RDDi j和RDDi(j+1)丟失,則從RDDi(j-1)檢查點(diǎn)進(jìn)行恢復(fù)的代價(jià)RRDDi(j+1)≥RRDDi j.

      證明. 由于procRDDi(j+1)≥procRDDi j,則對(duì)于RDDi(j+1)中任意第l個(gè)分片的計(jì)算代價(jià)procPTi(j+1)l≥procPTi jl.

      當(dāng)執(zhí)行恢復(fù)計(jì)算時(shí),RDDi(j-1)已備份為檢查點(diǎn),則恢復(fù)RDDi j的開銷為RRDDi j=max(RPi j1,RPi j2,…,RPi jk).對(duì)第l個(gè)分片而言,恢復(fù)該分片的開銷為

      RPi jl=TPi jl=αi+read(Pi(j-1)l)+proc(Pi jl).

      同樣,恢復(fù)RDDi(j+1)的開銷為RRDDi(j+1)=max(RPi(j+1)1,RPi(j+1)2,…,RPi(j+1)k).對(duì)第l個(gè)分片而言,恢復(fù)該分片的開銷為

      RPi(j+1)l=TPi(j+1)l=
      αi+read(Pi(j-1)l)+proc(Pi(j+1)l).

      由于procRDDi(j+1)≥procRDDi j,可知RPi(j+1)l≥RPi jl,同時(shí)RRDDi(j+1)≥RRDDi j.

      證畢.

      2.4 RDD關(guān)鍵度模型

      根據(jù)2.3節(jié)的分析,可以判斷出RDD與恢復(fù)開銷相關(guān)的因素有3個(gè):

      1) RDD的血統(tǒng)長(zhǎng)度.在作業(yè)中具有更長(zhǎng)血統(tǒng)的RDD恢復(fù)開銷更大,證明詳見(jiàn)定理1.因此當(dāng)設(shè)置檢查點(diǎn)時(shí),應(yīng)優(yōu)先選擇長(zhǎng)血統(tǒng)的RDD,從而降低對(duì)恢復(fù)開銷的影響.

      2) RDD類型(指寬依賴或窄依賴).在寬依賴情況下,丟失1個(gè)子RDD分區(qū),需重算的每個(gè)父RDD的每個(gè)分區(qū)會(huì)產(chǎn)生冗余計(jì)算開銷,使寬依賴在恢復(fù)時(shí)開銷更大,證明詳見(jiàn)定理2.因此,應(yīng)優(yōu)先選擇寬依賴的RDD作為檢查點(diǎn),從而降低恢復(fù)開銷.

      3) RDD計(jì)算代價(jià).計(jì)算代價(jià)越高的RDD恢復(fù)時(shí)開銷越大,應(yīng)優(yōu)先備份計(jì)算代價(jià)高的RDD,證明詳見(jiàn)定理3.另外,RDD大小對(duì)數(shù)據(jù)備份時(shí)長(zhǎng)和數(shù)據(jù)恢復(fù)讀取時(shí)長(zhǎng)都會(huì)產(chǎn)生影響,從而也作為考慮的關(guān)鍵因素.

      定義4. RDD關(guān)鍵度.表示RDD對(duì)任務(wù)恢復(fù)效率的重要程度.在傳統(tǒng)的檢查點(diǎn)策略中,并未引進(jìn)權(quán)重的概念,被選擇作為檢查點(diǎn)的數(shù)據(jù)在本質(zhì)上與其他數(shù)據(jù)沒(méi)有區(qū)別,而在Spark中則不同,不同的RDD對(duì)恢復(fù)的重要程度不同.在RDD丟失時(shí),需要重新計(jì)算,而不同的RDD恢復(fù)需要的計(jì)算開銷不同.

      綜合考慮相關(guān)因素,記LDRDDi j表示為L(zhǎng)DRDDi j血統(tǒng)長(zhǎng)度,TypeRDDi j為類型,CostRDDi j為計(jì)算代價(jià),SizeRDDi j為數(shù)據(jù)量大小.則RDD的關(guān)鍵度表示為

      (12)

      式(12)表明,血統(tǒng)長(zhǎng)度、RDD類型、計(jì)算代價(jià)與RDD關(guān)鍵度成正比關(guān)系,而容量大小則成反比關(guān)系.關(guān)鍵度越大,恢復(fù)時(shí)對(duì)恢復(fù)效率的影響越大,因此作為檢查點(diǎn)備份的必要性越大.

      定義5. 失效恢復(fù)比.用于表示恢復(fù)任務(wù)分配與節(jié)點(diǎn)計(jì)算能力的適應(yīng)程度.記恢復(fù)計(jì)算節(jié)點(diǎn)集合Workers={w1,w2,…,wn},其計(jì)算能力C={c1,c2,…,cn},RT為恢復(fù)任務(wù)總量,則執(zhí)行恢復(fù)任務(wù)的時(shí)間開銷均值可定義為

      (13)

      對(duì)于任意計(jì)算節(jié)點(diǎn)wi,分配給其恢復(fù)任務(wù)RTi時(shí),其執(zhí)行時(shí)間、均值的方差和失效恢復(fù)比分別為

      (14)

      Dwi=(Twi-E)2,

      (15)

      (16)

      其中,wi∈Workers.

      基于定義,從作業(yè)分配的角度來(lái)看,作業(yè)執(zhí)行時(shí)間也可表示為

      Tjob=max(Tw1,Tw2,…,Twn).

      (17)

      由于節(jié)點(diǎn)的失效恢復(fù)比與方差成反比,因此比值越大,方差越小,表示節(jié)點(diǎn)恢復(fù)作業(yè)完成時(shí)間越趨近均值,因此當(dāng)所有工作節(jié)點(diǎn)的失效恢復(fù)比取最大值時(shí)恢復(fù)任務(wù)的執(zhí)行時(shí)間最短.

      2.5 檢查點(diǎn)管理策略問(wèn)題定義

      2.1節(jié)至2.4節(jié)已經(jīng)對(duì)作業(yè)和檢查點(diǎn)機(jī)制作了比較詳細(xì)的闡述,基于這些定義,對(duì)我們的檢查點(diǎn)算法進(jìn)行形式化表示.

      優(yōu)化目標(biāo):

      minRjob.

      (18)

      s.t.

      (19)

      其中,j∈jobs,wi∈Workers,r∈R.

      優(yōu)化目標(biāo):

      (20)

      (21)

      s.t.

      (22)

      其中,j∈jobs,RDDj k∈jobj,wi∈Workers,r∈R.

      目標(biāo)是最大化RDD關(guān)鍵度和失效恢復(fù)比,約束條件同上.因此,在設(shè)置檢查點(diǎn)時(shí),應(yīng)選擇已生成的RDD中對(duì)恢復(fù)效率影響最大,即關(guān)鍵度最大的RDD作為檢查點(diǎn)進(jìn)行設(shè)置.在節(jié)點(diǎn)失效進(jìn)行恢復(fù)時(shí),應(yīng)選擇節(jié)點(diǎn)能力更強(qiáng)的節(jié)點(diǎn)執(zhí)行作業(yè)恢復(fù).

      3 檢查點(diǎn)管理策略

      本節(jié)提出基于關(guān)鍵度的檢查點(diǎn)管理(criticality based checkpoint management,CCM)策略,其中包括檢查點(diǎn)設(shè)置算法、失效恢復(fù)算法和檢查點(diǎn)清理算法,具體流程圖如圖2所示.

      Fig. 2 Flow chart of checkpoint strategy圖2 檢查點(diǎn)策略流程圖

      作業(yè)在設(shè)置檢查點(diǎn)和恢復(fù)算法的粒度可以分為2個(gè)級(jí)別:1)RDD級(jí)別;2)分區(qū)級(jí)別.在設(shè)置檢查點(diǎn)時(shí),檢查點(diǎn)選擇可以在RDD級(jí)別、分區(qū)級(jí)別以權(quán)重度、固定時(shí)間間隔等多種為算法以異步檢查點(diǎn)的方式進(jìn)行設(shè)置.在算法恢復(fù)時(shí),宕機(jī)節(jié)點(diǎn)上的任務(wù)需要進(jìn)行恢復(fù),以最新的檢查點(diǎn)序列為基礎(chǔ),由Spark調(diào)度器重新調(diào)度執(zhí)行.根據(jù)集群資源情況,分配需要的資源.

      RDD級(jí)別以作業(yè)的RDD為粒度,根據(jù)RDD的重要度選擇更優(yōu)的檢查點(diǎn),問(wèn)題在于粒度較大,當(dāng)某個(gè)節(jié)點(diǎn)宕機(jī)丟失某些RDD的分區(qū),需要重新計(jì)算所有的RDD,從磁盤中讀取最新檢查點(diǎn)信息的所有內(nèi)容,浪費(fèi)系統(tǒng)資源.分區(qū)級(jí)別以RDD的分區(qū)為粒度,根據(jù)分區(qū)的關(guān)鍵度,為每個(gè)分區(qū)選擇異步的檢查點(diǎn),在丟失某些分區(qū)時(shí)不需要恢復(fù)整個(gè)RDD,只需恢復(fù)丟失的分區(qū),無(wú)論是備份檢查點(diǎn)時(shí)的磁盤開銷,還是恢復(fù)時(shí)的讀取開銷和執(zhí)行開銷,都比較少.但問(wèn)題在于,以分區(qū)為粒度和異步備份檢查點(diǎn)時(shí)、若有多個(gè)任務(wù)同時(shí)在Spark集群中執(zhí)行,檢查點(diǎn)設(shè)置開銷和管理難度明顯增大.因此,采用以RDD為設(shè)置檢查點(diǎn)的基本單位,分區(qū)為恢復(fù)檢查點(diǎn)的基本單位.

      3.1 基礎(chǔ)數(shù)據(jù)構(gòu)建

      基于關(guān)鍵度的檢查點(diǎn)算法需要構(gòu)建的初始化參數(shù)與基礎(chǔ)數(shù)據(jù)如下:

      1) 生成RDD結(jié)構(gòu)樹treeRDDs

      通過(guò)在Spark源碼中插入監(jiān)聽代碼,遍歷作業(yè)的DAG圖,獲取DAG圖中點(diǎn)和邊的信息,以及輸入RDDId、操作類型、輸出RDDId,并通過(guò)DAG生成RDD結(jié)構(gòu)樹treeRDDs.

      2) 獲取RDD深度

      通過(guò)分析DAG圖即可獲得,實(shí)際運(yùn)行時(shí)血統(tǒng)長(zhǎng)度為迭代次數(shù)與血統(tǒng)長(zhǎng)度相乘.

      3) 獲取RDD類型值

      寬依賴操作的RDD具有作為檢查點(diǎn)的意義,因此遍歷RDD結(jié)構(gòu)樹,將操作為寬依賴的RDD寫入列表,并根據(jù)該RDD分片個(gè)數(shù)獲得類型值.

      4) RDD計(jì)算代價(jià)

      在實(shí)際的Spark環(huán)境中,作業(yè)的DAG圖生成后,管理節(jié)點(diǎn)監(jiān)控所有RDD的狀態(tài)變化,通過(guò)記錄各狀態(tài)變化的時(shí)間點(diǎn)即可得出起始時(shí)間和完成時(shí)間.

      5) RDD大小

      在作業(yè)執(zhí)行過(guò)程中,每計(jì)算完成一個(gè)RDD,即可根據(jù)其多個(gè)分區(qū)大小求和,進(jìn)而獲得該RDD的大小.

      6) 計(jì)算關(guān)鍵度

      通過(guò)RDD關(guān)鍵度公式,為RDD計(jì)算并保存.

      3.2 檢查點(diǎn)設(shè)置算法

      如算法1所示,根據(jù)RDD的關(guān)鍵度,在作業(yè)執(zhí)行時(shí)間內(nèi)選擇RDD作為檢查點(diǎn)進(jìn)行設(shè)置.檢查點(diǎn)設(shè)置的時(shí)間從源RDD之后第1個(gè)生成的RDD開始作為檢查點(diǎn)開始,到終點(diǎn)RDD之前最后一個(gè)生成的RDD結(jié)束為結(jié)束.在作業(yè)執(zhí)行時(shí),源RDD和終點(diǎn)RDD不需要進(jìn)行檢查點(diǎn)設(shè)置.由于源RDD存儲(chǔ)在磁盤中,終點(diǎn)RDD為任務(wù)結(jié)束的RDD,會(huì)根據(jù)需要在結(jié)束后寫入磁盤,因此不需要考慮.在作業(yè)執(zhí)行過(guò)程中進(jìn)行檢查點(diǎn)設(shè)置時(shí)對(duì)比最新生成的多個(gè)RDD,并選擇當(dāng)前關(guān)鍵度最大的RDD.選擇好需要備份的RDD,執(zhí)行檢查點(diǎn)設(shè)置操作,將該RDD備份.計(jì)算任務(wù)首先將檢查點(diǎn)臨時(shí)緩存在局部?jī)?nèi)存中,然后由另一個(gè)獨(dú)立于計(jì)算任務(wù)的并行任務(wù)負(fù)責(zé)將緩存在內(nèi)存中的檢查點(diǎn)數(shù)據(jù)文件拷貝到HDFS.

      算法1. 檢查點(diǎn)設(shè)置算法.

      輸入:結(jié)構(gòu)樹treeRDDs;

      輸出:檢查點(diǎn)列表checkpointlist.

      初始化:checkpointlist←newArray;

      checkpoint←newHash;

      cutlist←newList;

      i←0;

      j←1;

      maxCR←0.

      ①checkpointlist←Null;

      ②checkpoint(treeRDDs[i]);

      ③checkpointlist.add(treeRDDs[i]);

      /*添加生成的第1個(gè)RDD作為檢查點(diǎn)*/

      ④ while(Cfinished=1 &&Tfinished=0)

      ⑤ fori=0 tonewgenerateRDD.lengthdo

      ⑥candidateslist←getnewRDD;

      ⑦candidates[i].cost←getRDDCost;

      ⑧calculate(candidates[i].CR);

      ⑨ if (candidates[i].CR>maxCR) then

      ⑩nextcheckpoint[j]←candidates[i];

      3.3 檢查點(diǎn)恢復(fù)算法

      管理節(jié)點(diǎn)在每一個(gè)固定的時(shí)間間隔要求工作節(jié)點(diǎn)來(lái)發(fā)送它們的當(dāng)前狀態(tài).工作節(jié)點(diǎn)定期發(fā)送心跳給管理節(jié)點(diǎn),如果在一個(gè)時(shí)間限制沒(méi)有收到工作節(jié)點(diǎn)的心跳,標(biāo)記“失敗”.在這種情況下,管理節(jié)點(diǎn)指示工作節(jié)點(diǎn)利用最近的檢查點(diǎn)進(jìn)行重新啟動(dòng)恢復(fù).如算法2所示,宕機(jī)后,由Spark執(zhí)行恢復(fù)操作時(shí),從最后設(shè)置的檢查點(diǎn)RDD處進(jìn)行恢復(fù).恢復(fù)時(shí),優(yōu)先選擇離執(zhí)行恢復(fù)的工作節(jié)點(diǎn)最近的檢查點(diǎn)副本,從而降低網(wǎng)絡(luò)開銷和恢復(fù)時(shí)延.將最新的檢查點(diǎn)讀入內(nèi)存,并且可以根據(jù)需要將檢查點(diǎn)列表中的檢查點(diǎn)讀入,從而降低恢復(fù)和執(zhí)行開銷.當(dāng)某個(gè)RDD需要恢復(fù)但未設(shè)置檢查點(diǎn)時(shí),重新執(zhí)行血統(tǒng),通過(guò)其父節(jié)點(diǎn)恢復(fù);若已將該RDD設(shè)置檢查點(diǎn),可以將該RDD讀入內(nèi)存.若有寬依賴,或丟失了RDD的所有分區(qū),則需讀取所需的RDD的所有分區(qū)到內(nèi)存;若丟失了RDD的部分分區(qū),且恢復(fù)血統(tǒng)中無(wú)寬依賴,則只需讀取丟失分區(qū)所需檢查點(diǎn)到內(nèi)存進(jìn)行計(jì)算.

      算法2. 檢查點(diǎn)恢復(fù)算法.

      輸入:當(dāng)前結(jié)構(gòu)樹curtreeRDDs、檢查點(diǎn)列表checkpointlist、需要恢復(fù)的RDDrecoveryRDDs.

      初始化:parentsRDD←newHash;

      checkpointRDD←newHash;

      i,j,k←0;

      recoveryworker←assign.freeworker.

      ① fori=0 torecoveryRDDs.length-1 do

      ②parentsRDD←getLineage(

      recoveryRDDs[i],curtreeRDDs);

      /*獲取需要恢復(fù)RDD的父RDD*/

      ③ forj=0 toparentsRDD.length-1 do

      ④checkpointRDD←get(checkpointlist,

      parentsRDD);

      ⑤ fork=0 torecoveryRDDs.partitionnum-1 do

      ⑥ if (wideDependency(recoveryRDDs[i])∨lost(recoveryRDDs[i])

      ⑦read(parentsRDD[j]);

      /*操作為寬依賴或丟失所有分區(qū)*/

      ⑧ end if

      ⑩read(parentsRDD[j].partition[k]);

      /*操作為窄依賴且丟失部分分區(qū)*/

      /*根據(jù)血統(tǒng)進(jìn)行恢復(fù)計(jì)算*/

      3.4 檢查點(diǎn)清理算法

      隨著任務(wù)執(zhí)行時(shí)間的增長(zhǎng),保存的檢查點(diǎn)恢復(fù)信息越來(lái)越多,其中有些恢復(fù)信息成為過(guò)時(shí)無(wú)用的恢復(fù)信息.檢查點(diǎn)清理就是刪除這些過(guò)時(shí)無(wú)用的恢復(fù)信息.檢查點(diǎn)清理策略對(duì)恢復(fù)協(xié)議性能有直接影響.不同的恢復(fù)策略,不同的應(yīng)用所需要存儲(chǔ)的恢復(fù)信息量都會(huì)有不同,執(zhí)行檢查點(diǎn)清理會(huì)伴隨有開銷.因此有效的組織存儲(chǔ)恢復(fù)信息和執(zhí)行檢查點(diǎn)清理都會(huì)對(duì)系統(tǒng)性能產(chǎn)生影響,提高集群資源利用率.

      對(duì)Spark檢查點(diǎn)機(jī)制而言,檢查點(diǎn)存儲(chǔ)在持久化存儲(chǔ)磁盤或固態(tài)硬盤中.就磁盤空間而言,空間較大,并不是主要的瓶頸.因此,最簡(jiǎn)單的做法就是在任務(wù)執(zhí)行完畢后,將該任務(wù)所設(shè)置的所有檢查點(diǎn)刪除.但每次檢查點(diǎn)備份時(shí)為3個(gè)副本,因此,當(dāng)系統(tǒng)中并行執(zhí)行多個(gè)任務(wù)時(shí),若磁盤空間有限,那么在備份過(guò)程中需要執(zhí)行清理算法,可以根據(jù)檢查點(diǎn)的關(guān)鍵度進(jìn)行清理.

      檢查點(diǎn)清理算法,在不影響其他作業(yè)的情況下并行執(zhí)行.如算法3所示,清理的2個(gè)條件為:

      1) 當(dāng)任務(wù)的磁盤備份空間受限時(shí),在添加新檢查點(diǎn)時(shí),對(duì)已有檢查點(diǎn)進(jìn)行清理,以滿足新檢查點(diǎn)的空間需求;

      2) 系統(tǒng)設(shè)置檢查點(diǎn)清理周期,到達(dá)該時(shí)間周期時(shí)進(jìn)行清理.

      清理算法的2個(gè)步驟:

      1) 新添加一個(gè)檢查點(diǎn),磁盤空間不足,將已有的檢查點(diǎn)關(guān)鍵度進(jìn)行過(guò)濾,將權(quán)重小于新檢查點(diǎn)的對(duì)象放入候選列表.

      2) 將候選列表按關(guān)鍵度從小到大的順序排列.搜索候選列表,若存在目標(biāo),其容量滿足新檢查點(diǎn)的要求則進(jìn)行清理;否則,當(dāng)用戶設(shè)定的清理時(shí)間到達(dá)時(shí),才能將所有候列表中的檢查點(diǎn)進(jìn)行清理.

      算法3. 檢查點(diǎn)清理算法.

      輸入:檢查點(diǎn)列表checkpointlist、當(dāng)前需添加的檢查點(diǎn)RDD[i]、存儲(chǔ)空間空閑容量freecapacity.

      初始化:candidates←newHash;

      v←RDD[i].CR;

      s←RDD[i].size.

      ① forj=0 tocheckpointlist.length-1 do

      ② ifv>checkpointlist[j].CRthen

      ③candidates.add(checkpointlist[j]);

      ④ end if

      ⑤ end for

      ⑥cancadidates.orderbyCR();

      /*對(duì)關(guān)鍵度進(jìn)行排序*/

      ⑦ forj=0 tocancadidates.length-1 do

      ⑧ if (freecapacity≤mincapacity∧

      cancadidates[j].size>s)

      ⑨clean(candidates[j]);

      ⑩checkpoint(RDDs[i]);

      /*選擇空間滿足大小的檢查點(diǎn)替換*/

      /*到達(dá)系統(tǒng)清理時(shí)間*/

      /*清理所有檢查點(diǎn)*/

      4 實(shí)驗(yàn)與評(píng)價(jià)

      本節(jié)將通過(guò)實(shí)驗(yàn)進(jìn)行比較和評(píng)價(jià),驗(yàn)證檢查點(diǎn)自動(dòng)選擇算法、恢復(fù)算法和檢查點(diǎn)清理算法的有效性.

      4.1 實(shí)驗(yàn)環(huán)境

      實(shí)驗(yàn)環(huán)境用1臺(tái)服務(wù)器和8個(gè)工作節(jié)點(diǎn)建立計(jì)算集群,服務(wù)器作為Spark的Master和Hadoop的NameNode.為體現(xiàn)工作節(jié)點(diǎn)的計(jì)算能力不同,8個(gè)工作節(jié)點(diǎn)由1個(gè)高效節(jié)點(diǎn)、6個(gè)普通節(jié)點(diǎn)和1個(gè)慢節(jié)點(diǎn)組成,其中普通節(jié)點(diǎn)的配置如表1所示,高效節(jié)點(diǎn)配備4核CPU,16 GB內(nèi)存和4個(gè)千兆網(wǎng)卡,而慢任務(wù)節(jié)點(diǎn)僅有單核CPU,1 GB內(nèi)存和1個(gè)百兆網(wǎng)卡.

      Table1 Configuration Parameters of Worker Node表1 Worker節(jié)點(diǎn)配置參數(shù)

      4.2 算法綜合評(píng)估測(cè)試

      對(duì)于本文提出CCM策略進(jìn)行測(cè)試,實(shí)驗(yàn)數(shù)據(jù)首先選取WordCount,TeraSort,K-Means,PageRank 4種算法作為作業(yè)進(jìn)行分析.其中WordCount作業(yè)量為4.9 GB,TeraSort作業(yè)量為4.5 GB,K-Means輸入數(shù)據(jù)總樣本點(diǎn)為4 000 000,維度為20,點(diǎn)群個(gè)數(shù)為5,迭代1次.PageRank頁(yè)面?zhèn)€數(shù)為3 000 000,迭代3次.表2為不同算法在集群失效節(jié)點(diǎn)個(gè)數(shù)為1時(shí)執(zhí)行,且執(zhí)行到第100秒時(shí)關(guān)閉1個(gè)普通節(jié)點(diǎn)node3,對(duì)比3種策略對(duì)恢復(fù)時(shí)間(execution time, ET)以及與未優(yōu)化策略相比的加速時(shí)間(accelerate time, AT)和加速比(accelerate rate, AR)的影響.這3種策略分別為未優(yōu)化策略(Without opt)、Tachyon檢查點(diǎn)策略(Tachyon checkpoint)和CCM策略.

      Table 2 Recovery Efficiency with Different Strategies inSingle Node Failure

      在所有的作業(yè)中,CCM策略的作業(yè)完成時(shí)間都優(yōu)于傳統(tǒng)Spark,在設(shè)置檢查點(diǎn)的情況下,宕機(jī)恢復(fù)后,4類作業(yè)的總執(zhí)行時(shí)間要小于未設(shè)置檢查點(diǎn)的情況,從而證明了算法在多種類型作業(yè)下都具有良好的優(yōu)化效果.通過(guò)觀察數(shù)據(jù)發(fā)現(xiàn),不同作業(yè)對(duì)恢復(fù)效率的提高程度各不相同,這是因?yàn)椴煌愋妥鳂I(yè)寬依賴操作個(gè)數(shù)和數(shù)據(jù)大小各不相同,因此在作業(yè)類型不同的情況下算法的優(yōu)化效果無(wú)明顯規(guī)律.

      Fig. 3 Memory usage of different jobs on node3圖3 普通節(jié)點(diǎn)node3上不同作業(yè)的內(nèi)存利用率

      Fig. 4 Disk I/O rate of different jobs on node3圖4 普通節(jié)點(diǎn)node3上不同作業(yè)的磁盤I/O速率

      Fig. 5 The network I/O rate of different jobs on node3圖5 普通節(jié)點(diǎn)node3上不同作業(yè)的網(wǎng)絡(luò)I/O速率

      同樣,對(duì)于內(nèi)存利用率、磁盤I/O和網(wǎng)絡(luò)I/O的情況,在作業(yè)類型變化時(shí)也具有不同的特點(diǎn).圖3~5分別為本文提出的CCM策略下監(jiān)控普通節(jié)點(diǎn)node3在執(zhí)行過(guò)程中的內(nèi)存利用率、磁盤I/O速率和網(wǎng)絡(luò)I/O速率變化情況.在內(nèi)存利用率上,與作業(yè)的類型和輸入數(shù)據(jù)的分布情況有關(guān).對(duì)于相同的算法而言,所需處理的數(shù)據(jù)量越大,內(nèi)存資源使用量越大.由圖3可知,WordCount和TeraSort隨著執(zhí)行時(shí)間的增加,具有相對(duì)穩(wěn)定的內(nèi)存占用率;而K-Means和PageRank則隨著處理任務(wù)的階段不同,具有不同的內(nèi)存占用率.

      在磁盤I/O速率方面,無(wú)論任務(wù)是處理本地?cái)?shù)據(jù)還是網(wǎng)絡(luò)數(shù)據(jù),都會(huì)在某個(gè)節(jié)點(diǎn)上產(chǎn)生相應(yīng)的本地?cái)?shù)據(jù)讀取,消耗一定的磁盤I/O。若處理網(wǎng)絡(luò)數(shù)據(jù),還要產(chǎn)生額外的網(wǎng)絡(luò)I/O.由于作業(yè)需要從磁盤中讀取數(shù)據(jù),并在新的RDD生成時(shí)設(shè)置檢查點(diǎn),因此產(chǎn)生了較為頻繁的磁盤I/O.由圖4可知,其中WordCount的磁盤I/O更為明顯,其他3類作業(yè)則頻率較低.K-Means在100 s后磁盤I/O有明顯增加,這是由于節(jié)點(diǎn)失效,分配給node3恢復(fù)任務(wù),此時(shí)需要從磁盤中讀取相應(yīng)檢查點(diǎn),從而實(shí)現(xiàn)恢復(fù).

      對(duì)于網(wǎng)絡(luò)I/O而言,其開銷大小與作業(yè)的類型和任務(wù)并行度有關(guān).由于Spark具有數(shù)據(jù)本地性的特點(diǎn),盡可能保證節(jié)點(diǎn)上執(zhí)行的任務(wù)處理本地?cái)?shù)據(jù),因此網(wǎng)絡(luò)I/O主要源于寬依賴階段,另外小部分網(wǎng)絡(luò)I/O則是處理遠(yuǎn)程數(shù)據(jù).由圖5可看出,對(duì)于4類不同的作業(yè),網(wǎng)絡(luò)I/O開銷相差不大.其中WordCount作業(yè)的寬依賴階段只需合并少量數(shù)據(jù),因此其網(wǎng)絡(luò)I/O開銷較小.

      另外,由表2可知,在設(shè)置檢查點(diǎn)的情況下,本文提出的CCM策略的K-Means和PageRank的恢復(fù)加速情況要好于Tachyon的檢查點(diǎn)策略,而WordCount和TeraSort與Tachyon類似,這是對(duì)比K-Means和PageRank,WordCount和TeraSort中的寬依賴操作較少,因此在使用基于lineage的檢查點(diǎn)策略也能夠較好地設(shè)置失效恢復(fù)所需檢查點(diǎn).并且計(jì)算開銷與算法復(fù)雜度和輸入數(shù)據(jù)的大小相關(guān),當(dāng)作業(yè)一定時(shí)輸入數(shù)據(jù)越大,算法開銷越大,在此粗略地認(rèn)為輸入數(shù)據(jù)越大,相應(yīng)RDD的計(jì)算開銷也會(huì)相應(yīng)增加,同時(shí)增加磁盤讀寫的開銷.對(duì)應(yīng)不同的作業(yè),節(jié)點(diǎn)的處理速度也不同.

      4.3 檢查點(diǎn)設(shè)置算法

      為了進(jìn)一步對(duì)比和分析CCM策略,我們選用PageRank進(jìn)行性能測(cè)試、評(píng)價(jià)與比較.實(shí)驗(yàn)數(shù)據(jù)選用SNAP[25]提供的有向圖數(shù)據(jù)集,數(shù)據(jù)集列表如表3所示.

      利用節(jié)點(diǎn)和連接數(shù)差異較大的2個(gè)數(shù)據(jù)集Web-Google和Wiki-Talk分別迭代1~10次對(duì)該算法性能驗(yàn)證,并使用nmon監(jiān)測(cè)執(zhí)行時(shí)間和任務(wù)檢查點(diǎn)的大小.PageRank任務(wù)在多個(gè)數(shù)據(jù)集上執(zhí)行,使用PageRank有2方面的原因:1)PageRank主要用于有向圖的計(jì)算,是一個(gè)典型的迭代計(jì)算算法;2)PageRank是計(jì)算密集型算法,因此對(duì)檢查點(diǎn)系統(tǒng)更敏感,更利于驗(yàn)證算法.對(duì)于數(shù)據(jù)密集型任務(wù)而言,代價(jià)較高,因?yàn)樾柙趲掃h(yuǎn)低于內(nèi)存的集群網(wǎng)絡(luò)間拷貝大量的數(shù)據(jù),同時(shí)也將產(chǎn)生大量的存儲(chǔ)開銷.

      Table 3 Information of Datasets表3 測(cè)試數(shù)據(jù)集列表

      圖6表示與現(xiàn)有Spark內(nèi)存系統(tǒng)使用不同策略的檢查點(diǎn)設(shè)置算法的執(zhí)行效率進(jìn)行對(duì)比的情況.其中Web-Google和Wiki-Talk帶下標(biāo)Tachyon,experience,CCM分別代表:1)Tachyon設(shè)置檢查點(diǎn)的策略,即僅考慮RDD的深度,即當(dāng)選擇檢查點(diǎn)時(shí),選擇最新生成的RDD進(jìn)行持久化存儲(chǔ);2)Spark程序員選取檢查點(diǎn)的經(jīng)驗(yàn),僅考慮RDD的操作復(fù)雜度,即當(dāng)設(shè)置檢查點(diǎn)時(shí)選擇寬依賴的RDD進(jìn)行持久化存儲(chǔ);3)采用本文提出的基于關(guān)鍵度的檢查點(diǎn)設(shè)置算法,即當(dāng)選擇檢查點(diǎn)時(shí)選擇最新生成的RDD中關(guān)鍵度最大的進(jìn)行持久化存儲(chǔ).

      Fig. 6 Efficiency of checkpoint algorithm with different parameters圖6 不同策略下檢查點(diǎn)設(shè)置算法的執(zhí)行效率

      由圖6可知,對(duì)比不同數(shù)據(jù)集,Wiki-Talk具有較大的連接數(shù)和節(jié)點(diǎn)數(shù),計(jì)算代價(jià)較高,因此無(wú)論采取什么樣的參數(shù)設(shè)置,都使其具有較大的檢查點(diǎn)大小,因此執(zhí)行時(shí)間較長(zhǎng),對(duì)應(yīng)檢查點(diǎn)存儲(chǔ)設(shè)置的平均時(shí)間開銷也隨之增加,并且迭代次數(shù)的增加對(duì)檢查點(diǎn)平均時(shí)間開銷的影響較小.由于全局檢查點(diǎn)備選列表是在計(jì)算之前生成,因此對(duì)任務(wù)實(shí)際時(shí)間影響較小.隨著任務(wù)和輸入數(shù)據(jù)的規(guī)模不同,設(shè)置時(shí)間不同.檢查點(diǎn)選擇的大小,平均時(shí)間與所選的數(shù)據(jù)集有關(guān).

      而對(duì)比不同的參數(shù)情況,與其他情況相比,僅考慮RDD計(jì)算代價(jià)的情況下設(shè)置的檢查點(diǎn)時(shí)間開銷更大,因?yàn)橛?jì)算代價(jià)與RDD的大小和復(fù)雜度都相關(guān),因此具有更大的檢查點(diǎn)大小.雖然可以用于宕機(jī)后RDD的恢復(fù),但對(duì)比很長(zhǎng)血統(tǒng)的RDD來(lái)說(shuō)這樣的恢復(fù)耗時(shí)較長(zhǎng).因此,將某些RDD進(jìn)行檢查點(diǎn)操作保存在穩(wěn)定存儲(chǔ)上是有幫助的.通常情況下,對(duì)于包含寬依賴的長(zhǎng)血統(tǒng)的RDD設(shè)置檢查點(diǎn)操作是非常有用的.比如PageRank算法中的排名數(shù)據(jù)集.在這種情況下,集群中某個(gè)節(jié)點(diǎn)的故障會(huì)使從各個(gè)父RDD得出某些數(shù)據(jù)丟失,這時(shí)候就需要重算.相反,對(duì)于那些窄依賴與穩(wěn)定存儲(chǔ)上數(shù)據(jù)的父RDD來(lái)說(shuō),對(duì)其進(jìn)行檢查點(diǎn)操作就不是必要的.

      4.4 檢查點(diǎn)恢復(fù)算法

      通過(guò)實(shí)驗(yàn)在節(jié)點(diǎn)的失效率(failure rate,fr)分別取值為0.125,0.25,0.375的情況下,對(duì)比Web-Google和Wiki-Talk數(shù)據(jù)集得到PageRank在不同恢復(fù)算法下的執(zhí)行時(shí)間和恢復(fù)情況.

      由圖7可知,對(duì)比失效率不同的情況,隨著失效節(jié)點(diǎn)個(gè)數(shù)的增加,任務(wù)執(zhí)行時(shí)間也隨之增加.這是由于失效率越大,意味著失效的節(jié)點(diǎn)越多,因此要恢復(fù)的RDD越多,需要重新計(jì)算產(chǎn)生相應(yīng)的時(shí)間開銷.對(duì)比不同數(shù)據(jù)集的情況,Wiki-Talk和Web-Google在不同算法下Wiki-Talk的時(shí)間開銷差距較大,這是由于計(jì)算量大小的區(qū)別.

      Fig. 7 Execution time of different datasets with different fr圖7 不同失效率時(shí)不同數(shù)據(jù)集的執(zhí)行時(shí)間對(duì)比

      對(duì)迭代次數(shù)1~10進(jìn)行對(duì)比,由于未設(shè)置檢查點(diǎn)的Spark原生系統(tǒng)需要利用血統(tǒng)進(jìn)行恢復(fù)丟失的RDD,因此當(dāng)?shù)螖?shù)增加時(shí)時(shí)間開銷隨之增加.在節(jié)點(diǎn)失效時(shí),同時(shí)會(huì)使運(yùn)行在該節(jié)點(diǎn)的任務(wù)執(zhí)行失敗,同時(shí)丟失存儲(chǔ)在其內(nèi)存上的已生成RDD分區(qū).在失效恢復(fù)時(shí),Spark為了對(duì)丟失的RDD分區(qū)進(jìn)行恢復(fù),利用集群中其他主機(jī)重新執(zhí)行失效任務(wù).這些任務(wù)需要將輸入數(shù)據(jù)重新讀取,并利用血統(tǒng)進(jìn)行RDD重建.隨著作業(yè)迭代次數(shù)的增加,血統(tǒng)變長(zhǎng),此時(shí)需要計(jì)算RDD的時(shí)間越長(zhǎng),則恢復(fù)開銷越大.而失效恢復(fù)策略在恢復(fù)時(shí)間開銷方面沒(méi)有顯著的增加,因?yàn)樵摬呗岳脵z查點(diǎn)設(shè)置算法設(shè)置了檢查點(diǎn),可以通過(guò)從HDFS中讀取檢查點(diǎn)進(jìn)行恢復(fù),從而減少RDD的重復(fù)計(jì)算,降低恢復(fù)時(shí)間開銷.

      4.5 檢查點(diǎn)清理算法

      定義6. 有效空間利用率.在群集工作結(jié)點(diǎn)的備份所有檢查點(diǎn)RDD之中,對(duì)恢復(fù)執(zhí)行效率有加速作用的容量之和占檢查點(diǎn)總?cè)萘康谋嚷?

      檢查點(diǎn)清理(checkpoint cleaning, CC)算法是檢查點(diǎn)選擇算法的后續(xù)同步操作,同樣在磁盤趨近滿載時(shí)才能體現(xiàn)效能.圖8為對(duì)比Web-Google和Wiki-Talk數(shù)據(jù)集在使用檢查點(diǎn)清理算法策略時(shí)和未優(yōu)化時(shí)的有效空間利用率.如圖8所示,傳統(tǒng)Spark框架隨著并發(fā)應(yīng)用數(shù)的增加,有效磁盤利用率的惡化程度也越來(lái)越高;而檢查點(diǎn)清理算法的有效磁盤利用率則較為穩(wěn)定.對(duì)比來(lái)看,采用檢查點(diǎn)清理算法的Spark框架,其有效磁盤利用率普遍高于傳統(tǒng)Spark框架.根據(jù)并行任務(wù)的特性,當(dāng)其父RDD的所有子RDD都被設(shè)置檢查點(diǎn)時(shí),則該父RDD在恢復(fù)時(shí)不會(huì)被使用到,因此存儲(chǔ)該RDD對(duì)作業(yè)恢復(fù)效率沒(méi)有影響.而檢查點(diǎn)清理算法在工作節(jié)點(diǎn)的某個(gè)RDD需要清理時(shí),通知群集的其他工作節(jié)點(diǎn)清除該RDD的其他副本,因此能在不影響任務(wù)恢復(fù)效率的前提下提高了群集磁盤空間的有效利用率.

      Fig. 8 Valuable capacity rate of CC圖8 檢查點(diǎn)清理算法的有效空間利用率

      綜合比較表2以及圖3~8,在不同數(shù)據(jù)集、策略的情況下,對(duì)比任務(wù)的執(zhí)行時(shí)間和恢復(fù)時(shí)間可知,在提出失效恢復(fù)策略中,檢查點(diǎn)策略會(huì)增加少量的時(shí)間開銷,然而對(duì)比傳統(tǒng)Spark策略中,由程序員選擇檢查點(diǎn)的不確定因素甚至是異常風(fēng)險(xiǎn),這些額外時(shí)間和空間開銷是有價(jià)值的.失效恢復(fù)算法基于檢查點(diǎn)設(shè)置算法,在設(shè)置檢查點(diǎn)時(shí)考慮RDD的血統(tǒng)長(zhǎng)度、計(jì)算代價(jià)、操作復(fù)雜度和容量等因素.權(quán)重越大的RDD,重新計(jì)算的恢復(fù)成本也越高,優(yōu)先將這些恢復(fù)成本較高的RDD設(shè)置為檢查點(diǎn),可以降低任務(wù)整體的重新計(jì)算代價(jià).在執(zhí)行恢復(fù)算法時(shí),選擇計(jì)算能力強(qiáng)的節(jié)點(diǎn)進(jìn)行恢復(fù),可以對(duì)任務(wù)的恢復(fù)效率進(jìn)行有效的提高.

      5 總結(jié)與展望

      本文針對(duì)內(nèi)存計(jì)算框架Spark的失效恢復(fù)問(wèn)題,首先對(duì)內(nèi)存計(jì)算框架的任務(wù)執(zhí)行機(jī)制進(jìn)行分析,建立執(zhí)行模型和檢查點(diǎn)模型.通過(guò)分析檢查點(diǎn)恢復(fù)過(guò)程,給出了RDD關(guān)鍵度和失效恢復(fù)比的定義,并證明這些定義與任務(wù)恢復(fù)效率的關(guān)系,為算法設(shè)計(jì)提供基礎(chǔ)模型.在相關(guān)模型定義和證明的基礎(chǔ)上,提出了基于RDD關(guān)鍵度的檢查點(diǎn)管理策略問(wèn)題定義,以此作為算法設(shè)計(jì)的主要依據(jù).通過(guò)算法的問(wèn)題定義求解,計(jì)算RDD關(guān)鍵度,設(shè)計(jì)了檢查點(diǎn)設(shè)置算法、恢復(fù)算法和并行清理算法.最后,通過(guò)不同的實(shí)驗(yàn)證明算法的有效性,實(shí)驗(yàn)結(jié)果表明,該策略優(yōu)化了內(nèi)存計(jì)算框架的檢查點(diǎn)管理,提高了作業(yè)的恢復(fù)效率.

      未來(lái)工作主要集中在3個(gè)方面:

      1) 分析內(nèi)存計(jì)算框架不同類型操作資源需求的一般規(guī)律,設(shè)計(jì)適應(yīng)作業(yè)負(fù)載和類型的檢查點(diǎn)策略;

      2) 對(duì)內(nèi)存計(jì)算框架的內(nèi)存管理進(jìn)行研究,優(yōu)化Executor現(xiàn)有的任務(wù)內(nèi)存分配策略;

      3) 通過(guò)分析計(jì)算節(jié)點(diǎn)性能和作業(yè)DAG圖,利用樣本和歷史記錄執(zhí)行預(yù)測(cè)RDD和作業(yè)的計(jì)算開銷,從而協(xié)助資源分配和并行任務(wù)調(diào)度方案做決策.

      [1]Walker S J. Big data: A revolution that will transform how we live, work, and think[J]. International Journal of Advertising, 2014, 17(1): 181-183

      [2] Meng Xiaofeng, Ci Xiang. Big data management: Concepts, techniques and challenges[J]. Journal of Computer Research and Development, 2013, 50(1): 146-169 (in Chinese)(孟小峰, 慈祥. 大數(shù)據(jù)管理: 概念、技術(shù)與挑戰(zhàn)[J]. 計(jì)算機(jī)研究與發(fā)展, 2013, 50(1): 146-169)

      [3] Chen C P, Zhang Chunyang. Data-intensive applications, challenges, techniques and technologies: A survey on big data[J]. Information Sciences, 2014, 275(11): 314-347

      [4] Kambatla K, Kollias G, Kumar V, et al. Trends in big data analytics[J]. Journal of Parallel and Distributed Computing, 2014, 74(7): 2561-2573

      [5] Zaharia M, Chowdhury M, Das T, et al. Fast and interactive analytics over Hadoop data with Spark[J]. ;Login:, 2012, 37(4): 45-51

      [6] Apache. Spark overview[EB/OL]. 2011 [2016-03-18]. http://spark.apache.org

      [7] Zaharia M, Chowdhury M, Das T, et al. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing[C] //Proc of the 9th USENIX Conf on Networked Systems Design and Implementation. Berkeley, CA: USENIX Association, 2012

      [8] SAP. HANA overview[EB/OL]. 2011 [2016-09-21]. http://hana.sap.com/abouthana.html

      [9] Dean J, Ghemawat S. MapReduce: Simplified data processing on large clusters[C] //Proc of the 6th Symp on Operating System Design and Implementation (OSDI). New York: ACM, 2004: 137-150

      [10] Apache. Spark machine learning library (MLlib)[EB/OL]. 2012 [2016-03-18]. http://spark.incubator.apache.org/docs/latest/mllib-guide.html

      [11] Lin Xiuqin, Wang Peng, Wu Bin. Log analysis in cloud computing environment with Hadoop and Spark[C] //Proc of the 5th IEEE Int Conf on Broadband Network & Multimedia Technology (IC-BNMT). Piscataway, NJ: IEEE, 2013: 273-276

      [12] Dong Xiangyu, Xie Yuan, Muralimanohar N, et al. Hybrid checkpointing using emerging nonvolatile memories for future exascale system[J]. ACM Trans on Architecture and Code Optimization, 2011, 8(2): Article No.6

      [13] Dimitriou I. A retrial queue for modeling fault-tolerant systems with checkpointing and rollback recovery[J]. Computers & Industrial Engineering, 2015, 79: 156-167

      [14] Ifeanyi P E, David L, Bran S, et al. A survey of fault tolerance mechanisms and checkpoint/restart implementations for high performance computing systems[J]. Journal of Supercomputing, 2013, 65(3): 1302-1326

      [15] Zhou Enqiang, Lu Yutong, Shen Zhiyu. Implementation of checkpoint system toward large scale parallel computing[J]. Journal of Computer Research and Development, 2005, 42(6): 987-992 (in Chinese)(周恩強(qiáng), 盧宇彤, 沈志宇. 一個(gè)適合大規(guī)模集群并行計(jì)算的檢查點(diǎn)系統(tǒng)[J]. 計(jì)算機(jī)研究與發(fā)展, 2005, 42(6): 987-992)

      [16] Yi Huizhan, Wang Feng, Zuo Ke, et al. Asynchronous checkpoint/restart based on memory buffer[J]. Journal of Computer Research and Development, 2014, 51(6): 1229-1239 (in Chinese)(易會(huì)戰(zhàn), 王鋒, 左克, 等. 基于內(nèi)存緩存的異步檢查點(diǎn)容錯(cuò)技術(shù)[J].計(jì)算機(jī)研究與發(fā)展, 2014, 51(6): 1229-1239)

      [17] Wan Hu, Xu Yuanchao, Yan Junfeng, et al. Mitigating log cost through non-volatile memory and checkpoint optimization[J]. Journal of Computer Research and Development, 2015, 52(6): 1351-1361 (in Chinese)(萬(wàn)虎, 徐遠(yuǎn)超, 閆俊峰, 等. 通過(guò)非易失存儲(chǔ)和檢查點(diǎn)優(yōu)化緩解日志開銷[J]. 計(jì)算機(jī)研究與發(fā)展, 2015, 52(6): 1351-1361)

      [18] Cores I, Rodríguez G, Martín M J, et al. In-memory application-level checkpoint-based migration for MPI programs[J]. Journal of Supercomputing, 2014, 70(2): 660-670

      [19] Cao T, Vaz S M, Sowell B, et al. Fast checkpoint recovery algorithms for frequently consistent applications[C] //Proc of the 2011 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2011: 265-276

      [20] Chardonnens T, Cudre-Mauroux P, Grund M, et al. Big data analytics on high velocity streams: A case study[C] //Proc of 2013 IEEE Int Conf on Big Data. Piscataway, NJ: IEEE, 2013: 784-787

      [21] Neumeyer L, Robbins B, Nair A, et al. S4: Distributed stream computing platform[C] //Proc of the 10th IEEE Int Conf on Data Mining Workshops (ICDMW 2010). Piscataway, NJ: IEEE, 2010: 170-177

      [22] Li Haoyuan, Ghodsi A, Zaharia M, et al. Tachyon: Memory throughput I/O for cluster computing frameworks[C/OL]. 2013 [2016-03-18]. https://people.eecs.berkeley.edu/~alig/papers/tachyon-workshop.pdf

      [23] Li Haoyuan, Ghodsi A, Zaharia M, et al. Tachyon: Reliable, memory speed storage for cluster computing frameworks[C] //Proc of the 2014 ACM Symp on Cloud Computing. New York: ACM, 2014

      [24] Ongaro D, Rumble S M, Stutsman R, et al. Fast crash recovery in RAMCloud[C] //Proc of the 23rd ACM Symp on Operating Systems Principles. New York: ACM, 2011: 29-41

      [25] Jure L. Stanford network analysis project[EB/OL]. 2009 [2016-03-18]. http://snap.stanford.edu

      CriticalityCheckpointManagementStrategyBasedonRDDCharacteristicsinSpark

      Ying Changtian1,2, Yu Jiong2, Bian Chen2, Wang Weiqing1,3, Lu Liang2, and Qian Yurong2

      1(PostdoctoralResearchStationofElectricalEngineering,XinjiangUniversity,Urumqi830046)2(SchoolofSoftware,XinjiangUniversity,Urumqi830008)3(SchoolofElectricalEngineering,XinjiangUniversity,Urumqi830046)

      The default fault tolerance mechanism of Spark is setting the checkpoint by programmer. When facing data loss, Spark recomputes the tasks based on the RDD lineage to recovery the data. Meanwhile, in the circumstance of complicated application with multiple iterations and large amount of input data, the recovery process may cost a lot of computation time. In addition, the recompute task only considers the data locality by default regardless the computing capabilities of nodes, which increases the length of recovery time. To reduce recovery cost, we establish and demonstrate the Spark execution model, the checkpoint model and the RDD critically model. Based on the theory, the criticality checkpoint management (CCM) strategy is proposed, which includes the checkpoint algorithm, the failure recovery algorithm and the cleaning algorithm. The checkpoint algorithm is used to analyze the RDD charactersitics and its influence on the recovery time, and selects valuable RDDs as checkpoints. The failure recovery algorithm is used to choose the appropriate nodes to recompute the lost RDDs, and cleaning algorithm cleans checkpoints when the disk space becomes insufficient. Experimental results show that: the strategy can reduce the recovery overhead efficiently, select valuable RDDs as checkpoints, and increase the efficiency of disk usage on the nodes with sacrificing the execution time slightly.

      memory computing; Spark; checkpoint management; failure recovery; RDD characteristics

      2016-09-20;

      2017-07-04

      國(guó)家自然科學(xué)基金項(xiàng)目(61262088,61462079,61363083,61562086,51667020);新疆維吾爾自治區(qū)自然科學(xué)基金項(xiàng)目(2017D01A20);新疆維吾爾自治區(qū)高??蒲杏?jì)劃(XJEDU2016S106)

      This work was supported by the National Natural Science Foundation of China (61262088, 61462079, 61363083, 61562086, 51667020), the Natural Science Foundation of Xinjiang Uygur Autonomous Region of China (2017D01A20), and the Higher Education Research Program of Xinjiang Uygur Autonomous Region (XJEDU2016S106).

      于炯(yujiong@xju.edu.cn)

      TP311

      YingChangtian, born in 1989. PhD in Xinjiang University. Student member of CCF. Her main research interests include parallel computing, distributed system, and memory computing, etc.

      YuJiong, born in 1964. Professor and PhD supervisor. Senior member of CCF. His main research interests include grid computing, parallel computing, etc.

      BianChen, born in 1981. Associate professor and PhD. Senior member of CCF. His main research interests include parallel computing, distributed system, etc.

      WangWeiqing, born in 1959. Professor and PhD supervisor. His main research interests include power system relay protection, wind power generation control and grid connection technology (wwq59@xju.edu.cn).

      LuLiang, born in 1990. PhD candidate in Xinjiang University. Student member of CCF. His main research interests include flow processing, real-time computing.

      QianYurong, born in 1981. Professor and master supervisor. Senior member of CCF. Her main research interests include data mining.

      猜你喜歡
      檢查點(diǎn)內(nèi)存分區(qū)
      Spark效用感知的檢查點(diǎn)緩存并行清理策略①
      上海實(shí)施“分區(qū)封控”
      免疫檢查點(diǎn)抑制劑相關(guān)內(nèi)分泌代謝疾病
      “春夏秋冬”的內(nèi)存
      免疫檢查點(diǎn)抑制劑在腫瘤治療中的不良反應(yīng)及毒性管理
      浪莎 分區(qū)而治
      分布式任務(wù)管理系統(tǒng)中檢查點(diǎn)的設(shè)計(jì)
      基于SAGA聚類分析的無(wú)功電壓控制分區(qū)
      基于多種群遺傳改進(jìn)FCM的無(wú)功/電壓控制分區(qū)
      基于內(nèi)存的地理信息訪問(wèn)技術(shù)
      利辛县| 微博| 巴彦县| 应城市| 汝州市| 崇左市| 巴南区| 获嘉县| 潜山县| 大姚县| 毕节市| 桃源县| 溆浦县| 三亚市| 汝南县| 聂拉木县| 盘山县| 清新县| 延安市| 成都市| 保靖县| 大方县| 平和县| 鄯善县| 永安市| 麦盖提县| 宜君县| 迁安市| 蒲江县| 西乌珠穆沁旗| 翼城县| 井研县| 永泰县| 太保市| 武宁县| 安西县| 怀远县| 喜德县| 环江| 舟山市| 镇平县|