亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

hudi技術預研與原理分析

2023-05-26 09:32:18
119
0

一、環境信息:

Hudi:0.12.0,spark:3.0.3,flink:1.13.6,hive:3.1.2,hadoop:3.2.1

二、Hudi數據源實現:

前提:引入hudi第三方包,將hudi-hadoop-mr-bundle-0.12.0.jar放到hive的auxlib目錄

需要參數:

Kerberos認證相關的keytab,conf,principal

Core-site.xml,hdfs-site.xml路徑

 

三、Hudi元數據獲取:

可以通過構建HoodieTableMetaClient實現,參數只需要hadoop conf和hudi表路徑,獲取表的表名、路徑、分區字段等。

schema的獲取可以new 一個TableSchemaResolver實現,通過scheam獲取表字段相關信息。

四、spark、flink、javaclient三種寫入方式的比較:

Java client:

支持insert/upsert/delete,暫不支持bulkInsert。

Insert、upsert方法不支持mor表

支持完整的寫Hudi操作,包括rollback、clean、archive等

要自己實現攢批落hudi,按數量和時間攢批

 

Spark client:

前提:引入hudi第三方包,將hudi-spark3.1-bundle_2.12-0.12.0.jar放到spark的jars目錄

Hudi支持在Spark SQL上使用CTAS (Create Table As Select)創建hudi表(為了獲得更好的性能來加載數據到hudi表,CTAS使用批量插入(bulk insert)作為寫操作)。

支持rdd和spark sql方式,支持bulkinsert

構造HoodieRecord Rdd對象:hudi會根據元數據信息構造HoodieRecord Rdd 對象,方便后續數據去重和數據合并

Spark支持并發寫機制,集群模式

Spark的upsert支持兩種模式的寫入Copy On Write和Merge On Read

使用Spark DataFrame并發寫入

 

Flink client:

前提:引入hudi第三方包,將hudi-flink1.13-bundle-0.12.0.jar放到flink的lib目錄

支持集群模式

Flink暫不支持bulkInsert

Insert、upsert方法不支持mor表

Flink 的 writer 將數據放到內存,按以下 3 種策略刷數據到磁盤:

a.當某個 bucket 在內存積攢到一定大小 (可配,默認 64MB)

b.當總的 buffer 大小積攢到一定大小(可配,默認 1GB)

 

c.當 checkpoint 觸發,將內存里的數據全部 flush 出去:用于寫入HoodieRecord到文件系統中。

可以調整并行度來提升寫入速度

為了提高并發寫的吞吐量,會給每個bucket assign task分配一套獨立的bucket管理策略,并利用Hash算法把bucket ID以固定的規則hash到每個bucket assign task 下面,做到了并發決策。因此,控制bucket assign task并發度就相對控制了寫入小文件數量,在寫入吞吐量和小文件之間的權衡

小文件策略:優先選擇小的file group寫入,如果是insert數據,策略是每次選擇當前剩余空間最多的bucket寫入

 

五、寫入步驟分析:

Spark 寫hudi步驟:

1、開始提交:判斷上次任務是否失敗,如果失敗會觸發回滾操作。 然后會根據當前時間生成一個事務開始的請求標識元數據。

2、構造HoodieRecord Rdd對象:hudi 會根據元數據信息構造HoodieRecord Rdd 對象,方便后續數據去重和數據合并。

3、數據去重:一批增量數據中可能會有重復的數據,hudi會根據主鍵對數據進行去重避免重復數據寫入hudi 表。

4、數據fileId位置信息獲取:在修改記錄中可以根據索引獲取當前記錄所屬文件的fileid,在數據合并時需要知道數據update操作向那個fileId文件寫入新的快照文件。

5、數據合并:hudi 有兩種模式cow和mor。在cow模式中會重寫索引命中的fileId快照文件;在mor 模式中根據fileId 追加到分區中的log 文件。

6、完成提交:在元數據中生成xxxx.commit文件,只有生成commit 元數據文件,查詢引擎才能根據元數據查詢到剛剛upsert 后的數據。

7、compaction壓縮:主要是mor 模式中才會有,他會將mor模式中的xxx.log 數據合并到xxx.parquet 快照文件中去。

8、hive元數據同步:hive 的元素數據同步這個步驟需要配置非必需操作,主要是對于hive 和spark 等查詢引擎,需要依賴hive 元數據才能進行查詢。所以在hive 中的同步就是構造外表提供查詢。

 

flink 寫hudi步驟:

分為三個模塊:數據寫入、數據壓縮與數據清理。

數據寫入

基礎數據封裝:將數據流中flink的RowData封裝成Hoodie實體;

BucketAssigner:桶分配器,主要是給數據分配寫入的文件地址:若為插入操作,則取大小最小的FileGroup對應的FileId文件內進行插入;在此文件的后續寫入中文件 ID 保持不變,并且提交時間會更新以顯示最新版本。這也意味著記錄的任何特定版本,給定其分區路徑,都可以使用文件 ID 和 instantTime進行唯一定位;若為更新操作,則直接在當前location進行數據更新

Hoodie Stream Writer: 數據寫入,將數據緩存起來,在超過設置的最大flushSize或是做checkpoint時進行刷新到文件中;

Oprator Coordinator:主要與Hoodie Stream Writer進行交互,處理checkpoint等事件,在做checkpoint時,提交instant到timeLine上,并生成下一個instant的時間,算法為取當前最新的commi時間,比對當前時間與commit時間,若當前時間大于commit時間,則返回,否則一直循環等待生成。

數據壓縮

壓縮(compaction)用于在 MergeOnRead存儲類型時將基于行的log日志文件轉化為parquet列式數據文件,用于加快記錄的查找。compaction首先會遍歷各分區下最新的parquet數據文件和其對應的log日志文件進行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance

數據清理

隨著用戶向表中寫入更多數據,對于每次更新,Hudi會生成一個新版本的數據文件用于保存更新后的記錄(COPY_ON_WRITE)或將這些增量更新寫入日志文件以避免重寫更新版本的數據文件(MERGE_ON_READ)。在這種情況下,根據更新頻率,文件版本數可能會無限增長,但如果不需要保留無限的歷史記錄,則必須有一個流程(服務)來回收舊版本的數據,這就是 Hudi 的清理服務。具體清理策略可參考官網,一般使用的清理策略為:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 個文件版本而不受時間限制的效果。會刪除N之外的FileSlice。

 

Java 寫hudi步驟:

InitTable,首先獲取table,這里的table為HoodieJavaCopyOnWriteTable

PreWrite,寫之前的一些步驟,比如設置操作類型

insert,調用table.insert執行寫數據操作,返回result

postWrite,最后調用postWrite執行archive、clean等操作返回WriteStatuses

 

說明:由于java client 要實現攢批落,所以造數按一批13萬大小來驗證,spark和flink 寫入效率沒java client快的原因可能是提交任務需要時間,并且造的數據量不大,沒發揮出它的并發效果,對于大數據量的實時寫入,spark和flink的效率應該會更高。

六、寫入效率:

寫入方式

寫入數據量

寫入耗時

平均速度

Java-client

13萬

26秒

5000條/秒

Spark-client

13萬

27秒

4814條/秒

Flink-client

13萬

29秒

4482條/秒

 

 

七、總結:

 

Java Client和Spark、Flink客戶端核心邏輯是一樣的。不同的是比如Spark的入口是DF和SQL,多了一層API封裝。

Hudi Java Client和Spark、Flink一樣都可以實現完整的寫Hudi的邏輯,但是目前功能支持還不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,畢竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起來比較方便,集成到NIFI的好處是,可以通過拖來拽配置參數的形式完成歷史數據和增量數據寫入Hudi。也可以自己實現多線程,提升性能,我們目前測試的性能是Insert可以達到5000條/s,而upsert因為需要讀取索引,還有歷史數據的更新,可能需要重寫整個表,所以當歷史數據比較大且更新占比比較高時,單線程的性能會非常差,但是可以基于源碼改造,將布隆索引和寫數據的部分改為多線程。對于數據量不是很大,一般大表幾十億,性能還是可以滿足要求的。

將Hudi Java Client封裝成了一個NIFI processor,然后用NIFI調度,其性能和穩定性都能夠滿足項目需求。

0條評論
0 / 1000
l****n
7文章數
0粉絲數
l****n
7 文章 | 0 粉絲
原創

hudi技術預研與原理分析

2023-05-26 09:32:18
119
0

一、環境信息:

Hudi:0.12.0,spark:3.0.3,flink:1.13.6,hive:3.1.2,hadoop:3.2.1

二、Hudi數據源實現:

前提:引入hudi第三方包,將hudi-hadoop-mr-bundle-0.12.0.jar放到hive的auxlib目錄

需要參數:

Kerberos認證相關的keytab,conf,principal

Core-site.xml,hdfs-site.xml路徑

 

三、Hudi元數據獲取:

可以通過構建HoodieTableMetaClient實現,參數只需要hadoop conf和hudi表路徑,獲取表的表名、路徑、分區字段等。

schema的獲取可以new 一個TableSchemaResolver實現,通過scheam獲取表字段相關信息。

四、spark、flink、javaclient三種寫入方式的比較:

Java client:

支持insert/upsert/delete,暫不支持bulkInsert。

Insert、upsert方法不支持mor表

支持完整的寫Hudi操作,包括rollback、clean、archive等

要自己實現攢批落hudi,按數量和時間攢批

 

Spark client:

前提:引入hudi第三方包,將hudi-spark3.1-bundle_2.12-0.12.0.jar放到spark的jars目錄

Hudi支持在Spark SQL上使用CTAS (Create Table As Select)創建hudi表(為了獲得更好的性能來加載數據到hudi表,CTAS使用批量插入(bulk insert)作為寫操作)。

支持rdd和spark sql方式,支持bulkinsert

構造HoodieRecord Rdd對象:hudi會根據元數據信息構造HoodieRecord Rdd 對象,方便后續數據去重和數據合并

Spark支持并發寫機制,集群模式

Spark的upsert支持兩種模式的寫入Copy On Write和Merge On Read

使用Spark DataFrame并發寫入

 

Flink client:

前提:引入hudi第三方包,將hudi-flink1.13-bundle-0.12.0.jar放到flink的lib目錄

支持集群模式

Flink暫不支持bulkInsert

Insert、upsert方法不支持mor表

Flink 的 writer 將數據放到內存,按以下 3 種策略刷數據到磁盤:

a.當某個 bucket 在內存積攢到一定大小 (可配,默認 64MB)

b.當總的 buffer 大小積攢到一定大小(可配,默認 1GB)

 

c.當 checkpoint 觸發,將內存里的數據全部 flush 出去:用于寫入HoodieRecord到文件系統中。

可以調整并行度來提升寫入速度

為了提高并發寫的吞吐量,會給每個bucket assign task分配一套獨立的bucket管理策略,并利用Hash算法把bucket ID以固定的規則hash到每個bucket assign task 下面,做到了并發決策。因此,控制bucket assign task并發度就相對控制了寫入小文件數量,在寫入吞吐量和小文件之間的權衡

小文件策略:優先選擇小的file group寫入,如果是insert數據,策略是每次選擇當前剩余空間最多的bucket寫入

 

五、寫入步驟分析:

Spark 寫hudi步驟:

1、開始提交:判斷上次任務是否失敗,如果失敗會觸發回滾操作。 然后會根據當前時間生成一個事務開始的請求標識元數據。

2、構造HoodieRecord Rdd對象:hudi 會根據元數據信息構造HoodieRecord Rdd 對象,方便后續數據去重和數據合并。

3、數據去重:一批增量數據中可能會有重復的數據,hudi會根據主鍵對數據進行去重避免重復數據寫入hudi 表。

4、數據fileId位置信息獲取:在修改記錄中可以根據索引獲取當前記錄所屬文件的fileid,在數據合并時需要知道數據update操作向那個fileId文件寫入新的快照文件。

5、數據合并:hudi 有兩種模式cow和mor。在cow模式中會重寫索引命中的fileId快照文件;在mor 模式中根據fileId 追加到分區中的log 文件。

6、完成提交:在元數據中生成xxxx.commit文件,只有生成commit 元數據文件,查詢引擎才能根據元數據查詢到剛剛upsert 后的數據。

7、compaction壓縮:主要是mor 模式中才會有,他會將mor模式中的xxx.log 數據合并到xxx.parquet 快照文件中去。

8、hive元數據同步:hive 的元素數據同步這個步驟需要配置非必需操作,主要是對于hive 和spark 等查詢引擎,需要依賴hive 元數據才能進行查詢。所以在hive 中的同步就是構造外表提供查詢。

 

flink 寫hudi步驟:

分為三個模塊:數據寫入、數據壓縮與數據清理。

數據寫入

基礎數據封裝:將數據流中flink的RowData封裝成Hoodie實體;

BucketAssigner:桶分配器,主要是給數據分配寫入的文件地址:若為插入操作,則取大小最小的FileGroup對應的FileId文件內進行插入;在此文件的后續寫入中文件 ID 保持不變,并且提交時間會更新以顯示最新版本。這也意味著記錄的任何特定版本,給定其分區路徑,都可以使用文件 ID 和 instantTime進行唯一定位;若為更新操作,則直接在當前location進行數據更新

Hoodie Stream Writer: 數據寫入,將數據緩存起來,在超過設置的最大flushSize或是做checkpoint時進行刷新到文件中;

Oprator Coordinator:主要與Hoodie Stream Writer進行交互,處理checkpoint等事件,在做checkpoint時,提交instant到timeLine上,并生成下一個instant的時間,算法為取當前最新的commi時間,比對當前時間與commit時間,若當前時間大于commit時間,則返回,否則一直循環等待生成。

數據壓縮

壓縮(compaction)用于在 MergeOnRead存儲類型時將基于行的log日志文件轉化為parquet列式數據文件,用于加快記錄的查找。compaction首先會遍歷各分區下最新的parquet數據文件和其對應的log日志文件進行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance

數據清理

隨著用戶向表中寫入更多數據,對于每次更新,Hudi會生成一個新版本的數據文件用于保存更新后的記錄(COPY_ON_WRITE)或將這些增量更新寫入日志文件以避免重寫更新版本的數據文件(MERGE_ON_READ)。在這種情況下,根據更新頻率,文件版本數可能會無限增長,但如果不需要保留無限的歷史記錄,則必須有一個流程(服務)來回收舊版本的數據,這就是 Hudi 的清理服務。具體清理策略可參考官網,一般使用的清理策略為:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 個文件版本而不受時間限制的效果。會刪除N之外的FileSlice。

 

Java 寫hudi步驟:

InitTable,首先獲取table,這里的table為HoodieJavaCopyOnWriteTable

PreWrite,寫之前的一些步驟,比如設置操作類型

insert,調用table.insert執行寫數據操作,返回result

postWrite,最后調用postWrite執行archive、clean等操作返回WriteStatuses

 

說明:由于java client 要實現攢批落,所以造數按一批13萬大小來驗證,spark和flink 寫入效率沒java client快的原因可能是提交任務需要時間,并且造的數據量不大,沒發揮出它的并發效果,對于大數據量的實時寫入,spark和flink的效率應該會更高。

六、寫入效率:

寫入方式

寫入數據量

寫入耗時

平均速度

Java-client

13萬

26秒

5000條/秒

Spark-client

13萬

27秒

4814條/秒

Flink-client

13萬

29秒

4482條/秒

 

 

七、總結:

 

Java Client和Spark、Flink客戶端核心邏輯是一樣的。不同的是比如Spark的入口是DF和SQL,多了一層API封裝。

Hudi Java Client和Spark、Flink一樣都可以實現完整的寫Hudi的邏輯,但是目前功能支持還不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,畢竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起來比較方便,集成到NIFI的好處是,可以通過拖來拽配置參數的形式完成歷史數據和增量數據寫入Hudi。也可以自己實現多線程,提升性能,我們目前測試的性能是Insert可以達到5000條/s,而upsert因為需要讀取索引,還有歷史數據的更新,可能需要重寫整個表,所以當歷史數據比較大且更新占比比較高時,單線程的性能會非常差,但是可以基于源碼改造,將布隆索引和寫數據的部分改為多線程。對于數據量不是很大,一般大表幾十億,性能還是可以滿足要求的。

將Hudi Java Client封裝成了一個NIFI processor,然后用NIFI調度,其性能和穩定性都能夠滿足項目需求。

文章來自個人專欄
文章 | 訂閱
0條評論
0 / 1000
請輸入你的評論
0
0