徐 達(dá),曾 樂,王英杰
(國家氣象信息中心,北京 100081)
氣象綜合業(yè)務(wù)實(shí)時(shí)監(jiān)控系統(tǒng)—“天鏡”[1]是國家氣象信息中心為建設(shè)統(tǒng)一數(shù)據(jù)環(huán)境、整合分散獨(dú)立的監(jiān)視業(yè)務(wù)建立的通用、綜合、高效的集約化監(jiān)視平臺(tái)?!疤扃R”能夠?yàn)槿珖鴼庀蟛块T在收集、分發(fā)、入庫、數(shù)據(jù)同步各個(gè)環(huán)節(jié)提供實(shí)時(shí)觀測數(shù)據(jù)和產(chǎn)品的數(shù)據(jù)全流程監(jiān)視服務(wù)。目前“天鏡”每小時(shí)接收處理氣象業(yè)務(wù)監(jiān)視全流程[2]數(shù)據(jù)記錄達(dá)到3千萬條,累計(jì)接入的數(shù)據(jù)資料超過400種,為了使目前的數(shù)據(jù)全流程監(jiān)視業(yè)務(wù)可以更高效地在大數(shù)據(jù)計(jì)算和分布式存儲(chǔ)架構(gòu)上運(yùn)行,需要對目前海量監(jiān)視數(shù)據(jù)的處理中加大對計(jì)算策略和存儲(chǔ)策略的研究力度。
Spark[3]是對海量數(shù)據(jù)計(jì)算處理的重要工具和手段,是基于彈性分布式數(shù)據(jù)集(RDD)的數(shù)據(jù)結(jié)構(gòu),具有數(shù)據(jù)流模型特點(diǎn)。RDD將數(shù)據(jù)保留在內(nèi)存中,且允許用戶程序多次查詢,降低了對磁盤和網(wǎng)絡(luò)的開銷,適用于在線計(jì)算和迭代計(jì)算?!疤扃R”系統(tǒng)使用Spark計(jì)算全流程數(shù)據(jù),并按全國、省、市縣維度的統(tǒng)計(jì)指標(biāo)進(jìn)行匯聚。氣象資料的接入和監(jiān)視環(huán)節(jié)的擴(kuò)展使得需要計(jì)算和處理的監(jiān)視信息激增,使得Spark運(yùn)行作業(yè)時(shí)間變長,這對于滿足時(shí)效性要求而言需要縮短計(jì)算任務(wù)的運(yùn)行時(shí)間,一種方式是從Spark集群框架和配置參數(shù)進(jìn)行修改和優(yōu)化,另一種方式則是通過對程序代碼進(jìn)行改動(dòng),采用最優(yōu)的計(jì)算策略來提升計(jì)算效率。
2017年10月,中國氣象局批復(fù)了由國家氣象信息中心牽頭,國家級(jí)各業(yè)務(wù)單位共同參與建設(shè)的氣象綜合業(yè)務(wù)實(shí)時(shí)監(jiān)控系統(tǒng)(一期)項(xiàng)目。該項(xiàng)目旨在建立技術(shù)先進(jìn)的監(jiān)控系統(tǒng)技術(shù)框架,實(shí)現(xiàn)綜合監(jiān)視和告警運(yùn)維核心功能,建立規(guī)范的監(jiān)控信息采集接口,監(jiān)視范圍橫向覆蓋氣象資料現(xiàn)有數(shù)據(jù)流程各環(huán)節(jié),縱向覆蓋信息系統(tǒng)從網(wǎng)絡(luò)及安全、服務(wù)器、存儲(chǔ)、中間件、應(yīng)用軟件運(yùn)行狀態(tài)。氣象綜合業(yè)務(wù)實(shí)時(shí)監(jiān)控系統(tǒng)(一期)計(jì)劃2018年底建成后,將完成氣象綜合業(yè)務(wù)實(shí)時(shí)監(jiān)控的基礎(chǔ)框架,建立系統(tǒng)的硬件平臺(tái)和技術(shù)平臺(tái),從技術(shù)上解決了原MCP系統(tǒng)面臨的性能瓶頸問題,建立規(guī)范化的監(jiān)視信息采集接口,實(shí)現(xiàn)監(jiān)視告警的核心功能,實(shí)現(xiàn)國家級(jí)基于CIMISS數(shù)據(jù)環(huán)境的資料數(shù)據(jù)流程的收集、分發(fā)、解碼入庫、接口服務(wù)等環(huán)節(jié)的監(jiān)視,以及CMACast衛(wèi)星廣播系統(tǒng)、部際系統(tǒng)等系統(tǒng)的監(jiān)視。但是隨著監(jiān)視信息不斷增長,現(xiàn)有的運(yùn)行環(huán)境在處理計(jì)算上會(huì)有延遲,尤其是在中國地面分鐘級(jí)資料的實(shí)時(shí)監(jiān)視上會(huì)出現(xiàn)頁面為0的情況[4]。
國外氣象行業(yè)的監(jiān)視系統(tǒng)也是主要圍繞著數(shù)據(jù)傳輸網(wǎng)絡(luò)、數(shù)據(jù)收集生成、數(shù)據(jù)質(zhì)量、觀測設(shè)備狀態(tài)進(jìn)行監(jiān)控,如美國國家海洋和大氣管理局(NOAA)建設(shè)了觀測系統(tǒng)監(jiān)控中心(OSMC)實(shí)時(shí)監(jiān)測全球海洋觀測系統(tǒng)的性能[5],歐洲中期天氣預(yù)報(bào)中心(ECMWF)通過常規(guī)觀測告警系統(tǒng)檢測數(shù)據(jù)可用性和質(zhì)量問題[6],美國國家環(huán)境預(yù)報(bào)中心(NCEP)的實(shí)時(shí)數(shù)據(jù)監(jiān)測系統(tǒng)(RTDMS)主要監(jiān)測數(shù)據(jù)的數(shù)量和時(shí)效性[7]。國外的數(shù)據(jù)監(jiān)視系統(tǒng)是基于傳統(tǒng)的數(shù)據(jù)資料文件入庫,并對該文件資料進(jìn)行質(zhì)量評(píng)估后,繪制該類觀測資料的打點(diǎn)時(shí)序圖,對資料進(jìn)行分類監(jiān)視。ECMWF和NOAA更加側(cè)重資料到報(bào)后的質(zhì)量情況,通過設(shè)計(jì)測試的數(shù)值預(yù)報(bào)模式來校驗(yàn)到報(bào)的觀測資料是否合格,通過地圖打點(diǎn)的方式提供數(shù)據(jù)服務(wù),并用顏色來區(qū)分該類資料的數(shù)據(jù)質(zhì)量情況。
圍繞《全國氣象發(fā)展“十三五”規(guī)劃》提出的“智慧氣象”發(fā)展目標(biāo),氣象業(yè)務(wù)在實(shí)施現(xiàn)代化、信息化、集約化、標(biāo)準(zhǔn)化的進(jìn)程中,都需要監(jiān)控系統(tǒng)來保障業(yè)務(wù)的高效穩(wěn)定運(yùn)行。但是,各氣象業(yè)務(wù)的現(xiàn)有監(jiān)控系統(tǒng)都是獨(dú)立開發(fā)和運(yùn)維,監(jiān)控系統(tǒng)分散且數(shù)量龐大,運(yùn)行維護(hù)人力成本高;各監(jiān)控系統(tǒng)僅監(jiān)控業(yè)務(wù)流程中的獨(dú)立環(huán)節(jié),上下游監(jiān)控信息無法共享,缺乏對業(yè)務(wù)全流程的總體監(jiān)控,出現(xiàn)故障時(shí)準(zhǔn)確定位故障位置困難、分析故障原因不及時(shí),導(dǎo)致業(yè)務(wù)監(jiān)控運(yùn)維效率低。因此,急需實(shí)現(xiàn)對觀測、信息、預(yù)報(bào)預(yù)測、公共服務(wù)及政務(wù)的全流程、全要素、全過程的一體化監(jiān)控和運(yùn)維,以提升氣象業(yè)務(wù)運(yùn)行管理的質(zhì)量和效率。2016年底,按照中國氣象局統(tǒng)一部署,由預(yù)報(bào)司牽頭組織與協(xié)調(diào),觀測司配合,信息中心作為實(shí)施技術(shù)組組長單位,協(xié)同各成員單位上下一心,通力合作,共同推動(dòng)氣象綜合業(yè)務(wù)實(shí)時(shí)監(jiān)控系統(tǒng)建設(shè),樹立和打造氣象綜合業(yè)務(wù)監(jiān)控品牌——“天鏡”[8]。
氣象全流程監(jiān)控實(shí)現(xiàn)對數(shù)據(jù)從收集、分發(fā)、入庫、數(shù)據(jù)同步到應(yīng)用的全流程、全生命周期監(jiān)控。在收集環(huán)節(jié)由國內(nèi)氣象傳輸系統(tǒng)(CTS)收到氣象資料后,經(jīng)過文件打包處理后,把文件分發(fā)給業(yè)務(wù)系統(tǒng)和用戶。在入庫環(huán)節(jié)中解碼入庫程序按照氣象要素、時(shí)次等條件進(jìn)行拆解,按照存儲(chǔ)規(guī)則錄入不同的數(shù)據(jù)庫中。為了提供氣象資料查詢服務(wù),需要將解碼后的數(shù)據(jù)在不同類型庫中進(jìn)行同步。在氣象資料全流程監(jiān)視設(shè)計(jì)中需要對收集、分發(fā)、入庫、同步環(huán)節(jié)進(jìn)行監(jiān)視。全流程實(shí)時(shí)指標(biāo)見表1,計(jì)算依賴于節(jié)目表信息和總控配置信息,節(jié)目表信息用來指定該類氣象資料資料是否為考核資料,總控配置信息主要包含:資料業(yè)務(wù)時(shí)次配置信息、單站的單環(huán)節(jié)的單時(shí)次及時(shí)配置信息、統(tǒng)計(jì)規(guī)則(時(shí)次、時(shí)次截日、時(shí)次截小時(shí)、小時(shí)、日)、各個(gè)環(huán)節(jié)之間的關(guān)聯(lián)關(guān)系、文件級(jí)資料的應(yīng)收數(shù)、檢測告警開始時(shí)間、需要告警指標(biāo)、告警持續(xù)時(shí)間等相關(guān)配置。
表1 全流程實(shí)時(shí)計(jì)算收集環(huán)節(jié)核心指標(biāo)
Spark是一種快速、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學(xué)伯克利分校AMPLab,2010年開源,2013年6月成為Apache孵化項(xiàng)目。目前Spark生態(tài)系統(tǒng)已經(jīng)發(fā)展成為一個(gè)包含多個(gè)子項(xiàng)目的集合,包含SparkSQL、Spark Streaming、GraphX、MLlib、等子項(xiàng)目。
Spark是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架,與Hadoop的MapReduce相比,Spark基于內(nèi)存的運(yùn)算速度更快,同時(shí)保證了高容錯(cuò)性和高可伸縮性,Spark實(shí)現(xiàn)了高效的DAG執(zhí)行引擎,從而可以通過內(nèi)存來高效處理數(shù)據(jù)流[9-10]。
在“天鏡”中,Spark的體系架構(gòu)如圖1所示?!疤扃R”采用Standlone模式部署Spark集群,通過Zookeeper,一個(gè)開源的分布式應(yīng)用程序協(xié)調(diào)服務(wù)軟件進(jìn)行集群管理,在Spark集群上創(chuàng)建常駐的SparkSession即常駐的Driver進(jìn)程用于交互Spark程序,SparkSession中包含開源的ActorSystem,一套開源的用于設(shè)計(jì)跨處理器和網(wǎng)絡(luò)的可擴(kuò)展彈性系統(tǒng)。服務(wù)端的ActorSystem向Zookeeper注冊自身的地址。在外部調(diào)度任務(wù)模塊的驅(qū)動(dòng)下,將獲取服務(wù)端的Actor-System地址,隨機(jī)選擇其中一個(gè)地址,提交SprakSQL任務(wù),SparkSQL任務(wù)提交成功后,會(huì)把任務(wù)和接收提交的ActorSystem信息注冊到Zookeeper,用于后續(xù)查看SparkSQL任務(wù)狀態(tài)和取消任務(wù)。
圖1 “天鏡”中Spark的體系架構(gòu)
全流程各環(huán)節(jié)監(jiān)視信息通過接口網(wǎng)關(guān)進(jìn)入后至高速緩沖通道,一路數(shù)據(jù)直接入庫進(jìn)行持久化,一路數(shù)據(jù)進(jìn)行標(biāo)準(zhǔn)化構(gòu)建和數(shù)據(jù)清洗形成中間結(jié)果表(見表2)。
表2 臺(tái)站級(jí)資料預(yù)處理后中間結(jié)果表
根據(jù)總控配置表的業(yè)務(wù)頻次(cron表達(dá)式[0 0 0/1 * * ? ]、統(tǒng)計(jì)規(guī)則[時(shí)次、時(shí)次截小時(shí)、時(shí)次截日、小時(shí)、日])信息計(jì)算出業(yè)務(wù)時(shí)次,并生成一個(gè)sparkSQL文件存入到HDFS中,提交給spark計(jì)算,計(jì)算考核指標(biāo)的SparkSQL語句如下:
1.--考核應(yīng)收
2.sum(coalesce(CO_CHECK_TD,0))AS CO_CHECK_TD,
3.--考核及時(shí)收
4.sum( casewhen CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_INTIME_ACTUAL,
5.--考核逾期收
6.sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_LATETIME_ACTUAL,
7.--考核實(shí)收數(shù)
8.(sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)) AS CO_CHECK_ACTUAL,
9.--考核缺收數(shù)
10.(sum(coalesce(CO_CHECK_TD,0)) - (sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))) AS CO_CHECK_LOC,
11.--考核及時(shí)率
12.(sum( casewhen CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)/sum(coalesce(CO_CHECK_TD,2147483646))) AS CO_CHECK_INTIME_RATE,
13.--考核到報(bào)率
14.((sum( casewhen CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))/sum(coalesce(CO_CHECK_TD,2147483646))) AS CO_CHECK_RATE.
“天鏡”系統(tǒng)部署在36臺(tái)IntelX86物理服務(wù)器上(見圖2),其中5臺(tái)服務(wù)器用于部署網(wǎng)關(guān)模塊(gateway),數(shù)據(jù)預(yù)處理模塊(standardizer)主要負(fù)責(zé)接收監(jiān)視信息的收集和全流程中間結(jié)果(指標(biāo)詳情)的處理,3臺(tái)服務(wù)器用于部署消息中間件(kafka)集群,用于數(shù)據(jù)的高速緩存,避免因數(shù)據(jù)量過大導(dǎo)致后端數(shù)據(jù)庫寫入壓力過大。18臺(tái)服務(wù)器部署分布式日志數(shù)據(jù)庫用于對監(jiān)視信息的原始指標(biāo),中間結(jié)果,最終計(jì)算指標(biāo)進(jìn)行存儲(chǔ)。用于計(jì)算的Spark集群(版本2.3.1)[11]部署在5臺(tái)CPU 24核,內(nèi)存256G,3.2TSAS磁盤,操作系統(tǒng)為Centos7.3服務(wù)器上。
圖2 “天鏡”-氣象數(shù)據(jù)全流程系統(tǒng)架構(gòu)
基于Spark計(jì)算引擎對氣象全流程監(jiān)視信息進(jìn)行實(shí)時(shí)處理,作業(yè)調(diào)度任務(wù)每分鐘執(zhí)行一次,按照臺(tái)站級(jí)氣象資料(StationDiStaticJob)和文件級(jí)氣象資料(FileDiStaticJob)分為兩個(gè)計(jì)算任務(wù)。隨著接入的氣象資料種類越來越多,每分鐘處理的監(jiān)視信息也呈幾何級(jí)增長,執(zhí)行的Spark任務(wù)的耗時(shí)在20分鐘以上,導(dǎo)致氣象全流程監(jiān)視界面中氣象區(qū)域站資料無法及時(shí)顯示。與此同時(shí),運(yùn)維人員發(fā)現(xiàn)Spark集群中有個(gè)別節(jié)點(diǎn)的負(fù)載特別高,這種情況是因?yàn)閿?shù)據(jù)源單個(gè)spark input read數(shù)據(jù)量過大,或者單個(gè)task相對于其他task spark input read較大的情況,導(dǎo)致的讀取數(shù)據(jù)源明顯不均勻[12]。因此盡量使用可切割的文本存儲(chǔ),生成盡量多的task進(jìn)行并行計(jì)算,可以從數(shù)據(jù)源避免傾斜,并從源頭增大并行度[13]。通過觀察Spark任務(wù)管理頁面可以看到已完成的計(jì)算任務(wù)資源使用和耗時(shí)情況,如表3所示,正常計(jì)算任務(wù)需要分配計(jì)算資源10核,內(nèi)存5 GB。
表3 優(yōu)化前Spark任務(wù)運(yùn)行監(jiān)視結(jié)果
進(jìn)行Spark計(jì)算任務(wù)的優(yōu)化的目的,是為了充分利用硬件本身的性能,最大限度地提升Spark中Executor的執(zhí)行效率[14-17]。依據(jù)氣象全流程監(jiān)視界面資料展示情況,拆分為地面資料、海洋資料、高空資料、輻射資料、農(nóng)業(yè)與生態(tài)資料、大氣成分、雷達(dá)數(shù)據(jù)、衛(wèi)星數(shù)據(jù)、氣象服務(wù)產(chǎn)品、數(shù)值預(yù)報(bào)產(chǎn)品共10類資料,每類資料又分為考核資料和非考核資料。相較于優(yōu)化前,雖然增加了SparkSQL模板的復(fù)雜度,但是提升了氣象考核資料的計(jì)算效率,該文以傳輸環(huán)節(jié)考核資料為例,新增的SparkSQL模板如下:
1.base.sql.co.checks=sum( casewhen CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END) AS CO_CHECK_TD, sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_INTIME_ACTUAL, sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) AS CO_CHECK_LATETIME_ACTUAL, (sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)) AS CO_CHECK_ACTUAL, (sum( case when CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END) - (sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))) AS CO_CHECK_LOC,(sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END)/coalesce(sum( case when CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END), 2147483646)) AS CO_CHECK_INTIME_RATE,((sum( case when CHECK = '1' then coalesce(CO_LATETIME_ACTUAL,0) ELSE 0 END) + sum( case when CHECK = '1' then coalesce(CO_INTIME_ACTUAL,0) ELSE 0 END))/coalesce(sum( case when CHECK = '1' then coalesce(CO_TD,0) ELSE 0 END), 2147483646)) AS CO_CHECK_RATE
在向Spark進(jìn)行任務(wù)提交時(shí),客戶端處理程序需要將氣象資料按照上述分類進(jìn)行拆解,核心代碼如下:
1.…
2.for (TabMcmConfig config : tabOminCmCcSubsystem Allconfigs) {
3.// 1、資料編碼
4.String ctsCode = config.getcCtsCode();
5.String sodCode = config.getcSodCode();
6.//資料大類
7.String dataClass = config.getDataClass();
8.String ctsSodCode = ctsCode.concat(":").concat(sodCode);
9.//文件級(jí)還是站點(diǎn)級(jí)
10.String dataSourceType = config.getcDataSource();
11.if ("1".equals(dataSourceType)) {
12.if(!fileDiComputeEnabled) {
13.continue;
14.}
15.dataSourceType = "file";
16.}else {
17.dataSourceType = "station";
18.}
19.…
此段代碼通過獲取總控配置后對每類氣象資料進(jìn)行分類,分類后生成的計(jì)算任務(wù)與生成的SparkSQL模板匹配,從而完成計(jì)算任務(wù)拆解,單個(gè)SparkSQL只計(jì)算一類考核資料或者一類非考核資料。
該文采用自動(dòng)化測試的方法,由于對程序代碼結(jié)構(gòu)進(jìn)行了修改和微調(diào),因此需要對優(yōu)化后的全流程指標(biāo)計(jì)算結(jié)果正確性進(jìn)行驗(yàn)證。正確性可以根據(jù)監(jiān)視頁面中資料的統(tǒng)計(jì)指標(biāo)和系統(tǒng)告警進(jìn)行判斷,如圖3所示,可以通過查看Spark作業(yè)任務(wù)日志進(jìn)行驗(yàn)證,如表4所示。該文展示的全流程監(jiān)視界面與優(yōu)化前資料監(jiān)視統(tǒng)計(jì)指標(biāo)計(jì)算結(jié)果一致,并且中國地面分鐘降水?dāng)?shù)據(jù)在一級(jí)界面中可以顯示正常。優(yōu)化后單個(gè)計(jì)算任務(wù)的計(jì)算時(shí)間控制2分鐘以內(nèi)。
圖3 氣象綜合業(yè)務(wù)實(shí)時(shí)監(jiān)控系統(tǒng)—“天鏡”全流程監(jiān)視界面
表4 優(yōu)化后Spark任務(wù)運(yùn)行監(jiān)視結(jié)果
通過拆分計(jì)算任務(wù),生成盡可能多的task增加Spark計(jì)算并行度,成功將氣象全流程計(jì)算框架優(yōu)化并業(yè)務(wù)運(yùn)行,如表5所示,獲得了10倍的加速效果,提高了程序的運(yùn)行效率。但是“天鏡”系統(tǒng)在處理大數(shù)據(jù)計(jì)算時(shí)還是有瓶頸,原因是地面區(qū)域站氣象資料會(huì)產(chǎn)生大量重復(fù)數(shù)據(jù),要能夠高效處理海量的監(jiān)視數(shù)據(jù),除了對計(jì)算任務(wù)拆分,還需要對計(jì)算任務(wù)設(shè)置優(yōu)先級(jí),針對核心資料優(yōu)先分配計(jì)算資源計(jì)算,這就需要業(yè)務(wù)人員對資料的監(jiān)視等級(jí)進(jìn)行配置,同時(shí)要熟悉Spark資源分配機(jī)制,在此基礎(chǔ)上來做系統(tǒng)優(yōu)化,能夠較好地提升優(yōu)化效果。
表5 “天鏡”全流程Spark計(jì)算任務(wù)優(yōu)化前后運(yùn)行時(shí)間