一、環境信息:
Hudi:0.12.0,spark:3.0.3,flink:1.13.6,hive:3.1.2,hadoop:3.2.1
二、Hudi數據源實現:
前提:引入hudi第三方包,將hudi-hadoop-mr-bundle-0.12.0.jar放到hive的auxlib目錄
需要參數:
Kerberos認證相關的keytab,conf,principal
Core-site.xml,hdfs-site.xml路徑
三、Hudi元數據獲取:
可以通過構建HoodieTableMetaClient實現,參數只需要hadoop conf和hudi表路徑,獲取表的表名、路徑、分區字段等。
schema的獲取可以new 一個TableSchemaResolver實現,通過scheam獲取表字段相關信息。
四、spark、flink、javaclient三種寫入方式的比較:
Java client:
支持insert/upsert/delete,暫不支持bulkInsert。
Insert、upsert方法不支持mor表
支持完整的寫Hudi操作,包括rollback、clean、archive等
要自己實現攢批落hudi,按數量和時間攢批
Spark client:
前提:引入hudi第三方包,將hudi-spark3.1-bundle_2.12-0.12.0.jar放到spark的jars目錄
Hudi支持在Spark SQL上使用CTAS (Create Table As Select)創建hudi表(為了獲得更好的性能來加載數據到hudi表,CTAS使用批量插入(bulk insert)作為寫操作)。
支持rdd和spark sql方式,支持bulkinsert
構造HoodieRecord Rdd對象:hudi會根據元數據信息構造HoodieRecord Rdd 對象,方便后續數據去重和數據合并
Spark支持并發寫機制,集群模式
Spark的upsert支持兩種模式的寫入Copy On Write和Merge On Read
使用Spark DataFrame并發寫入
Flink client:
前提:引入hudi第三方包,將hudi-flink1.13-bundle-0.12.0.jar放到flink的lib目錄
支持集群模式
Flink暫不支持bulkInsert
Insert、upsert方法不支持mor表
Flink 的 writer 將數據放到內存,按以下 3 種策略刷數據到磁盤:
a.當某個 bucket 在內存積攢到一定大小 (可配,默認 64MB)
b.當總的 buffer 大小積攢到一定大小(可配,默認 1GB)
c.當 checkpoint 觸發,將內存里的數據全部 flush 出去:用于寫入HoodieRecord到文件系統中。
可以調整并行度來提升寫入速度
為了提高并發寫的吞吐量,會給每個bucket assign task分配一套獨立的bucket管理策略,并利用Hash算法把bucket ID以固定的規則hash到每個bucket assign task 下面,做到了并發決策。因此,控制bucket assign task并發度就相對控制了寫入小文件數量,在寫入吞吐量和小文件之間的權衡
小文件策略:優先選擇小的file group寫入,如果是insert數據,策略是每次選擇當前剩余空間最多的bucket寫入
五、寫入步驟分析:
Spark 寫hudi步驟:
1、開始提交:判斷上次任務是否失敗,如果失敗會觸發回滾操作。 然后會根據當前時間生成一個事務開始的請求標識元數據。
2、構造HoodieRecord Rdd對象:hudi 會根據元數據信息構造HoodieRecord Rdd 對象,方便后續數據去重和數據合并。
3、數據去重:一批增量數據中可能會有重復的數據,hudi會根據主鍵對數據進行去重避免重復數據寫入hudi 表。
4、數據fileId位置信息獲取:在修改記錄中可以根據索引獲取當前記錄所屬文件的fileid,在數據合并時需要知道數據update操作向那個fileId文件寫入新的快照文件。
5、數據合并:hudi 有兩種模式cow和mor。在cow模式中會重寫索引命中的fileId快照文件;在mor 模式中根據fileId 追加到分區中的log 文件。
6、完成提交:在元數據中生成xxxx.commit文件,只有生成commit 元數據文件,查詢引擎才能根據元數據查詢到剛剛upsert 后的數據。
7、compaction壓縮:主要是mor 模式中才會有,他會將mor模式中的xxx.log 數據合并到xxx.parquet 快照文件中去。
8、hive元數據同步:hive 的元素數據同步這個步驟需要配置非必需操作,主要是對于hive 和spark 等查詢引擎,需要依賴hive 元數據才能進行查詢。所以在hive 中的同步就是構造外表提供查詢。
flink 寫hudi步驟:
分為三個模塊:數據寫入、數據壓縮與數據清理。
數據寫入
基礎數據封裝:將數據流中flink的RowData封裝成Hoodie實體;
BucketAssigner:桶分配器,主要是給數據分配寫入的文件地址:若為插入操作,則取大小最小的FileGroup對應的FileId文件內進行插入;在此文件的后續寫入中文件 ID 保持不變,并且提交時間會更新以顯示最新版本。這也意味著記錄的任何特定版本,給定其分區路徑,都可以使用文件 ID 和 instantTime進行唯一定位;若為更新操作,則直接在當前location進行數據更新
Hoodie Stream Writer: 數據寫入,將數據緩存起來,在超過設置的最大flushSize或是做checkpoint時進行刷新到文件中;
Oprator Coordinator:主要與Hoodie Stream Writer進行交互,處理checkpoint等事件,在做checkpoint時,提交instant到timeLine上,并生成下一個instant的時間,算法為取當前最新的commi時間,比對當前時間與commit時間,若當前時間大于commit時間,則返回,否則一直循環等待生成。
數據壓縮
壓縮(compaction)用于在 MergeOnRead存儲類型時將基于行的log日志文件轉化為parquet列式數據文件,用于加快記錄的查找。compaction首先會遍歷各分區下最新的parquet數據文件和其對應的log日志文件進行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance
數據清理
隨著用戶向表中寫入更多數據,對于每次更新,Hudi會生成一個新版本的數據文件用于保存更新后的記錄(COPY_ON_WRITE)或將這些增量更新寫入日志文件以避免重寫更新版本的數據文件(MERGE_ON_READ)。在這種情況下,根據更新頻率,文件版本數可能會無限增長,但如果不需要保留無限的歷史記錄,則必須有一個流程(服務)來回收舊版本的數據,這就是 Hudi 的清理服務。具體清理策略可參考官網,一般使用的清理策略為:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 個文件版本而不受時間限制的效果。會刪除N之外的FileSlice。
Java 寫hudi步驟:
InitTable,首先獲取table,這里的table為HoodieJavaCopyOnWriteTable
PreWrite,寫之前的一些步驟,比如設置操作類型
insert,調用table.insert執行寫數據操作,返回result
postWrite,最后調用postWrite執行archive、clean等操作返回WriteStatuses
說明:由于java client 要實現攢批落,所以造數按一批13萬大小來驗證,spark和flink 寫入效率沒java client快的原因可能是提交任務需要時間,并且造的數據量不大,沒發揮出它的并發效果,對于大數據量的實時寫入,spark和flink的效率應該會更高。
六、寫入效率:
| 寫入方式 | 寫入數據量 | 寫入耗時 | 平均速度 | 
| Java-client | 13萬 | 26秒 | 5000條/秒 | 
| Spark-client | 13萬 | 27秒 | 4814條/秒 | 
| Flink-client | 13萬 | 29秒 | 4482條/秒 | 
七、總結:
Java Client和Spark、Flink客戶端核心邏輯是一樣的。不同的是比如Spark的入口是DF和SQL,多了一層API封裝。
Hudi Java Client和Spark、Flink一樣都可以實現完整的寫Hudi的邏輯,但是目前功能支持還不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,畢竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起來比較方便,集成到NIFI的好處是,可以通過拖來拽配置參數的形式完成歷史數據和增量數據寫入Hudi。也可以自己實現多線程,提升性能,我們目前測試的性能是Insert可以達到5000條/s,而upsert因為需要讀取索引,還有歷史數據的更新,可能需要重寫整個表,所以當歷史數據比較大且更新占比比較高時,單線程的性能會非常差,但是可以基于源碼改造,將布隆索引和寫數據的部分改為多線程。對于數據量不是很大,一般大表幾十億,性能還是可以滿足要求的。
將Hudi Java Client封裝成了一個NIFI processor,然后用NIFI調度,其性能和穩定性都能夠滿足項目需求。