闞京+陳彩+梁毅
摘 要:伴隨著數(shù)據(jù)的爆炸式增長(zhǎng),越來(lái)越多的大數(shù)據(jù)業(yè)務(wù)分析與處理選擇分布式計(jì)算平臺(tái)。目前針對(duì)大數(shù)據(jù)的分布式計(jì)算框架都支持DAG式的任務(wù)編排。由于大數(shù)據(jù)采集來(lái)源以及分布式存儲(chǔ)系統(tǒng)特點(diǎn),很多使用DAG框架進(jìn)行計(jì)算的應(yīng)用都是增量式的大數(shù)據(jù)集,但現(xiàn)有的DAG框架對(duì)這樣的數(shù)據(jù)集進(jìn)行計(jì)算時(shí)有許多冗余,造成計(jì)算資源浪費(fèi)。提出了在DAG框架上進(jìn)行增量式復(fù)用的方法,并針對(duì)FILTER算子特點(diǎn)提出了基于FILTER算子匹配的間接復(fù)用機(jī)制。
關(guān)鍵詞:增量計(jì)算;分布式計(jì)算;計(jì)算復(fù)用;查詢優(yōu)化;DAG計(jì)算
DOIDOI:10.11907/rjdk.171282
中圖分類號(hào):TP301
文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):1672-7800(2017)007-0026-03
0 引言
利用大數(shù)據(jù)進(jìn)行業(yè)務(wù)分析與處理越來(lái)越多,Google的Map-Reduce、Apache Hadoop MR以及Apache Spark是應(yīng)用甚廣的分布式計(jì)算框架[1-2],即便缺乏分布式系統(tǒng)經(jīng)驗(yàn)的大數(shù)據(jù)工作者,也能充分利用分布式計(jì)算所帶來(lái)的性能提升。
早期這些分布式計(jì)算框架主要用于批處理作業(yè),隨著數(shù)據(jù)量的增加以及業(yè)務(wù)需求的演進(jìn),更復(fù)雜高效的數(shù)據(jù)操作手段成為大數(shù)據(jù)分析與處理的重要需求。Pig、Hive以及Spark SQL都提供了相較于MPI及Map-Reduce等傳統(tǒng)分布式計(jì)算手段更高層次的分布式數(shù)據(jù)集接口,通過這些接口可以更輕松地對(duì)大數(shù)據(jù)集進(jìn)行分析與處理[3–5]。
大數(shù)據(jù)計(jì)算平臺(tái)的分布式存儲(chǔ)系統(tǒng),如GFS、HDFS都具備一些共同特征:數(shù)據(jù)的更新主要通過附加新數(shù)據(jù)方式完成,而這類增量式數(shù)據(jù)進(jìn)行處理時(shí),往往會(huì)產(chǎn)生較多的冗余計(jì)算[6-7]。
本文提出了在增量計(jì)算中進(jìn)行間接復(fù)用的方法,并通過解構(gòu)、分析FILTER算子方式,達(dá)成比算子級(jí)更低粒度的計(jì)算復(fù)用,從而提升算子復(fù)用機(jī)會(huì),使整體計(jì)算效率得到提升。
1 相關(guān)工作
增量計(jì)算中的復(fù)用方法有以下兩種方式:
(1)對(duì)結(jié)果集進(jìn)行局部更新,即通過針對(duì)特定的分布式作業(yè)管線設(shè)計(jì)一系列動(dòng)態(tài)算法,并依據(jù)該算法對(duì)此前的結(jié)果集進(jìn)行小規(guī)模的局部更新,從而使剩余大部分結(jié)果集得以復(fù)用。Google設(shè)計(jì)的可用來(lái)更新Page Rank的Percolator就是采用這種方法的代表[8]。該種方式主要針對(duì)特定的分布式計(jì)算任務(wù)。
(2)對(duì)作業(yè)管線中的子任務(wù)進(jìn)行緩存及復(fù)用,從而透明地重用此前的計(jì)算結(jié)果,由系統(tǒng)自動(dòng)地將可復(fù)用的子任務(wù)計(jì)算結(jié)果進(jìn)行緩存,在緩存命中的情況下,就可避免相關(guān)子任務(wù)計(jì)算。
ReStore通過分析并改寫作業(yè)的物理計(jì)劃達(dá)到復(fù)用之前計(jì)算的目的[9]。ReStore不僅可復(fù)用完整的計(jì)算作業(yè),還可復(fù)用作業(yè)中的子作業(yè),有效提升了復(fù)用發(fā)生幾率。
與Restore不同,Nectar系統(tǒng)所對(duì)應(yīng)的輸入是DAG作業(yè),而非傳統(tǒng)的MR作業(yè)[10]。Nectar在DryadInc的兩個(gè)DAG作業(yè)復(fù)用算法IDE及MER基礎(chǔ)上,闡述了整個(gè)復(fù)用體系的工作機(jī)制,并重點(diǎn)闡述了對(duì)數(shù)據(jù)中心的數(shù)據(jù)管理方法[7],在復(fù)用粒度上與Restore相似。Nectar在查詢?nèi)蝿?wù)的邏輯計(jì)劃上進(jìn)行分析與復(fù)用優(yōu)化,將計(jì)算邏輯與計(jì)算數(shù)據(jù)進(jìn)行復(fù)合并作為緩存鍵,將計(jì)算結(jié)果存儲(chǔ)于分布式文件系統(tǒng)中,并通過緩存服務(wù)器對(duì)其進(jìn)行索引。
2 基于FILTER算子匹配的間接復(fù)用
2.1 基本概念
大數(shù)據(jù)規(guī)模龐大,在以事務(wù)為中心的數(shù)據(jù)庫(kù)中難以進(jìn)行批量計(jì)算和處理,非關(guān)系式的數(shù)據(jù)模型索引建立和維護(hù)較為困難,在檢索上往往面臨性能上的挑戰(zhàn)。目前分布式計(jì)算框架是通過多節(jié)點(diǎn)的并行運(yùn)算來(lái)提升計(jì)算效能的。
增量數(shù)據(jù)往往是對(duì)事物的客觀記錄和描述,通常都是歷史信息,這類數(shù)據(jù)具有不會(huì)更新、刪除的特點(diǎn)。增量型數(shù)據(jù)可定義為:
其中,D(t)表示t時(shí)刻數(shù)據(jù)集。當(dāng)需要多次對(duì)這樣的數(shù)據(jù)集進(jìn)行非連接型操作(例如過濾、投影以及聚合操作等)時(shí),往往要付出巨大代價(jià)。為了針對(duì)增量式數(shù)據(jù)設(shè)計(jì)增量式DAG計(jì)算復(fù)用策略,首先對(duì)DAG中的算子進(jìn)行劃分。在這些算子中,可合并計(jì)算算子可以有效地進(jìn)行增量計(jì)算復(fù)用??珊喜⒂?jì)算算子定義為這樣一類函數(shù):
則稱f為可合并計(jì)算算子, merge為其合并函數(shù)。在DAG運(yùn)算中,F(xiàn)ILTER、SORT、PROJECT,AGGREGATE等算子都具備該性質(zhì)。
2.2 間接復(fù)用
在對(duì)FILTER算子進(jìn)行復(fù)用時(shí),除了在算子完全相同時(shí)直接復(fù)用計(jì)算結(jié)果外,還存在間接復(fù)用計(jì)算結(jié)果的可能性。下面介紹基于FILTER算子匹配的間接復(fù)用方法。
例:在所有人中找年齡大于30的人,若在此前已經(jīng)找過年齡大于20的人,且該結(jié)果已經(jīng)緩存,那么這部分被緩存的數(shù)據(jù)仍然可以利用,即便算子Filter(Person.Age > 30)與Filter(Person.Age > 20)并不匹配,如圖1所示。
虛線表示計(jì)算上的邏輯需求:為尋找年齡大于30的人,就必須遍歷整個(gè)Person集合,而目前已經(jīng)存在一個(gè)篩選好的年齡大于20的集合,那么在計(jì)算過程中就無(wú)需去遍歷全集,只需從大于20的人的集合進(jìn)行篩選即可,這就減少了I/O占用及實(shí)際計(jì)算規(guī)模,這種復(fù)用方式稱為間接復(fù)用。間接復(fù)用充分利用了緩存中的小規(guī)模數(shù)據(jù)集,從而使這部分被緩存的數(shù)據(jù)集可以直接復(fù)用。在相同的緩存空間下,使得緩存數(shù)據(jù)利用率得以提高。因此,從DAG的復(fù)用分析角度來(lái)講,間接復(fù)用是一種比算子級(jí)復(fù)用粒度更小的復(fù)用手段。若采用間接復(fù)用手段,除了算子要具備可合并計(jì)算性質(zhì),還要求被復(fù)用的算子及其依賴的計(jì)算流程具備更特殊的性質(zhì)——包含。這一包含關(guān)系可定義為:
對(duì)于兩個(gè)函數(shù)G1:A→B 和G2:A→B ,若對(duì)于任意的數(shù)據(jù)D∈A,若總是有:
則稱函數(shù)G1包含G2。這一包含關(guān)系實(shí)際上表示了G2的結(jié)果集總是可以由G1的結(jié)果集導(dǎo)出。
對(duì)于具備這種形式的可復(fù)用計(jì)算算子,可采用如下方式進(jìn)行增量式復(fù)用,首先若給定t1時(shí)刻的數(shù)據(jù)集I,其輸出為R1,記為:
當(dāng)G2試圖間接復(fù)用該結(jié)果時(shí),即將R1置入緩存,在t2時(shí)刻(t2≥ t1),若數(shù)據(jù)集I的增量為Δ,則新的計(jì)算結(jié)果可根據(jù)緩存中的R1和Δ導(dǎo)出:
雖然間接復(fù)用過程中對(duì)緩存數(shù)據(jù)應(yīng)用了G2算子,但是若被復(fù)用的算子具備壓縮數(shù)據(jù)規(guī)模能力,則對(duì)于占大比例的復(fù)用數(shù)據(jù)而言,G2所需處理的數(shù)據(jù)規(guī)模就大大縮小了,而這實(shí)質(zhì)上就減少了每個(gè)計(jì)算單元的任務(wù)量和I/O消耗。
基于此,為實(shí)現(xiàn)基于FILTER算子匹配的間接復(fù)用,只需找到一種算法來(lái)判斷兩個(gè)FILTER算子是否具有包含關(guān)系即可。對(duì)于給定的兩個(gè)謂詞表達(dá)式P1(x)和P2(x),若滿足:
即二者具備包含關(guān)系。對(duì)于這樣的一對(duì)FILTER算子,若算子FilterP2是新到達(dá)的DAG算子,而FilterP1是緩存了的算子,則可根據(jù)復(fù)用算法,以間接方式利用緩存中的數(shù)據(jù)。
將兩個(gè)FILTER中的謂詞表達(dá)式都轉(zhuǎn)換為等價(jià)的CNF(Conjunctive Normal Form),設(shè)一個(gè)FILTER的謂詞表達(dá)式F為CNF,則F可表示為多個(gè)簡(jiǎn)單析取式的合取,即該FILTER的謂詞表達(dá)式可表示為如下形式:
對(duì)于已經(jīng)按合取運(yùn)算符AND拆分的簡(jiǎn)單析取式,可表示為:
對(duì)于F而言,每個(gè)簡(jiǎn)單析取式所對(duì)應(yīng)的真值集的交集就是F所對(duì)應(yīng)的真值集,由此可得判定F能否包含另一個(gè)謂詞表達(dá)式F'的充分條件就是:
即對(duì)任意的Fi,都存在至少一個(gè)Fj'蘊(yùn)含F(xiàn)i,那么F′蘊(yùn)含F(xiàn),本文根據(jù)該條件對(duì)FILTER算子的包含關(guān)系進(jìn)行判定,證明如下:
對(duì)于給定的F和F',若式(9)為真,則有:
基于該判定條件,欲解決該問題,只需對(duì)兩個(gè)簡(jiǎn)單析取式的蘊(yùn)含關(guān)系進(jìn)行判定即可。
基于上述描述,判斷兩個(gè)簡(jiǎn)單析取式Fi和Fj之間是否存在蘊(yùn)含關(guān)系的算法如下:
(1)Fi和Fj的所有基本項(xiàng)中的鍵名存在不同,判定為不一定蘊(yùn)含,退出。
(2)按照表 1計(jì)算Fi∧Fj,為真時(shí)保留f',為假時(shí)用False表示,若結(jié)果不為Fj,判定為不一定蘊(yùn)含,退出。
(3)判定為Fj'蘊(yùn)含F(xiàn)i,退出。
由于算法中的(1)和(2)對(duì)計(jì)算提出了一定的條件,因此其結(jié)果均為不一定蘊(yùn)含,這意味著該算法仍然為一個(gè)充分性的判定。
通過上文給出的判斷兩個(gè)FILTER算子是否相互包含的判定算法,就可以在新的DAG計(jì)算到達(dá)且不存在直接復(fù)用條件時(shí),通過FILTER的包含關(guān)系來(lái)進(jìn)行間接復(fù)用,從而增加數(shù)據(jù)的復(fù)用機(jī)會(huì)。
3 實(shí)驗(yàn)
3.1 環(huán)境
增量條件下的DAG計(jì)算要求原始數(shù)據(jù)集是增量的,為此本文設(shè)計(jì)了一個(gè)增量數(shù)據(jù)集,該數(shù)據(jù)集模擬日志類的數(shù)據(jù),每輪實(shí)驗(yàn)結(jié)束后,為該數(shù)據(jù)增長(zhǎng)約1.7 GB的數(shù)據(jù)。實(shí)驗(yàn)使用3個(gè)節(jié)點(diǎn),各節(jié)點(diǎn)配置如表 2所示。
實(shí)驗(yàn)平臺(tái)為Spark SQL,緩存使用Spark SQL默認(rèn)的緩存管理器。
3.2 結(jié)果與分析
實(shí)驗(yàn)在開始階段,采樣密度為每次增量都采集一次實(shí)驗(yàn)數(shù)據(jù),在后半段維持增量與計(jì)算不變,僅減少采樣頻率。
DAG中的FILTER在本試驗(yàn)中以隨機(jī)方式生成,對(duì)全部的實(shí)驗(yàn)負(fù)載,覆蓋100%數(shù)據(jù)集計(jì)算,66%的實(shí)驗(yàn)負(fù)載,覆蓋10%的數(shù)據(jù)集計(jì)算,這樣的負(fù)載模擬了實(shí)際負(fù)載中熱點(diǎn)數(shù)據(jù)被更多關(guān)注的特點(diǎn)。實(shí)驗(yàn)對(duì)直接復(fù)用方法和加入FILTER算子匹配的間接復(fù)用方法的增量計(jì)算進(jìn)行對(duì)比測(cè)評(píng),結(jié)果如圖 2所示。
通過使用基于FILTER算子識(shí)別的間接復(fù)用,相較于直接復(fù)用方法,時(shí)間開銷平均降低84.91%,具體復(fù)用情況見表3。
由此可見,引入基于FILTER算子識(shí)別的間接復(fù)用可以大幅度提升緩存匹配的成功率,提升緩存利用率及系統(tǒng)整體運(yùn)行效率。
4 結(jié)語(yǔ)
本文提出了基于FILTER算子匹配的增量式DAG計(jì)算復(fù)用方法,給出了通過識(shí)別FILTER算子的包含關(guān)系來(lái)達(dá)成對(duì)FILTER算子更細(xì)粒度的增量計(jì)算復(fù)用手段,并通過實(shí)驗(yàn)驗(yàn)證了該復(fù)用方法可提升增量計(jì)算復(fù)用中緩存被命中的機(jī)會(huì),進(jìn)而提高計(jì)算平臺(tái)整體運(yùn)行性能。
參考文獻(xiàn):
[1] DEAN J,GHEMAWAT S.MapReduce:simplified data processing on large clusters[J].In Proceedings of Operating Systems Design and Implementation,2004,51(1):107-113.
[2] ZAHARIA M,CHOWDHURY M,DAS T,et al.Resilient distributed datasets:a fault-tolerant abstraction for in-memory cluster computing[J].In-Memory Cluster Computing.USENIX Symposium on Networked Systems Design and Implementation,2012,70(2):141-146.
[3] ARMBRUST M,XIN R S,LIAN C,et al.Spark SQL:relational data processing in Spark[M].ACM,2015:1383-1394.
[4] OLSTON C,REED B,SRIVASTAVA U,et al.PigLatin:a not-so-foreign language for data processing[J].Science China Information Sciences,2008(1):1099-1110.
[5] THUSOO A,SARMA J S,JAIN N,et al.Hive:a warehousing solution over a map-reduce framework[J].Proceedings of the Vldb Endowment,2009,2(2):1626-1629.
[6] GHEMAWAT S,GOBIOFF H,LEUNG S T.The Google file system[J].ACM Press,2003(5):29-43.
[7] POPA L,BUDIU M,YU Y,et al.DryadInc:reusing work in large-scale computations[EB/OL].https://link.springer.com/article/10.1007%2Fs13222-012-0109-3.
[8] PENG D,DABEK F.Large-scale incremental processing using distributed transactions and notifications[EB/OL].https://www.hanspub.org/reference/ReferencePapers.aspx?PaperID=9501&ReferenceID=23068.
[9] ELGHANDOUR I,ABOULNAGA A.Restore:reusing results of MapReduce jobs[J].Proceedings of the VLDB Endowment,2012,5(6):586-597.
[10] GUNDA P K,RAVINDRANATH L,THEKKATH C A,et al.Nectar:automatic management of data and computation in datacenters[EB/OL].http://www.doc88.com/p-3149057701380.html.