摘? 要:本文主要論述了如何利用消息中間件ActiveMQ,構(gòu)建一個(gè)通用的數(shù)據(jù)傳輸框架,以解決同步傳輸效率低下、數(shù)據(jù)傳輸不及時(shí)、數(shù)據(jù)傳輸不可靠、難以按優(yōu)先級(jí)傳輸數(shù)據(jù)等難題,實(shí)現(xiàn)一個(gè)可跨平臺(tái)、適用于各個(gè)異構(gòu)分布式系統(tǒng)、屏蔽底層細(xì)節(jié)的數(shù)據(jù)傳輸框架的搭建。該框架在應(yīng)用過(guò)程中提高了業(yè)務(wù)處理效率、降低了開(kāi)發(fā)成本。
關(guān)鍵詞:中間件;ActiveMQ;通用數(shù)據(jù)傳輸框架;民航氣象數(shù)據(jù)傳輸
中圖分類號(hào):TP311.52? ? ? 文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):2096-4706(2019)10-0026-03
Abstract:This paper mainly discusses how to use a Message-oriented middleware ActiveMQ to build a universal data transmission framework. The framework can solve the problems of low efficiency of synchronous transmission,untimely data transmission,unreliable data transmission,and difficulty in transmitting data according to priority. A cross-platform,heterogeneous distributed system,shielding the underlying details of the data transmission framework is implemented. The framework improves business processing efficiency and reduces development costs in the application process.
Keywords:message-oriented middleware;ActiveMQ;data transmission framework;civil aviation meteorological data transmission
0? 引? 言
民用航空氣象服務(wù)與民用航空的安全和經(jīng)濟(jì)效益密切相關(guān),隨著民用航空的高速發(fā)展,對(duì)氣象服務(wù)的要求越來(lái)越高。而氣象數(shù)據(jù)種類多樣、報(bào)文格式復(fù)雜、數(shù)據(jù)量大以及高時(shí)效性、準(zhǔn)確度要求高[1]等因素給氣象服務(wù)帶來(lái)了更大的挑戰(zhàn)。而現(xiàn)如今,越來(lái)越多的部門對(duì)民航氣象數(shù)據(jù)有著迫切的需求。
在當(dāng)前的業(yè)務(wù)條件下,民航氣象數(shù)據(jù)傳輸給用戶主要通過(guò)FTP的方式,F(xiàn)TP傳輸對(duì)于點(diǎn)對(duì)點(diǎn)數(shù)據(jù)傳輸有著方便易用等優(yōu)點(diǎn),卻存在傳輸及時(shí)性不強(qiáng)、可靠性不足、無(wú)法配置優(yōu)先級(jí)等缺點(diǎn)。
民航業(yè)務(wù)運(yùn)行急需一個(gè)傳輸框架,既可以解決各部門中的分布式系統(tǒng)的異構(gòu)問(wèn)題,屏蔽操作系統(tǒng)的底層細(xì)節(jié),又能保證各類數(shù)據(jù)異步傳輸、即到即轉(zhuǎn)、滿足數(shù)據(jù)時(shí)效精確到秒的要求,同時(shí)做到100%不丟數(shù)據(jù),且能設(shè)置不同的優(yōu)先級(jí)進(jìn)行數(shù)據(jù)傳輸接收。
基于現(xiàn)實(shí)迫切的需求,構(gòu)建了一個(gè)基于消息中間件的通用傳輸框架,該框架可異步傳輸數(shù)據(jù),可跨平臺(tái)使用,屏蔽底層細(xì)節(jié),同時(shí)做到數(shù)據(jù)可靠傳輸,適用于所有需要傳輸數(shù)據(jù)的場(chǎng)合。該框架選用了基于JMS(Java領(lǐng)域的MOM規(guī)范)實(shí)現(xiàn)的中間件——ActiveMQ。
1? ActiveMQ介紹
ActiveMQ是目前最為流行和強(qiáng)大的開(kāi)源消息中間件(MOM),由Apache軟件基金會(huì)(ASF)出品。ActiveMQ是用于系統(tǒng)之間遠(yuǎn)程通信的消息代理器,它遵循JMS 1.1規(guī)范和J2EE 1.4規(guī)范中的JMS服務(wù)。ActiveMQ是用Java語(yǔ)言實(shí)現(xiàn)的,它也為多種語(yǔ)言提供了API,如Java、C、C++、C#、Ruby、Perl、Python和PHP等[2],還支持OpenWire、Stomp、REST、WS-Notification、XMPP和AMQP等多種應(yīng)用連接協(xié)議。
在JMS規(guī)范中,消息類型有六種,分別是Message、TextMessage、MapMessage、BytesMessage、Stream-Message、ObjectMessage。在ActiveMQ中,還定義了新的消息類型——BlobMessage[3],可應(yīng)用于發(fā)送大文件。
關(guān)于數(shù)據(jù)傳輸模型,ActiveMQ支持兩種:點(diǎn)對(duì)點(diǎn)模型(Point To Point)和發(fā)布訂閱模式(Publisher To Sub-scriber)。
第一種模型是點(diǎn)對(duì)點(diǎn)模型(Point To Point),這個(gè)模型的特點(diǎn)是一對(duì)一,在這個(gè)模型中,生產(chǎn)者首先封裝數(shù)據(jù)生成消息,然后將消息發(fā)送至Queue,即隊(duì)列中。Active MQ服務(wù)將根據(jù)傳送策略,將該隊(duì)列中的消息傳輸?shù)较虼岁?duì)列注冊(cè)的某一個(gè)消費(fèi)者,一個(gè)消息只能傳送一次、只能由一個(gè)消費(fèi)者使用。但是,同一個(gè)隊(duì)列允許一個(gè)至多個(gè)生產(chǎn)者向其發(fā)送消息。
第二種模型是發(fā)布訂閱模型(Publisher To Sub-scriber),該模型的特點(diǎn)是一對(duì)多,即消息只需生產(chǎn)一次,就可提供個(gè)多個(gè)消費(fèi)者即用戶使用。在這個(gè)模型中,生產(chǎn)者封裝消息發(fā)送到相應(yīng)的Topic(目標(biāo)主題),ActiveMQ會(huì)將消息發(fā)送給訂閱了該主題的消費(fèi)者??梢韵騎opic發(fā)送消息的生產(chǎn)者也是可以多個(gè)的。
發(fā)布訂閱模型還支持持久訂閱,如果消費(fèi)者向Active-MQ的某個(gè)Topic提交持久訂閱要求,即向該主題注冊(cè),注冊(cè)后,ActiveMQ將保證該主題的消息能可靠地傳輸給該消費(fèi)者。當(dāng)消費(fèi)者處于掉線狀態(tài),無(wú)法及時(shí)接收到消息,ActiveMQ將為其保存該消息,直至消息到期,或者消費(fèi)者上線,正常接收到消息。未向Topic注冊(cè)的消費(fèi)者掉線時(shí),ActiveMQ不會(huì)為其保存消息,即使消費(fèi)者再次成功連接,之前的消息也將丟失。
ActiveMQ的主要特性有:
(1)遵循JMS1.1規(guī)范。ActiveMQ能夠提供同步和異步的消息投遞、一次且僅一次的消息投遞以及消息訂閱者的消息持久訂閱等功能。
(2)支持多種連接協(xié)議和連接方式。ActiveMQ同時(shí)支持不同消息代理器之間的連接和消息代理器與客戶端之間的連接。
(3)支持消息的持久存儲(chǔ)。ActiveMQ能將消息持久化地存儲(chǔ)到本地?cái)?shù)據(jù)庫(kù),以保證消息的有效投遞。
(4)具有安全認(rèn)證和授權(quán)功能。ActiveMQ可以通過(guò)本地配置文件或者插件來(lái)實(shí)現(xiàn)客戶端的安全認(rèn)證和授權(quán),以保證消息只能被授權(quán)用戶生產(chǎn)和消費(fèi)。
(5)其他高級(jí)特性。ActiveMQ還支持許多高級(jí)特性,包括消息代理器(broker)的集群、獨(dú)享消費(fèi)者、消息組和大文件消息等等。
2? 基于ActiveMQ的通用數(shù)據(jù)傳輸框架設(shè)計(jì)與實(shí)現(xiàn)
2.1? 架構(gòu)設(shè)計(jì)
本框架分為業(yè)務(wù)邏輯層、控制層、傳輸層、數(shù)據(jù)交換層[4]、分布式系統(tǒng)等幾個(gè)部分。為了實(shí)現(xiàn)全雙工通信,保證數(shù)據(jù)即到即處理,本框架的實(shí)現(xiàn)中對(duì)數(shù)據(jù)的處理選用了異步的模式,在控制模塊采用多線程的方式,在傳輸模塊選用了Pub/Sub(發(fā)布者/訂閱者)模型。
如圖1所示,控制模塊負(fù)責(zé)與業(yè)務(wù)邏輯對(duì)接,提供數(shù)據(jù)接口,相應(yīng)業(yè)務(wù)按照指定格式,通過(guò)調(diào)用數(shù)據(jù)接口將要傳輸?shù)臄?shù)據(jù)發(fā)送給控制模塊,控制模塊根據(jù)配置文件,針對(duì)不同的業(yè)務(wù)邏輯進(jìn)行數(shù)據(jù)的封裝,由傳輸模塊作為生產(chǎn)者將消息發(fā)送到指定的主題中。
而作為消息的消費(fèi)者,傳輸模塊持續(xù)監(jiān)聽(tīng)指定主題,當(dāng)主題中有最新的消息時(shí),自動(dòng)觸發(fā)控制模塊中的處理線程,將數(shù)據(jù)解封,調(diào)用對(duì)應(yīng)控制模塊中的處理線程,將數(shù)據(jù)提供給業(yè)務(wù)邏輯。
通過(guò)異步編程接口,傳輸框架向業(yè)務(wù)邏輯提供服務(wù),統(tǒng)一編程接口,屏蔽數(shù)據(jù)傳輸細(xì)節(jié)。向下利用網(wǎng)絡(luò)配置文件、ActiveMQ、基于事件的異步API等,連接各個(gè)分布式系統(tǒng),為各個(gè)系統(tǒng)提供數(shù)據(jù)通信傳輸服務(wù),降低了各個(gè)系統(tǒng)的耦合性[5]。該框架具有較高的并發(fā)處理效率,能很好地滿足及時(shí)性處理要求。
在分布式系統(tǒng)中,接收端通過(guò)多線程、事件驅(qū)動(dòng)的架構(gòu)編碼方式(Event-Driven Programming,簡(jiǎn)稱EDP)及持久訂閱等,降低數(shù)據(jù)傳輸?shù)氖录詈闲?,降低代碼間的耦合性,保證數(shù)據(jù)及時(shí)、可靠傳輸?shù)接脩簟?/p>
2.2? 系統(tǒng)實(shí)現(xiàn)
業(yè)務(wù)層的各類程序,可通過(guò)調(diào)用框架提供的接口,將要傳輸?shù)臄?shù)據(jù)告知傳輸框架,框架將這些數(shù)據(jù)并封裝成JMS消息,發(fā)送到ActiveMQ的主題中,分布式系統(tǒng)如果已注冊(cè)監(jiān)聽(tīng)該主題,則主題收到消息后將觸發(fā)接收端接收程序,數(shù)據(jù)發(fā)送到各個(gè)分布式系統(tǒng)。在此框架中,數(shù)據(jù)發(fā)送端為消息的生產(chǎn)者,接收端為消息的消費(fèi)者。在系統(tǒng)中安裝了本框架,即可同時(shí)成為數(shù)據(jù)的發(fā)送者和接收者。如圖2所示。
2.2.1? 消息控制模塊設(shè)計(jì)
框架消息控制模塊負(fù)責(zé)框架初始化、消息收發(fā)、消息路由、處理優(yōu)先級(jí)控制、帶寬控制等功能。根據(jù)業(yè)務(wù)劃分不同的消息主題,代理線程根據(jù)配置文件中記錄的主題隊(duì)列、處理優(yōu)先級(jí)、帶寬等配置進(jìn)行處理,代理線程運(yùn)行在線程池中,將收到的消息路由到相應(yīng)的業(yè)務(wù)邏輯中,業(yè)務(wù)邏輯進(jìn)行接口調(diào)用后,主題線程根據(jù)配置文件,調(diào)用相應(yīng)的模塊,生成消息發(fā)送到對(duì)應(yīng)的隊(duì)列中。為了使配置便于擴(kuò)展及解析,文件的格式選用了XML[6]。
2.2.2? 異步API
異步API實(shí)現(xiàn)了事件驅(qū)動(dòng)的消息應(yīng)用架構(gòu),利用Pub/Sub(即發(fā)布/訂閱模型),使得業(yè)務(wù)邏輯與數(shù)據(jù)傳輸形成一種松耦合的關(guān)系,從而達(dá)到異步處理。且對(duì)于多個(gè)需求相同的分布式系統(tǒng),只需調(diào)用一次API即可達(dá)到數(shù)據(jù)的多個(gè)用戶發(fā)送。
事件驅(qū)動(dòng)消息應(yīng)用架構(gòu)驅(qū)動(dòng)著業(yè)務(wù)處理流程,框架首先進(jìn)行注冊(cè)監(jiān)聽(tīng)器來(lái)檢監(jiān)測(cè)消息事件。監(jiān)測(cè)到消息事件后,代理線程從消息隊(duì)列中獲取消息,將消息封裝成相應(yīng)主題的主題事件,交由對(duì)應(yīng)的子模塊處理,子模塊被主題事件觸發(fā),根據(jù)主題尋找配置文件信息,與相應(yīng)業(yè)務(wù)邏輯進(jìn)行交互。
通過(guò)異步API屏蔽了消息等待、消息重傳、消息細(xì)節(jié)等信息,其他模塊只需針對(duì)業(yè)務(wù)邏輯進(jìn)行編程即可,降低了使用難度,提高了數(shù)據(jù)處理的及時(shí)性,避免了串行處理阻塞等待的低效率。
2.2.3? 保證數(shù)據(jù)可靠傳輸
由于民航數(shù)據(jù)對(duì)于數(shù)據(jù)傳輸?shù)目煽啃砸蠛芨撸瑐鬏斂蚣茉诎l(fā)送端采用消息存儲(chǔ)持久化的方式,保證接收端的分布式系統(tǒng)在離線或者網(wǎng)絡(luò)不穩(wěn)定的情況下,數(shù)據(jù)也可以萬(wàn)無(wú)一失地傳輸?shù)礁飨到y(tǒng)。為了降低風(fēng)險(xiǎn),提高穩(wěn)定性,傳輸框架未選用ActiveMQ內(nèi)存對(duì)象方式保存消息,而是選用了KahaDB文件庫(kù)實(shí)現(xiàn)消息持久化。并綜合考慮服務(wù)器性能、數(shù)據(jù)量大小、頻次等因素,測(cè)試性能瓶頸,制定消息持久化的保存時(shí)間。
代碼如下:
publisher.setTimeToLive(1000*60*60*24); // 設(shè)置有效時(shí)間
publisher.setDeliveryMode(DeliveryMode.PER SISTENT); // 設(shè)置持久存儲(chǔ)消息[7]
在分布式系統(tǒng)中的消費(fèi)者采用持久訂閱方式,他們分別向ActiveMQ注冊(cè)一個(gè)識(shí)別自己身份的ID(如CSAIR),當(dāng)此ID離線或網(wǎng)絡(luò)不穩(wěn)定時(shí),傳輸框架會(huì)為這個(gè)ID保存所有發(fā)送到主題的消息,當(dāng)分布式系統(tǒng)再次正常連接到ActiveMQ時(shí),該ID離線時(shí)在ActiveMQ存儲(chǔ)的所有消息將會(huì)被自動(dòng)發(fā)送過(guò)去。傳輸框架和分布式系統(tǒng)中的消費(fèi)者建立了應(yīng)答機(jī)制,框架服務(wù)器在分布式系統(tǒng)中的消費(fèi)者返回消息ACK標(biāo)識(shí)后會(huì)更新消息的消費(fèi)狀態(tài)[8]。
2.2.4? 事件驅(qū)動(dòng)的消息應(yīng)用架構(gòu)
在傳統(tǒng)的數(shù)據(jù)傳輸模型中,會(huì)采用定時(shí)輪詢方式來(lái)檢測(cè)數(shù)據(jù)文件是否到達(dá),這樣的模式會(huì)導(dǎo)致數(shù)據(jù)處理滯后,使得框架程序需占用資源、阻塞等待數(shù)據(jù)I/O,是低效的同步處理方式。本傳輸框架利用了事件驅(qū)動(dòng)的架構(gòu)編碼方式(Event-Driven Programming,簡(jiǎn)稱EDP),在傳輸層采用注冊(cè)監(jiān)聽(tīng)器的方式,當(dāng)Topic中有新消息時(shí),將馬上調(diào)用相應(yīng)的業(yè)務(wù)處理接口,配合異步編程接口,減少因隨機(jī)因素造成的資源浪費(fèi),提高系統(tǒng)的性能和可伸縮性,同時(shí)保證數(shù)據(jù)到達(dá)后立即進(jìn)行處理,符合民航數(shù)據(jù)時(shí)效性高的需求。
2.2.5? 服務(wù)的持續(xù)性保證探究
由于ActiveMQ支持配置集群,可以采用此方式避免服務(wù)的中斷。集群可以有效應(yīng)對(duì)網(wǎng)絡(luò)中斷、服務(wù)器軟硬件故障和電力中斷等問(wèn)題,與普通的備份方法相比較,集群方式使服務(wù)器切換對(duì)用戶透明,從而提供持續(xù)的服務(wù),同時(shí)也保證了數(shù)據(jù)的可靠性和不重復(fù)性。該方式在框架中的穩(wěn)定性與可靠性正處于探究階段。
2.2.6? 框架測(cè)試
完成框架的開(kāi)發(fā)和基本測(cè)試后,在民航氣象業(yè)務(wù)系統(tǒng)中使用了該框架,并聯(lián)合中國(guó)南方航空公司一起從功能、性能等方面對(duì)該框架進(jìn)行了測(cè)試。測(cè)試結(jié)果表明該框架在數(shù)據(jù)可靠傳輸、網(wǎng)絡(luò)安全、縮短數(shù)據(jù)流轉(zhuǎn)時(shí)間、方便易用等方面有較好的效果。
3? 結(jié)? 論
該框架已應(yīng)用于民航氣象數(shù)據(jù)傳輸,這種基于消息隊(duì)列的傳輸框架能夠在提高民航氣象數(shù)據(jù)傳輸能力的同時(shí),保證民航氣象數(shù)據(jù)安全、可靠的傳輸,縮短了民航氣象數(shù)據(jù)與用戶的信息交換時(shí)間,提升了經(jīng)濟(jì)效益。民航各部門中存在很多可靠性和時(shí)效性要求較高的實(shí)時(shí)數(shù)據(jù)傳輸、交換業(yè)務(wù)應(yīng)用場(chǎng)景,該框架模型在這些方面將有一定的應(yīng)用前景。
在保證服務(wù)的持續(xù)性方面,可采用ActiveMQ的集群配置,進(jìn)一步優(yōu)化現(xiàn)有框架,同時(shí)在高并發(fā)數(shù)據(jù)訪問(wèn)時(shí),如何通過(guò)負(fù)載均衡保證ActiveMQ服務(wù)器的服務(wù)質(zhì)量也是框架研究和改進(jìn)方向之一[9]。
參考文獻(xiàn):
[1] 呂常勝,張宏偉.民航氣象報(bào)文快速恢復(fù)系統(tǒng)的實(shí)現(xiàn) [J].電腦知識(shí)與技術(shù),2019,15(2):252-253.
[2] Snyder B,Bosanac D,Davies R. Introduction to Apache Active MQ Green Paper from Active MQ in action [M].London:Manning,2017:20-23.
[3] Snyder B,Davies R,Bosnanac D . ActiveMQ in action [J]. Manning,2011:4-5.
[4] 張煒.基于J2EE分布式架構(gòu)的高性能電商交易接入平臺(tái)研究與設(shè)計(jì) [J].移動(dòng)通信,2014,38(10):90-96.
[5] 馮云姣,吳斌,曾輝,等.基于Active MQ的任務(wù)集成管理系統(tǒng)設(shè)計(jì)與實(shí)現(xiàn) [J].科技視界,2018(5):39-41.
[6] 戴俊,朱曉民.基于Active MQ的異步消息總線的設(shè)計(jì)與實(shí)現(xiàn) [J].計(jì)算機(jī)系統(tǒng)應(yīng)用,2010,19(8):254-257+215.
[7] 李剛.輕量級(jí)Java EE企業(yè)應(yīng)用實(shí)戰(zhàn)——Struts 2+Spring +Hibernate整合開(kāi)發(fā) [M].北京:電子工業(yè)出版社,2008:185-208.
[8] 周聰.基于改進(jìn)的Active MQ的通信模型的設(shè)計(jì)和實(shí)現(xiàn) [D].長(zhǎng)春:吉林大學(xué),2017.
[9] 龐佳麗.分布式系統(tǒng)中基于中間件的異步通信可靠性研究 [D].浙江:浙江工業(yè)大學(xué),2017.
作者簡(jiǎn)介:陳瑤(1987.04-),女,漢族,湖南湘潭人,工程師,碩士,研究方向:數(shù)據(jù)傳輸框架。