潘兆平 張建軍 魏志強(qiáng)
摘要:本文介紹了分布式流數(shù)據(jù)的在線匯聚合與統(tǒng)計(jì)的方法,該方法采用在分布式隨機(jī)采樣算法的基礎(chǔ)上增加了一個(gè)權(quán)重的概念,它可以從分布式流數(shù)據(jù)中進(jìn)行隨機(jī)采樣。該方法把多個(gè)在線查詢?nèi)蝿?wù)分解成一個(gè)多層次處理單元集合,每個(gè)處理單元負(fù)責(zé)一個(gè)時(shí)段的數(shù)據(jù)查詢,這些處理單元能夠并行處理,在并行處理過(guò)程中,流數(shù)據(jù)以事件方式封裝打包,通過(guò)處理單元之間的相互配合完成整個(gè)查詢?nèi)蝿?wù)。在多層次查詢過(guò)程中,處理單元能將一些重復(fù)性的計(jì)算進(jìn)行合并,這樣就避免重復(fù)計(jì)算帶來(lái)的消耗,提高查詢語(yǔ)句的執(zhí)行效率。
關(guān)鍵詞:匯聚與統(tǒng)計(jì);分布式;流數(shù)據(jù);隨機(jī)采樣
中圖分類(lèi)號(hào):TP311.13 文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):1007-9416(2018)09-0140-04
1 引言
過(guò)去,用戶從一個(gè)大型數(shù)據(jù)庫(kù)在線查詢數(shù)據(jù)時(shí),需等待系統(tǒng)遍歷完所有數(shù)據(jù)后才能得到最終查詢結(jié)果,處理過(guò)程中沒(méi)有任何反饋信息。
在查詢過(guò)程中,為了實(shí)時(shí)地獲得數(shù)據(jù)查詢的中間狀態(tài),可以采取降低數(shù)據(jù)的準(zhǔn)確性的策略來(lái)實(shí)現(xiàn)查詢信息的及時(shí)反饋。在線匯聚與統(tǒng)計(jì)方法就是連續(xù)為用戶提供近似查詢結(jié)果,同時(shí)輸出相應(yīng)的統(tǒng)計(jì)結(jié)果,而不是等待處理完所有數(shù)據(jù)后再給出最終結(jié)果;隨著在線處理的數(shù)據(jù)越來(lái)越多,匯聚與統(tǒng)計(jì)結(jié)果持續(xù)更新,直到結(jié)果能夠滿足用戶需求為止。
2 實(shí)現(xiàn)方法
在線匯聚與統(tǒng)計(jì)能連續(xù)地提供實(shí)時(shí)匯聚與統(tǒng)計(jì)分析結(jié)果,本文采用的流數(shù)據(jù)處理方法具有在線匯聚能力,當(dāng)在線接收到一組數(shù)據(jù)時(shí),系統(tǒng)能快速地處理這組數(shù)據(jù),并將匯聚與統(tǒng)計(jì)結(jié)果以異步的方式發(fā)送給用戶。文獻(xiàn)[1]提出,流數(shù)據(jù)操作有兩種:無(wú)狀態(tài)操作和有狀態(tài)操作。無(wú)狀態(tài)操作(如映射、并集等)根據(jù)輸入數(shù)據(jù)進(jìn)行處理,而后把結(jié)果發(fā)送給下一個(gè)操作即可,是不需要保留中間計(jì)算結(jié)果。而有狀態(tài)操作(如合并,笛卡爾乘積)一般采用滑動(dòng)窗口(可以是固定時(shí)間窗口,也可以是固定個(gè)數(shù)窗口)方法進(jìn)行處理,需要一串?dāng)?shù)據(jù)同時(shí)操作。
為了支持狀態(tài)操作,本文采用滑動(dòng)窗口方式進(jìn)行在線匯聚,通過(guò)時(shí)間窗口與順序窗口兩個(gè)條件同時(shí)觸發(fā)狀態(tài)匯聚操作。當(dāng)在線接收到流數(shù)據(jù)時(shí),系統(tǒng)將連續(xù)處理,并輸出中間結(jié)果。如圖1,系統(tǒng)包含采樣、映射、匯聚 和統(tǒng)計(jì)等處理過(guò)程。
2.1 分布式流數(shù)據(jù)的隨機(jī)采樣
文獻(xiàn)[2]指出,蓄水池采樣算法可以從N個(gè)流數(shù)據(jù)中隨機(jī)采樣,其中N是一個(gè)很大的數(shù),這些數(shù)通常不保存在內(nèi)存中。在初始狀態(tài)下,該算法有s個(gè)采樣數(shù)據(jù),當(dāng)?shù)趇個(gè)數(shù)據(jù)到達(dá)時(shí)(其中i>s),有s/i的概率選擇這個(gè)新數(shù)據(jù)。具體做法是:從數(shù)據(jù)[1,i]中隨機(jī)得到一個(gè)值j,如果j≤s,把第i個(gè)數(shù)據(jù)替換掉內(nèi)存中的第j個(gè)數(shù)據(jù),否則丟棄這個(gè)數(shù)(第i個(gè)數(shù))。這個(gè)算法只需要O(s)個(gè)空間和O(1)的采樣時(shí)間。
但是,蓄水池采樣算法無(wú)法從分布式流數(shù)據(jù)中進(jìn)行隨機(jī)采樣,原因有兩個(gè):第一個(gè)是不同流數(shù)據(jù)可能采用不同的數(shù)據(jù)分布(文獻(xiàn)[3]),如一個(gè)采用伯努利分布,另一個(gè)采用吉布斯。第二個(gè)是來(lái)自不同流數(shù)據(jù)有不同的權(quán)重,如網(wǎng)絡(luò)監(jiān)控和網(wǎng)頁(yè)服務(wù)等。
為了解決上述問(wèn)題,分布式流數(shù)據(jù)采樣算法引入了一個(gè)權(quán)重的概念,該算法不僅能從不同的流數(shù)據(jù)中進(jìn)行隨機(jī)采樣,而且這些流數(shù)據(jù)還帶有不同的權(quán)重。分布式流數(shù)據(jù)采樣算法拓?fù)浣Y(jié)構(gòu)見(jiàn)圖2,圖中有m個(gè)數(shù)據(jù)流,有m個(gè)本地采集器,每個(gè)數(shù)據(jù)流含有n個(gè)數(shù)據(jù)。本地采集器Pi對(duì)流數(shù)據(jù)Si進(jìn)行采樣,并把采樣結(jié)果發(fā)送給M(M可以是一個(gè)有狀態(tài)操作,也可以是一個(gè)無(wú)狀態(tài)操作),每個(gè)采樣結(jié)果都是帶有權(quán)重的。
圖2中,一個(gè)流數(shù)據(jù)端Si將以Pi的概率選擇接收到的數(shù)據(jù)e(該數(shù)權(quán)重為wi),并把數(shù)據(jù)e發(fā)送給M。M中維護(hù)了一個(gè)數(shù)據(jù)候選集V(匯集了多個(gè)流數(shù)據(jù)的采樣結(jié)果)。對(duì)于任何一個(gè)流數(shù)據(jù)vi(vi∈V),分配一個(gè)鍵值ki(ki=ui1/wi,ui是一個(gè)從0到1范圍內(nèi)隨機(jī)的一個(gè)值),M總是從V中選擇s個(gè)數(shù)據(jù)(其鍵值ki最大)作為最終采樣數(shù)據(jù)。由于ui是個(gè)隨機(jī)數(shù),因此,ki就是個(gè)隨機(jī)值,在加上s個(gè)數(shù)據(jù)是隨機(jī)選擇的,因此這s個(gè)數(shù)據(jù)是來(lái)自Si的隨機(jī)采樣結(jié)果。
2.2 分類(lèi)匯聚處理方法
在線匯聚系統(tǒng)框圖中,“映射器”負(fù)責(zé)對(duì)數(shù)據(jù)進(jìn)行分類(lèi),將數(shù)據(jù)分配到不同的匯聚器上。匯聚器對(duì)數(shù)據(jù)進(jìn)行過(guò)濾與篩選,而后將某個(gè)特定時(shí)間段的數(shù)據(jù)進(jìn)行“合并”操作,最后把結(jié)果數(shù)據(jù)發(fā)送給統(tǒng)計(jì)器作統(tǒng)計(jì);通過(guò)統(tǒng)計(jì)計(jì)算得出近似查詢結(jié)果。
2.2.1 數(shù)據(jù)映射
由于來(lái)自不同數(shù)據(jù)流的數(shù)據(jù)格式可能不一樣,而且存在無(wú)效值,因此,在數(shù)據(jù)進(jìn)映射前,需要對(duì)采樣到的流數(shù)據(jù)進(jìn)行清洗(即統(tǒng)一格式),將每個(gè)數(shù)據(jù)轉(zhuǎn)化成
2.2.2 流數(shù)據(jù)匯聚
在匯聚器中有許多處理單元(簡(jiǎn)稱:PE),這些PE間相互獨(dú)立,可以并行處理數(shù)據(jù);每個(gè)PE中的數(shù)據(jù)具有相同的“key”,PE對(duì)收到的數(shù)據(jù)進(jìn)行過(guò)濾與篩選,其方法與數(shù)據(jù)庫(kù)中的行過(guò)濾與列過(guò)濾相同,這樣可以減少匯聚的計(jì)算成本與數(shù)據(jù)通信成本。SQL查詢語(yǔ)句參見(jiàn)圖3。
分布式流數(shù)據(jù)匯聚操作見(jiàn)圖4,每個(gè)PE中有兩個(gè)列表(流A和流B),它們之間通過(guò)key值進(jìn)行匯聚操作得到相關(guān)結(jié)果。給定一個(gè)key,PE將從表流A和流B中檢索相關(guān)數(shù)據(jù),并對(duì)相關(guān)數(shù)據(jù)進(jìn)行匹配與連接。流A和B的數(shù)據(jù)經(jīng)過(guò)采樣后存儲(chǔ)在內(nèi)存中,系統(tǒng)在一個(gè)時(shí)間窗口內(nèi)自動(dòng)執(zhí)行合并任務(wù),當(dāng)一個(gè)時(shí)間窗口的合并操作結(jié)束后(當(dāng)前窗口的數(shù)據(jù)從內(nèi)存中釋放掉,并等待下一個(gè)時(shí)間窗口的運(yùn)行),將通知匯聚開(kāi)始這個(gè)窗口的數(shù)據(jù)匯聚。
在實(shí)際中,流數(shù)據(jù)往往存在不同步和無(wú)序的特性,這樣的數(shù)據(jù)流可能導(dǎo)致時(shí)間窗口不完整,因此,檢測(cè)一個(gè)時(shí)間窗口是否結(jié)束就變得十分復(fù)雜。在這里將采用標(biāo)記方式來(lái)指示一個(gè)時(shí)間窗口合并操作是否結(jié)束,當(dāng)匯聚操作單元接收到所有標(biāo)記通知時(shí),才開(kāi)始這個(gè)窗口的匯聚操作和更新?tīng)顟B(tài)。
由于內(nèi)存有限,本方案只把部分PE常駐于內(nèi)存中,如果某個(gè)PE已經(jīng)完成任務(wù),則移除對(duì)應(yīng)的PE,釋放出相應(yīng)的內(nèi)存,但是建立與移除PE時(shí)耗費(fèi)了一定的時(shí)間和計(jì)算能力,因此,為了提高機(jī)器的計(jì)算能力、減少不必要的消耗,本文采用計(jì)算單元重用的方法:當(dāng)內(nèi)存不足而有其它PE空閑時(shí),系統(tǒng)將自動(dòng)使用這些空閑PE;當(dāng)內(nèi)存足夠而所有的PE都在忙時(shí),那么將為該任務(wù)重新創(chuàng)建一個(gè)PE。
3 統(tǒng)計(jì)分析
匯聚操作結(jié)束后,系統(tǒng)對(duì)匯聚結(jié)果進(jìn)行統(tǒng)計(jì)評(píng)估,計(jì)算其置信區(qū)間與誤差范圍;根據(jù)單表匯聚操作與多表匯聚操作的不同,其計(jì)算置信區(qū)間與誤差范圍時(shí)的方法也有一些區(qū)別。
3.1 單表匯聚查詢
單表匯聚查詢形式可以簡(jiǎn)單表示如下:
SELECT opt(Exp(xi)) FROM A.a
流數(shù)據(jù)匯聚操作采用了一個(gè)基于時(shí)間窗口的策略:若在第i個(gè)時(shí)間窗口(τ=Ti-Ti-1)PE存有m個(gè)樣本數(shù)據(jù)(x1,x2,...,xm),那么,匯聚時(shí)該P(yáng)E需要計(jì)算四個(gè)值(Ni是第i個(gè)時(shí)間窗口的數(shù)據(jù)個(gè)數(shù),是第i個(gè)時(shí)間窗口中所有數(shù)據(jù)的平均數(shù),Sumi是第i個(gè)窗口所有數(shù)據(jù)的總和,σi2是第i個(gè)時(shí)間窗口中數(shù)據(jù)的方差值),求解公式如下:
根據(jù)公式3-4,如果給定一個(gè)置信度p,可以計(jì)算出∈的值,得到誤差范圍為[λ-∈,λ+∈];或者給定一個(gè)誤差范圍∈,也可以同樣計(jì)算出這個(gè)范圍內(nèi)置信度的值p。
3.2 多表匯聚查詢
多表匯聚查詢形式可以表示如下:
SELECT opt(Exp(xi,xj))FROM A,B
WHERE A.a=B.b
要想計(jì)算多表匯聚查詢的置信區(qū)間和誤差范圍,需要對(duì)每個(gè)表的方差值進(jìn)行分別計(jì)算,根據(jù)文獻(xiàn)[4]中的表述,多表匯聚查詢結(jié)果的計(jì)算公式如下:
其中σA和σB是表A和B的近似方差值,因此,一個(gè)基于時(shí)間窗口的匯聚結(jié)果可以輸出為:(Ti,Ti-1,Ni,,Sumi,σi2),奪標(biāo)匯聚查詢結(jié)果與單表匯聚查詢結(jié)果相似。
4 實(shí)驗(yàn)結(jié)果評(píng)估
4.1 實(shí)驗(yàn)環(huán)境配置
分布式流數(shù)據(jù)的在線匯聚與統(tǒng)計(jì)算法部署在24臺(tái)機(jī)器中,每臺(tái)機(jī)器的硬件配置:CPU因特爾E2600,內(nèi)存16GB,硬盤(pán)1TB;每臺(tái)機(jī)器的軟件配置:Apache S4 系統(tǒng)平臺(tái),編程語(yǔ)言是JAVA;機(jī)器之間采用萬(wàn)兆網(wǎng)絡(luò)。在實(shí)驗(yàn)過(guò)程中,將1臺(tái)機(jī)器配置管理節(jié)點(diǎn),另外20臺(tái)機(jī)器作為計(jì)算節(jié)點(diǎn),剩余3臺(tái)機(jī)器作為備用機(jī)器(故障檢查及容錯(cuò))。
實(shí)驗(yàn)數(shù)據(jù)有四張表,總數(shù)據(jù)量約10GB,其中Litem表含有1300萬(wàn)行記錄,Orders表有350萬(wàn)行記錄, Part表有80萬(wàn)行記錄,表Psup有160萬(wàn)行記錄。
本次實(shí)驗(yàn)采用以下三個(gè)SQL語(yǔ)句來(lái)評(píng)估分布式流數(shù)據(jù)在線匯聚與統(tǒng)計(jì)算法的實(shí)際性能,包含匯聚、統(tǒng)計(jì)和多語(yǔ)句同時(shí)查詢。實(shí)驗(yàn)的結(jié)果都是經(jīng)過(guò)多次運(yùn)行后計(jì)算所得的平均值,整個(gè)SQL的運(yùn)行完成時(shí)間是從提交SQL查詢到執(zhí)行完畢為止。
查詢模版1(Q1):
SELECT Sum(Ext*Dis),Average(Ext*Dis)
FROM Litem
WHERE Dis<0.15 and Dis>=0.07
查詢模版2(Q2):
SELECT A.ReturnFlag, A.LineStatus,B.OrderPriority,
Sum(A.Quantity),Count(A.*)
FROM Litem A, Orders B
WHERE A.OrderKey=B.OrderKey
and A.Dis<0.15 and A.Dis>=0.07
AND B.TotalPrice>=1000
AND B.TotalPrice<30000
GROUP BY A.RFlag,A.LStatus,B.OPriority
查詢模版3(Q3):
(1)SELECT B.MFGR,B.BRAND,
Sum(A.Quantity),
Average(A.Quantity)
FROM Litem A,Part B
WHERE A.PartKey = B.PartKey
AND A.Dis<0.15 AND A.Dis>=0.07
GROUP BY B.MFGR,B.BRAND
(2)SELECT A.ReturnFlag, A.LStatus,Sum(B.SupplyCost),
Average(B.SupplyCost)
FROM Litem A,Psup B
WHERE A.PartKey = B.PartKey
AND A.Dis<0.15 AND A.Dis>=0.07
GROUP BY A.RFlag, A.LStatus
在對(duì)數(shù)據(jù)進(jìn)行匯聚與統(tǒng)計(jì)計(jì)算時(shí),時(shí)間窗口中緩存的數(shù)據(jù)量大小直接影響在線匯聚與統(tǒng)計(jì)算法的運(yùn)行時(shí)間,因此首先評(píng)估了數(shù)據(jù)量大小與SQL的平均運(yùn)行時(shí)間關(guān)系,然后評(píng)估了置信區(qū)間與誤差區(qū)間之間的影響,分析數(shù)據(jù)量大小與執(zhí)行時(shí)間的關(guān)系;通過(guò)調(diào)整集群中機(jī)器的個(gè)數(shù)(從2到22)進(jìn)行比較分析,最后比較Spark與S4在處理流數(shù)據(jù)中性能。
4.2 多查詢語(yǔ)句性能分析
通過(guò)實(shí)驗(yàn)分析多查詢語(yǔ)句并行工作的性能,試驗(yàn)中實(shí)現(xiàn)的查詢語(yǔ)句為兩個(gè)Q3,這兩個(gè)查詢包含相同的濾波條件,濾波操作與結(jié)果可以合并成一個(gè);但在匯聚操作中關(guān)聯(lián)的表是不同的,因此,需要把濾波的數(shù)據(jù)結(jié)果分兩塊發(fā)送給各自的匯聚節(jié)點(diǎn),通過(guò)相同操作合并機(jī)制,可以減少不同查詢語(yǔ)句的重疊計(jì)算。
通過(guò)調(diào)整數(shù)據(jù)的大?。◤?G到10G)進(jìn)行測(cè)試,多查詢語(yǔ)句Q3與子查詢Q3-1和Q3-2分別運(yùn)行在同一集群和同一數(shù)據(jù)集中。進(jìn)行多次試驗(yàn)后平均得到結(jié)果,如圖5顯示。
當(dāng)數(shù)據(jù)量增加時(shí),所有查詢的運(yùn)行時(shí)間也隨之增加,獨(dú)立并行執(zhí)行完查詢Q3-1和Q3-2的時(shí)間比共享的運(yùn)行時(shí)間多。我們計(jì)算了通過(guò)該拓?fù)浣Y(jié)構(gòu)帶來(lái)的性能提高,假設(shè)Q3的運(yùn)行時(shí)間是t3,Q3-1與Q3-2的運(yùn)行時(shí)間分別是t31和t32,那么合并的查詢Q3的提高為(t31+t32-t3)÷(t31+t32)。圖6描述了平均提高效果為18%。
5 結(jié)語(yǔ)
在本文中,在分布式流數(shù)據(jù)系統(tǒng)S4中實(shí)現(xiàn)了一個(gè)在線匯聚的方法,采用“行為模式”簡(jiǎn)化復(fù)雜查詢,支持流水線任務(wù)和并行處理;著重分析了MapReduce Online與本章方法的區(qū)別,并比較了兩者的結(jié)果。由于MapReduce Online在處理流水線任務(wù)時(shí),會(huì)把中間結(jié)果存儲(chǔ)在硬盤(pán)中,而執(zhí)行結(jié)果直接從內(nèi)存中發(fā)送給下一個(gè)任務(wù),實(shí)驗(yàn)結(jié)果顯示該方法有效地支持實(shí)時(shí)數(shù)據(jù)匯聚;在處理多查詢方面,通過(guò)一個(gè)拓?fù)浣Y(jié)構(gòu)指導(dǎo)查詢的執(zhí)行,該拓?fù)浣Y(jié)構(gòu)能夠合并數(shù)據(jù)的重復(fù)操作,減少了系統(tǒng)計(jì)算工作量,實(shí)驗(yàn)結(jié)果顯示拓?fù)浣Y(jié)構(gòu)能夠很大程度上提高整體的查詢效率;通過(guò)公開(kāi)的基準(zhǔn)數(shù)據(jù)TPC-H進(jìn)行實(shí)驗(yàn),結(jié)果顯示該方法能夠把較準(zhǔn)確的結(jié)果快速反饋給用戶,運(yùn)行的速度與效果也都比MapReduceOnline好。
參考文獻(xiàn)
[1]ABADI D, CARNEY D, C, ETINTEMEL U, et al. Aurora: a new model and architecture fordata stream management[J]. The international Journal on VLDB,2003,12(2):120-139.
[2]VITTER J S. Random sampling with a reservoir[J]. ACM Transactions on Mathematical Software,1985,11(1):37-57.
[3]CORMODE G, MUTHUKRISHNAN S, YI K, et al. Optimal sampling from distributed streams[C].Principles of Database Systems.[S.l.]:ACM,2010:77-86.
[4]HAAS P, HELLERSTEIN J. Ripple joins for online aggregation[C].ACM SIGMOD international conference on Management of data.[S.l.]:ACM,1999,28:287-298.