• 
    

    
    

      99热精品在线国产_美女午夜性视频免费_国产精品国产高清国产av_av欧美777_自拍偷自拍亚洲精品老妇_亚洲熟女精品中文字幕_www日本黄色视频网_国产精品野战在线观看 ?

      Hive和Kafka在數(shù)據(jù)稽核和同步中的應(yīng)用

      2021-07-01 17:23:10曹建華徐晨敏郭昱含
      中國(guó)新通信 2021年6期

      曹建華 徐晨敏 郭昱含

      【摘要】? ? 中國(guó)電信自主測(cè)評(píng)管理平臺(tái)使用了Hadoop數(shù)據(jù)倉(cāng)庫(kù)工具Hive對(duì)基礎(chǔ)數(shù)據(jù)進(jìn)行合規(guī)性稽核,稽核后的數(shù)據(jù)通過(guò)Sqoop工具同步至Oracle關(guān)系數(shù)據(jù)庫(kù)。針對(duì)多批次百萬(wàn)級(jí)數(shù)據(jù)量并行同步會(huì)導(dǎo)致Oracle負(fù)載過(guò)大影響正常OLTP的情況,通過(guò)應(yīng)用Kafka消息隊(duì)列,將Hive與Oracle之間的數(shù)據(jù)并行同步改為異步模式下可按需設(shè)置串行/并行同步,問(wèn)題得到有效解決。

      【關(guān)鍵詞】? ? Hadoop? ? Hive? ? Sqoop? ? Kafka

      Application of hive and Kafka in data audit and synchronization

      Cao jianhua, Xu chenmin, Guo yuhan(Customer service operation support center of China Telecom Group,Shanghai 200041)

      Abstract:China Telecom independent evaluation management platform uses Hive which is a Hadoop data warehouse tool to audit the basic data, and the audited data is synchronized to Oracle relational database through sqoop tool. When multiple batches of millions of data are synchronized in parallel, Oracle load will be too large, which will affect the normal OLTP. By applying Kafka message queue, the data parallel synchronization between hive and Oracle can be changed to asynchronous mode, and either serial or parallel synchronization can be set on demand. The problem has been effectively solved.

      Key words:Hadoop、Hive、Sqoop、Kafka

      引言

      中國(guó)電信自主測(cè)評(píng)管理平臺(tái)用于支撐建立“客戶(hù)說(shuō)了算”的服務(wù)評(píng)價(jià)體系,負(fù)責(zé)對(duì)測(cè)評(píng)數(shù)據(jù)進(jìn)行全流程管理,其中包括基礎(chǔ)數(shù)據(jù)質(zhì)量稽核和用戶(hù)免打擾處理,以提升整體測(cè)評(píng)質(zhì)量。用戶(hù)免打擾處理是指對(duì)已標(biāo)識(shí)的特殊用戶(hù)不納入測(cè)評(píng),對(duì)曾經(jīng)測(cè)評(píng)過(guò)的用戶(hù)在一定期限內(nèi)避免做二次測(cè)評(píng)。用戶(hù)滿(mǎn)意度測(cè)評(píng)分若干個(gè)指標(biāo),其中綜合滿(mǎn)意度測(cè)評(píng)單批次基礎(chǔ)數(shù)據(jù)達(dá)百萬(wàn)級(jí),為對(duì)基礎(chǔ)數(shù)據(jù)進(jìn)行高效的稽核,本平臺(tái)采用了Hadoop分布式系統(tǒng)中的Hive數(shù)據(jù)倉(cāng)庫(kù)工具和oracle關(guān)系數(shù)據(jù)庫(kù)相結(jié)合的技術(shù)方案,前者用于數(shù)據(jù)稽核和免打擾處理,后者用于存儲(chǔ)稽核后的數(shù)據(jù)便于數(shù)據(jù)查詢(xún)應(yīng)用。

      本文重點(diǎn)介紹采用Hive進(jìn)行數(shù)據(jù)稽核及與Oracle之間進(jìn)行數(shù)據(jù)同步的技術(shù)實(shí)現(xiàn)和優(yōu)化方案。

      一、技術(shù)方案

      1.1技術(shù)方案選型

      目前中國(guó)電信自主測(cè)評(píng)覆蓋了公眾、政企、觸點(diǎn)、以及NPS等20多個(gè)指標(biāo),在對(duì)用戶(hù)進(jìn)行滿(mǎn)意度測(cè)評(píng)前,需要對(duì)基礎(chǔ)測(cè)評(píng)數(shù)據(jù)進(jìn)行稽核、精準(zhǔn)抽樣,以提升整體測(cè)評(píng)質(zhì)量。數(shù)據(jù)稽核和免打擾處理包括號(hào)碼長(zhǎng)度校驗(yàn)、是否數(shù)字化校驗(yàn)、省份和本地網(wǎng)歸屬校驗(yàn)、批次內(nèi)重復(fù)數(shù)據(jù)校驗(yàn)、與測(cè)評(píng)免打擾庫(kù)數(shù)據(jù)重復(fù)性校驗(yàn),其中公眾綜合滿(mǎn)意度測(cè)評(píng)基礎(chǔ)數(shù)據(jù)量單批次達(dá)上百萬(wàn),并且根據(jù)測(cè)評(píng)場(chǎng)景要求多個(gè)批次數(shù)據(jù)經(jīng)常要并行稽核,因此對(duì)數(shù)據(jù)稽核處理能力提出了較高的要求。傳統(tǒng)上數(shù)據(jù)處理通常使用關(guān)系型數(shù)據(jù)庫(kù)如Oracle,本平臺(tái)還搭建了Hadoop分布式系統(tǒng)(版本2.7.1),其中包含支持SQL的Hive工具。Hive是基于Hadoop的一個(gè)數(shù)據(jù)倉(cāng)庫(kù)工具,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫(kù)表,并提供類(lèi)SQL查詢(xún)語(yǔ)言(稱(chēng)為HiveQL)。下表是Hive與關(guān)系數(shù)據(jù)庫(kù)數(shù)據(jù)處理的對(duì)比:

      本次數(shù)據(jù)稽核不對(duì)原數(shù)據(jù)字段做修改,而是根據(jù)稽核情況追加數(shù)據(jù)標(biāo)簽??紤]到單批次處理的數(shù)據(jù)規(guī)模和并行處理要求,采用Hive作為數(shù)據(jù)稽核處理工具,稽核之后的數(shù)據(jù)通過(guò)sqoop工具同步至oracle關(guān)系數(shù)據(jù)庫(kù),方便數(shù)據(jù)的查詢(xún)和應(yīng)用。具體處理過(guò)程如圖1。

      1.2稽核和免打擾處理具體實(shí)現(xiàn)

      如圖2所示,數(shù)據(jù)稽核和同步采用HDFS Shell腳本嵌HiveSQL文件的方式,通過(guò)Linux的crontab定時(shí)任務(wù)工具觸發(fā)。各個(gè)測(cè)評(píng)指標(biāo)的稽核腳本相互獨(dú)立,只要掃描到有需要稽核的數(shù)據(jù)便開(kāi)始執(zhí)行。

      下面是以其中某個(gè)測(cè)評(píng)指標(biāo)為例的部分關(guān)鍵代碼:

      #step1:執(zhí)行加載數(shù)據(jù)到Hive的sql腳本文件

      hive -hiveconf dt=$op_time -hiveconf type1=$type1 -hiveconf file1=$file1 -f /app/data/shell/10000load.sql

      #其中10000load.sql代碼如下

      load data local inpath ‘${hiveconf:file1} overwrite into table ctd.table_temp partition(dt=${hiveconf:dt},type1=${hiveconf:type1});

      #step2:執(zhí)行稽核打標(biāo)和免打擾處理的sql腳本文件

      hive? ?-hiveconf? ?dt=$dt? -hiveconf type1=$type1 -hiveconf file_batch1=$file_batch1 -f /app/data/ shell/business_yhfw.sql

      由于該sql文件語(yǔ)句較為復(fù)雜,由于篇幅所限此處不再詳細(xì)展開(kāi)。

      #step3:執(zhí)行數(shù)據(jù)導(dǎo)出至Oracle數(shù)據(jù)庫(kù)的shell腳本

      sh /app/data/shell/Sqoop_Export.sh $dt $type1

      #其中Sqoop_Export.sh腳本中關(guān)鍵代碼如下

      sqoop export --table oracle_tablename --connect ***:thin:*** --username *** --password *** --export-dir '/apps/hive/warehouse/ctd.db/dt='$op_time'/type1='$type1''? ? ? ?\

      --columns UUID,COL1,COL2,…,COLN --input-fields-terminated-by '\001' --input-lines-terminated-by '\n' --input-null-string '\\N' --input-null-non-string '\\N'

      需要說(shuō)明的是,HiveSQL語(yǔ)法、表模型設(shè)計(jì)、執(zhí)行計(jì)劃和計(jì)算引擎是影響Hive執(zhí)行性能的主要因素,具體調(diào)優(yōu)方法可見(jiàn)本文參考文獻(xiàn)[2]。

      1.3數(shù)據(jù)同步優(yōu)化方案

      上述方案在具體應(yīng)用過(guò)程中,各類(lèi)測(cè)評(píng)指標(biāo)數(shù)據(jù)稽核任務(wù)獨(dú)立進(jìn)行,稽核完畢后即調(diào)用Sqoop工具將數(shù)據(jù)同步至oracle數(shù)據(jù)庫(kù)。當(dāng)超過(guò)百萬(wàn)數(shù)據(jù)量的多個(gè)任務(wù)并行寫(xiě)入Oracle時(shí),會(huì)導(dǎo)致其OLTP(On-Line Transaction Processing)受到嚴(yán)重影響。為了解決這個(gè)問(wèn)題,引入了Hadoop中的Kafka消息隊(duì)列,將數(shù)據(jù)并行同步優(yōu)化為異步模式下可按數(shù)據(jù)量規(guī)模設(shè)置串行/并行同步,具體流程圖如圖3。

      Kafka是一個(gè)分布式的、高吞吐量、高可擴(kuò)展性的消息系統(tǒng),它基于發(fā)布/訂閱模式,通過(guò)消息解耦,使生產(chǎn)者和消費(fèi)者異步交互,無(wú)需彼此等待。Kafka 基于頁(yè)緩存和磁盤(pán)順序?qū)懙姆绞綄?shí)現(xiàn)了寫(xiě)數(shù)據(jù)的超高性能,還具有數(shù)據(jù)壓縮、同時(shí)支持離線(xiàn)和實(shí)時(shí)數(shù)據(jù)處理等優(yōu)點(diǎn),適用于大批量日志壓縮收集、監(jiān)控?cái)?shù)據(jù)聚合等需要異步處理的場(chǎng)。應(yīng)用Kafka要避免消息不丟失不重復(fù)消費(fèi),需要設(shè)置生產(chǎn)者和消費(fèi)者的相關(guān)配置參數(shù),其生產(chǎn)者和消費(fèi)者默認(rèn)模式都采用at least once(至少一次),即消息不會(huì)丟失,但可能被處理多次。本方案使用的Kafka版本為2.11-0.9.0.1,可支持在生產(chǎn)者設(shè)置enable.idempotent參數(shù)為true,同時(shí)在消費(fèi)者設(shè)置enable.auto.commit參數(shù)為false,并自行控制offset(偏移量)的提交,來(lái)實(shí)現(xiàn)exactly once(精確一次)模式。

      本方案中通過(guò)shell腳本實(shí)現(xiàn)生產(chǎn)者向Kafka發(fā)布稽核完成的任務(wù)主題消息。為便于對(duì)Kafka中的partition進(jìn)行offset操作,應(yīng)用Java語(yǔ)言實(shí)現(xiàn)消費(fèi)者訂閱主題消息,以獲取到需要數(shù)據(jù)同步的具體任務(wù)消息,再通過(guò)shell腳本調(diào)用sqoop工具實(shí)現(xiàn)數(shù)據(jù)從Hadoop同步至oracle數(shù)據(jù)庫(kù)。

      消費(fèi)者監(jiān)聽(tīng)到主題消息時(shí),會(huì)先行判斷該消息對(duì)應(yīng)的數(shù)據(jù)稽核任務(wù)中數(shù)據(jù)量規(guī)模,當(dāng)超過(guò)設(shè)定閾值時(shí),采用單線(xiàn)程執(zhí)行數(shù)據(jù)串行同步,降低對(duì)oralce數(shù)據(jù)庫(kù)的壓力;當(dāng)?shù)陀谠O(shè)定閾值時(shí),采用多線(xiàn)程執(zhí)行數(shù)據(jù)并行同步,提升同步效率。

      關(guān)鍵代碼如下:

      #調(diào)整2.2中step3代碼:由執(zhí)行數(shù)據(jù)同步改為發(fā)送數(shù)據(jù)同步消息,異步處理

      #sh /app/data/shell/Sqoop_Export.sh $dt $type1

      sh /app/data/shell/ kafkaproject/start_kafka.sh $dt $topic $path $file_count $type1 $limit

      #其中start_kafka.sh消息發(fā)送的關(guān)鍵代碼

      cat /app/data/ctd/shell/dsfcpeq/kafkaproject/batchMessage.txt | ${kafkaPath}/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic ${topic} | > /dev/null

      #step4:應(yīng)用java實(shí)現(xiàn)消費(fèi)者的關(guān)鍵代碼

      @Component

      public class KafkaConsumer {

      ...

      // 多線(xiàn)程池

      ExecutorService fixedThreadPool = null;

      // 單線(xiàn)程池

      final ExecutorService singleThreadPool = ThreadPoolFactory.getNewSingleThreadPool();

      //監(jiān)聽(tīng)Kafka消息

      @KafkaListener(topics = “#{‘${spring.kafka.consumer.topics}.split(‘,)}”)

      public void onMessage(ConsumerRecord<?, ?> record) {

      // 獲取任務(wù)消息內(nèi)容

      ReqPara reqPara = JSON.parseObject(record.value().toString(), ReqPara.class);

      ...

      //根據(jù)數(shù)據(jù)稽核任務(wù)中數(shù)據(jù)規(guī)模等條件設(shè)置數(shù)據(jù)同步方式(單線(xiàn)程串行/多線(xiàn)程并行)

      if (!StringUtils.isEmpty(size) && !StringUtils.isEmpty(limit) && Integer.parseInt(size) > Integer.parseInt(limit)) {

      singleThreadPool.execute(runnable);

      } else {

      ...

      fixedThreadPool = ThreadPoolFactory.getNewFixedThreadPool(CpuCores * 2);

      fixedThreadPool.execute(runnable);

      }

      }

      //線(xiàn)程實(shí)現(xiàn),調(diào)用shell腳本觸發(fā)sqoop同步數(shù)據(jù)

      private Runnable newThread(List pathAndParams) {

      return new Runnable() {

      @Override

      public void run() {

      ProcessBuilder processBuilder = new ProcessBuilder(pathAndParams);

      processBuilder.redirectErrorStream(true);

      exec = processBuilder.start();

      }

      }

      }

      }

      本案例只是Kafka應(yīng)用的其中一角,在自主測(cè)評(píng)管理平臺(tái)中,還借助Kafka實(shí)現(xiàn)了對(duì)全網(wǎng)測(cè)評(píng)執(zhí)行能力的統(tǒng)籌管理,基于工作流和數(shù)據(jù)流統(tǒng)一調(diào)度CATI測(cè)評(píng)、智能語(yǔ)音測(cè)評(píng)、互聯(lián)網(wǎng)測(cè)評(píng)等能力平臺(tái)。Kafka是成長(zhǎng)最快的開(kāi)源項(xiàng)目之一,正在成為管理和處理流式數(shù)據(jù)的利器。它雖然類(lèi)似于A(yíng)ctiveMQ、RabbitMQ等消息隊(duì)列產(chǎn)品,但它以集群的方式運(yùn)行可以自由伸縮,可以滿(mǎn)足數(shù)據(jù)個(gè)性化存儲(chǔ)的要求,其流式處理能力可支持動(dòng)態(tài)地處理派生流和數(shù)據(jù)集。更多關(guān)于其安裝配置、消息生產(chǎn)與消費(fèi)、管理監(jiān)控的知識(shí)可詳見(jiàn)本文參考文獻(xiàn)[3]。

      1.4更換Hive引擎提升數(shù)據(jù)處理效率

      HiveSQL最后都會(huì)轉(zhuǎn)化成各個(gè)計(jì)算引擎所能執(zhí)行的任務(wù),目前Hive支持MapReduce(MR)、Tez和Spark 3種計(jì)算引擎。本平臺(tái)使用了Hive1.2.1版本,其默認(rèn)使用MR作為執(zhí)行引擎。由于MapReduce中間計(jì)算均需要寫(xiě)入磁盤(pán),而Spark是放在內(nèi)存中整體處理效率更高,所以可通過(guò)修改Hive的引擎即設(shè)置成Hive on Spark模式來(lái)提升數(shù)據(jù)稽核處理的效率。

      需要提醒的是Hive與Spark存在版本兼容的要求,安裝配置過(guò)程較為復(fù)雜,且上述使用的shell腳本也需要同步調(diào)整,具體本文不再贅述。

      二、總結(jié)

      本文介紹了基于Hive的大批量數(shù)據(jù)稽核處理的技術(shù)實(shí)現(xiàn)方案,并通過(guò)優(yōu)化HiveSQL語(yǔ)法和更換計(jì)算引擎進(jìn)一步提升了數(shù)據(jù)處理效率。針對(duì)多個(gè)大批量數(shù)據(jù)并行同步導(dǎo)致oracle的OLTP受到嚴(yán)重影響的問(wèn)題,并通過(guò)引入Kafka將數(shù)據(jù)并行同步優(yōu)化為異步模式下可按數(shù)據(jù)量規(guī)模設(shè)置串行/并行同步,兼顧了性能和效率。本應(yīng)用案例對(duì)于大數(shù)據(jù)量稽核和異步處理場(chǎng)景具有較高的可參考性。

      參? 考? 文? 獻(xiàn)

      [1]張良均,樊哲,位文超,劉名軍. Hadoop與大數(shù)據(jù)挖掘[M].北京:機(jī)械工業(yè)出版社,2016:25-27

      [2]林志煌. Hive性能調(diào)優(yōu)[M].北京:機(jī)械工業(yè)出版社·華章圖文,2020:2-10

      [3]Neha Narkhede等著.Kafka權(quán)威指南[M].薛命燈譯.北京:人民郵電出版社,2017:15-35

      安溪县| 襄垣县| 孟连| 左贡县| 莱芜市| 资源县| 甘谷县| 古蔺县| 镇江市| 亚东县| 泰顺县| 万年县| 霍邱县| 渝中区| 鄄城县| 铁岭市| 扶余县| 内江市| 礼泉县| 宿迁市| 马公市| 贺兰县| 霍城县| 大港区| 四子王旗| 政和县| 阿鲁科尔沁旗| 江阴市| 阜平县| 碌曲县| 惠东县| 高密市| 漳州市| 始兴县| 凤台县| 蓝田县| 石景山区| 宾川县| 佛学| 犍为县| 阿拉善右旗|