武守曉,房 俊
(1.北方工業(yè)大學 大規(guī)模流數(shù)據(jù)集成與分析技術北京市重點實驗室 北京 100144;2.北方工業(yè)大學 數(shù)據(jù)工程研究院 北京 100144)
目前處理亂序流的主流方法是基于緩存的方法。基于緩存的方法使用開辟的緩沖區(qū)等待遲到數(shù)據(jù),對緩沖區(qū)內(nèi)數(shù)據(jù)進行排序,以避免系統(tǒng)處理亂序數(shù)據(jù)。傳統(tǒng)的基于緩存的K-slack[1]方法和MP-K-slack[2]方法無法做到對緩沖區(qū)大小自適應,在時延變小的情況下會浪費系統(tǒng)資源。AQ-K-slack[3]雖然實現(xiàn)了緩沖區(qū)自適應,但無法應用于top-k連續(xù)查詢這類復雜的聚合函數(shù)。在具體的top-k連續(xù)查詢算法中,SMA算法[4]利用k-skyband對象在有序流上快速進行top-k連續(xù)查詢,但需要維護大量的k-skyband對象,內(nèi)存耗費大,并且該方法只能在有序流上使用。MinTopk算法[5]維護了一個最小的top-k候選集,每次計算都從該候選集中得出結果,大大減少了計算量,但在亂序流上使用會有誤差。GSTopk算法[6]對MinTopk算法進行了一些改良,使其能在亂序流下立刻給出近似結果,可以在時效性要求較高的情況下使用。在上述算法的基礎上,本文提出一種面向高速亂序數(shù)據(jù)流的top-k連續(xù)查詢方法。首先使用基于緩存的亂序流處理技術,舍棄緩存數(shù)據(jù)重排序步驟,緩存時長的確定使用緩存時長自適應算法,在保證用戶允許的最小正確率的情況下計算出最小緩存時長,其次使用改造的MinTopk算法計算當前窗口的top-k結果集。實驗結果表明,該方法能有效權衡查詢精度和查詢時延之間的關系,對窗口內(nèi)數(shù)據(jù)執(zhí)行快速且高效的查詢并得出結果,使亂序流下的top-k連續(xù)查詢收到了良好的效果。
在亂序數(shù)據(jù)流處理方面,研究工作按處理機制的不同大致分為基于緩存的方法[1-3, 7-8]、基于標點的方法[9-12]、基于推測的方法和基于近似的方法?;诰彺娴姆椒ㄊ情_辟緩沖區(qū)來緩存亂序數(shù)據(jù)以等待遲到數(shù)據(jù),以一定延遲開銷換取結果質量的提升。K-slack[1]是基于緩存的典型方法,其中參數(shù)K與緩沖區(qū)的大小密切相關。具體來說,K-slack技術維護緩沖區(qū)用來緩存到達的元組,緩沖區(qū)內(nèi)的數(shù)據(jù)最多等待K個時間單位,然后被提交至查詢處理模塊進行查詢。MP-K-slack方法[2]是基于流元組延遲的動態(tài)變化來不斷調整K值,如果延遲不斷增大,會使數(shù)據(jù)越積越多,導致查詢時延的上升和查詢吞吐量的下降。AQ-K-slack方法[3]以用戶給定的結果精度為目標,通過聚合函數(shù)與窗口覆蓋率的定值關系,動態(tài)調整K值大小。但由于top-k查詢這類聚合函數(shù)過于復雜,會造成AQ-K-slack方法難以實施。另外,基于緩存的方法大多會對緩存的數(shù)據(jù)進行排序,以保證計算時的有序,代價比較大?;跇它c的方法依賴于數(shù)據(jù)流內(nèi)被稱為標點的特殊元組,表示沒有時間戳小于標點的元組。當收到一個標點,查詢算子確定未來將沒有數(shù)據(jù)到達,然后得到這些窗口的查詢結果[10],如心跳[12]、部分排序[11]是標點的特殊類型。標點顯式地通知查詢算子什么時候返回窗口的結果,因此查詢算子能夠直接消費無序輸入。然而,查詢結果的準確性從根本上會受到標點準確性的限制[9]。假定標點是由外部數(shù)據(jù)源提供,或者是由應用程序語義和數(shù)據(jù)流亂序特征的先驗知識通過系統(tǒng)非常簡單地生成,但這個假設不一定在現(xiàn)實世界的場景中成立?;谕茰y的方法和基于近似的方法基本上采用了激進處理方法。激進處理方法與保守等待方法相反,它不管亂序問題是否存在,總是優(yōu)先快速地處理數(shù)據(jù)流,直到遲到元組出現(xiàn)以后再彌補錯誤。激進處理方法通常應用于實時性要求較高且急需獲取處理結果的分析處理系統(tǒng)。但是這種方法的場景局限性很大,并且有可能得不到正確結果。
在top-k連續(xù)查詢具體算法方面,SMA算法[4]根據(jù)數(shù)據(jù)特征提出k-skyband對象的概念。該算法需要維護k-skyband對象之間的支配關系,總體代價較大,并且不具備過濾新增數(shù)據(jù)(即新流入窗口的數(shù)據(jù))的能力,不能處理亂序數(shù)據(jù)流。MinTopk算法[5]維護了一個最小top-k候選集,對于流入窗口的新元組,高效地過濾掉不可能成為top-k結果的元組,將可能成為top-k結果的元組插入候選集,每次只要查找候選集即可找到top-k結果。但是該算法只能處理順序流,在亂序流中會導致查詢錯誤。GSTopk算法[6]改造了MinTopk算法,使其能夠快速地處理亂序數(shù)據(jù)流,但是該算法得出的僅僅是當前窗口內(nèi)的top-k結果,沒有對當前窗口的遲到數(shù)據(jù)進行處理,導致其計算結果往往不夠準確。由于該算法的高效性,在正確率要求不高而實時性要求特別高的情況下可以使用?;谝陨涎芯?,本文使用基于緩存的亂序處理方法等待遲到元組,但不對緩沖區(qū)內(nèi)數(shù)據(jù)進行排序,配合使用改造的MinTopk算法,保證top-k連續(xù)查詢正確率在用戶可接受范圍內(nèi),減少了查詢時延。
圖1為面向亂序流的top-k連續(xù)查詢算法流程。為了解決亂序數(shù)據(jù)流中top-k連續(xù)查詢結果不準確的問題,使用基于緩存的亂序流處理方法,該方法的難點在于緩存時長的確定?;诰彺娴姆椒ú豢赡軣o限等待遲到元組,不能保證查詢的絕對正確性。使用緩存時長自適應算法對top-k查詢進行正確率和緩存時長的統(tǒng)計,在保證用戶允許的最小正確率的情況下,周期性地計算出所需要的最小緩存時長。接下來通過具體的top-k查詢方法,計算出當前窗口的top-k結果。為了方便計算,靈活地實施緩存時長自適應算法,使用元組的Event Time[13]劃分窗口,也就是使用元組自身的時間戳作為滑動窗口的劃分依據(jù)。
圖1 面向亂序流的top-k連續(xù)查詢算法流程
圖2為基于緩存的亂序流處理方法。當前滑動窗口為W0,W0在tend時刻閉合,閉合后等待K個時間單位,即在tlate時刻計算并輸出W0的top-k結果。在這K個時間單位中,對于到來的每一個元組,若其屬于當前滑動窗口W0,該元組就會被發(fā)送到W0處理;若其屬于W0前面或后面的窗口,則進行相應的處理。
圖2 基于緩存的亂序流處理方法
基于緩存的亂序流處理方法,其難點在于緩存時長K的確定。緩存時間越長,時延越高,正確率也就越高。網(wǎng)絡延遲的制約因素有很多,不可能準確地計算出最晚元組到達的時間。另外,在高速亂序數(shù)據(jù)流下,數(shù)據(jù)流量巨大,緩存時間越長,對緩沖區(qū)和系統(tǒng)吞吐量造成的壓力越大。因此,在保證用戶允許的最小正確率的情況下選擇一個恰當?shù)木彺鏁r長K,可以有效地緩解系統(tǒng)壓力,減少查詢時延。
計算單次的top-k結果的正確性是沒有意義的,但統(tǒng)計多次的top-k結果的正確率足以證明某種方法的有效性。所以,通過統(tǒng)計不同緩存時長下top-k查詢結果的正確率,以質量驅動的方式[3]選出最小緩存時長,即在保證用戶允許的最小正確率的情況下計算出最小緩存時長。具體步驟如下。
1)參數(shù)初始化。系統(tǒng)指定一個初始緩存時長K,即窗口的緩存時長到達K時輸出查詢結果。初始化用于計算恰當緩存時長的區(qū)間,用(Kdown,Kup]表示。在初始情況下,(Kdown,Kup]將被初始化為(0,K]。另外,需要用戶指定能承受的最小正確率εmin。
1.解戒人員社區(qū)康復時間長短與操守率之間的關系。為分析社區(qū)康復是否對保持操守率存在積極影響,筆者以廣州市某強制隔離戒毒所2017年7月至2018年3月期間解戒的221名解戒人員為樣本,協(xié)同禁毒社工赴解戒人員所在戶籍街道,通過現(xiàn)場訪談、電話訪問和尿樣檢測等方式,于2018年5月及2018年9月分兩次,對同一批221名解戒人員進行跟蹤調查,來了解戒斷鞏固率情況。
2)統(tǒng)計計算。將(Kdown,Kup]平均劃分得到m個緩存時長,即{Kdown+(Kup-Kdown)/m,Kdown+2*(Kup-Kdown)/m, …,Kdown+(m-1)*(Kup-Kdown)/m,Kup}。對于每次top-k查詢,記錄下這m個不同緩存時長得到的top-k結果集,同時,后臺等待所有的遲到元組計算出此次查詢正確的top-k結果集。對于每一個緩存時長對應的top-k結果集,將其與正確的結果集進行比較,計算該top-k結果集的命中率,即top-k結果集與正確結果集一致的項數(shù)與總項數(shù)的比值。經(jīng)過n次top-k查詢?nèi)∑骄?,就能計算出不同緩存時長的查詢準確率。
3)求最小緩存時長。根據(jù)用戶給定的所能承受的最小正確率εmin,即可定位出可以達到該正確率的最小緩存時長所在的區(qū)間(Kdown,Kup],那么最小緩存時長Kmin改為Kup。若此時符合要求的緩存時長不在(Kdown,Kup]內(nèi),則區(qū)間相應前移或者后移(Kup-Kdown)/m個單位。為了避免區(qū)間太小,收斂速度太慢,(Kup-Kdown)/m不能太小。重復上一個步驟,統(tǒng)計出(Kdown,Kup]中不同緩存時長對應的正確率。
表1為根據(jù)不同緩存時長統(tǒng)計的top-3查詢結果示例。當前緩存時長區(qū)間為(0 s, 4.5 s],Kmin=4.5 s,m=3,n=20,εmin=0.8,則平均劃分為1.5 s、3 s、4.5 s三個緩存時長。在每次top-3查詢中,記錄下這三個緩存時長對應的top-3結果,最后和此次查詢正確的top-3結果集進行比較,得到這三個緩存時長對應結果的命中率。這個過程重復20次,每一個緩存時長會得到一個查詢正確率。其中,緩存時長為1.5 s的正確率為72%,緩存時長為3 s的正確率為83%,緩存時長為4.5 s的正確率為94%。由于用戶允許的最小正確率εmin=0.8,所以下一次用于計算最小緩存時長的區(qū)間改為(1.5 s, 3 s],最小緩存時長Kmin改為3 s,重復進行以上操作。
表1 不同緩存時長的統(tǒng)計結果示例
Top-k連續(xù)查詢依托于滑動窗口模型,給定滑動窗口W和top-k查詢q,每當窗口滑動后,q返回W中分值最高的k個元組。由于算法需要實時處理大量數(shù)據(jù),且每次窗口滑動前后有大量重疊數(shù)據(jù),計算這些重復數(shù)據(jù)耗時費力。因此,本文借鑒MinTopk算法的思想,利用滑動窗口的特性過濾掉大量對結果無貢獻的元組,維護一個top-k結果候選集C。當窗口滑動后,更新候選集C,只需要訪問候選集C便可得出top-k結果集,這樣既大大削減了數(shù)據(jù)規(guī)模[14],又保證了查詢結果的準確性。
圖3展示了相鄰滑動窗口的數(shù)據(jù)歸屬,其中Wi表示某編號窗口,si表示由滑動步長劃分的某批數(shù)據(jù)。每次窗口滑動后,最早的一批數(shù)據(jù)被釋放,最新的一批數(shù)據(jù)流入窗口。新來的元組有可能一直成為top-k結果,直到它被窗口釋放。如s3中的某元組可能成為W3或W2、W1、W0的top-k結果??梢钥闯觯琖0包含所有批次數(shù)據(jù),W1包括批次s1、批次s2、批次s3的數(shù)據(jù),W2包括批次s2、批次s3的數(shù)據(jù),W3僅包括批次s3的數(shù)據(jù)。對于W0中的數(shù)據(jù),為了避免重復計算,首先計算出W3(s3)的top-k結果集,然后計算出W2(s2和s3)的top-k結果集,再計算出W1(s1、s2和s3)的top-k結果集,最后計算出W0的top-k結果集。如此計算則可以充分利用上一次的計算結果,避免重復計算。
圖3 相鄰滑動窗口的數(shù)據(jù)歸屬
圖4為不同窗口的示例數(shù)據(jù),圖5為候選集C和候選集D。如圖4(a)所示,當前窗口為W0,每個元組的標簽表示元組的到達順序。如圖5(a)所示,僅對于W0窗口中的元組,計算出窗口W0、W1、W2、W3的top-3結果集,使用一個有序列表來維護這些元組。如圖5(b)所示, 按元組分值從大到小排列,元組右側表示該元組會對哪些窗口做出貢獻,這個有序列表就是候選集C。由于一個元組作出貢獻的窗口集合是連續(xù)的,只維護起始貢獻窗口id和結束貢獻窗口id即可。同時,為了快速過濾掉不作貢獻的元組,還需要維護各個窗口的最小元組指針。由于候選集列表C是有序的,所以當前窗口的top-k結果集為候選集C前k個元組的集合。
圖4 不同窗口的示例數(shù)據(jù)
圖5 候選集C和候選集D
由于本文算法的復雜性,為了避免在極端情況下對維護的候選集列表C進行頻繁插入,需要對原MinTopk算法進行改造后使用。當窗口滑動后,對應圖2中[tstart,tlate]時刻,對于其中的每一個元組有可能屬于前面窗口,或屬于當前窗口,或屬于下一個窗口。并且由于需要維護不同緩存時長的top-k結果,所以不僅要維護前面窗口不同緩存時長的top-k結果,還需要維護當前窗口的候選集C和下一批次數(shù)據(jù)的top-k結果D。下面給出tlate時刻執(zhí)行計算的具體流程。
1)獲取當前top-k結果。此時,當前窗口候選集列表C中的前k個元組為當前窗口的top-k結果。創(chuàng)建一個空的候選集列表,將當前窗口的top-k結果復制到該列表,用以后臺繼續(xù)記錄不同緩存時長的top-k結果。
2)刪除過期元組。把當前窗口候選集列表C中最早一批元組(即候選集列表前k個元組)的起始貢獻窗口id加1。當起始貢獻窗口id大于結束貢獻窗口id時,該元組被淘汰,從列表中刪除。
3)合并候選集列表D到候選集列表C。通過指針索引,對于候選集列表D中的每一個元組,按從小到大順序和各個窗口的元組最小分值進行比較,快速計算出起始貢獻窗口id和結束貢獻窗口id。若其對某個窗口有貢獻,將其插入到候選集C中,使C保持有序,同時刪除對候選集C不作貢獻的元組。
對于當前窗口之前的窗口,針對不同的緩存時長,記錄每個窗口的top-k結果,對于晚到的元組持續(xù)進行處理,直到該窗口沒有元組到達。如圖4(b)所示,窗口滑動后,當前窗口由W0變?yōu)閃1,新流入了元組21~25,元組1~5被釋放。圖5(b)展示了候選集C和候選集D的合并過程。對于元組25,依次和元組17、元組15、元組10進行比較,得出元組25的起始貢獻窗口為W1,結束貢獻窗口為W4,并將其有序地插入候選集C中。
實驗環(huán)境使用CPU為3.2 GHz,內(nèi)存為16 GB的ubuntu18.04電腦。緩存時長自適應算法的實驗參數(shù)如下:緩存時長K初始為2 s,緩存時長劃分份數(shù)m為10,迭代次數(shù)n為30,允許的最小正確率εmin為0.95;top-k查詢的實驗參數(shù)如下:偏好函數(shù)為求元組最大值,k值為5,滑動窗口總大小為60 s,滑動窗口的滑動步長為5 s。
由于網(wǎng)絡延遲通常遵循指數(shù)分布等長拖尾型概率分布[15],故使用指數(shù)分布生成亂序數(shù)據(jù)。為了營造高速亂序流的環(huán)境,盡量增大窗口中的元組數(shù)目。另外,生成的數(shù)據(jù)應包含時間戳字段、值字段。通過指數(shù)分布生成了充足的亂序數(shù)據(jù),選擇SMA算法、MinTopk算法、GSTopk算法作為本實驗的對比算法,使用相同的數(shù)據(jù)進行top-k連續(xù)查詢,記錄下運行參數(shù),得出top-k結果正確率與算法運行時間的關系以及top-k結果查詢時延與算法運行時間的關系。
為了測試算法對亂序程度不同的數(shù)據(jù)的有效性,構造一個定量的數(shù)據(jù)集,并將其打亂為三種不同亂序程度的數(shù)據(jù)集。本文算法和對比算法分別使用三種不同亂序程度的數(shù)據(jù)集作為輸入進行top-k連續(xù)查詢,并記錄下運行參數(shù),得出top-k結果正確率與數(shù)據(jù)亂序程度的關系。另外,為了避免偶然性所帶來的實驗誤差,以上所述實驗均在參數(shù)及數(shù)據(jù)不變的情況下進行多次,并取平均值作為最終實驗結果。
不同算法的實驗結果對比如圖6所示。圖6(a)展示了隨著運行時間的增加,不同算法的正確率變化情況。可以看出,本文算法比其他算法的正確率高很多,顯示了本文算法處理亂序流的優(yōu)越性。這是由于本文算法使用基于緩存的方法等待遲到元組,使邊界元組被包含在正確的滑動窗口內(nèi),提高了正確率。本文算法的正確率隨著運行時間一直在變化,這是由于本文算法可以根據(jù)數(shù)據(jù)流的亂序程度自適應緩存時長。若當前緩存時長的正確率大于允許的最小正確率,則應減小緩存時長;否則進行相反的操作。圖6(b)展示了隨著運行時間的增加,不同算法的查詢時延變化情況。查詢時延表示當滑動窗口閉合后到計算得出該滑動窗口的top-k結果集所需要的時間??梢钥闯觯疚乃惴ǖ牟樵儠r延比其他三種算法的查詢時延要高。這是由于本文算法使用了基于緩存的亂序流處理方法,等待屬于當前滑動窗口的遲到元組,以時延換取了正確率的上升。隨著運行時間的改變,查詢時延還會不斷變化。這是由于本文算法可以自適應改變緩存時長,使得時延增大,正確率提高。當面臨實時性要求特別高而正確率要求不太高的情況,應盡量避免使用本文算法。GSTopk算法可以快速處理亂序數(shù)據(jù)流,只是結果不是那么精確。由此可見,本文算法的一個缺點是查詢時延較高。圖6(c)展示了數(shù)據(jù)集輕度、中度和重度亂序時,不同算法的正確率變化情況。在亂序數(shù)據(jù)集上,隨著亂序程度的增加,SMA算法和MinTopk算法的正確率偏低,這是由于這兩種算法沒有針對亂序數(shù)據(jù)流做處理,屬于當前滑動窗口的遲到數(shù)據(jù)被丟棄或者是延遲到下一個窗口執(zhí)行,導致查詢結果不準確。GSTopk算法可以處理亂序流,但正確率也較低,而本文算法的正確率較高,證明本文算法適合處理亂序數(shù)據(jù)流下的top-k連續(xù)查詢問題。
圖6 不同算法的實驗結果對比
本文研究了高速亂序流環(huán)境下的top-k連續(xù)查詢問題,盡管已有一些相關方法研究了此類問題,但是查詢結果誤差較大。本文通過已有的亂序流處理方法和滑動窗口的數(shù)據(jù)特征,首先使用基于緩存的方法等待遲到元組,但不對緩沖區(qū)排序,并運用統(tǒng)計的思想實現(xiàn)了緩存時長自適應。然后使用改造的MinTopk算法,在保證用戶允許的最小正確率的情況下計算出最小緩存時長,減少了查詢時延。后續(xù)工作將優(yōu)化緩存時長自適應算法,減小算法資源消耗,進一步加快算法的計算速度。