◎王正迅
基于傳統(tǒng)關(guān)系型數(shù)據(jù)庫(kù)的穩(wěn)定性,目前還有很多企業(yè)將數(shù)據(jù)存儲(chǔ)在關(guān)系型數(shù)據(jù)庫(kù)中,但是關(guān)系型數(shù)據(jù)庫(kù)的數(shù)據(jù)模型較簡(jiǎn)單,不適合表達(dá)復(fù)雜的數(shù)據(jù)關(guān)系,在處理大量數(shù)據(jù)、半結(jié)構(gòu)化和非結(jié)構(gòu)化數(shù)據(jù),以及系統(tǒng)容錯(cuò)和系統(tǒng)擴(kuò)展性方面受到了一定的限制,Hadoop 下的系列工具則有較大優(yōu)勢(shì),早期由于工具的缺乏,Hadoop 集群與傳統(tǒng)數(shù)據(jù)庫(kù)之間的數(shù)據(jù)傳輸非常困難?;谶@些方面的考慮,需要一個(gè)能在傳統(tǒng)關(guān)系型數(shù)據(jù)庫(kù)和Hadoop 之間進(jìn)行數(shù)據(jù)遷移的工具,Sqoop 應(yīng)運(yùn)而生,Apache 提供的Sqoop 工具,能實(shí)現(xiàn)自動(dòng)化數(shù)據(jù)遷移,依托于數(shù)據(jù)庫(kù)相關(guān)的schema 描述信息,遷移的過(guò)程則使用MapReduce(后面都簡(jiǎn)寫(xiě)為MR)來(lái)進(jìn)行。Sqoop 作為一個(gè)跨平臺(tái)抽取和輸出數(shù)據(jù)的工具,在關(guān)系型數(shù)據(jù)庫(kù)(MySQL、O-ralce 等)和大數(shù)據(jù)平臺(tái)(HDFS、Hive、HBase)之間常用。作為ETL 過(guò)程中重要的一環(huán),加載作業(yè)的性能也是需要關(guān)注和優(yōu)化的。本文將主要闡明如何在異構(gòu)環(huán)境中使用Sqoop 方法進(jìn)行數(shù)據(jù)遷移。
Sqoop 是一款用于在Hadoop 和關(guān)系型數(shù)據(jù)庫(kù)之間高效遷移大批量數(shù)據(jù)的開(kāi)源工具,類似于其他ETL 工具,Sqoop 使用元數(shù)據(jù)模型來(lái)判斷數(shù)據(jù)類型,并在數(shù)據(jù)從數(shù)據(jù)源轉(zhuǎn)移到Hadoop 時(shí)確保傳輸安全的數(shù)據(jù)處理,專為大數(shù)據(jù)批量傳輸設(shè)計(jì),能夠分割數(shù)據(jù)集并創(chuàng)建Maptask 任務(wù)來(lái)處理每個(gè)區(qū)塊。以RDBMS 和HDFS 之間數(shù)據(jù)傳輸為例,Sqoop 借助于MR 導(dǎo)入和導(dǎo)出數(shù)據(jù),用戶可以輕松地以命令行模式從RDBMS 如MySQL 或 Oracle 中導(dǎo)入數(shù)據(jù)到 HDFS 中,通過(guò) Hadoop 的MR 模型計(jì)算完之后,將結(jié)果導(dǎo)回RDBMS,Sqoop 能夠自動(dòng)完成整個(gè)過(guò)程中的大部分,并提供容錯(cuò)和并行化操作。
Sqoop 本質(zhì)就是遷移數(shù)據(jù),用戶在使用Sqoop 在異構(gòu)環(huán)境間遷移數(shù)據(jù)時(shí),Sqoop Client 提供了CLI 和瀏覽器兩種方式提交請(qǐng)求,然后Sqoop Server 收到請(qǐng)求后,授權(quán)MR 執(zhí)行。這個(gè)過(guò)程它高度依賴Hadoop 并行導(dǎo)入數(shù)據(jù),充分利用了MR 的并行特點(diǎn),以批處理的方式加快數(shù)據(jù)的傳輸,同時(shí)也借助MR 實(shí)現(xiàn)了容錯(cuò)。
Sqoop 把關(guān)系型數(shù)據(jù)庫(kù)(以mysql 為例)的數(shù)據(jù)導(dǎo)人到HDFS 中,主要分為兩步:一是得到元數(shù)據(jù)(mysql 數(shù)據(jù)庫(kù)中的數(shù)據(jù)),二是提交Map。在這個(gè)過(guò)程中,sqoop 會(huì)通過(guò)jdbc來(lái)獲取需要的數(shù)據(jù)庫(kù)的元數(shù)據(jù)信息,例如:導(dǎo)入的表的列名,數(shù)據(jù)類型。這些數(shù)據(jù)庫(kù)的數(shù)據(jù)類型會(huì)被映射成為java 的數(shù)據(jù)類型,根據(jù)這些信息,sqoop 會(huì)生成一個(gè)與表名相同的類,用來(lái)完成序列化工作,最后使用Java 類進(jìn)行反序列化,MR并行寫(xiě)數(shù)據(jù)到Hadoop 中,從而保存表中的每一行記錄。在導(dǎo)入數(shù)據(jù)時(shí),如果不想取出全部數(shù)據(jù),可以通過(guò)類似于where 的語(yǔ)句進(jìn)行限制。
圖1 Sqoop 數(shù)據(jù)導(dǎo)入機(jī)制
Sqoop 的導(dǎo)出通常是將 HDFS、HBase、Hive 中的數(shù)據(jù)導(dǎo)出到關(guān)系型數(shù)據(jù)庫(kù)中,關(guān)系型數(shù)據(jù)庫(kù)中的表必須提前創(chuàng)建好。底層方面,同樣是通過(guò)jdbc 讀取HDFS/HBase/Hive 數(shù)據(jù),生成Java 類(這個(gè)類主要作用是解析文本中的數(shù)據(jù)),用于序列化,最后export 程序啟動(dòng),通過(guò)Java 類反序列化,同時(shí)啟動(dòng)多個(gè)Map 將相應(yīng)值插入表中。
圖2 Sqoop 數(shù)據(jù)導(dǎo)出機(jī)制
數(shù)據(jù)導(dǎo)入分全量導(dǎo)入和增量導(dǎo)入。
(1)全量導(dǎo)入數(shù)據(jù)。全量數(shù)據(jù)導(dǎo)入就是一次性將所有需要導(dǎo)入的數(shù)據(jù),從關(guān)系型數(shù)據(jù)庫(kù)一次性地導(dǎo)入到HDFS 中(也可以是HBase、Hive 等)。全量導(dǎo)入形式使用場(chǎng)景為一次性離線分析場(chǎng)景。用sqoop import 命令,具體如下:
(2)導(dǎo)入數(shù)據(jù)庫(kù)中的部分?jǐn)?shù)據(jù)。導(dǎo)入部分?jǐn)?shù)據(jù)可以在行與列的選取上添加參數(shù)完成,列選取上添加一個(gè)--columns參數(shù),指定數(shù)據(jù)庫(kù)中需要導(dǎo)入的列,如添加--columns id,name,age,sex;行選取上添加 --where 參數(shù),增加 where 條件篩選滿足條件的行,如--where "age >= 20" ;還可以使--query 參數(shù)查詢篩選需要導(dǎo)入的數(shù)據(jù),同時(shí)實(shí)現(xiàn)行、列的選取,如 --query"select id,name,age,sex from t_user_info where age>=20 and$CONDITIONS"。
(3)增量導(dǎo)入數(shù)據(jù)。在實(shí)際生產(chǎn)環(huán)境中,系統(tǒng)可能會(huì)定期從與業(yè)務(wù)相關(guān)的關(guān)系型數(shù)據(jù)庫(kù)向Hadoop 導(dǎo)入數(shù)據(jù),導(dǎo)入數(shù)據(jù)倉(cāng)庫(kù)后進(jìn)行后續(xù)離線分析。數(shù)據(jù)量比較大,有的前期數(shù)據(jù)已經(jīng)被用于項(xiàng)目分析了,我們此時(shí)不可能再將所有數(shù)據(jù)重新導(dǎo)一遍,此時(shí)我們就需要增量數(shù)據(jù)導(dǎo)入這一模式了。增量數(shù)據(jù)導(dǎo)入分兩種,一是基于遞增列的增量數(shù)據(jù)導(dǎo)入(Append方式)。二是基于時(shí)間列的增量數(shù)據(jù)導(dǎo)入(LastModified 方式)。在--incremental 參數(shù)后通過(guò)指定Append 方式或LastModified 方式。
export 是HDFS 里的文件導(dǎo)出到關(guān)系型數(shù)據(jù)庫(kù)的工具,不能直接從hive、hbase 導(dǎo)出數(shù)據(jù)。如果要把hive 表數(shù)據(jù)導(dǎo)出到關(guān)系型數(shù)據(jù)庫(kù),需先把hive 表通過(guò)查詢寫(xiě)入到一個(gè)暫存表,臨時(shí)用文本格式,然后再?gòu)脑摃捍姹砟夸浝飳?dǎo)出數(shù)據(jù)。
執(zhí)行數(shù)據(jù)導(dǎo)出前,數(shù)據(jù)庫(kù)中必須已經(jīng)存在要導(dǎo)入的目標(biāo)表,默認(rèn)操作是從將文件中的數(shù)據(jù)使用INSERT 語(yǔ)句插入到表中,也可選擇更新模式(Sqoop 將生成UPDATE 替換數(shù)據(jù)庫(kù)中現(xiàn)有記錄的語(yǔ)句)或調(diào)用模式(Sqoop 將為每條記錄創(chuàng)建一個(gè)存儲(chǔ)過(guò)程調(diào)用)。
默認(rèn)情況下,sqoop export 將每行輸入記錄轉(zhuǎn)換成一條INSERT 語(yǔ)句,添加到目標(biāo)數(shù)據(jù)庫(kù)表中。如果數(shù)據(jù)庫(kù)中的表具有約束條件(例如,其值必須唯一的主鍵列)并且已有數(shù)據(jù)存在,則必須注意避免插入違反這些約束條件的記錄。如果INSERT 語(yǔ)句失敗,導(dǎo)出過(guò)程將失敗。此模式主要用于將記錄導(dǎo)出到可以接收這些結(jié)果的空表中。通常用于全表數(shù)據(jù)導(dǎo)出。使用如下命令可完成:
更新模式導(dǎo)出,僅僅更新已存在的數(shù)據(jù)記錄,不會(huì)插入新記錄,該模式用于更新源表與目標(biāo)表中數(shù)據(jù)的不一致,即在不一致時(shí),將源表中的數(shù)據(jù)遷移至目標(biāo)表(如MySQL、Oracle 等的目標(biāo)表中),這種不一致是指一條記錄中存在的不一致,比如HDFS 表和MySQL 中都有一個(gè)id=1 的記錄,但是其中一個(gè)字段的取值不同,則該模式會(huì)將這種忽視差異。對(duì)于“你有我無(wú)”的記錄則不做處理,通過(guò)指定update-key 并在—update-mode 參數(shù)后指定是updateonly 模式。調(diào)用模式導(dǎo)出,會(huì)更新已存在的數(shù)據(jù)記錄,同時(shí)插入一個(gè)新記錄,實(shí)質(zhì)上是插入一個(gè)update+insert 的操作,同樣是通過(guò)指定update-key 并在—update-mode 參數(shù)后指定是allowinsert模式。
Sqoop 作為一種重要的數(shù)據(jù)遷移工具,在使用過(guò)程中需要遵守?cái)?shù)據(jù)庫(kù)約束、數(shù)據(jù)庫(kù)連接機(jī)制,考慮空值、并行度、分隔符等原因?qū)е碌膫鬏斄袛?shù)和表的列數(shù)不一致等問(wèn)題。
空值問(wèn)題常見(jiàn)于Hive 與MySQL 數(shù)據(jù)遷移過(guò)程中發(fā)生。Hive 中的 Null 在底層是以“N”來(lái)存儲(chǔ),而 MySQL 中的 Null在底層就是Null,這就導(dǎo)致了兩邊進(jìn)行數(shù)據(jù)遷移時(shí)存儲(chǔ)不一致問(wèn)題,Sqoop 要求在數(shù)據(jù)遷移的時(shí)候嚴(yán)格保證兩端的數(shù)據(jù)格式、數(shù)據(jù)類型一致,否則會(huì)帶來(lái)異常。
為了保證數(shù)據(jù)兩端的一致性,數(shù)據(jù)遷移的過(guò)程中遇到null-string,null-non-string 數(shù)據(jù)都轉(zhuǎn)化成指定的類型,通常指定成"N"。依賴自身參數(shù)在導(dǎo)入數(shù)據(jù)時(shí)采用--null-string“\N”和--null-non-string“\N”,在導(dǎo)出數(shù)據(jù)時(shí)采用--input-null-string“\N”和 --input-null-non-string“\N”兩個(gè)參數(shù),在使用這些參數(shù)過(guò)程中,需要正確地將值N 轉(zhuǎn)義到\N。
(1)任務(wù)失敗導(dǎo)致數(shù)據(jù)不一致。由于Sqoop 將導(dǎo)出過(guò)程分解為多個(gè)事務(wù),因此失敗的導(dǎo)出作業(yè)可能會(huì)導(dǎo)致將部分?jǐn)?shù)據(jù)提交到數(shù)據(jù)庫(kù)。在某些情況下,這可能會(huì)導(dǎo)致后續(xù)作業(yè)由于插入沖突而失敗,或者在其他情況下導(dǎo)致重復(fù)數(shù)據(jù)。如這樣一個(gè)場(chǎng)景:export 到 Mysql 時(shí),使用 6個(gè) Map 任務(wù),過(guò)程中有3個(gè)任務(wù)失敗,那此時(shí)MySQL 中存儲(chǔ)了另外三個(gè)Map任務(wù)導(dǎo)入的數(shù)據(jù),此時(shí)會(huì)生成一個(gè)不完整的報(bào)表數(shù)據(jù)。繼續(xù)調(diào)試問(wèn)題并最終將全部數(shù)據(jù)正確的導(dǎo)入MySQL,會(huì)再次生成一個(gè)報(bào)表數(shù)據(jù),而這個(gè)報(bào)表數(shù)據(jù)與之前的報(bào)表數(shù)據(jù)是不一致,這在生產(chǎn)環(huán)境是不允許的。這種情況下,可以通過(guò)--staging-table 參數(shù)指定一個(gè)staging 表來(lái)克服這個(gè)問(wèn)題,指定的這個(gè)staging 表在單個(gè)事務(wù)中,暫存數(shù)據(jù),等到事務(wù)完全處理完畢再移動(dòng)到目標(biāo)表。為了使用暫存功能,必須在運(yùn)行導(dǎo)出作業(yè)之前創(chuàng)建暫存表,該表必須在結(jié)構(gòu)上與目標(biāo)表相同,這個(gè)表應(yīng)該在導(dǎo)出作業(yè)運(yùn)行之前為空,所以需要--clear-staging-table 這個(gè)參數(shù)配合起來(lái)使用。
(2)分隔符問(wèn)題導(dǎo)致數(shù)據(jù)不一致。Sqoop 默認(rèn)字段與字段之間是用“,”分隔開(kāi),Hive 默認(rèn)的列分隔符是 ^A(001),行與行之間的分隔符是“
”,當(dāng)然,在創(chuàng)建這些表(包括MySQL表)的時(shí)候也可以自己指定分隔符。在數(shù)據(jù)遷移過(guò)程中,由于分隔符的不一致會(huì)導(dǎo)致數(shù)據(jù)遷移失敗,由于導(dǎo)入的數(shù)據(jù)中有'
',hive 會(huì)認(rèn)為一行已經(jīng)結(jié)束,后面的數(shù)據(jù)被分割成下一行,也會(huì)導(dǎo)致數(shù)據(jù)不一致。這時(shí)可以使用--lines-terminated-by和--fields-terminated-by 這兩個(gè)參數(shù)來(lái)自定義行分隔符和列分隔符進(jìn)行解決。但是hive 只支持'
'作為行分隔符,所以在關(guān)系型數(shù)據(jù)庫(kù)與Hive 進(jìn)行數(shù)據(jù)遷移時(shí),還需要加上--hive-delims-replacement
--hive-delims-replacement
--hive-drop-import-delims:將分隔符中的/0x01 和/r/n去掉
在生產(chǎn)環(huán)境中,由于數(shù)據(jù)量巨大,數(shù)據(jù)結(jié)構(gòu)復(fù)雜,Sqoop導(dǎo)入數(shù)據(jù)報(bào)內(nèi)存溢出以及抽數(shù)時(shí)間過(guò)長(zhǎng),日志顯示有個(gè)別的reduce 執(zhí)行時(shí)間過(guò)長(zhǎng),卡在99%那個(gè)位置,例如有25個(gè)Map 中有24個(gè)Map 是在20 秒內(nèi)執(zhí)行完成,只有1個(gè)Map用了6 分多鐘,這種Map 分布不均勻,就是數(shù)據(jù)傾斜現(xiàn)象。導(dǎo)致數(shù)據(jù)傾斜的原因有可能是數(shù)據(jù)本身就不均勻,或是分隔符問(wèn)題,或是數(shù)據(jù)類型不一致等。這時(shí)需要設(shè)置--split-by、--num-Mappers 和--split-Mappers 這三個(gè)參數(shù)。
在import 時(shí),指定--split-by 參數(shù),Sqoop 根據(jù)不同的split-by 參數(shù)值來(lái)進(jìn)行切分,然后將切分出來(lái)的區(qū)域分配到不同Map 中。每個(gè)Map 中再處理數(shù)據(jù)庫(kù)中獲取的一行一行的值,寫(xiě)入到HDFS 中。split-by 根據(jù)不同的參數(shù)類型有不同的切分方法,最好使用較簡(jiǎn)單的int 型。
通過(guò)設(shè)置Map 的個(gè)數(shù)來(lái)提高吞吐量,-num-Mappers后面設(shè)置的Maptask 數(shù)目大于1 的話,那么-split-by 后面必須跟字段,因?yàn)?num-Mappers 后面要是1 的話,那么-split-Mappers 后面跟不跟字段都沒(méi)有意義,因?yàn)?,他只?huì)啟動(dòng)一個(gè)Maptask 進(jìn)行數(shù)據(jù)處理。一般來(lái)說(shuō)數(shù)據(jù)量與Map 的數(shù)量是相關(guān)的,一般建議在500w 以下使4個(gè)Map 即可,如果數(shù)據(jù)量在500w 以上可以使用8個(gè)Map,Map 數(shù)量太多會(huì)對(duì)數(shù)據(jù)庫(kù)增加運(yùn)壓力,造成其他場(chǎng)景使?性能降低。在使用并行度的時(shí)候需要了解主鍵的分布情況是否是有必要的。
隨著大數(shù)據(jù)、云計(jì)算、物聯(lián)網(wǎng)的不斷發(fā)展,信息系統(tǒng)產(chǎn)生的數(shù)據(jù)規(guī)模與日俱增,以Hadoop 平臺(tái)為代表的海量數(shù)據(jù)處理平臺(tái)通過(guò)對(duì)海量數(shù)據(jù)進(jìn)行并行處理成為一種有效的解決方案,基于Sqoop 實(shí)現(xiàn)的在關(guān)系型數(shù)據(jù)庫(kù)與Hadoop 平臺(tái)之間進(jìn)行數(shù)據(jù)遷移,它可以高效、可靠地完成數(shù)據(jù)傳輸任務(wù),是數(shù)據(jù)分析處理及挖掘前的重要一環(huán)。本文從Sqoop 工作機(jī)制、遷移方法介紹、Sqoop 常見(jiàn)問(wèn)題及解決辦法等方面進(jìn)行分析,解決了Sqoop 使用過(guò)程中的簡(jiǎn)單問(wèn)題,在實(shí)際使用過(guò)程中,還需要結(jié)合項(xiàng)目實(shí)際需求對(duì)Sqoop 做更進(jìn)一步的優(yōu)化。