章琦皓 王 楓 王月婷
1(中科院等離子體物理研究所 安徽 合肥 230031)2(中國科學(xué)技術(shù)大學(xué) 安徽 合肥 230026)
MDSplus作為EAST聚變實驗數(shù)據(jù)存儲的主要工具之一,每年有大量的聚變科學(xué)家對MDSplus實驗數(shù)據(jù)進行訪問[1-2]。伴隨著實驗室的MDSplus存儲的數(shù)據(jù)量日益增長,訪問MDSplus的用戶也隨之增加。防止用戶惡意訪問MDSplus中某單一節(jié)點數(shù)據(jù)從而導(dǎo)致服務(wù)器負載過大、監(jiān)控MDSplus服務(wù)器的流量的出入變得尤為重要。目前正在使用的MDSplus服務(wù),其日志系統(tǒng)只是單一的記錄了用戶的TCP/IP連接記錄,并沒有記錄用戶任何其他相關(guān)的操作記錄,這給MDSplus的監(jiān)控帶來一定的盲區(qū)。如果能對MDSplus上所有用戶的操作進行監(jiān)控,并且及時、沒有偏差地記錄下來,就可以通過統(tǒng)計知道用戶對聚變實驗?zāi)承?shù)據(jù)的偏好,體現(xiàn)該信號量所具有的研究價值。對MDSplus日志進行數(shù)據(jù)分析,提取有效的日志信息,采用現(xiàn)有的大數(shù)據(jù)技術(shù)上的機器學(xué)習(xí)等方法,搭建出一套可用的MDSplus日志應(yīng)用平臺。
據(jù)統(tǒng)計目前每天EAST上的MDSplus日志大概有3萬條日志記錄,這些還只是單一的TCP/IP記錄,如果通過完善目前的MDSplus日志系統(tǒng),可以記錄所有用戶的操作,那么每天會有百萬條日志記錄。當(dāng)然這一日志記錄很有可能在實驗期間某一時間段呈現(xiàn)爆發(fā)式增長,在秒級別內(nèi)產(chǎn)生百萬條日志記錄,在為了應(yīng)對未來海量的數(shù)據(jù)日志消息的產(chǎn)生,本文借助大數(shù)據(jù)技術(shù)進行海量日志的分析。
另外,MDSplus聚變實驗數(shù)據(jù)存儲量很快要達到PB級,未來聚變實驗數(shù)據(jù)很有可能使用類似于Hadoop這樣的大數(shù)據(jù)框架進行存儲,使用大數(shù)據(jù)技術(shù)進行日志分析迎合了未來數(shù)據(jù)存儲的發(fā)展趨勢。
目前國內(nèi)外所有使用MDSplus的實驗室或者研究機構(gòu)沒有針對MDSplus日志這一項功能進行相關(guān)的技術(shù)上的完善,更沒有相關(guān)的日志上數(shù)據(jù)的分析,所以在原有日志的基礎(chǔ)上構(gòu)建一個基于大數(shù)據(jù)技術(shù)的MDSplus日志分析系統(tǒng)具有技術(shù)上的挑戰(zhàn)和實際實驗中的意義和價值。
作為EAST實驗數(shù)據(jù)重要的存儲工具,MDSplus日志系統(tǒng)需要改變以往的簡單的記錄方式。EAST上MDSplus access日志原有的格式如下:
1) {date} (pid number) Connection received{or disconnected} from {username}@{ipAddress}
2) Invalid message
格式只是單一的記錄下了簡單的遠程用戶的連接記錄,包括用戶名和IP地址,其次還有一些無效的日志信息參雜在日志中。實際情況中,MDSplus日志系統(tǒng)需要記錄下更多有效的日志信息。如表1所示,希望能夠記錄更多關(guān)于用戶在MDSplus上的數(shù)據(jù)操作類型,如GetData、GetSegment等操作。
表1 MDSplus日志設(shè)計需求
完善現(xiàn)有的MDSplus日志系統(tǒng),增加更多的日志記錄信息之后才能建立一個集離線與實時于一體的日志分析系統(tǒng)。整個架構(gòu)系統(tǒng)能夠達到線上實時預(yù)警、流量監(jiān)控。線下提取有效信息,采取應(yīng)對手段的功能。整個工作分成四步:
1) 完善的MDSplus日志功能。
2) 針對MDSplus日志進行離線。
3) 針對MDSplus日志進行實時分析。
4) 日志數(shù)據(jù)可視化。
日志分析系統(tǒng)整個軟件架構(gòu)如圖1所示。整個系統(tǒng)的設(shè)計是在擁有完整的日志信息前提下借助于現(xiàn)有的大數(shù)據(jù)技術(shù)對日志信息進行處理。
圖1 系統(tǒng)總體架構(gòu)圖
系統(tǒng)用到的大數(shù)據(jù)技術(shù)和概念包含以下幾個方面:
1) Flume:分布式、可靠、高可用的海量日志聚合的系統(tǒng),支持在系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù)[3]。
2) Kafka:高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),形成流式數(shù)據(jù),供Spark Streaming 進行流式計算[4]。
3) Hadoop:大數(shù)據(jù)分布式開發(fā)框架,使用HDFS進行數(shù)據(jù)存儲。
4) Spark Streaming:基于Spark生態(tài)圈的準實時流數(shù)據(jù)計算框架[5]。
5) Zeppelin:大數(shù)據(jù)可視化工具,除了能夠接入傳統(tǒng)的MYSQL數(shù)據(jù)源以外,還能很好地接入Hadoop和Spark的數(shù)據(jù)源。
6) Web端:傳統(tǒng)的數(shù)據(jù)展示手段,其中使用到了Echarts等開源插件。
由圖1可看出,遠程用戶針對MDSplus服務(wù)器進行數(shù)據(jù)訪問,產(chǎn)生大量的數(shù)據(jù)訪問日志。該日志信息實時被Flume服務(wù)監(jiān)聽,監(jiān)聽到日志的變化,一方面將其發(fā)送到Hadoop集群中的HDFS(Hadoop分布式文件系統(tǒng))中進行持久化存儲,方便日后的離線處理,另一方面將日志信息發(fā)送到Kafka服務(wù)中,轉(zhuǎn)換成實時數(shù)據(jù)流供Spark Streaming進行實時的流數(shù)據(jù)處理。因為原有的MDSplus日志信息在一段時間后會自動被新的日志信息給覆蓋,所以在數(shù)據(jù)采集方面為了保證原有的MDSplus日志信息持久化,在數(shù)據(jù)收集時先在Flume端進行自定義正則過濾器,將不必要的日志信息過濾掉。一方面在離線數(shù)據(jù)存儲時候?qū)⑷罩拘畔凑漳暝氯諘r間被歸分到HDFS存儲下不同的目錄中,另一方面在實時數(shù)據(jù)傳輸中將日志信息按照不同的日志類型存儲在Kafka不同的Topic中。采用這樣方式進行數(shù)據(jù)采集,將傳統(tǒng)的服務(wù)器日志信息和新生的大數(shù)據(jù)采集框架有機地結(jié)合起來,從離線和實時兩個方面使得日志信息的收集存儲有很好的條理性和邏輯性。數(shù)據(jù)還可以持久化到MYSQL數(shù)據(jù)庫中,中間利用到Zeppelin數(shù)據(jù)可視化工具和Web數(shù)據(jù)展示工具。至此,整個流程將日志的產(chǎn)生、處理、展現(xiàn)綜合起來,形成了一個完整的EAST實驗數(shù)據(jù)訪問日志分析系統(tǒng)。
根據(jù)現(xiàn)有的MDSplus源碼接口,采用鉤子函數(shù)監(jiān)聽的方式對整個MDSplus服務(wù)器進行監(jiān)聽[6]。設(shè)計對應(yīng)的鉤子函數(shù)可以對所需要的信息進行鉤取。將需要監(jiān)聽MDSplus的用戶操作使用枚舉的方式存儲,然后對應(yīng)到相應(yīng)的Notify通知中,針對不同的操作調(diào)用CallHookback函數(shù),通知到MDSplus的日志文件中去。整個MDSplus日志架構(gòu)和流程如圖2所示。
圖2 MDSplus日志完善架構(gòu)圖
1) 遠程客戶發(fā)出請求,連接到MDSplus服務(wù)器;
2) 客戶在服務(wù)器上進行一系列的TreeOpen、GetData等操作;
3) 對應(yīng)的操作觸發(fā)TreeCallHook函數(shù);
4) TreeCallHook函數(shù)觸發(fā)libTreeShrHook.so動態(tài)鏈接庫;
5) 動態(tài)鏈接庫將相應(yīng)的日志內(nèi)容以刷新緩沖的方式打印到日志文件中;
6) 客戶端斷開與服務(wù)器的連接。
完善的日志系統(tǒng)中調(diào)用的動態(tài)鏈接庫LibTreeShrHook.so使用到的鉤子函數(shù)算法如下(偽代碼):
int Notify(TreeshrHookType Htype, char *tree, int shot, int nid)
{
SomeVarDefine();
//定義一些記錄信息變量
switch (Htype) {
//匹配對應(yīng)的數(shù)據(jù)操作類型
case OpenTree:
name=″OpenTree″;
Operation_1();
break;
…………………… 省略……………………
case GetSegment:
name=″GetSegment″;
Opreation_n();
break;
}
printf(Meassage);
//打印日志信息
fflush(stdout);
//刷新日志信息到標準輸出中
if (path != na && path != (char *)0)
free(path);
//釋放節(jié)點路徑
return 1;
}
值得注意的是,目前鉤子函數(shù)的觸發(fā)條件是遠程的客戶端連接方式,暫不支持本地操作日志記錄功能。該算法基本能夠?qū)崿F(xiàn)目前所需要的MDSplus Log的功能。圖3是目前完善后的日志文件能夠記錄到的日志內(nèi)容,新增加了{date} (pid number) HookType called for {node absolutely path}日志格式,使得日志信息更加完整、可靠。
圖3 MDSplus日志內(nèi)容
MDSplus日志內(nèi)容作為Flume的代理對象Agent的數(shù)據(jù)來源,將日志信息緩沖到Channel中。采用了Flume的選擇分流模式,將事件流向兩個目的地。在離線模式下,Channel1介質(zhì)設(shè)置為磁盤介質(zhì),一旦達到緩沖大小,就將日志內(nèi)容發(fā)送到下游Sink1指定的HDFS中進行存儲。分流模式如圖4所示。
圖4 Flume分流模式
HDFS中的存儲內(nèi)容是Flume過濾后以天數(shù)為單位存儲的日志信息,使用Hadoop的MapReduce計算框架將日志信息分解成兩種不同的數(shù)據(jù)模型[7]:
(1) 客戶模型。記錄著當(dāng)前用戶的信息,包括用戶連接或者斷開連接的時間、當(dāng)前連接的進程號、用戶名、IP地址、當(dāng)前用戶狀態(tài)等信息:
client(linkTime:String,pid:Integer,user:String,host:String, status:String)
(2) 操作模型。記錄著用戶連接MDSplus服務(wù)器后一系列的操作信息,包括操作時間、進程號、數(shù)據(jù)操作類型、操作的樹名、炮號名等信息:
operation(linkTime:String,pid:Integer,hooktype:String, tree:String, shot:Integer, nodepath:String)
考慮到日志信息中含有多種不同類型的日志信息種類,所以在MapReduce的map過程需要接收兩種不同的輸入數(shù)據(jù)類型進行序列化,分別是client數(shù)據(jù)類型和operation數(shù)據(jù)類型。然后繼承Hadoop接口中的GenericWritable類,將兩種數(shù)據(jù)類型結(jié)合起來,這樣就解決了map過程中可能出現(xiàn)不同的數(shù)據(jù)類型的情況,具體如下:
public class logWritable extends GenericWritable {
private static Class extends Writable>[] CLASSES=null;
static {
CLASSES=(Class extends Writable>[]) new Class[] {
org.apache.hadoop.io.Text.class,
ClientWritable.class,
//自定義client類型
OperationWritable.class
//自定義operation類型
};
}
……………………………省略……………………………
}
經(jīng)過數(shù)據(jù)的ETL過程,可以看到MDSplus日志信息被提取出來放在以下兩個數(shù)據(jù)庫表中。每個表中部分信息如圖5所示。
圖5 離線處理結(jié)果
考慮到要從MDSplus日志中獲取到實時的信息,從而即時地采取手段進行應(yīng)對惡意的服務(wù)器攻擊等行為。關(guān)于實時計算框架在Spark Streaming和Storm之間的選擇,可以清楚地看見,Storm對于消息的處理是純實時的,是一條一條消息進行處理,但是相比較于Spark Streaming吞吐量比較低[8]?;谝韵聨c的考慮,實時處理框架最終選擇了Spark Streaming計算模型:
1) MDSplus日志分析不需要達到純實時的精確度。
2) Spark生態(tài)對實時計算、離線批處理、交互式查詢等業(yè)務(wù)功能可拓展性強。
3) Spark生態(tài)圈很容易和現(xiàn)有的Hadoop生態(tài)圈結(jié)合。
結(jié)合圖4,很容易看到MDSplus的日志流的下一個目的地是Kafka,其中采用的緩沖通道Channel2是內(nèi)存緩沖。為了避免Flume直接將日志文件直接發(fā)送給Spark Streaming處理導(dǎo)致的計算框架崩潰的情況,其將消息流先發(fā)送給Kafka這個消息中間件,日志數(shù)據(jù)以發(fā)布-訂閱的模式實時記錄到對應(yīng)的topic里,Spark Streaming從相對應(yīng)的Topic中讀取數(shù)據(jù)流進行流數(shù)據(jù)計算。
在整個準實時數(shù)據(jù)處理流程中,采用Spark原生的編程語言Scala進行編程,降低了代碼的冗余。處理的過程中,根據(jù)流數(shù)據(jù)的內(nèi)容進行過濾,提取日志內(nèi)容中有效字段。將原有的RDD(resilient distributed dataset)轉(zhuǎn)換成以RDD為基礎(chǔ)的分布式數(shù)據(jù)集的DataFrame形式。其中DataFrame應(yīng)用于使用SQL處理數(shù)據(jù)的場景,在系統(tǒng)中采用了Spark的SQLContext類,將處理后的字段寫入到MySQL數(shù)據(jù)庫中。部分處理過程如下所示:
//開始處理整個日志內(nèi)容
logs.foreachRDD(logs=>{
//創(chuàng)建一個sqlcontext單例模式
val sQLContext=
SQLContextSingleton.getInstance(logs.sparkContext)
import sQLContext.implicits._
//client日志內(nèi)容處理
var flag =″OFF″
val logClient=logs.filter({s=>
s.contains(″Connection″)
}).map({k=>
k.split(″ ″)
}).map({t=>
if(t(9)==″received″)
flag=″ON″
else
flag=″OFF″
new client(
linkTime=t(0)+″ ″+t(1)+″ ″+t(2)+″ ″+t(3)+″ ″+t(4),
//后面轉(zhuǎn)換成timeStamp
pid=t(7).replace(″)″,″″).toInt,
user=t(11).split(″@″)(0),
host=t(11).split(″@″)(1),
status=flag
)
}).toDF()
logClient.registerTempTable(″client″)
經(jīng)過SparkStreaming處理后提取出來的字段放置在不同的DataFrame中,最終的結(jié)果存到MYSQL數(shù)據(jù)庫中,供數(shù)據(jù)展示前端進行可視化。
無論是離線的數(shù)據(jù)處理,還是涉及到的實時數(shù)據(jù)處理,都需要將數(shù)據(jù)進行可視化,方便大家快速直觀地了解目前MDSplus服務(wù)相關(guān)的信息。在前端的展示上采取了Zeppelin數(shù)據(jù)可視化工具和傳統(tǒng)的Web展示工具兩種方式相結(jié)合的手段。Zeppelin作為大數(shù)據(jù)可視化工具,不僅能夠很好地支持Spark和Hadoop,還能和傳統(tǒng)的MYSql相互連接。Web展現(xiàn)的方式采用了Echarts插件,將MDSplus服務(wù)器狀態(tài)能夠直觀展現(xiàn)出來[9]。圖6是Web端日志數(shù)據(jù)可視化的內(nèi)容之一,顯示當(dāng)前各國在線人數(shù)以及當(dāng)前最長在線的用戶。
圖6 數(shù)據(jù)可視化Web展示
系統(tǒng)測試過程中,采取多線程并發(fā)式模擬多用戶訪問MDSplus數(shù)據(jù)庫,并對MDSplus數(shù)據(jù)庫進行各種數(shù)據(jù)讀取等操作。模擬并發(fā)用戶量1 000多名,每個用戶的操作平均產(chǎn)生20條日志,共計產(chǎn)生約3萬條數(shù)據(jù)。測試采取兩種不同的方式進行日志處理,分別是MapReduce方式的離線數(shù)據(jù)處理、Spark Streaming的準實時數(shù)據(jù)處理方式。表2是兩種不同的處理方式的時間上的對比。
表2 MDSplus日志處理方式對比
由于離線數(shù)據(jù)在處理的過程中,需要啟動系統(tǒng)的資源,所以耗費比較長的時間,但是在數(shù)據(jù)量達到海量時,該處理方式具有一定的優(yōu)勢。而準實時處理是按照時間切片進行數(shù)據(jù)拉取和處理,所以在實時性方面占有優(yōu)勢。該工作系第一次對MDSplus日志進行功能完善和日志信息處理的工作,目前還沒有其他相關(guān)的工作對MDSplus進行日志完善和處理,屬于原創(chuàng)性工作,所以暫時沒有和其他的工作在時間和結(jié)果上進行對比。
本文介紹了利用大數(shù)據(jù)技術(shù)對EAST數(shù)據(jù)訪問日志分析系統(tǒng)的設(shè)計和實踐。該系統(tǒng)極大地方便了聚變科研人員對EAST實驗數(shù)據(jù)的管理。首次對MDSplus的日志系統(tǒng)進行改進,完善了MDSplus日志信息。針對用戶行為產(chǎn)生的海量日志數(shù)據(jù),使用大數(shù)據(jù)技術(shù)中比較成熟的HadoopMR、SparkStreaming等技術(shù)很好地完成了日志的離線和在線的解析。這項工作不僅為 聚變領(lǐng)域中數(shù)據(jù)訪問工作提供了借鑒,還對其他的海量日志的處理工作具有一定的參考價值。