王 寧,郝雅青,甘人才
(中國鐵道科學(xué)研究院集團(tuán)有限公司 電子計(jì)算技術(shù)研究所,北京 100081)
鐵路電子支付平臺(tái)(簡稱:支付平臺(tái))承載著鐵路客、貨運(yùn)輸?shù)碾娮又Ц逗蛯?duì)賬結(jié)算業(yè)務(wù)。自2010年投入運(yùn)行以來,隨著客、貨運(yùn)業(yè)務(wù)量的增長和業(yè)務(wù)種類的增多,交易量從最初日均100萬筆提升至日均千萬筆,對(duì)賬處理效率較低的問題日漸凸顯。同時(shí),隨著二維碼掃碼支付、鐵路e卡通等多種新型支付的增加,對(duì)賬業(yè)務(wù)和資金核算邏輯更為復(fù)雜,對(duì)支付平臺(tái)對(duì)賬業(yè)務(wù)與資金核算的靈活性和擴(kuò)展性也提出更高要求。
為此,研究提出基于分布式并行計(jì)算架構(gòu)的對(duì)賬業(yè)務(wù)數(shù)據(jù)處理方案,對(duì)支付平臺(tái)的對(duì)賬與資金核算業(yè)務(wù)數(shù)據(jù)處理過程進(jìn)行技術(shù)升級(jí)改造,將各接入渠道業(yè)務(wù)系統(tǒng)、支付機(jī)構(gòu)、支付平臺(tái)三方與對(duì)賬相關(guān)的數(shù)據(jù)均匯集到Hadoop平臺(tái)中,采用Spark、Kafka等組件搭建分布式并行計(jì)算環(huán)境,用于完成對(duì)賬和資金核算的數(shù)據(jù)處理任務(wù);通過敏捷查詢引擎(AQE,Agile Query Engine),Hadoop數(shù)據(jù)可以通過JDBC和REST等多種方式,提供給電子支付管理平臺(tái),支持后續(xù)的交易查詢、報(bào)表統(tǒng)計(jì)、偏差處理等業(yè)務(wù);并在實(shí)驗(yàn)室搭建測(cè)試環(huán)境,對(duì)該數(shù)據(jù)處理方案進(jìn)行測(cè)試驗(yàn)證。
支付平臺(tái)對(duì)賬和資金核算功能主要完成各類交易信息的匯總、核對(duì)、差異處理、資金報(bào)表生成等業(yè)務(wù)處理[1],主要業(yè)務(wù)流程如圖1所示。
圖1 支付平臺(tái)對(duì)賬與資金核算業(yè)務(wù)流程示意
1.1.1 接入渠道
接入渠道是指為鐵路客戶提供客貨業(yè)務(wù)服務(wù)的各類業(yè)務(wù)系統(tǒng),如鐵路12306互聯(lián)網(wǎng)售票系統(tǒng)(簡稱:12306)、鐵路貨運(yùn)電子商務(wù)系統(tǒng)(簡稱:貨運(yùn)系統(tǒng));各接入渠道業(yè)務(wù)系統(tǒng)在執(zhí)行交易時(shí),可通過接口將交易信息發(fā)送至支付平臺(tái)。
1.1.2 支付機(jī)構(gòu)
鐵路客戶在辦理鐵路客、貨運(yùn)業(yè)務(wù)時(shí),可選用不同支付機(jī)構(gòu)的支付服務(wù),各個(gè)支付機(jī)構(gòu)通過接口向支付平臺(tái)提供交易接入服務(wù)。
1.1.3 支付平臺(tái)
支付平臺(tái)主要完成交易處理、對(duì)賬、交易查詢、偏差處理、報(bào)表統(tǒng)計(jì)等功能,對(duì)賬業(yè)務(wù)主要流程如下:
(1)對(duì)賬文件入庫:各接入渠道業(yè)務(wù)系統(tǒng)在約定的時(shí)間將固定格式的對(duì)賬文件傳輸至支付平臺(tái)的文件服務(wù)器,文件服務(wù)器上的程序自動(dòng)檢查對(duì)賬文件格式和數(shù)據(jù)的正確性,將正確的對(duì)賬文件數(shù)據(jù)寫入數(shù)據(jù)庫;若有對(duì)賬文件存在異常,則給出提示,以轉(zhuǎn)由相關(guān)業(yè)務(wù)人員進(jìn)行人工處理;
(2)對(duì)賬處理:支付平臺(tái)每日自動(dòng)執(zhí)行對(duì)賬處理,按照設(shè)定的規(guī)則,將各接入渠道業(yè)務(wù)系統(tǒng)的數(shù)據(jù)與支付平臺(tái)的數(shù)據(jù)、各支付機(jī)構(gòu)的數(shù)據(jù)與支付平臺(tái)的數(shù)據(jù),分別進(jìn)行逐筆交易兩兩對(duì)比核查;核對(duì)的數(shù)據(jù)項(xiàng)主要包括交易流水號(hào)、交易金額、交易狀態(tài)、交易時(shí)間等;
(3)對(duì)賬結(jié)果處理:對(duì)賬結(jié)果分為對(duì)賬成功、單邊交易賬、金額不一致等類型;對(duì)于單邊交易賬和金額不一致的交易數(shù)據(jù),按系統(tǒng)設(shè)置的審核規(guī)則進(jìn)行數(shù)據(jù)偏差審核,判定是否需要給客戶退款;對(duì)于無法自動(dòng)完成審核的交易數(shù)據(jù),由業(yè)務(wù)人員手工處理[2];
(4)資金核算:資金核算處理程序每月對(duì)各接入渠道業(yè)務(wù)系統(tǒng)的當(dāng)月結(jié)賬資金、支付平臺(tái)記錄的當(dāng)月資金、支付機(jī)構(gòu)的當(dāng)月資金進(jìn)行核對(duì),資金核算結(jié)果包括月切交易數(shù)據(jù)、單邊交易數(shù)據(jù)、交易類型對(duì)比核查;通過資金核對(duì),找出存在資金差異的交易數(shù)不一致數(shù)據(jù)、金額不一致數(shù)據(jù)等類型;業(yè)務(wù)人員將依據(jù)資金核算結(jié)果,進(jìn)行欠款追款、調(diào)賬等多種處理;
(5)報(bào)表統(tǒng)計(jì)與資金上繳:根據(jù)交易處理、對(duì)賬、資金核算分類處理,支付平臺(tái)統(tǒng)計(jì)生成業(yè)務(wù)數(shù)據(jù)匯總、銀行數(shù)據(jù)匯總、平臺(tái)數(shù)據(jù)匯總、電子支付日?qǐng)?bào)、資金差異明細(xì)表等多種業(yè)務(wù)報(bào)表和資金報(bào)表,業(yè)務(wù)人員依據(jù)相關(guān)報(bào)表進(jìn)行資金上繳[3]。
(1)數(shù)據(jù)處理量及性能要求
支付平臺(tái)需要對(duì)賬的交易數(shù)據(jù)來自多個(gè)接入渠道業(yè)務(wù)系統(tǒng)和10多家支付機(jī)構(gòu)。對(duì)賬業(yè)務(wù)處理是逐日滾動(dòng)執(zhí)行的,考慮到接入支付平臺(tái)的不同來源交易數(shù)據(jù)可能存在一定時(shí)間延遲,為保證每日數(shù)據(jù)核對(duì)的有效性,每日?qǐng)?zhí)行對(duì)賬處理時(shí)一般對(duì)連續(xù)5天(即T-2、T-1、T、T+1、T+2)的交易數(shù)據(jù)進(jìn)行核對(duì);目前,每日對(duì)賬處理的交易數(shù)據(jù)記錄約為2億條,對(duì)賬處理全流程耗時(shí)應(yīng)少于2 h。
(2)可擴(kuò)展性
隨著支付平臺(tái)可支持的支付方式日益多樣化,以及所接入的支付機(jī)構(gòu)和各接入渠道業(yè)務(wù)系統(tǒng)的不斷增加,對(duì)賬處理應(yīng)能快速、靈活地適應(yīng)數(shù)據(jù)源增加和接口類型增多。
基于分布式并行計(jì)算架構(gòu)的對(duì)賬業(yè)務(wù)數(shù)據(jù)處理方案的邏輯框架如圖2所示。
圖2 對(duì)賬業(yè)務(wù)數(shù)據(jù)處理方案的邏輯框架示意
支付平臺(tái)對(duì)賬業(yè)務(wù)的數(shù)據(jù)處理主要包括:數(shù)據(jù)采集模塊、對(duì)賬模塊、數(shù)據(jù)存儲(chǔ)模塊、數(shù)據(jù)查詢模塊等功能模塊,以及運(yùn)行監(jiān)控和任務(wù)調(diào)度等輔助管理模塊。
各接入渠道業(yè)務(wù)系統(tǒng)的對(duì)賬文件,經(jīng)解析代碼處理為結(jié)構(gòu)化數(shù)據(jù),并存儲(chǔ)到高性能消息中間件Kafka;采用Spark Streaming組件定期從Kafka拉取數(shù)據(jù),然后存儲(chǔ)在分布式文件系統(tǒng)HDFS中,這些數(shù)據(jù)為待對(duì)賬數(shù)據(jù);增量數(shù)據(jù)采用Hudi組件,實(shí)現(xiàn)數(shù)據(jù)增量更新、數(shù)據(jù)版本管理和數(shù)據(jù)痕跡追蹤。
待對(duì)賬數(shù)據(jù)和對(duì)賬結(jié)果數(shù)據(jù)均存儲(chǔ)在分布式文件系統(tǒng)HDFS中,采用Hudi組件可構(gòu)建和管理PB級(jí)數(shù)據(jù),為各類業(yè)務(wù)提供高效和低延遲的數(shù)據(jù)連接,支持文件級(jí)、記錄級(jí)的插入、刪除、更新操作,可按時(shí)間版本查詢數(shù)據(jù),有效地改善存儲(chǔ)管理和查詢性能。
對(duì)賬業(yè)務(wù)數(shù)據(jù)處理按日進(jìn)行,各接入渠道業(yè)務(wù)數(shù)據(jù)、支付平臺(tái)交易數(shù)據(jù)、各支付機(jī)構(gòu)數(shù)據(jù)均按日期分區(qū)存儲(chǔ),并進(jìn)行數(shù)據(jù)分片。
對(duì)賬邏輯程序采用Spark 批處理完成,Spark 運(yùn)行在Hadoop Yarn上,由Yarn管理Spark集群,負(fù)責(zé)資源統(tǒng)一管理,任務(wù)調(diào)度與監(jiān)控。
為充分利用Spark的并行計(jì)算能力,按照處理數(shù)據(jù)分片規(guī)則,將當(dāng)天對(duì)賬數(shù)據(jù)進(jìn)行分類后,再行執(zhí)行對(duì)賬邏輯計(jì)算,可顯著提高計(jì)算效率。
AQE作為數(shù)據(jù)查詢核心組件,將存儲(chǔ)在HDFS中的數(shù)據(jù)暴露為JDBC接口或者REST接口,為其它業(yè)務(wù)系統(tǒng)提供數(shù)據(jù)查詢服務(wù)。
為了便于日常運(yùn)行維護(hù),采用Airflow平臺(tái)完成任務(wù)調(diào)度和監(jiān)控;每日定時(shí)運(yùn)行對(duì)賬處理邏輯代碼,可查看任務(wù)執(zhí)行歷史,對(duì)任務(wù)執(zhí)行異常進(jìn)行提示與警告。
監(jiān)控模塊主要有3類監(jiān)控對(duì)象:服務(wù)器和組件、對(duì)賬任務(wù)、對(duì)賬數(shù)據(jù)。
(1)服務(wù)器和組件:實(shí)時(shí)監(jiān)控虛擬機(jī)的CPU、內(nèi)存等使用情況;采集服務(wù)器級(jí)別的告警信息;實(shí)時(shí)監(jiān)控Kafka、Spark、Hadoop、Airflow 等組件的運(yùn)行情況。
(2)對(duì)賬任務(wù)監(jiān)控:監(jiān)控Kafka寫入Hadoop的實(shí)時(shí)流任務(wù)和對(duì)賬任務(wù)的運(yùn)行情況;監(jiān)控定時(shí)任務(wù)是否成功執(zhí)行。
(3)對(duì)賬數(shù)據(jù)監(jiān)控:監(jiān)控Kafka寫入Hadoop的數(shù)據(jù)量、監(jiān)控所采集的數(shù)據(jù)量;監(jiān)控當(dāng)日各接入渠道數(shù)據(jù)同步的完成情況;監(jiān)控對(duì)賬處理完成情況、對(duì)賬相符數(shù)據(jù)量、對(duì)賬差異數(shù)據(jù)量、退款對(duì)賬數(shù)據(jù)量等業(yè)務(wù)數(shù)據(jù)。
分布式通用數(shù)據(jù)計(jì)算引擎Spark是專為大規(guī)模數(shù)據(jù)快速處理而設(shè)計(jì)的通用計(jì)算引擎[4],基于類Hadoop MapReduce的開源通用并行框架,但不同于MapReduce的是,計(jì)算任務(wù)的中間結(jié)果可保存于內(nèi)存中,無需讀寫HDFS。Spark能夠提供交互式數(shù)據(jù)處理,還可以優(yōu)化迭代工作負(fù)載,適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等需要迭代的MapReduce算法,常用于構(gòu)建大型的低延遲數(shù)據(jù)處理應(yīng)用[5]。
支付平臺(tái)對(duì)賬處理采用Spark計(jì)算框架,將對(duì)賬數(shù)據(jù)加載到內(nèi)存中,并采用Spark分片和多任務(wù)并行計(jì)算方式,能夠極大地提升對(duì)賬處理效率。
分布式文件系統(tǒng)HDFS基于流數(shù)據(jù)模式,可運(yùn)行于廉價(jià)服務(wù)器[6],具有高容錯(cuò)、高可靠性、高可擴(kuò)展性、高可用性、高吞吐率等特性[7],且安裝和維護(hù)簡單。采用分布式文件系統(tǒng)HDFS存儲(chǔ)和管理海量結(jié)構(gòu)化分析型數(shù)據(jù),能夠以較低的成本實(shí)現(xiàn)安全、可靠的數(shù)據(jù)存儲(chǔ),并保證數(shù)據(jù)存儲(chǔ)規(guī)模具有良好的持續(xù)擴(kuò)展性。
采用分布式文件系統(tǒng)HDFS存儲(chǔ)支付平臺(tái)的對(duì)賬業(yè)務(wù)數(shù)據(jù),解決了關(guān)系型數(shù)據(jù)庫因讀寫磁盤速率低造成的數(shù)據(jù)讀寫速度慢的問題,提高了數(shù)據(jù)處理吞吐率。HDFS提供的數(shù)據(jù)分片存儲(chǔ)方式,也為支持未來業(yè)務(wù)數(shù)據(jù)量的持續(xù)增長提供保障。
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),可提供高吞吐、低延遲的實(shí)時(shí)數(shù)據(jù)處理平臺(tái)[8]。其持久化層是一個(gè)按照分布式事務(wù)架構(gòu)的大規(guī)模發(fā)布/訂閱消息隊(duì)列,常作為企業(yè)消息總線、實(shí)時(shí)數(shù)據(jù)管道,主要起到削峰填谷、系統(tǒng)解耦以及冗余的作用。
采用Kafka消息中間件實(shí)現(xiàn)支付平臺(tái)對(duì)賬業(yè)務(wù)的數(shù)據(jù)采集,提升了數(shù)據(jù)采集環(huán)節(jié)的效率和彈性,可為數(shù)據(jù)處理提供緩沖池,以避免對(duì)賬業(yè)務(wù)數(shù)據(jù)處理程中各環(huán)節(jié)間互相影響。
AQE是基于Apache Arrows內(nèi)存列式存儲(chǔ)的大數(shù)據(jù)查詢引擎,采用MPP技術(shù),能接入HDFS、Hive、Kafka、RDBMS等多種外部數(shù)據(jù)源,支持JDBC、ODBC、Rest的查詢接口和基于Arrow Flight的查詢API接口,支持異構(gòu)數(shù)據(jù)源的混合查詢計(jì)算[9]。AQE使用物化視圖和SQL重寫技術(shù),實(shí)現(xiàn)查詢性能優(yōu)化和提升。
采用AQE查詢引擎實(shí)現(xiàn)支付平臺(tái)對(duì)賬業(yè)務(wù)數(shù)據(jù)處理的查詢功能,可將多種數(shù)據(jù)源的訪問進(jìn)行統(tǒng)一封裝和優(yōu)化,為前端業(yè)務(wù)功能提供統(tǒng)一、高效的查詢接口。
為驗(yàn)證基于分布式并行計(jì)算架構(gòu)的對(duì)賬業(yè)務(wù)數(shù)據(jù)處理方案的有效性,在實(shí)驗(yàn)室搭建對(duì)賬業(yè)務(wù)數(shù)據(jù)處理的測(cè)試環(huán)境,如圖3所示。
圖3 對(duì)賬業(yè)務(wù)數(shù)據(jù)處理的測(cè)試環(huán)境構(gòu)成示意
組件監(jiān)控:由2臺(tái)服務(wù)器部署相應(yīng)監(jiān)控程序組成,負(fù)責(zé)監(jiān)控測(cè)試環(huán)境中相關(guān)資源的運(yùn)行情況。
數(shù)據(jù)采集和對(duì)賬處理:消息隊(duì)列Kafka功能相對(duì)獨(dú)立,由2臺(tái)服務(wù)器組成;并行計(jì)算Spark、系統(tǒng)管理Yarn、文件系統(tǒng)Hadoop、數(shù)據(jù)存儲(chǔ)Hive、任務(wù)調(diào)度Airflow 等組件部署在4臺(tái)服務(wù)器上,構(gòu)成對(duì)賬數(shù)據(jù)處理運(yùn)行環(huán)境。
數(shù)據(jù)查詢:AQE部署在2臺(tái)服務(wù)器上,通過外部接口調(diào)用數(shù)據(jù)存儲(chǔ)完成數(shù)據(jù)查詢。
(1)對(duì)賬數(shù)據(jù)采集:將現(xiàn)有數(shù)據(jù)轉(zhuǎn)儲(chǔ)至大數(shù)據(jù)平臺(tái)中,包括各接入渠道、支付機(jī)構(gòu)和支付平臺(tái)的交易數(shù)據(jù),為后續(xù)對(duì)賬處理準(zhǔn)備好測(cè)試數(shù)據(jù)集。
(2)對(duì)賬處理:采用Spark多任務(wù)并行計(jì)算進(jìn)行對(duì)賬處理,將對(duì)賬結(jié)果存放在Hive庫中。
(3)對(duì)賬結(jié)果查詢:通過AQE提供的JDBC接口和REST接口,按多種條件快速、靈活的查詢對(duì)賬數(shù)據(jù)。
采用分布式并行計(jì)算架構(gòu)后,相較于原來基于Oracle關(guān)系型數(shù)據(jù)庫存儲(chǔ)的運(yùn)行環(huán)境,技術(shù)升級(jí)改造前后對(duì)賬業(yè)務(wù)數(shù)據(jù)處理測(cè)試項(xiàng)目指標(biāo)對(duì)比見表1。
表1 技術(shù)升級(jí)改造前后測(cè)試項(xiàng)目指標(biāo)對(duì)比
(1)數(shù)據(jù)采集:采用Spark和Kafka相結(jié)合的數(shù)據(jù)采集處理方式,速度提高近4.5倍。此外,相對(duì)于現(xiàn)有系統(tǒng)完全基于數(shù)據(jù)庫的處理方式,采用Spark和Kafka組件采集數(shù)據(jù),還能減輕數(shù)據(jù)庫負(fù)載,測(cè)試過程中數(shù)據(jù)庫服務(wù)器CPU監(jiān)測(cè)指標(biāo)表明,可釋放出約30%資源能力。
(2)對(duì)賬計(jì)算:采用Spark并行任務(wù)進(jìn)行基于內(nèi)存的分布式計(jì)算,數(shù)據(jù)處理速度提升達(dá)十倍量級(jí)。
(3)對(duì)賬結(jié)果查詢:在現(xiàn)有Oracle關(guān)系型數(shù)據(jù)庫中,采用按時(shí)間分區(qū)存儲(chǔ)方案,查詢跨分區(qū)數(shù)據(jù)時(shí),需進(jìn)行全表掃描,數(shù)據(jù)讀取耗時(shí)較長,一般查詢響應(yīng)時(shí)間約15 s。基于AQE進(jìn)行查詢時(shí),可自動(dòng)根據(jù)分區(qū)條件確定數(shù)據(jù)查詢范圍,無需全表掃描,平均查詢響應(yīng)時(shí)間約300 ms;且數(shù)據(jù)量越大,查詢速度差距越明顯。另外,Oracle關(guān)系型數(shù)據(jù)庫僅提供基于JDBC接口的SQL查詢方式,而AQE查詢可提供多種類型的數(shù)據(jù)源訪問接口,還可提供REST接口,能夠支持更為靈活的查詢需求。
針對(duì)鐵路電子支付平臺(tái)對(duì)賬處理面臨的問題,分析現(xiàn)有對(duì)賬業(yè)務(wù)數(shù)據(jù)處理流程及性能要求,研究分布式并行計(jì)算相關(guān)的關(guān)鍵技術(shù),提出基于Hadoop、Spark等技術(shù)的分布式并行計(jì)算方案;設(shè)計(jì)了數(shù)據(jù)采集、對(duì)賬、數(shù)據(jù)查詢等業(yè)務(wù)處理模塊,以及數(shù)據(jù)存儲(chǔ)、任務(wù)調(diào)度和運(yùn)行監(jiān)控等輔助管理模塊;在實(shí)驗(yàn)室搭建測(cè)試環(huán)境,對(duì)測(cè)試數(shù)據(jù)集進(jìn)行處理。測(cè)試表明:該方案可顯著提高對(duì)賬核算業(yè)務(wù)處理效率,增強(qiáng)支付平臺(tái)對(duì)業(yè)務(wù)需求靈活支撐的能力。
該方案中所使用的相關(guān)組件多為開源技術(shù),若要投入實(shí)際運(yùn)用,如何有效確保其穩(wěn)定可靠運(yùn)行,需要盡一步開展深入研究。