杜華明 ,張 鵬 ,徐克付 ,譚建龍 ,李 焱
(1.中國(guó)科學(xué)技術(shù)大學(xué)軟件學(xué)院 合肥 230051;2.中國(guó)科學(xué)院信息工程研究所 北京 100093;3.信息內(nèi)容安全技術(shù)國(guó)家工程實(shí)驗(yàn)室 北京 100093;4.國(guó)家計(jì)算機(jī)網(wǎng)絡(luò)應(yīng)急技術(shù)處理協(xié)調(diào)中心 北京 100029)
隨著云計(jì)算、物聯(lián)網(wǎng)等技術(shù)的興起,數(shù)據(jù)正以前所未有的速度不斷增長(zhǎng)和積累,大數(shù)據(jù)時(shí)代已經(jīng)到來(lái),其中典型的3個(gè)特點(diǎn)就是:規(guī)模性、多樣性和高速性[1]。同時(shí),大數(shù)據(jù)主要的處理模式包括批處理和流處理兩種[2]。批處理是先存儲(chǔ)后處理,而流處理則是直接處理。流處理的基本理念是數(shù)據(jù)的價(jià)值會(huì)隨著時(shí)間的流逝而不斷減少,因此盡可能快地對(duì)最新的數(shù)據(jù)做出處理并且給出結(jié)果是所有流處理的共同目標(biāo)。流處理系統(tǒng)從處理模型上可以分為集中式、分布式以及并行分布式。然而,無(wú)論哪種處理模型,提高流處理的可靠性都是其中的熱點(diǎn)和難點(diǎn)。故障容錯(cuò)作為提高系統(tǒng)可靠性的一個(gè)方面已經(jīng)被廣泛研究,并且出現(xiàn)了很多成熟的技術(shù)。總的來(lái)說(shuō),一個(gè)故障容錯(cuò)協(xié)議必須包含兩個(gè)部分:一是節(jié)點(diǎn)的故障檢測(cè)和替換;二是故障節(jié)點(diǎn)丟失狀態(tài)的恢復(fù)。其中,檢測(cè)和替換是被動(dòng)行為,而丟失狀態(tài)的恢復(fù)則是一種需要持續(xù)保存可能丟失狀態(tài)信息的主動(dòng)行為。在參考文獻(xiàn)[3]中,給出了一個(gè)查詢(xún)算子的故障容錯(cuò)的3種技術(shù):主動(dòng)備份技術(shù)、被動(dòng)備份技術(shù)和上游備份技術(shù)。這3種技術(shù)的主要區(qū)別在于出現(xiàn)故障時(shí)保存可能丟失狀態(tài)信息的方式(也就是查詢(xún)算子的狀態(tài))。
主動(dòng)備份技術(shù)(或者主動(dòng)復(fù)制)通過(guò)一個(gè)算子的備份節(jié)點(diǎn)提供算子的故障容錯(cuò),其中這個(gè)備份節(jié)點(diǎn)中的算子處理與主節(jié)點(diǎn)的算子相同的元組。也就是說(shuō),當(dāng)主節(jié)點(diǎn)的算子出現(xiàn)故障時(shí),可以使用這個(gè)備份節(jié)點(diǎn)的算子替換它。這種容錯(cuò)技術(shù)會(huì)帶來(lái)較高的開(kāi)銷(xiāo),其中主要的開(kāi)銷(xiāo)是保存副本的空間開(kāi)銷(xiāo),因?yàn)樗鼈冊(cè)跀?shù)據(jù)處理的大部分時(shí)間中并沒(méi)有被利用。此外,元組必須發(fā)送到多個(gè)節(jié)點(diǎn),這也會(huì)帶來(lái)額外的時(shí)間開(kāi)銷(xiāo)。最后,備份節(jié)點(diǎn)的算子必須和主節(jié)點(diǎn)的算子保持相同的元組處理順序,這會(huì)產(chǎn)生額外的時(shí)間開(kāi)銷(xiāo)。此外,當(dāng)主節(jié)點(diǎn)出現(xiàn)故障時(shí),主動(dòng)備份技術(shù)需要把主節(jié)點(diǎn)的輸出流切換到備份節(jié)點(diǎn),因此故障恢復(fù)的時(shí)間也較長(zhǎng)。
被動(dòng)備份技術(shù)把屬于要備份節(jié)點(diǎn)的算子狀態(tài)周期性地復(fù)制到備份節(jié)點(diǎn)上。復(fù)制可以不斷地在備份節(jié)點(diǎn)上或者在專(zhuān)用的節(jié)點(diǎn)上進(jìn)行,當(dāng)主節(jié)點(diǎn)出現(xiàn)故障時(shí),這些備份被安裝到替換的備份節(jié)點(diǎn)上。一個(gè)算子的周期性的復(fù)制被稱(chēng)為校驗(yàn)。與主動(dòng)備份相比,周期性的校驗(yàn)減少了主節(jié)點(diǎn)和備份節(jié)點(diǎn)之間需要發(fā)送的元組個(gè)數(shù),所以被動(dòng)備份的時(shí)間開(kāi)銷(xiāo)較少。另一方面,由于在最后一個(gè)校驗(yàn)點(diǎn)和出現(xiàn)故障的這段時(shí)間內(nèi)發(fā)送到主節(jié)點(diǎn)的所有元組都沒(méi)有在備份節(jié)點(diǎn)中被維護(hù),所以這些元組需要重新發(fā)送到備份節(jié)點(diǎn)上,因此導(dǎo)致被動(dòng)備份的故障恢復(fù)時(shí)間較長(zhǎng)。
上游備份是一種不同的備份機(jī)制,它不需要使用任何備份節(jié)點(diǎn),只依賴(lài)上游節(jié)點(diǎn)和下游節(jié)點(diǎn)。上游節(jié)點(diǎn)定義了一個(gè)協(xié)議用于維護(hù)其所輸出元組的狀態(tài),直到下游節(jié)點(diǎn)確認(rèn)這些元組可以被刪除。上游備份的核心思想是當(dāng)主節(jié)點(diǎn)出現(xiàn)故障時(shí),上游節(jié)點(diǎn)把所有在輸出隊(duì)列并且還沒(méi)有被下游節(jié)點(diǎn)確認(rèn)的元組重新發(fā)送到替換的節(jié)點(diǎn)上。上游備份的唯一開(kāi)銷(xiāo)就是維護(hù)上游節(jié)點(diǎn)發(fā)送的元組的空間開(kāi)銷(xiāo)。然而,由于主節(jié)點(diǎn)狀態(tài)需要重建并且再次分別處理每個(gè)元組,因此上游備份的故障恢復(fù)時(shí)間會(huì)較長(zhǎng)(恢復(fù)時(shí)間取決于主節(jié)點(diǎn)狀態(tài)的恢復(fù)時(shí)間)。
對(duì)于數(shù)據(jù)流處理系統(tǒng),故障容錯(cuò)仍然是其中的重點(diǎn)和難點(diǎn)。在工業(yè)界中,S4和Storm是當(dāng)前流行的數(shù)據(jù)流處理系統(tǒng),其中S4使用Zookeeper來(lái)協(xié)同集群中的任務(wù)分配,集群中活躍(active)節(jié)點(diǎn)被分配具體的任務(wù),而空閑(idle)節(jié)點(diǎn)放在池中以在故障容錯(cuò)或者負(fù)載均衡時(shí)使用。特別地,一個(gè)空閑節(jié)點(diǎn)可以注冊(cè)成為分配不同任務(wù)的多個(gè)活躍節(jié)點(diǎn)的備份節(jié)點(diǎn)。對(duì)于S4在運(yùn)行一段時(shí)間后可能出現(xiàn)失敗、基礎(chǔ)設(shè)施更新、調(diào)度重新分配和應(yīng)用更新等情況,參考文獻(xiàn)[4]中提出了相應(yīng)的策略:高可用策略、基于檢查點(diǎn)的狀態(tài)恢復(fù)策略和低時(shí)延的處理策略。對(duì)于Storm,參考文獻(xiàn)[5]中介紹的容錯(cuò)技術(shù)僅是將元組不停地重發(fā),以保證每個(gè)元組至少得到一次完整的處理。
在學(xué)術(shù)界中,參考文獻(xiàn)[6]認(rèn)為現(xiàn)有的故障容錯(cuò)技術(shù)(無(wú)論被動(dòng)備份還是主動(dòng)備份)都會(huì)在運(yùn)行時(shí)增加一些時(shí)間開(kāi)銷(xiāo)。該文的一個(gè)主要工作是提出了一個(gè)預(yù)測(cè)模型,該模型會(huì)在輸入元組處理失敗時(shí)啟動(dòng)故障容錯(cuò)機(jī)制。這個(gè)模型需要分流器來(lái)持續(xù)監(jiān)控一個(gè)節(jié)點(diǎn)狀態(tài),并且把它們標(biāo)記為正常(normal)、警告(alert)和失敗(failure)。當(dāng)一個(gè)節(jié)點(diǎn)的狀態(tài)從正常變?yōu)榫鏁r(shí),分流器會(huì)啟動(dòng)故障容錯(cuò)機(jī)制。為了構(gòu)建分類(lèi)器,這個(gè)模型定義了一段訓(xùn)練時(shí)間以預(yù)測(cè)未來(lái)可能出現(xiàn)的故障。當(dāng)預(yù)測(cè)模型發(fā)現(xiàn)一個(gè)可能在短期內(nèi)發(fā)生的故障時(shí),它首先把故障節(jié)點(diǎn)的算子遷移到一個(gè)專(zhuān)用的節(jié)點(diǎn)上以減少故障和算子替換所造成的影響。當(dāng)節(jié)點(diǎn)狀態(tài)是警告時(shí),為了收集用來(lái)預(yù)測(cè)相同類(lèi)型算子未來(lái)出現(xiàn)故障的信息,可能出現(xiàn)故障的算子的監(jiān)控力度會(huì)增加。然而,該工作的不足在于實(shí)際中能夠被預(yù)測(cè)的故障類(lèi)型是很少的。不僅如此,該文提出的故障容錯(cuò)技術(shù)只有在預(yù)測(cè)模型所需的計(jì)算資源沒(méi)有超過(guò)其他故障容錯(cuò)技術(shù)所需要的計(jì)算資源時(shí)才有效。
[7]和參考文獻(xiàn)[8]中,關(guān)注非確定性算子的故障容錯(cuò)。在數(shù)據(jù)流中,算子執(zhí)行的非確定性可以通過(guò)對(duì)輸入流元組到達(dá)次序敏感的函數(shù)或者依賴(lài)時(shí)間的函數(shù)來(lái)定義。如作者所述,用來(lái)提供這種算子故障容錯(cuò)的維護(hù)信息包括到元組到達(dá)次序信息和依賴(lài)元組到達(dá)次序敏感的函數(shù)信息。該文作者也研究了如何有效維護(hù)副本,使其能夠克服嚴(yán)格同步副本中輸入元組的次序所帶來(lái)的限制,同時(shí)作者還研究了如何以多線(xiàn)程方式運(yùn)行副本。
在參考文獻(xiàn)[9]中,作者提出了一種混合主動(dòng)備份和被動(dòng)備份的容錯(cuò)協(xié)議,其中的核心思想是周期性地校驗(yàn)查詢(xún)算子的狀態(tài),這些狀態(tài)并不存放到專(zhuān)用的節(jié)點(diǎn),而是存放到這個(gè)算子的空閑副本上。也就是說(shuō),算子的一個(gè)副本正在被其他節(jié)點(diǎn)所維護(hù),這個(gè)副本不用接收主節(jié)點(diǎn)所處理的相同元組,它的狀態(tài)只需要通過(guò)增量的校驗(yàn)點(diǎn)來(lái)持續(xù)地更新。當(dāng)主節(jié)點(diǎn)出現(xiàn)臨時(shí)故障時(shí),算子的副本被啟用并且開(kāi)始處理和主節(jié)點(diǎn)所處理的相同元組。如果主節(jié)點(diǎn)出現(xiàn)永久故障時(shí),那么這個(gè)查詢(xún)會(huì)得到指示,開(kāi)始只向算子的副本發(fā)送元組并且開(kāi)始使用它的輸出元組。
參考文獻(xiàn)[10]在前期工作[11]的基礎(chǔ)上提出了一種基于異步校驗(yàn)機(jī)制(類(lèi)似于模糊校驗(yàn))的容錯(cuò)協(xié)議,該協(xié)議沒(méi)有對(duì)算子狀態(tài)和它的輸出隊(duì)列進(jìn)行校驗(yàn),而是讓所有輸出元組都包含窗口校驗(yàn)元組(專(zhuān)門(mén)用來(lái)描述窗口中間狀態(tài)的元組),并且只有輸出的數(shù)據(jù)流被持久化。當(dāng)節(jié)點(diǎn)出現(xiàn)故障時(shí),通過(guò)讀取該節(jié)點(diǎn)的算子輸出隊(duì)列來(lái)查找最近的窗口校驗(yàn)信息,以計(jì)算出從哪個(gè)位置重新發(fā)送數(shù)據(jù)流的元組。如作者討論的一樣,由于這種校驗(yàn)方式在重建故障節(jié)點(diǎn)的算子狀態(tài)時(shí)減少了重新發(fā)送元組的數(shù)量,所以可以減少故障恢復(fù)的時(shí)間。
StreamCloud[12]在參考文獻(xiàn)[10]的基礎(chǔ)上對(duì)故障容錯(cuò)以下幾個(gè)方面的改進(jìn):
·出現(xiàn)故障時(shí)重新發(fā)送元組的時(shí)間點(diǎn)的信息earliest timestamp只包含在輸出元組的頭部,減少了存儲(chǔ)開(kāi)銷(xiāo);
·earliest timestamp是在線(xiàn)維護(hù)的,因此避免了在并行文件系統(tǒng)中查找數(shù)據(jù)的不必要的讀取操作;
·數(shù)據(jù)流持久化中通過(guò)采用一種對(duì)持久化信息自識(shí)別的命名方式避免了元數(shù)據(jù)的維護(hù)[9],減少了故障容錯(cuò)過(guò)程對(duì)運(yùn)行時(shí)的影響;
·基于earliest timestamp的容錯(cuò)協(xié)議可以對(duì)重新部署期間發(fā)生的故障進(jìn)行容錯(cuò)。
然而,上述技術(shù)如果把故障粒度定為數(shù)據(jù)流中的每個(gè)元組,當(dāng)元組數(shù)量很多時(shí),跟蹤這些元組是否已經(jīng)被處理的內(nèi)存開(kāi)銷(xiāo)會(huì)很大。因此需要一種既能節(jié)約內(nèi)存又能夠保證需要處理的每個(gè)元組都被處理的可靠方案。為此本文提出了一種既能夠保證元組得到可靠處理又能夠節(jié)省內(nèi)存開(kāi)銷(xiāo)的元組跟蹤方法。該方法包括內(nèi)存分配策略、元組跟蹤單元選擇策略和校驗(yàn)值更新策略,這3個(gè)策略通過(guò)只保留元組標(biāo)識(shí)符的異或校驗(yàn)值而不是元組來(lái)減少內(nèi)存開(kāi)銷(xiāo),同時(shí)通過(guò)改進(jìn)一致性散列來(lái)實(shí)現(xiàn)元組跟蹤單元的負(fù)載均衡。
流是由具有相同數(shù)據(jù)模式且無(wú)界限的元組序列組成的。與傳統(tǒng)的數(shù)據(jù)庫(kù)系統(tǒng)不同,數(shù)據(jù)流處理系統(tǒng)處理的數(shù)據(jù)是沒(méi)有經(jīng)過(guò)持久化的流數(shù)據(jù),當(dāng)且僅當(dāng)滿(mǎn)足查詢(xún)條件時(shí),數(shù)據(jù)流處理系統(tǒng)才會(huì)將查詢(xún)結(jié)果返回給用戶(hù),所以該查詢(xún)又可以稱(chēng)作連續(xù)查詢(xún)。一個(gè)查詢(xún)可以定義為一個(gè)有向無(wú)環(huán)圖,并且圖中每個(gè)節(jié)點(diǎn)都是一個(gè)操作算子,圖中每條邊可以表示的是數(shù)據(jù)流向。本文把查詢(xún)的起點(diǎn)稱(chēng)為元組生成器(spring),處理單元稱(chēng)為元組處理器(processor)。元組生成器可以產(chǎn)生并發(fā)送數(shù)據(jù)流,其中的數(shù)據(jù)流中的元組被稱(chēng)為根元組。根元組的狀態(tài)分為正在處理狀態(tài)(pending)、處理失敗狀態(tài)(failure)和處理成功狀態(tài)(finished)3種。元組處理器接收數(shù)據(jù)流并且處理數(shù)據(jù)流,處理后的數(shù)據(jù)流也可以發(fā)送給其他的元組處理器。元組生成器和元組處理器在元組的處理過(guò)程中會(huì)啟動(dòng)多個(gè)任務(wù)線(xiàn)程來(lái)并行處理元組。
元組生成器處理根元組后會(huì)產(chǎn)生多個(gè)元組,這些元組經(jīng)過(guò)元組處理器處理后可能繼續(xù)產(chǎn)生新的元組,直到元組處理器不再產(chǎn)生新元組為止,所有元組所形成的一個(gè)樹(shù)狀結(jié)構(gòu)被稱(chēng)為元組樹(shù)。元組樹(shù)的根元組用springId唯一標(biāo)識(shí),其他元組用tupleId唯一標(biāo)識(shí)。在元組樹(shù)形成的過(guò)程中,元組生成器和元組處理器會(huì)不斷地向元組跟蹤器發(fā)送消息,元組跟蹤器根據(jù)發(fā)送來(lái)的消息構(gòu)造跟蹤記錄,并將跟蹤記錄存儲(chǔ)在元組跟蹤單元中(元組跟蹤單元的選擇策略在第2.2節(jié)介紹)。元組跟蹤單元(acker)是一個(gè)跟蹤元組處理過(guò)程的進(jìn)程。跟蹤記錄是一個(gè)三元組,表示為
圖1 元組跟蹤器的交互過(guò)程
在元組跟蹤器啟動(dòng)階段,首先是初始化元組跟蹤單元的數(shù)量,元組跟蹤單元的數(shù)量可以從配置文件中讀取,也可以在元組跟蹤器運(yùn)行狀態(tài)下,通過(guò)對(duì)外接口來(lái)修改元組跟蹤單元數(shù)量。然后查看是否能夠產(chǎn)生元組跟蹤單元。如果能夠正常產(chǎn)生元組跟蹤單元,元組跟蹤器開(kāi)始接收元組生成器發(fā)送的springId和taskId,以進(jìn)入運(yùn)行階段。否則,元組跟蹤器再次讀取配置文件重新生成相應(yīng)數(shù)量的元組跟蹤單元。
元組跟蹤器的運(yùn)行階段可以細(xì)劃為內(nèi)存分配、元組跟蹤單元選擇和校驗(yàn)值更新3個(gè)子階段。如圖1所示,當(dāng)元組跟蹤器接收到元組生成器發(fā)來(lái)的springId和taskId后,首先進(jìn)入內(nèi)存分配階段,元組跟蹤器為每個(gè)元組分配大約20 byte的內(nèi)存空間來(lái)構(gòu)造跟蹤記錄;然后進(jìn)入元組跟蹤單元選擇階段,通過(guò)元組跟蹤單元選擇策略將跟蹤記錄存儲(chǔ)在不同的元組跟蹤單元上;最后進(jìn)入校驗(yàn)值更新階段,元組生成器(元組處理器)會(huì)不斷地向元組跟蹤器發(fā)送消息(已處理的元組ID和新產(chǎn)生的元組ID),元組跟蹤器利用元組生成器(元組處理器)發(fā)來(lái)的消息不斷地更新跟蹤記錄中的校驗(yàn)值,更新策略在第3.3節(jié)介紹,當(dāng)校驗(yàn)值為0時(shí),說(shuō)明跟蹤記錄所跟蹤的元組已經(jīng)得到了完整的處理。此時(shí),元組跟蹤器將會(huì)通知元組生成器相應(yīng)的任務(wù)線(xiàn)程,任務(wù)線(xiàn)程將對(duì)應(yīng)的元組的狀態(tài)修改為已完成(finished)。最后,元組生成器會(huì)將狀態(tài)為已完成的元組從內(nèi)存中移除。當(dāng)校驗(yàn)值不為0時(shí),任務(wù)線(xiàn)程將對(duì)應(yīng)的元組的狀態(tài)修改為失?。╢ailure),最后,元組生成器會(huì)將狀態(tài)為失敗的元組重新發(fā)送。
元組跟蹤器和數(shù)據(jù)流處理引擎之間是松耦合關(guān)系,它可以獨(dú)立于數(shù)據(jù)流處理引擎運(yùn)行,當(dāng)不需要元組跟蹤器時(shí),用戶(hù)可以通過(guò)命令行來(lái)終止元組跟蹤器所對(duì)應(yīng)的進(jìn)程。
為了實(shí)現(xiàn)節(jié)省內(nèi)存、負(fù)載均衡和可靠的元組處理,元組跟蹤器采用的技術(shù)主要涉及內(nèi)存分配、元組跟蹤單元選擇和校驗(yàn)值更新,下面具體介紹這3個(gè)策略。
大數(shù)據(jù)具有規(guī)模大并且速度快的特點(diǎn),要保證從元組生成器產(chǎn)生的每個(gè)根元組都得到至少一次完整的處理,則需要對(duì)每個(gè)根元組所形成的元組樹(shù)中的每個(gè)元組進(jìn)行跟蹤,以確定元組樹(shù)中的所有元組是否都得到完整的處理。但是,如果元組樹(shù)中含有成千上萬(wàn)個(gè)節(jié)點(diǎn),對(duì)元組樹(shù)的跟蹤所占用的內(nèi)存會(huì)隨著元組樹(shù)中節(jié)點(diǎn)數(shù)的增加而呈現(xiàn)指數(shù)級(jí)增長(zhǎng),這樣會(huì)導(dǎo)致內(nèi)存溢出。針對(duì)跟蹤元組樹(shù)的內(nèi)存開(kāi)銷(xiāo)問(wèn)題,本文提出一種節(jié)約內(nèi)存的方法,該方法只保留元組標(biāo)識(shí)符的異或校驗(yàn)值而不是元組。其中,元組生成器接收根元組后,會(huì)向元組跟蹤器發(fā)送springId和taskId,然后,元組跟蹤器利用springId、taskId以及checkValue構(gòu)造跟蹤記錄。當(dāng)這個(gè)根元組得到完整處理時(shí),元組跟蹤器會(huì)通知taskId對(duì)應(yīng)的任務(wù)將根元組從內(nèi)存中移除,否則,元組跟蹤器會(huì)通知taskId對(duì)應(yīng)的任務(wù)重新發(fā)送該根元組。
由于元組生成器產(chǎn)生元組的數(shù)量多,如果僅使用一個(gè)元組跟蹤單元來(lái)跟蹤元組生成器產(chǎn)生的所有根元組,那么元組跟蹤單元的負(fù)載會(huì)很高。因此,元組跟蹤器需要使用多個(gè)元組跟蹤單元來(lái)跟蹤元組生成器產(chǎn)生的根元組。為了使各個(gè)元組跟蹤單元跟蹤元組的數(shù)量盡量均衡,需要一個(gè)將元組的跟蹤記錄分配到不同的元組跟蹤單元的分配策略,使得各個(gè)元組跟蹤單元負(fù)載相對(duì)均衡,這樣不僅可以減小單個(gè)元組跟蹤單元的負(fù)載壓力,同時(shí)也可以提高整體性能。如果某個(gè)元組跟蹤單元異常終止,元組跟蹤器會(huì)將該元組跟蹤單元的跟蹤記錄分配給其他的元組跟蹤單元。
本文中,元組跟蹤單元的選擇采用了改進(jìn)的一致性散列策略,其中的原理如下:元組跟蹤器使用散列函數(shù)將元組生成器產(chǎn)生的根元組ID映射到環(huán)上的某一個(gè)值,環(huán)是一個(gè)由0~(232-1)的數(shù)值組成的空間。然后將一個(gè)元組跟蹤單元及其副本分別映射到環(huán)上的某一個(gè)值,每個(gè)元組跟蹤單元跟蹤逆時(shí)針?lè)较蛏吓c它距離最近的元組,這樣每個(gè)元組跟蹤單元所跟蹤的元組數(shù)量就會(huì)相對(duì)均衡。具體的分配步驟如下。
首先將元組映射到一個(gè)32 bit的key值,該映射首先初始化全局變量hash=0,然后將字符串中每個(gè)字符從右到左順序執(zhí)行如式(1)所示的計(jì)算式:
最后執(zhí)行如式(2)所示的計(jì)算式:
所得的key值對(duì)應(yīng)環(huán)中的某個(gè)值。
例如根元組、根元組2…根元組6和acker A、acker B,將這6個(gè)根元組和兩個(gè)acker的ID映射到環(huán)上,散列函數(shù)為key=hash(value),該函數(shù)封裝的是式(1)和式(2)的邏輯,value是springId或ackerId,然后將映射的結(jié)果封裝成location=
對(duì)于環(huán)中的每個(gè)根元組,從根元組的key值出發(fā),沿順時(shí)針?lè)较蛐D(zhuǎn)搜索,當(dāng)遇到第一個(gè)acker時(shí),將元組的location存儲(chǔ)在該acker上,因?yàn)閟pringId和ackerId的散列值是固定的,因此這個(gè)元組和acker的關(guān)系必然是唯一和確定的。理想的散列結(jié)果是將所有元組均勻分配到各acker中,采取的策略就是將一個(gè)ackerId映射到兩(N)個(gè)位置,這樣可以保證跟蹤記錄相對(duì)均勻地分配到各個(gè)acker中。此時(shí)value為acker_id#1,acker_id#2,…,acker_id#N,hash 值 key=hash(value),hash 值在環(huán)上的分布以及分配結(jié)果如圖2(b)所示。
在元組跟蹤器運(yùn)行過(guò)程中,由于某些原因?qū)е耡cker的數(shù)量減少,根據(jù)圖 2(a)和圖2(b)所描述的映射方法,這時(shí)受影響的將僅是沿acker B1和acker B2逆時(shí)針遍歷直到下一個(gè) acker(acker A2和 acker A1)之前的 location,也是本來(lái)映射到acker B上的那些location。那么,僅需要將根元組2對(duì)應(yīng)的location分配給acker A2,根元組4和根元組6對(duì)應(yīng)的location分配給acker A1即可,元組的重新分配如圖 2(c)所示。
假如因?yàn)樵M的數(shù)目過(guò)多(僅有acker A和acker B記錄這些元組對(duì)應(yīng)的location狀態(tài)負(fù)載過(guò)大)而增加acker或者通過(guò)修改元組跟蹤器的acker數(shù)量N值增加acker(假設(shè)新增acker C)。通過(guò)圖2(b)中所提到的將ackerId映射到環(huán)的方法,acker C的兩個(gè)hash值分別映射到根元組3和根元組5、根元組4和根元組6所對(duì)應(yīng)的location之間,這時(shí)受影響的元組將僅是那些沿acker C1或acker C2逆時(shí)針遍歷直到下一個(gè)acker B2和acker A2之間的location(它們本來(lái)也是映射到 acker B1和 acker A2上的),將這些location重新映射到acker C1和acker C2上即可,映射的結(jié)果如圖 2(d)所示。
以上所描述的就是元組跟蹤單元選擇策略的全過(guò)程,通過(guò)使用改進(jìn)的一致性散列將一個(gè)ackerId映射到環(huán)上的多個(gè)位置,不僅能夠保證元組的跟蹤記錄分配的相對(duì)均衡,同時(shí)也保證增加或者減少元組跟蹤單元只會(huì)影響小部分已經(jīng)分配的跟蹤記錄。
3.3 校驗(yàn)值更新策略
為了保證元組生成器產(chǎn)生的根元組至少得到一次完整的處理,當(dāng)元組沒(méi)有被成功處理時(shí),元組跟蹤器將會(huì)重發(fā)沒(méi)有得到成功處理的根元組。元組跟蹤器通過(guò)跟蹤記錄中的校驗(yàn)值(0/非0)來(lái)判斷元組生成器產(chǎn)生的根元組是否得到完整的處理,如果某個(gè)元組沒(méi)有得到完整處理(校驗(yàn)值為非0),那么元組跟蹤器會(huì)通知元組生成器中處理該元組的任務(wù)(task)重新發(fā)送該元組。
跟蹤記錄通過(guò)對(duì)校驗(yàn)值判斷根元組是否已經(jīng)得到完整的處理。不管這棵元組樹(shù)多大,它只是簡(jiǎn)單地把這棵樹(shù)上的所有已處理的tupleId(根元組ID除外)和新產(chǎn)生的tupleId進(jìn)行異或(XOR)運(yùn)算,并以此結(jié)果更新校驗(yàn)值。當(dāng)校驗(yàn)值為0時(shí),表示跟蹤記錄中對(duì)應(yīng)的根元組被完整地處理,因?yàn)樵M樹(shù)中的每個(gè)tupleId都出現(xiàn)兩次,所以異或的結(jié)果為0,由此證明根元組產(chǎn)生的元組樹(shù)中的所有元組都得到完整處理。反之,沒(méi)有得到完整處理,這是因?yàn)槿绻谔幚碓M的過(guò)程中出現(xiàn)元組丟失,那么元組樹(shù)中每個(gè)tupleId不會(huì)出現(xiàn)兩次,所以結(jié)果不為0,此時(shí)通過(guò)跟蹤記錄中的taskId通知元組生成器中相應(yīng)的任務(wù)重發(fā)springId對(duì)應(yīng)的根元組。因?yàn)樾r?yàn)值是64 bit的,所以元組樹(shù)存在未被處理的元組但是異或結(jié)果為0的概率是1/264,因此可以忽略。下面描述一下checkValue更新的過(guò)程。
圖2 元組跟蹤單元選擇策略
首先,元組生成器產(chǎn)生具有64 bit ID的根元組,并將其置為pending狀態(tài),然后元組生成器將springId和taskId發(fā)送給元組跟蹤器,元組跟蹤器用收到的springId、taskId和checkValue(初始化為0)構(gòu)造跟蹤記錄后,將springId生成的tupleId進(jìn)行異或運(yùn)算,用得到的結(jié)果與checkValue做異或運(yùn)算,用異或運(yùn)算的結(jié)果更新checkValue。
然后,元組處理器每次處理元組后,會(huì)給元組跟蹤器發(fā)送處理元組的tupleId及新生成的元組的tupleId。同樣,將已處理的元組的tupleId和新生成的元組的tupleId進(jìn)行異或運(yùn)算,將得到的結(jié)果與checkValue進(jìn)行異或運(yùn)算以更新校驗(yàn)值。
最后,當(dāng)元組處理器不再產(chǎn)生新元組時(shí),僅將輸入的tupleId做異或運(yùn)算,將得到的結(jié)果與checkValue做異或運(yùn)算,用異或運(yùn)算的結(jié)果更新checkValue。
以上過(guò)程就是某一個(gè)跟蹤記錄中校驗(yàn)值的變化過(guò)程。判斷跟蹤記錄第一個(gè)字段springId所代表的根元組是否重發(fā)的依據(jù)就是判斷校驗(yàn)值是否為0。當(dāng)校驗(yàn)值為0時(shí),元組跟蹤器會(huì)將該springId和taskId發(fā)回給元組生成器,元組生成器將會(huì)根據(jù)taskId把springId的元組狀態(tài)置為已完成狀態(tài),并將該元組從內(nèi)存中移出。否則,將會(huì)通知元組生成器更新taskId中springId的元組狀態(tài)為失敗,那么元組生成器會(huì)重發(fā)該元組。
下面以元組springId為01001為例,介紹它的跟蹤記錄校驗(yàn)值的更新的全過(guò)程。如圖3所示,元組生成器產(chǎn)生springId為01001的根元組,元組生成器處理根元組并產(chǎn)生了tupleId為01010和01011兩個(gè)元組,此時(shí)沒(méi)有需要確認(rèn)的元組,元組生成器將這兩個(gè)新產(chǎn)生的元組ID發(fā)送給相應(yīng)的元組跟蹤單元,元組跟蹤單元將這兩個(gè)元組tupleId為01010和01011進(jìn)行異或運(yùn)算,將得到的結(jié)果與校驗(yàn)值做異或運(yùn)算。計(jì)算所得的結(jié)果更新校驗(yàn)值。將tupleId為01010的元組被發(fā)送到元組處理器1中,將tupleId為01011的元組被發(fā)送到元組處理器2中,計(jì)算過(guò)程如圖3(a)所示。
元組處理器1處理01010這個(gè)元組,處理元組的結(jié)果是產(chǎn)生了新元組,其tupleId為01100。元組處理器1將已處理的元組tupleId和新產(chǎn)生的元組tupleId發(fā)送給相應(yīng)的元組跟蹤單元,元組跟蹤單元將01010和01100進(jìn)行異或運(yùn)算,用所得的結(jié)果與校驗(yàn)值做異或運(yùn)算,用計(jì)算所得的結(jié)果更新校驗(yàn)值,將tupleId為01100的元組傳到元組處理器3中,計(jì)算過(guò)程如圖3(b)所示。
元組處理器2處理01011這個(gè)元組,處理元組的結(jié)果是產(chǎn)生了新元組,其tupleId為01101。元組處理器2將已處理的元組tupleId和新產(chǎn)生的元組tupleId發(fā)送給相應(yīng)的元組跟蹤單元,元組跟蹤單元將01011和01101進(jìn)行異或運(yùn)算,用所得的結(jié)果與校驗(yàn)值做異或運(yùn)算,用計(jì)算的結(jié)果更新校驗(yàn)值,將tupleId為01101的元組發(fā)送到元組處理器3中,計(jì)算過(guò)程如圖3(c)所示。
元組處理器3處理01100和01101這兩個(gè)元組,不再有新的元組生成。那么,元組處理器3僅將已處理的元組tupleId發(fā)送給相應(yīng)的元組跟蹤單元,元組跟蹤單元將01100和01101做異或運(yùn)算,用所得的結(jié)果與校驗(yàn)值做異或運(yùn)算,用計(jì)算所得的結(jié)果更新校驗(yàn)值,計(jì)算過(guò)程如圖3(d)所示。
下面通過(guò)實(shí)驗(yàn)驗(yàn)證元組跟蹤器的內(nèi)存開(kāi)銷(xiāo)和負(fù)載。本實(shí)驗(yàn)環(huán)境采是用4臺(tái)4核主頻2.40 GHz的PC機(jī)做服務(wù)器,內(nèi)存4 GB。本實(shí)驗(yàn)所定義的查詢(xún)僅有一個(gè)映射(map)算子和一個(gè)聚合(aggregate)算子。數(shù)據(jù)源是文件中讀取的英文文本數(shù)據(jù),本實(shí)驗(yàn)的目標(biāo)是計(jì)算出文本中不同的單詞在文本中出現(xiàn)的次數(shù)。元組生成器將文本中的每一行數(shù)據(jù)封裝為一個(gè)根元組,map算子就是將每行的文本以空格為分隔符將根元組分割成若干元組 (單詞),并將每個(gè)元組(單詞)發(fā)給aggregate算子做聚合計(jì)算。
實(shí)驗(yàn)一驗(yàn)證僅有一個(gè)元組跟蹤單元時(shí),輸入元組的數(shù)量對(duì)元組跟蹤單元占用內(nèi)存的影響。輸入的元組數(shù)從10萬(wàn)到90萬(wàn)不等,元組跟蹤單元的內(nèi)存變化情況如圖4(a)所示,當(dāng)輸入的元組個(gè)數(shù)是10萬(wàn)時(shí),上文已經(jīng)介紹了一個(gè)跟蹤記錄僅消耗20 byte的內(nèi)存,理論上10萬(wàn)應(yīng)該消耗1.9 MB內(nèi)存,而實(shí)際僅消耗0.86 MB內(nèi)存,內(nèi)存消耗減少了55%左右。當(dāng)輸入元組個(gè)數(shù)是20萬(wàn)時(shí),理論計(jì)算應(yīng)該消耗3.8 MB左右,而實(shí)際上僅消耗1.59 MB,內(nèi)存消耗減少了58%左右。出現(xiàn)以上現(xiàn)象是因?yàn)楫?dāng)元組跟蹤單元確定根元組已經(jīng)得到完整的處理后,元組跟蹤器會(huì)通知元組生成器將對(duì)應(yīng)根元組移除內(nèi)存,與此同時(shí)元組跟蹤單元也會(huì)將相對(duì)應(yīng)的跟蹤記錄移除內(nèi)存,也就是說(shuō)在不斷地構(gòu)造新跟蹤記錄的同時(shí)也將已處理的跟蹤記錄移除內(nèi)存,所以該階段內(nèi)存的消耗的增長(zhǎng)率較小,從圖4(a)中可以看出元組的數(shù)量較少時(shí),元組跟蹤器的內(nèi)存節(jié)開(kāi)銷(xiāo)較小。當(dāng)輸入的元組增大到40萬(wàn)后,內(nèi)存開(kāi)銷(xiāo)增長(zhǎng)率會(huì)平緩地增大,由于程序?qū)υM的跟蹤能力有限,所以?xún)?nèi)存開(kāi)銷(xiāo)會(huì)增加。當(dāng)輸入元組增長(zhǎng)到一定數(shù)量后,受機(jī)器內(nèi)存的影響,元組跟蹤器的內(nèi)存開(kāi)銷(xiāo)將不會(huì)增加,內(nèi)存開(kāi)銷(xiāo)增長(zhǎng)率為0。同時(shí),從圖中可以看到,隨著輸入元組數(shù)量的增多,無(wú)元組跟蹤單元的數(shù)據(jù)流處理系統(tǒng)的內(nèi)存開(kāi)銷(xiāo)線(xiàn)性增長(zhǎng),當(dāng)輸入元組個(gè)數(shù)為90萬(wàn)時(shí),無(wú)元組跟蹤單元對(duì)元組的跟蹤需要消耗大約69 MB內(nèi)存,而元組跟蹤單元對(duì)元組的跟蹤僅僅消耗大約16 MB的內(nèi)存。所以圖4(a)也從側(cè)面反映了另外的一個(gè)問(wèn)題:即使使用單個(gè)元組跟蹤單元來(lái)跟蹤元組,內(nèi)存開(kāi)銷(xiāo)也是很小的。第4.2節(jié)觀察在使用多個(gè)元組跟蹤單元時(shí),是否能夠進(jìn)一步地減少單臺(tái)機(jī)器的內(nèi)存開(kāi)銷(xiāo)。
圖3 校驗(yàn)值更新流程
實(shí)驗(yàn)二是觀察隨著元組跟蹤單元的增加,各元組跟蹤單元的內(nèi)存開(kāi)銷(xiāo)(這里所說(shuō)的內(nèi)存開(kāi)銷(xiāo)指的是集群中各機(jī)器內(nèi)存平均開(kāi)銷(xiāo))情況。將輸入元組限定在50萬(wàn)個(gè),當(dāng)元組跟蹤單元的數(shù)量為1時(shí),可以從圖4(b)中看出,內(nèi)存開(kāi)銷(xiāo)略小于7 MB,當(dāng)元組跟蹤單元增多時(shí),元組跟蹤單元內(nèi)存開(kāi)銷(xiāo)不斷減少,這是因?yàn)殡S著元組跟蹤單元數(shù)量的增加,原來(lái)分配到一個(gè)元組跟蹤單元的跟蹤記錄被多個(gè)跟蹤單元均勻分配,所以隨著元組跟蹤單元的增加,內(nèi)存開(kāi)銷(xiāo)逐漸減少。從圖中可以看出,當(dāng)元組跟蹤單元的數(shù)量超過(guò)6個(gè)時(shí),元組跟蹤單元的內(nèi)存開(kāi)銷(xiāo)減少幅度越來(lái)越小,這是被集群自身節(jié)點(diǎn)個(gè)數(shù)所限制,因?yàn)樵诩褐袡C(jī)器數(shù)量較少的情況下,隨著元組跟蹤單元的數(shù)量不斷增多,集群中每臺(tái)機(jī)器中運(yùn)行的元組跟蹤單元數(shù)量相對(duì)較多,那么內(nèi)存開(kāi)銷(xiāo)會(huì)逐漸增大。
實(shí)驗(yàn)三主要是驗(yàn)證第3.2節(jié)的元組跟蹤單元選擇策略能否使得各個(gè)元組跟蹤單元存儲(chǔ)的跟蹤記錄相對(duì)均衡,要驗(yàn)證的是輸入元組的數(shù)量對(duì)元組跟蹤單元負(fù)載均衡的影響,分別做3次實(shí)驗(yàn),每次實(shí)驗(yàn)的元組跟蹤單元的數(shù)量都設(shè)置為6個(gè),并且每次實(shí)驗(yàn)輸入元組的數(shù)量分別為10萬(wàn)個(gè)、20萬(wàn)個(gè)和30萬(wàn)個(gè)。實(shí)驗(yàn)結(jié)果如圖4(c)所示。從圖中可以看出,隨著輸入元組數(shù)量的增多,曲線(xiàn)逐漸趨于橫線(xiàn),這也說(shuō)明了輸入的元組越多,各個(gè)元組跟蹤單元存儲(chǔ)的元組的總數(shù)量相對(duì)更均衡。這主要是因?yàn)樵M跟蹤單元選擇策略是將各個(gè)元組跟蹤單元及其副本的映射值均勻分布在環(huán)上,采用元組跟蹤單元的副本策略可以使各個(gè)元組跟蹤單元在環(huán)上的分布盡量均衡,這樣輸入元組的數(shù)量越多,將元組映射到環(huán)上的位置占用率就越高,根據(jù)第3.2節(jié)所述的跟蹤記錄分配方法,在數(shù)量上每個(gè)元組跟蹤單元所存儲(chǔ)的跟蹤記錄也就相對(duì)地均衡。
圖4 元組跟蹤器的內(nèi)存開(kāi)銷(xiāo)和負(fù)載
規(guī)模性、多樣性和高速性是大數(shù)據(jù)處理必須要考慮的3個(gè)特征,很多研究工作圍繞這3個(gè)特征正在展開(kāi)。針對(duì)規(guī)模性和高速性,本文提出了一種面向數(shù)據(jù)流處理的元組跟蹤方法,其中的最大特點(diǎn)是節(jié)約內(nèi)存,該方法具體包括內(nèi)存分配策略、元組跟蹤單元選擇策略和校驗(yàn)值更新策略,這3個(gè)策略通過(guò)只保留元組標(biāo)識(shí)符的異或校驗(yàn)值而不是元組減少內(nèi)存開(kāi)銷(xiāo),同時(shí)通過(guò)改進(jìn)一致性散列實(shí)現(xiàn)元組跟蹤單元的負(fù)載均衡。內(nèi)存開(kāi)銷(xiāo)和負(fù)載均衡的相關(guān)實(shí)驗(yàn)表明,該方法有效實(shí)現(xiàn)了元組的可靠處理。
參考文獻(xiàn)
1 孟小峰,慈祥.大數(shù)據(jù)管理:概念,技術(shù)與挑戰(zhàn).計(jì)算機(jī)研究與發(fā)展,2013,50(1):146~169
2 Kumar R.Two computational paradigm for big data.http://kdd2012.sigkdd.org/sites/images/summerschool/Ravi.Kumar.pdf,2012
3 Hwang J H,Balazinska M,Rasin A,et al.High-availability algorithms for distributed stream processing.Proceedings of 21st International Conference on Data Engineering (ICDE 2005),Tokyo,Japan,2005
4 S4 distributed stream computing platform.http://incubator.apache.org/s4/doc/0.6.0/fault_tolerance/,2013
5 Guaranteeing message processing.https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing,2013
6 Gu X H,Papadimitriou S,Yu P S,et al.Toward predictive failure management for distributed stream processing systems.Proceedings of 2008 the 28th International Conference on Distributed Computing Systems (ICDCS’08),Washington,DC,USA,2008
7 Brito A,Fetzer C,Felber P.Minimizing latency in fault-tolerant distributed stream processing systems.Proceedings of 2009 the 29th IEEE International Conference on Distributed Computing Systems(ICDCS’09),Washington,DC,USA,2009
8 Brito A,Fetzer C,Felber P.Multithreading-enabled active replication for event stream processing operators.Proceedings of the 28th IEEE International Symposium on Reliable Distributed Systems(SRDS’09),Niagara Falls,New York,USA,2009
9 Zhang Z,Gu Y,Ye F,et al.A hybrid approach to high availability in stream processing systems.Proceedings of the 30th International Conference on Distributed Computing Systems(ICDCS’10),Washington,DC,USA,2010
10 Sebepou Z,Magoutis K.CEC:continuous eventual check pointing for data stream processing operators.Proceedings of 2011 IEEE/IFIP 41st International Conference on Dependable Systems Networks(DSN),Hong Kong,China,June 2011
11 Sebepou Z,Magoutis K.Scalable storage support for data stream processing.Proceedings of the 26th Symposium on Mass Storage Systems and Technologies(MSST),Incline Village,Nevada,May 2010
12 Gulisano V, Jimenez-Peris R, Patino-Martnez M, et al.StreamCloud:an elastic and scalable data streaming system.IEEE Transactions on Parallel and Distributed Systems,2012,23(12):2351~2365