應用場景
MRS支持在大數據存儲容量大、計算資源需要彈性擴展的場景下,用戶將數據存儲在OBS服務中,使用MRS集群僅做數據計算處理的存算分離模式。
本文將向您介紹如何在MRS集群中運行Flink作業來處理OBS中存儲的數據。
方案架構
Flink是一個批處理和流處理結合的統一計算框架,其核心是一個提供了數據分發以及并行化計算的流數據處理引擎。它的最大亮點是流處理,是業界最頂級的開源流處理引擎。
Flink最適合的應用場景是低時延的數據處理(Data Processing)場景:高并發pipeline處理數據,時延毫秒級,且兼具可靠性。


在本示例中,我們使用MRS集群內置的Flink WordCount作業程序,來分析OBS文件系統中保存的源數據,以統計源數據中的單詞出現次數。
操作流程
本示例操作流程如下:


創建MRS集群
創建并購買一個包含有Flink組件的MRS集群,詳情請參見購買自定義集群。
說明本文以購買的MRS 3.1.0版本的集群為例,集群未開啟Kerberos認證。
在本示例中,由于我們要分析處理OBS文件系統中的數據,因此在集群的高級配置參數中要為MRS集群綁定IAM權限委托,使得集群內組件能夠對接OBS并具有對應文件系統目錄的操作權限。
您可以直接選擇系統默認的“MRS_ECS_DEFAULT_AGENCY”,也可以自行創建其他具有OBS文件系統操作權限的自定義委托。


集群購買成功后,在MRS集群的任一節點內,使用omm用戶安裝集群客戶端,具體操作可參考安裝并使用集群客戶端。
例如客戶端安裝目錄為“/opt/client”。
準備測試數據
在創建Flink作業進行數據分析前,我們需要在提前準備待分析的測試數據,并將該數據上傳至OBS文件系統中。
- 本地創建一個“mrs_flink_test.txt”文件,例如文件內容如下:
This is a test demo for MRS Flink. Flink is a unified computing framework that supports both batch processing and stream processing. It provides a stream data processing engine that supports data distribution and parallel computing.
- 在云服務列表中選擇“存儲 > 對象存儲服務”,登錄OBS管理控制臺。
- 單擊“并行文件系統”,創建一個并行文件系統,并上傳測試數據文件。
詳見下圖:創建并行文件系統


例如創建的文件系統名稱為“mrs-demo-data”,單擊系統名稱,在“文件”頁面中,新建一個文件夾“flink”,上傳測試數據至該目錄中。
則本示例的測試數據完整路徑為“obs://mrs-demo-data/flink/mrs_flink_test.txt”。
詳見下圖:上傳測試數據


- (可選)上傳數據分析應用程序。
使用管理臺界面直接提交作業時,將已開發好的Flink應用程序jar文件也可以上傳至OBS文件系統中,或者MRS集群內的HDFS文件系統中。
本示例中我們使用MRS集群內置的Flink WordCount樣例程序,可從MRS集群的客戶端安裝目錄中獲取,即“/opt/client/Flink/flink/examples/batch/WordCount.jar”。
將“WordCount.jar”上傳至“mrs-demo-data/program”目錄下。
創建并運行Flink作業
方式 1 :在控制臺界面在線提交作業。
- 登錄MRS管理控制臺,單擊MRS集群名稱,進入集群詳情頁面。
- 在集群詳情頁的“概覽”頁簽,單擊“IAM用戶同步”右側的“單擊同步”進行IAM用戶同步。
- 單擊“作業管理”,進入“作業管理”頁簽。
- 單擊“添加”,添加一個Flink作業。
- 作業類型:Flink
- 作業名稱:自定義,例如flink_obs_test。
- 執行程序路徑:本示例使用Flink客戶端的WordCount程序為例。
- 運行程序參數:使用默認值。
- 執行程序參數:設置應用程序的輸入參數,“input”為待分析的測試數據,“output”為結果輸出文件。
例如本示例中,我們設置為“--input obs://mrs-demo-data/flink/mrs_flink_test.txt --output
obs://mrs-demo-data/flink/output”。
- 服務配置參數:使用默認值即可,如需手動配置作業相關參數,可參考運行Flink作業。


- 確認作業配置信息后,單擊“確定”,完成作業的新增,并等待運行完成。
方式 2 :通過集群客戶端提交作業。
- 使用root用戶登錄集群客戶端節點,進入客戶端安裝目錄。
su - omm
cd /opt/client
source bigdata_env
- 執行以下命令驗證集群是否可以訪問OBS。
hdfs dfs -ls obs://mrs-demo-data/flink
- 提交Flink作業,指定源文件數據進行消費。
flink run -m yarn-cluster /opt/client/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demo-data/flink/mrs_flink_test.txt --output obs://mrs-demo/data/flink/output2
...
Cluster started: Yarn cluster with application id application_1654672374562_0011
Job has been submitted with JobID a89b561de5d0298cb2ba01fbc30338bc
Program execution finished
Job with JobID a89b561de5d0298cb2ba01fbc30338bc has finished.
Job Runtime: 1200 ms
查看作業執行結果
- 作業提交成功后,登錄MRS集群的FusionInsight Manager界面,單擊“集群 > 服務 > Yarn”。
- 點擊“ResourceManager WebUI”后的鏈接進入Yarn Web UI界面,在Applications頁面查看當前Yarn作業的詳細運行情況及運行日志。


- 等待作業運行完成后,在OBS文件系統中指定的結果輸出文件中可查看數據分析輸出的結果。


下載“output”文件到本地并打開,可查看輸出的分析結果。
a 3
and 2
batch 1
both 1
computing 2
data 2
demo 1
distribution 1
engine 1
flink 2
for 1
framework 1
is 2
it 1
mrs 1
parallel 1
processing 3
provides 1
stream 2
supports 2
test 1
that 2
this 1
unified 1
使用集群客戶端命令行提交作業時,若不指定輸出目錄,在作業運行界面也可直接查看數據分析結果。
Job with JobID xxx has finished.
Job Runtime: xxx ms
Accumulator Results:
- e6209f96ffa423974f8c7043821814e9 (java.util.ArrayList) [31 elements]
(a,3)
(and,2)
(batch,1)
(both,1)
(computing,2)
(data,2)
(demo,1)
(distribution,1)
(engine,1)
(flink,2)
(for,1)
(framework,1)
(is,2)
(it,1)
(mrs,1)
(parallel,1)
(processing,3)
(provides,1)
(stream,2)
(supports,2)
(test,1)
(that,2)
(this,1)
(unified,1)