鄭曉東 王 梅 陳德華 張碧瑩
(東華大學(xué)計(jì)算機(jī)科學(xué)與技術(shù)學(xué)院 上海 201620)
大數(shù)據(jù)時(shí)代已然來臨。時(shí)間作為客觀事物的固有屬性,幾乎所有信息都顯式或隱式地具備時(shí)態(tài)特征。大數(shù)據(jù)的產(chǎn)生往往也是經(jīng)過時(shí)間累積形成,因此大數(shù)據(jù)也被理解為針對(duì)某個(gè)對(duì)象在時(shí)空兩個(gè)維度上的“全息”數(shù)據(jù)。若對(duì)數(shù)據(jù)進(jìn)行時(shí)態(tài)的關(guān)聯(lián)分析,有望挖掘出數(shù)據(jù)的巨大價(jià)值,從而使得研究和使用數(shù)據(jù)的“時(shí)態(tài)”信息意義日顯突出,對(duì)時(shí)態(tài)數(shù)據(jù)管理查詢技術(shù)的研究十分迫切。
在考慮數(shù)據(jù)的時(shí)態(tài)信息時(shí),被打上時(shí)間標(biāo)簽的數(shù)據(jù),會(huì)伴隨著時(shí)間的延伸而生效或失效。因此,對(duì)于一個(gè)實(shí)體來說,它會(huì)擁有多個(gè)數(shù)據(jù)記錄,很明顯傳統(tǒng)默認(rèn)時(shí)間點(diǎn)的靜態(tài)“快照”式數(shù)據(jù)存儲(chǔ)模式難以滿足時(shí)態(tài)查詢需求。一些時(shí)態(tài)數(shù)據(jù)庫模型被相繼提出,如TimeDB[1]等。與此同時(shí),Oracle、IBM DB2[2]、SAP HANA[3]等知名的商業(yè)數(shù)據(jù)庫也開始支持一些簡單的時(shí)態(tài)查詢功能,包括時(shí)間旅行,時(shí)態(tài)聚合以及時(shí)態(tài)連接等。以時(shí)態(tài)連接為例,檢索“在相同時(shí)間點(diǎn)中,兩個(gè)銀行賬戶數(shù)據(jù)表中存款相同的客戶存款額度”,需要在時(shí)間以及對(duì)象兩個(gè)維度上進(jìn)行雙向篩選,如何在龐大的數(shù)據(jù)集進(jìn)行快速、高效的時(shí)態(tài)操作執(zhí)行成為一個(gè)十分有挑戰(zhàn)性的問題。
索引是數(shù)據(jù)庫中加速查詢的有效技術(shù)。為此,研究者從不同角度為時(shí)態(tài)數(shù)據(jù)模型提出了多種時(shí)態(tài)索引技術(shù)。文獻(xiàn)[4]提出的基于B+樹Time Index索引以及文獻(xiàn)[5]中提出的Historical R樹索引對(duì)傳統(tǒng)樹型索引進(jìn)行擴(kuò)展已支持時(shí)態(tài)數(shù)據(jù),文獻(xiàn)[6]針對(duì)時(shí)態(tài)XML數(shù)據(jù),構(gòu)建不同的時(shí)態(tài)索引模型以支持時(shí)態(tài)查詢。近來,SAP HANA提出的Timeline索引[7]以及雙時(shí)態(tài)索引技術(shù)[8]采用順序結(jié)構(gòu)實(shí)現(xiàn)高效的時(shí)態(tài)查詢。然而,目前所設(shè)計(jì)的大部分索引均在單機(jī)環(huán)境下實(shí)現(xiàn),當(dāng)數(shù)據(jù)集規(guī)模較大時(shí),上述方法將花費(fèi)較高的代價(jià)在遍歷龐大的索引樹或索引隊(duì)列上。同時(shí),其索引結(jié)構(gòu)和查詢算法的執(zhí)行效率對(duì)機(jī)器性能的要求極高,以支持SAP HANA的Timeline時(shí)態(tài)索引為例,其執(zhí)行默認(rèn)的機(jī)器內(nèi)存達(dá)192 GB[7]。上述問題極大地限制了索引技術(shù)的普遍應(yīng)用和查詢效率的提升。
在數(shù)據(jù)規(guī)模不斷增長的今天,越來越多的應(yīng)用都是基于存儲(chǔ)在分布式系統(tǒng)中的大規(guī)模數(shù)據(jù)。Spark分布式處理框架通過將海量數(shù)據(jù)分發(fā)至各個(gè)計(jì)算節(jié)點(diǎn),并基于彈性數(shù)據(jù)集RDD及MapReduce技術(shù)極大地加快了數(shù)據(jù)的處理速度[9]。然而該框架中并沒有對(duì)hdfs中文件的行級(jí)記錄進(jìn)行索引,本文基于Spark分布式計(jì)算平臺(tái)設(shè)計(jì)并實(shí)現(xiàn)了一種分布式時(shí)態(tài)索引方法。該方法首先提出時(shí)態(tài)數(shù)據(jù)集的分段索引構(gòu)造策略,進(jìn)一步設(shè)計(jì)基于Spark的時(shí)態(tài)索引構(gòu)建方法及基于Spark RDD的并行查詢策略。本文主要內(nèi)容如下:
1) 以Timeline索引為例,提出時(shí)態(tài)索引在Spark集群上的構(gòu)建方法,設(shè)計(jì)分布式時(shí)態(tài)索引的并行查詢框架。根據(jù)時(shí)態(tài)查詢所涉及的Spark RDD分區(qū)模式的不同,將其分為分區(qū)獨(dú)立查詢,跨區(qū)查詢以及跨段查詢,并對(duì)不同模式的時(shí)態(tài)查詢提出了優(yōu)化的分布式查詢算法,提高查詢效率。
2) 優(yōu)化輔助索引結(jié)構(gòu),加快“給定時(shí)間點(diǎn)查詢”這一原子操作的效率,從而極大地提升時(shí)態(tài)聚合,時(shí)態(tài)旅行等分區(qū)獨(dú)立查詢的查詢效率。進(jìn)一步針對(duì)跨RDD分區(qū)及跨段的時(shí)態(tài)連接復(fù)雜查詢,分別設(shè)計(jì)復(fù)合索引結(jié)構(gòu)輔助查詢,確保本文方法對(duì)數(shù)據(jù)規(guī)模的有效自擴(kuò)展性同時(shí)降低集群硬件配置需求。
3) 在基準(zhǔn)數(shù)據(jù)上,進(jìn)行本文所提方法的有效性驗(yàn)證,證明了本文提出的索引策略的實(shí)用性和高效性。
1.1 時(shí)態(tài)索引
傳統(tǒng)數(shù)據(jù)庫中索引技術(shù)是提高查詢效率的關(guān)鍵技術(shù)之一,同樣對(duì)于時(shí)態(tài)數(shù)據(jù),合適的索引結(jié)構(gòu)將有效地支持時(shí)態(tài)操作。時(shí)態(tài)數(shù)據(jù)索引大致可歸為兩類:樹索引和隊(duì)列索引。最早提出的時(shí)態(tài)數(shù)據(jù)索引結(jié)構(gòu)Time Index就是基于B+樹構(gòu)建的,該索引針對(duì)每個(gè)時(shí)間間隔的結(jié)束時(shí)刻作為索引值建立B+樹,每個(gè)葉子結(jié)點(diǎn)記錄該時(shí)間點(diǎn)各事件的生效失效狀態(tài)[4]。但是該索引在時(shí)間跨度較大的情況下會(huì)引起較大的空間開銷,同時(shí)這種基于B樹的時(shí)態(tài)索引在構(gòu)建時(shí)依賴于數(shù)據(jù)的時(shí)間有序性。另一種樹形結(jié)構(gòu)R樹雖然最初的設(shè)計(jì)思想是為了索引空間數(shù)據(jù),但是它被用來存儲(chǔ)時(shí)間同樣適合。文獻(xiàn)[5]提出了Historical R樹,每個(gè)時(shí)間戳都建立一個(gè)R樹,且一個(gè)Historical R樹中未發(fā)生變化的記錄對(duì)應(yīng)的結(jié)點(diǎn)只保存一個(gè)以節(jié)省空間。同時(shí),除了這種以單一一個(gè)數(shù)據(jù)結(jié)構(gòu)來索引全部數(shù)據(jù)外,還有以樹形結(jié)構(gòu)為基礎(chǔ),為每一個(gè)時(shí)間版本構(gòu)建一個(gè)樹來索引時(shí)態(tài)數(shù)據(jù)的方法,如MVBT索引[10]和MV3D-RT索引[11]。相對(duì)于樹形時(shí)態(tài)索引,基于隊(duì)列的時(shí)態(tài)索引研究較少,文獻(xiàn)[7]提出了Timeline時(shí)間線索引,該索引應(yīng)用于HANA,在內(nèi)存中為時(shí)態(tài)數(shù)據(jù)建立一張時(shí)態(tài)表,每一個(gè)時(shí)態(tài)表對(duì)應(yīng)一個(gè)Timeline時(shí)間線索引,在時(shí)間線上保存著各個(gè)數(shù)據(jù)的生效失效時(shí)間,該索引支持多種時(shí)態(tài)操作。
目前對(duì)于時(shí)態(tài)數(shù)據(jù)的操作主要分為時(shí)間旅行,時(shí)態(tài)聚合和時(shí)態(tài)連接。時(shí)間旅行是查詢數(shù)據(jù)在過去或未來某個(gè)時(shí)間點(diǎn)或時(shí)間段所具有的狀態(tài),可以通過時(shí)間旅行回溯到某個(gè)時(shí)間版本的數(shù)據(jù)庫狀態(tài)。時(shí)態(tài)聚合和時(shí)態(tài)連接都是基于時(shí)間旅行所進(jìn)行的操作,時(shí)態(tài)聚合是指對(duì)某個(gè)時(shí)間點(diǎn)或時(shí)間區(qū)間的有效記錄進(jìn)行聚合操作。時(shí)態(tài)連接是指對(duì)指定時(shí)間點(diǎn)下的有效記錄進(jìn)行連接操作。如今對(duì)于時(shí)態(tài)索引的研究還處于一個(gè)發(fā)展階段,現(xiàn)階段已提出的時(shí)態(tài)索引大多數(shù)僅僅支持有限的時(shí)態(tài)操作,部分更是僅支持時(shí)間旅行,無法有效地支持時(shí)態(tài)聚合和時(shí)態(tài)連接等復(fù)雜查詢操作。
1.2 分布式索引
分布式計(jì)算框架如Spark、Hadoop,采用的是分布式的文件系統(tǒng),通過將大文件進(jìn)行分片,切分成多個(gè)分片文件存儲(chǔ)在多個(gè)存儲(chǔ)節(jié)點(diǎn)上。平臺(tái)在需要對(duì)該文件數(shù)據(jù)進(jìn)行計(jì)算時(shí),各個(gè)分片文件加載至多個(gè)計(jì)算節(jié)點(diǎn)上進(jìn)行獨(dú)立計(jì)算,計(jì)算子結(jié)果將返回主節(jié)點(diǎn)進(jìn)行后續(xù)操作。這種存儲(chǔ)計(jì)算模式雖然解決了大數(shù)據(jù)計(jì)算處理的效率問題,但是在對(duì)數(shù)據(jù)文件進(jìn)行行級(jí)訪問時(shí),盡管數(shù)據(jù)文件的切分且分布式的讀取加快了讀取速度,其大量的時(shí)間浪費(fèi)在了文件的尋道遍歷上。Haojun Liao提出了應(yīng)用在Hadoop的分布式文件系統(tǒng)上的多維索引,考慮到HDFS中塊大小、索引節(jié)點(diǎn)大小、網(wǎng)絡(luò)中的數(shù)據(jù)通信等影響查詢速度的因素,通過構(gòu)建類似于R樹的層次結(jié)構(gòu)索引,避免查詢時(shí)無意義的窮舉減少查詢響應(yīng)時(shí)間[12]。Gankidi等[13]提出通過構(gòu)建B+樹管理HDFS數(shù)據(jù),索引維持在SqlSever中,用戶可以通過SQL查詢HDFS數(shù)據(jù)。Xie等[14]提出建立在Spark平臺(tái)上用于管理空間數(shù)據(jù)的兩級(jí)索引結(jié)構(gòu),該索引維持在RDD中,區(qū)域索引保存空間區(qū)域中的數(shù)據(jù)信息,而全局索引管理著每個(gè)區(qū)域索引的邊界信息。
總的來說,現(xiàn)有的時(shí)態(tài)索引技術(shù)尚處于研究階段,大多數(shù)索引結(jié)構(gòu)無法有效地支持時(shí)態(tài)操作,并且在面對(duì)大規(guī)模時(shí)態(tài)數(shù)據(jù)時(shí),索引空間代價(jià)較大,效率變低,而傳統(tǒng)的分布式索引大多是面向隱性時(shí)間屬性數(shù)據(jù),無法有效地支持對(duì)歷史信息的相關(guān)操作。
2.1 定義和符號(hào)
定義1時(shí)態(tài)表(Temporal Table)。包含具有時(shí)態(tài)屬性的數(shù)據(jù)的表。
定義2時(shí)間區(qū)間(Period)。表示一個(gè)數(shù)據(jù)合法持續(xù)的時(shí)間,包括它的開始和結(jié)束。本文使用包含-排除方法來建模時(shí)間區(qū)間,使用[start, end)來表示。在實(shí)現(xiàn)過程中分別通過時(shí)態(tài)表的兩列來分別表示記錄的開始時(shí)間和結(jié)束時(shí)間。
定義3時(shí)間版本(versionID)。指時(shí)間區(qū)間內(nèi)離散的時(shí)間點(diǎn),這些時(shí)間點(diǎn)是單調(diào)遞增的,本文將這些時(shí)間點(diǎn)稱之為versionID。
表1給出了一個(gè)時(shí)態(tài)表的例子。該實(shí)例模擬了一個(gè)銀行系統(tǒng),Alice在101時(shí)間點(diǎn)在銀行存入$100,Alice在銀行的存款為$100這個(gè)狀態(tài)在數(shù)據(jù)庫中維持到了103時(shí)間點(diǎn),即103時(shí)間點(diǎn)后(包含103時(shí)間點(diǎn))第一條記錄失效,第四條記錄生效,表明Alice在103時(shí)間點(diǎn)在銀行存入$500使銀行的存款到達(dá)$600。同理,Ann在102時(shí)間點(diǎn)在銀行存入$500,第二條記錄在102時(shí)間點(diǎn)到107時(shí)間點(diǎn)有效,第三條記錄表示Grace在103時(shí)間點(diǎn)在銀行存入$300,直到目前為止,Grace在銀行的存款都為$300。
表1 時(shí)態(tài)表
2.2 索引結(jié)構(gòu)
不論是基于樹結(jié)構(gòu)或是順序結(jié)構(gòu)的時(shí)態(tài)索引,均需在其索引結(jié)點(diǎn)中記錄各時(shí)間點(diǎn)下事件的生效失效狀態(tài)。以圖1(a)給出的Timeline索引結(jié)構(gòu)為例,其中“+”表示該數(shù)據(jù)在對(duì)應(yīng)時(shí)間版本下開始生效,“-”表示數(shù)據(jù)失效。上述索引可以有效地支持多種時(shí)態(tài)操作,然而其也存在著明顯的缺陷。在執(zhí)行時(shí)態(tài)操作時(shí),由于索引只保存了各個(gè)記錄的生效失效狀態(tài),如果要獲取到某一時(shí)間版本下的有效數(shù)據(jù),如查詢“106時(shí)間版本下的有效數(shù)據(jù)”,系統(tǒng)無法避免地需要從頭遍歷事務(wù)層至查詢時(shí)間版本,以得到“106時(shí)間版本下第二條記錄及第五條記錄有效”的信息。每一次的時(shí)態(tài)操作都會(huì)重復(fù)性的遍歷索引,這種低效的重復(fù)性工作造成了極大的時(shí)間代價(jià)。
圖1 checkpoint結(jié)構(gòu)
因此,在Timeline時(shí)態(tài)索引中,同時(shí)維護(hù)檢查點(diǎn)(checkpoint)輔助索引結(jié)構(gòu)。checkpoint中保存了當(dāng)前時(shí)間下所有記錄的有效狀態(tài),這種類似于快照的設(shè)計(jì)結(jié)構(gòu)省去了在每次時(shí)態(tài)操作時(shí)需要從頭遍歷時(shí)間線索引所引起的大量開銷,可以較為快速地獲取指定時(shí)間版本下的有效記錄,對(duì)于優(yōu)化索引的檢索效率至關(guān)重要。由于checkpoint的創(chuàng)建代價(jià)較大,一般在一個(gè)較大的時(shí)間間隔下創(chuàng)建一個(gè)新的checkpoint結(jié)構(gòu)。
2.3 基于Spark的分段時(shí)態(tài)索引框架
Spark上的所有數(shù)據(jù)操作都依賴于彈性分布式數(shù)據(jù)集(RDD),RDD被分布式的存儲(chǔ)在集群的各個(gè)節(jié)點(diǎn)上,程序方能分布式的執(zhí)行。給定當(dāng)前時(shí)態(tài)表T,由于時(shí)態(tài)表中數(shù)據(jù)是以時(shí)間特性排序的,為了保證各分區(qū)中數(shù)據(jù)的時(shí)態(tài)均衡性,應(yīng)用Spark自帶Hash Partition方法針對(duì)ROW_ID列進(jìn)行哈希劃分,從而得到RDD的各個(gè)分區(qū)。各RDD迭代本分區(qū)數(shù)據(jù)并行創(chuàng)建索引,因此RDD類型為一個(gè)二元組(TT,TI),其中TT即Temporal Table是本數(shù)據(jù)分區(qū)中的時(shí)態(tài)表,TI即Temporal Index是針對(duì)于本分區(qū)時(shí)態(tài)數(shù)據(jù)構(gòu)建的時(shí)態(tài)索引。很明顯,隨著時(shí)間推移,時(shí)態(tài)表T中的數(shù)據(jù)不斷累積。為保證本文方法對(duì)數(shù)據(jù)規(guī)模的有效自擴(kuò)展性。同時(shí),為了避免當(dāng)數(shù)據(jù)增大,RDD在進(jìn)行迭代計(jì)算時(shí)產(chǎn)生的內(nèi)存溢出問題,首先將數(shù)據(jù)進(jìn)行切分。給定當(dāng)前時(shí)態(tài)表T={t1,t2,tn},n為數(shù)據(jù)記錄條數(shù),將其切分為T={T1,T2,Tm},m為數(shù)據(jù)段個(gè)數(shù)。對(duì)每個(gè)數(shù)據(jù)段Ti按如上所述方法構(gòu)建索引,通過設(shè)定m的大小以控制各段RDD的大小。同時(shí),在Spark集群中維持全局索引來調(diào)度對(duì)應(yīng)查詢時(shí)段所涉及的各數(shù)據(jù)段索引RDD進(jìn)行相關(guān)查詢操作。
時(shí)態(tài)數(shù)據(jù)的查詢操作可分為時(shí)間旅行、時(shí)態(tài)聚合、時(shí)態(tài)連接三種時(shí)態(tài)操作。根據(jù)操作所涉及的Spark RDD數(shù)據(jù)對(duì)象,本文將查詢分為分區(qū)獨(dú)立,跨區(qū)查詢,跨段查詢,如圖2所示。并且對(duì)于每類查詢,本文均針對(duì)性地提出優(yōu)化的輔助索引結(jié)構(gòu)及查詢算法,以充分發(fā)揮分布式的優(yōu)點(diǎn),提高時(shí)態(tài)檢索效率。
圖2 構(gòu)建在Spark集群上的時(shí)態(tài)查詢分類
3.1 分區(qū)獨(dú)立查詢
本文將時(shí)態(tài)索引構(gòu)建在Spark集群中的RDD上,當(dāng)集群讀取至RDD上后,各個(gè)分區(qū)根據(jù)分區(qū)數(shù)據(jù)構(gòu)建本分區(qū)索引。分區(qū)獨(dú)立查詢即指各個(gè)分區(qū)數(shù)據(jù)之間不必進(jìn)行通信,各個(gè)分區(qū)在檢索完本分區(qū)索引后直接返回結(jié)果集。
在幾種時(shí)態(tài)操作中,時(shí)間旅行屬于典型的分區(qū)獨(dú)立型查詢。時(shí)間旅行是查詢某一時(shí)間版本versionID下數(shù)據(jù)庫記錄的狀態(tài)信息,也就是得到該時(shí)間版本數(shù)據(jù)庫的有效數(shù)據(jù)集。以時(shí)間旅行為例,詳細(xì)描述分區(qū)獨(dú)立查詢?cè)赟park集群的執(zhí)行流程:在獲取到待查versionID后,IndexRDD各分區(qū)索引先檢索本分區(qū)的checkpoint集合,獲取到最鄰近且早于查詢時(shí)間版本的checkpoint,根據(jù)checkpoint中的標(biāo)志位查詢對(duì)應(yīng)行向量分段數(shù)據(jù),獲取到該時(shí)間版本下的有效數(shù)據(jù)。若checkpoint的時(shí)間版本和查詢時(shí)間相同,該分區(qū)即可返回結(jié)果集;若與查詢時(shí)間不等,則根據(jù)checkpoint的時(shí)間版本在時(shí)態(tài)索引中的位置信息接著向下遍歷時(shí)態(tài)索引,直到遍歷至查詢時(shí)間在時(shí)態(tài)索引中的位置,整合計(jì)算得到該分區(qū)的有效數(shù)據(jù)集后返回給驅(qū)動(dòng)節(jié)點(diǎn)。
傳統(tǒng)的checkpoint結(jié)構(gòu)需要遍歷行向量來查看對(duì)應(yīng)時(shí)間版本的數(shù)據(jù)庫狀態(tài),由于checkpoint中會(huì)存儲(chǔ)和時(shí)態(tài)表同等大小的行向量,若設(shè)時(shí)態(tài)表大小為n,其遍歷次數(shù)為n。然而由于數(shù)據(jù)的時(shí)效性,在給定時(shí)間版本下同一對(duì)象的時(shí)態(tài)數(shù)據(jù)只有一條是生效的,且有效數(shù)據(jù)具有區(qū)域性(即一個(gè)時(shí)間版本下的有效數(shù)據(jù)大多會(huì)集中在行向量的一個(gè)分段內(nèi)),造成大量的無效遍歷。為此,本文首先利用Spark 分布式和并行處理特點(diǎn),對(duì)checkpoint結(jié)構(gòu)進(jìn)行優(yōu)化,即對(duì)checkpoint中行向量進(jìn)行固定長度的分段,并添加記錄各分區(qū)有無有效記錄的標(biāo)記flag結(jié)構(gòu)。如圖3所示,checkpoint分為標(biāo)志位和有效數(shù)據(jù)位兩部分,在有效數(shù)據(jù)部分按行號(hào)分段,并在標(biāo)志位增設(shè)標(biāo)志,記錄該分段在指定時(shí)間版本下是否存在有效的行號(hào)。在具體查找時(shí),即可根據(jù)標(biāo)志位大量過濾無效記錄。
圖3 優(yōu)化checkpoint
優(yōu)化checkpoint中的分段標(biāo)志位簡化了遍歷復(fù)雜度,分段標(biāo)志位為真再去遍歷對(duì)應(yīng)分段數(shù)據(jù)的有效性,若為假則直接跳過該分段。假定分段步長為l,則時(shí)間復(fù)雜度近乎簡化為(n/l+l),且各個(gè)分段可并行遍歷,極大地降低獲取checkpoint下有效數(shù)據(jù)的時(shí)間開銷。同時(shí),由于數(shù)據(jù)記錄隨時(shí)間的累計(jì)特性,在下一個(gè)時(shí)刻點(diǎn)對(duì)checkpoint創(chuàng)建時(shí),僅需追加新增段,原有分段可重用,大大降低了checkpoint的創(chuàng)建復(fù)雜度。
3.2 跨區(qū)查詢
跨區(qū)查詢涉及到存儲(chǔ)索引信息的RDD各分區(qū)數(shù)據(jù)間的通信,各分區(qū)間需要進(jìn)行聯(lián)合計(jì)算。時(shí)態(tài)連接就是最為典型的跨區(qū)查詢,時(shí)態(tài)連接在時(shí)態(tài)操作中是一種較為復(fù)雜的操作,同時(shí)涉及到時(shí)間維度和對(duì)象維度的連接操作,如查詢“在相同時(shí)間點(diǎn)中,兩個(gè)銀行賬戶數(shù)據(jù)表中存款相同的客戶存款額度”。上述時(shí)態(tài)連接可以劃分成兩個(gè)階段,第一個(gè)階段利用時(shí)態(tài)索引以及checkpoint得到每個(gè)時(shí)間版本下兩個(gè)數(shù)據(jù)集各分區(qū)的有效記錄集合;第二個(gè)階段是在相同查詢點(diǎn)的有效記錄集合中,根據(jù)查詢條件以一定的連接規(guī)則對(duì)兩個(gè)數(shù)據(jù)集的有效記錄進(jìn)行連接以得到查詢結(jié)果。通過改進(jìn)checkpoint結(jié)構(gòu)雖然加快了第一階段獲取當(dāng)前時(shí)間版本下有效記錄的速度,但第二階段低效的兩兩條件匹配付出了O(n2)的時(shí)間開銷,盡管將索引應(yīng)用在Spark集群,采用分布式的計(jì)算方法加快了匹配速度,但是存在集群中節(jié)點(diǎn)相互通信、數(shù)據(jù)拉取等問題,Spark集群上的時(shí)態(tài)連接速度并不理想。
針對(duì)于時(shí)態(tài)連接過程中出現(xiàn)的低效的兩兩匹配以及大量p的數(shù)據(jù)通信問題,本文采用面向?qū)ο缶S度的輔助索引來優(yōu)化時(shí)態(tài)連接效率。如圖4所示,在通過第一階段獲取到有效記錄后,在時(shí)態(tài)連接的第二階段,各個(gè)數(shù)據(jù)節(jié)點(diǎn)不再進(jìn)行數(shù)據(jù)廣播,而是采用將全部有效數(shù)據(jù)進(jìn)行混洗重分區(qū),以各個(gè)記錄的對(duì)象維度字段為key值進(jìn)行hash混洗重分區(qū),因此新的數(shù)據(jù)分區(qū)具有封閉性。即將連接的記錄限制在本地分區(qū),不用再進(jìn)行數(shù)據(jù)通信來進(jìn)行連接,這樣的輔助索引結(jié)構(gòu)不僅減少了數(shù)據(jù)通信引起的極大開銷,而且縮減連接次數(shù),提高了時(shí)態(tài)連接效率。具體的連接算法如算法1所示。
圖4 優(yōu)化后的時(shí)態(tài)連接查詢過程
算法1時(shí)態(tài)連接
輸入:versionIDRange時(shí)間版本范圍,joinCondition連接條件;
輸出:Result查詢結(jié)果集;
1.for versionID_i in versionIDRange
2.RA,RA←temporalTravel(versionID_i);
/*時(shí)間旅行獲取數(shù)據(jù)表的有效記錄*/
3.Repartition(RA) and Repartition(RB);
/*以對(duì)象維度對(duì)RA和RB重分區(qū)*/
4.for partitionA in RA
5.for partition in RB
6.if partitionA.partitionNum==partition.partitionNum
7.Result←Join(partitionA,partitonB);
/*擁有分區(qū)號(hào)相同的RDD進(jìn)行連接*/
8.end if
9.end for
10.end for
11.end for
3.3 跨段查詢
Spark集群的一切操作都是基于RDD的。在根據(jù)數(shù)據(jù)集構(gòu)建索引時(shí),RDD需要將所有數(shù)據(jù)都讀進(jìn)內(nèi)存進(jìn)行迭代計(jì)算,由于集群節(jié)點(diǎn)可以申請(qǐng)到的內(nèi)存是有限的,當(dāng)分區(qū)數(shù)據(jù)的計(jì)算量到達(dá)節(jié)點(diǎn)上限時(shí)就會(huì)內(nèi)存溢出導(dǎo)致計(jì)算失敗,為了避免這種情況,我們采用跨段的查詢方式。
跨段查詢就是對(duì)大數(shù)據(jù)進(jìn)行分段處理,各個(gè)分段獨(dú)立構(gòu)建索引,將分段數(shù)據(jù)集及索引打包存儲(chǔ)至HDFS??紤]到時(shí)態(tài)數(shù)據(jù)具有時(shí)效性,每一個(gè)數(shù)據(jù)集同樣有一個(gè)時(shí)效性,數(shù)據(jù)集中記錄最早的有效時(shí)間以及最晚的失效時(shí)間即為該數(shù)據(jù)集的時(shí)間跨度。將時(shí)間跨度的相關(guān)信息設(shè)計(jì)到索引結(jié)構(gòu)中將有效地加快查詢速度,避免遍歷過多的時(shí)間相錯(cuò)的數(shù)據(jù)集。因此,本文設(shè)計(jì)雙hash結(jié)構(gòu)來索引HDFS中的相關(guān)的索引文件,當(dāng)執(zhí)行時(shí)態(tài)操作時(shí),由全局索引調(diào)配符合時(shí)間跨度的數(shù)據(jù)集及索引至Spark集群進(jìn)行后續(xù)操作。
如圖5所示,全局索引包括索引表和索引矩陣兩部分,索引表保存各個(gè)索引結(jié)構(gòu)的存儲(chǔ)號(hào)、時(shí)間跨度,以及HDFS中的存儲(chǔ)地址。索引矩陣將各個(gè)索引結(jié)構(gòu)的存儲(chǔ)號(hào)保存到對(duì)應(yīng)時(shí)間跨度的塊中,索引矩陣的橫縱坐標(biāo)為索引文件跨度的對(duì)應(yīng)的hash值,橫坐標(biāo)為開始時(shí)間,縱坐標(biāo)為失效時(shí)間。
圖5 全局索引
當(dāng)進(jìn)行時(shí)態(tài)操作時(shí),先訪問全局索引的索引矩陣,根據(jù)時(shí)態(tài)操作的時(shí)間字段查詢矩陣的對(duì)應(yīng)區(qū)域,獲取到與操作時(shí)間有交錯(cuò)的索引的存儲(chǔ)號(hào)。再根據(jù)存儲(chǔ)號(hào)在索引表中查詢到其數(shù)據(jù)及索引在HDFS中的儲(chǔ)存地址,Spark集群逐一加載涉及到的數(shù)據(jù)集和索引至內(nèi)存中執(zhí)行時(shí)態(tài)操作,最后將結(jié)果集進(jìn)行合并即為查詢結(jié)果。如“查詢時(shí)間為2 005至3 078各個(gè)時(shí)間點(diǎn)的數(shù)據(jù)庫信息”,索引矩陣以1 000個(gè)時(shí)間版本為步長,邊長6為例,則計(jì)算其開始結(jié)束的hash值為2和3,根據(jù)hash值訪問索引矩陣的對(duì)應(yīng)區(qū)域(即藍(lán)色陰影標(biāo)注區(qū)域)得到有時(shí)間交錯(cuò)索引的存儲(chǔ)號(hào),即可檢索對(duì)應(yīng)索引表找到其HDFS地址。
跨段查詢算法如算法2所示,其中g(shù)etHashCode(versionID)為計(jì)算時(shí)間的哈希值;serachIndexMartix(hashCode,indexMartix)為掃描索引矩陣,獲取對(duì)應(yīng)位置的索引塊號(hào)。judgeTimeRegion(item,querytime,indexTable)為檢索索引表,獲取時(shí)間區(qū)域有重疊的索引塊號(hào)。之后調(diào)用readIndexFromHDFS(path)從HDFS中讀取時(shí)態(tài)索引,進(jìn)而進(jìn)行時(shí)間旅行temporalTravel(temporalIndex,versionID)得到查詢結(jié)果。
算法2跨段查詢
輸入:versionID時(shí)間版本,indexMartix索引矩陣,indexTable索引表;
輸出:Result查詢結(jié)果集;
1.hashCode←getHashCode(versionID);
2.I←searchIndexMartix(hashCode);
3.For otem in I
4.P←judgeTimeRegion(item,versionID,indexTable);
5.End for
6.For path in P
7.temporalIndex←readIndexFromHDFS(path);
8.Result←temporalTravel(temporalIndex,versionID);
9.End for
3.4 索引更新
時(shí)態(tài)數(shù)據(jù)是帶有時(shí)間屬性的,這種特性也預(yù)示著時(shí)態(tài)數(shù)據(jù)會(huì)隨著時(shí)間不斷地增加,集群需要在新數(shù)據(jù)上添加索引以加快訪問速度,并且集群無法將不斷增大的數(shù)據(jù)及索引都維護(hù)在Spark集群中,需要一個(gè)索引的更新管理策略來控制Spark讀取對(duì)應(yīng)時(shí)間點(diǎn)或時(shí)間段所涉及的索引結(jié)構(gòu)。時(shí)態(tài)數(shù)據(jù)的時(shí)效性特點(diǎn)導(dǎo)致了時(shí)態(tài)記錄存在時(shí)間跨度且這種時(shí)間跨度是無法預(yù)料的,集群無法預(yù)知同一個(gè)對(duì)象所產(chǎn)生的下條記錄的時(shí)間跨度長短。如果將新產(chǎn)生的時(shí)態(tài)記錄索引到原有的索引結(jié)構(gòu)中,需要在索引結(jié)構(gòu)中進(jìn)行頻繁的隊(duì)列移動(dòng),將新的索引節(jié)點(diǎn)插入到對(duì)應(yīng)位置,并且這種插入式的方法無法批量操作,索引只允許逐一的進(jìn)行插入,這種低效的更新策略是無法滿足當(dāng)今的查詢需要的。
由于將新的時(shí)態(tài)數(shù)據(jù)索引到原有索引結(jié)構(gòu)上這種索引更新策略所引起的時(shí)間開銷要大于直接重構(gòu)索引,本文擬采用滯后更新的索引更新策略。原有索引不去索引新的數(shù)據(jù),而是當(dāng)新的數(shù)據(jù)積累到一定量時(shí),對(duì)新數(shù)據(jù)單獨(dú)構(gòu)建索引結(jié)構(gòu)。
為了證明在Spark集群上構(gòu)建時(shí)態(tài)索引的高效性,本節(jié)進(jìn)行了以下幾個(gè)方面的實(shí)驗(yàn):(1) 在Spark集群環(huán)境下利用優(yōu)化的checkpoint和原始checkpoint性能對(duì)比;(2) 在Spark環(huán)境下時(shí)態(tài)連接的性能效率;(3) 在Spark環(huán)境下分段索引性能分析。
4.1 實(shí)驗(yàn)環(huán)境
本次實(shí)驗(yàn)中Spark部分試驗(yàn)是在是由5臺(tái)機(jī)器構(gòu)成的Spark on standalone集群系統(tǒng)上完成的,1個(gè)master節(jié)點(diǎn)和4個(gè)slave節(jié)點(diǎn),各節(jié)點(diǎn)CPU為Intel(R) Core(TM) i7-3770 CPU @ 3.40 GHz,6 GB內(nèi)存空間,采用CentOS 6.8系統(tǒng),Scala版本為2.11.8,Spark版本為2.0.0。
4.2 實(shí)驗(yàn)數(shù)據(jù)集
本實(shí)驗(yàn)在TPC-H基準(zhǔn)數(shù)據(jù)集的基礎(chǔ)上擴(kuò)展時(shí)態(tài)屬性,并通過TPC-C事務(wù)生成基準(zhǔn)數(shù)據(jù)集的歷史數(shù)據(jù)[15]。本實(shí)驗(yàn)采用數(shù)據(jù)集如表2所示。
表2 實(shí)驗(yàn)數(shù)據(jù)集
其中,SF_0為TPC-H生成數(shù)據(jù)的比例因子;SF_H為TPC-C生成數(shù)據(jù)的比例因子,即數(shù)據(jù)集中時(shí)間版本的規(guī)模;lineitem表示數(shù)據(jù)集的總行數(shù),version表示時(shí)間版本規(guī)模。
4.3 實(shí)驗(yàn)結(jié)果與分析
4.3.1 checkpoint優(yōu)化前后性能對(duì)比
為了規(guī)避時(shí)態(tài)操作時(shí)分段查詢干擾checkpoint性能判斷的問題,本實(shí)驗(yàn)在小型數(shù)據(jù)集基礎(chǔ)上對(duì)比checkpoint優(yōu)化前后的性能變化,確保數(shù)據(jù)在一個(gè)數(shù)據(jù)段中。實(shí)驗(yàn)中查詢給定versionID的數(shù)據(jù)庫有效記錄。實(shí)驗(yàn)結(jié)果如圖6所示,圖中橫坐標(biāo)為查詢的versionID。
圖6 checkpoint優(yōu)化性能對(duì)比
圖中查詢時(shí)間隨著查詢時(shí)間版本的增長出現(xiàn)折現(xiàn)波動(dòng),這是因?yàn)閏heckpoint保存了對(duì)應(yīng)時(shí)間版本下的數(shù)據(jù)庫狀態(tài),無需遍歷索引,而兩個(gè)checkpoint中的時(shí)間版本數(shù)據(jù)庫狀態(tài)需要遍歷時(shí)間線索引的對(duì)應(yīng)區(qū)間,所以在checkpoint時(shí)間點(diǎn)上查詢時(shí)間突然下降,查詢兩個(gè)checkpoint間的時(shí)間版本數(shù)據(jù)庫狀態(tài)查詢時(shí)間是線性增長的。實(shí)驗(yàn)結(jié)果表明,優(yōu)化后的checkpoint相對(duì)于原始checkpoint結(jié)構(gòu)在時(shí)態(tài)旅行上提升40%左右。由于優(yōu)化后的checkpoint將visible rows分段,并添加對(duì)應(yīng)的標(biāo)識(shí)位,減去了不必要的遍歷的時(shí)間,這對(duì)于查詢性能的提升是顯著的。并且當(dāng)數(shù)據(jù)集變大時(shí),checkpoint中需要保存的記錄號(hào)變多了,則利用checkpoint進(jìn)行時(shí)態(tài)操作時(shí)的遍歷量變大,而優(yōu)化的checkpoint規(guī)避了這種遍歷所引起的時(shí)間開銷,也就是說數(shù)據(jù)量越大時(shí),優(yōu)化的checkpoint對(duì)于性能的提升越明顯。
4.3.2 跨區(qū)查詢時(shí)間開銷
實(shí)驗(yàn)結(jié)果如圖7所示,優(yōu)化后連接算法的執(zhí)行效率要明顯由于原有連接算法。進(jìn)行連接優(yōu)化后,時(shí)間開銷隨連接記錄數(shù)增加呈線性增長,而未優(yōu)化的連接算法呈幾乎指數(shù)級(jí)的增長態(tài)勢。在連接量較少時(shí),原有連接算法略優(yōu)于優(yōu)化算法,這是因?yàn)閿?shù)據(jù)混洗引起的時(shí)間開銷高于優(yōu)化的時(shí)間代價(jià)。而連接的記錄數(shù)越多,優(yōu)化的連接算法優(yōu)勢越大,其原因主要在于未優(yōu)化連接操作低效地兩兩連接,而優(yōu)化后連接算法通過hash成功規(guī)避了低效連接操作,縮小了連接范圍。
圖7 跨區(qū)查詢性能對(duì)比
4.3.3 跨段查詢時(shí)間開銷
實(shí)驗(yàn)結(jié)果如圖8所示,在四次不同時(shí)間版本跨度的查詢中,有三次查詢的耗時(shí)在5秒左右,只有最后一次查詢的時(shí)間開銷為10秒,這是因?yàn)榍叭畏秶樵冎簧婕耙粋€(gè)數(shù)據(jù)段,最后一次范圍查詢涉及到了兩個(gè)數(shù)據(jù)段。Spark集群將時(shí)態(tài)數(shù)據(jù)分段構(gòu)建索引,在進(jìn)行時(shí)態(tài)操作時(shí),集群首先查詢?nèi)炙饕?,將時(shí)態(tài)區(qū)間與查詢時(shí)間版本有交叉的索引文件逐一從HDFS讀入Spark集群進(jìn)行查詢。同時(shí),由實(shí)驗(yàn)數(shù)據(jù)可知,在進(jìn)行范圍查詢時(shí)往往只涉及到一至兩個(gè)數(shù)據(jù)段,這是因?yàn)閿?shù)據(jù)本身的時(shí)態(tài)特性就決定了數(shù)據(jù)本身在時(shí)間維度上是聚集連續(xù)的,在進(jìn)行時(shí)態(tài)范圍查詢時(shí)只涉及到少數(shù)的數(shù)據(jù)段。因此,分段查詢?cè)诮鉀Q了數(shù)據(jù)量過大無法構(gòu)建索引問題的同時(shí),還保證了查詢的效率。
圖8 跨段查詢時(shí)間開銷
本文針對(duì)現(xiàn)有時(shí)態(tài)索引在數(shù)據(jù)量過大,普通硬件環(huán)境難以支持的問題,基于時(shí)態(tài)數(shù)據(jù)的特性,提出基于Spark的分布式時(shí)態(tài)索引方法。該方法在Spark集群上按數(shù)據(jù)分區(qū)單獨(dú)構(gòu)建時(shí)態(tài)索引,在此基礎(chǔ)上優(yōu)化輔助索引checkpoint結(jié)構(gòu),設(shè)計(jì)面向?qū)ο缶S度的輔助索引以及全局雙hash結(jié)構(gòu),全面高效地支持各種時(shí)態(tài)查詢操作。最后,通過基準(zhǔn)數(shù)據(jù)集上的實(shí)驗(yàn)驗(yàn)證了所提方法的有效性。目前本文所實(shí)現(xiàn)的Spark時(shí)間索引是在離線數(shù)據(jù)的基礎(chǔ)上,在接下來的研究中,我們準(zhǔn)備實(shí)現(xiàn)實(shí)時(shí)監(jiān)控的流式構(gòu)建時(shí)態(tài)索引系統(tǒng),并進(jìn)一步優(yōu)化在跨區(qū)查詢時(shí)面對(duì)大量混洗數(shù)據(jù)的查詢算法。
參 考 文 獻(xiàn)
[1] Yong Tang.TimeDB-A Temporal Relational DBMS[OL].[2015-03-05].http://www.timeconsult.com/software/software.html, 1999.
[2] Saracco C M,Nicola M,Gandhi L.A matter of time:Temporal data management in DB2 10[R].Technical report,IBM,2012.
[3] Kaufmann M,Fischer P M,May N,et al.TPC-BiH:A Benchmark for Bitemporal Databases[M]//Performance Characterization and Benchmarking.Springer International Publishing,2013:16-31.
[4] Elmasri R,Wuu G T J,Kim Y J.The time index:An access structure for temporal data[C]//Proceedings of the 16th International Conference on Very Large Data Bases.San Francisco,CA:Morgan Kaufmann Publishers,1990:1-12.
[5] Tao Y,Papadias D.Efficient historical R-trees[C]//Scientific and Statistical Database Management,2001.SSDBM 2001.Proceedings.Thirteenth International Conference on.IEEE,2001:223-232.
[6] Zhang F,Wang X,Ma S.Temporal XML Indexing Based on Suffix Tree[C]//Acis International Conference on Software Engineering Research,Management and Applications.IEEE,2009:140-144.
[7] Kaufmann M,Manjili A A,Vagenas P,et al.Timeline index:a unified data structure for processing queries on temporal data in SAP HANA[C]//ACM SIGMOD International Conference on Management of Data.ACM,2013:1173-1184.
[8] Kaufmann M,Fischer P M,May N,et al.Bi-temporal Timeline Index:A data structure for Processing Queries on bi-temporal data[C]//IEEE,International Conference on Data Engineering.IEEE,2015:471-482.
[9] Zaharia M,Chowdhury M,Franklin M J,et al.Spark:cluster computing with working sets[C]//Usenix Conference on Hot Topics in Cloud Computing.USENIX Association,2010:10-10.
[10] Becker B,Gschwind S,Ohler T,et al.An asymptotically optimal multiversion B-tree[J].The VLDB Journal,1996,5(4):264-275.
[11] Tao Y,Papadias D.MV3R-Tree:A Spatio-Temporal Access Method for Timestamp and Interval Queries[C]//VLDB 2001,Proceedings of,International Conference on Very Large Data Bases,September 11-14,2001,Roma,Italy.DBLP,2001:431-440.
[12] Liao H,Han J,Fang J.Multi-dimensional Index on Hadoop Distributed File System[C]//International Conference on Networking,Architecture,and Storage,Nas 2010,Macau,China,July.2010:240-249.
[13] Gankidi V R,Teletia N,Patel J M,et al.Indexing HDFS data in PDW[J].Proceedings of the VLDB Endowment,2014,7(13):1520-1528.
[14] Xie D,Li F,Yao B,et al.Simba:Efficient In-Memory Spatial Analytics[C]//International Conference on Management of Data.ACM,2016:1071-1085.
[15] Funke F,Kemper A,Krompass S,et al.Metrics for measuring the performance of the mixed workload CH-benCHmark[C]//Tpc Technology Conference on Topics in PERFORMANCE Evaluation,Measurement and Characterization.Springer-Verlag,2011:10-30.