前提條件
- MRS集群中已安裝Flink組件。
- 集群正常運行,已安裝集群客戶端,例如安裝目錄為“/opt/hadoopclient”。以下操作的客戶端目錄只是舉例,請根據實際安裝目錄修改。
使用Flink客戶端(MRS 3.x之前版本)
- 以客戶端安裝用戶,登錄安裝客戶端的節點。
- 執行以下命令,切換到客戶端安裝目錄。
cd /opt/hadoopclient
- 執行如下命令初始化環境變量。
source /opt/hadoopclient/bigdata_env
4.若集群開啟Kerberos認證,需要執行以下步驟,若集群未開啟Kerberos認證請跳過該步驟。
a.準備一個提交Flink作業的用戶。
b.登錄Manager,下載認證憑據。
登錄集群的Manager界面,具體請參見訪問MRSManager(MRS2.x及之前版本),選擇“系統設置 > 用戶管理”,在已增加用戶所在行的“操作”列,選擇“更多 > 下載認證憑據”。
c.將下載的認證憑據壓縮包解壓縮,并將得到的user.keytab文件拷貝到客戶端節點中,例如客戶端節點的“/opt/hadoopclient/Flink/flink/conf”目錄下。如果是在集群外節點安裝的客戶端,需要將得到的krb5.conf文件拷貝到該節點的“/etc/”目錄下。
d.配置安全認證,在“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”配置文件中的對應配置添加keytab路徑以及用戶名。
security.kerberos.login.keytab: <user.keytab 文件路徑 >
security.kerberos.login.principal: < 用戶名 >
例如:
security.kerberos.login.keytab:/opt/hadoopclient/Flink/flink/conf/user.keytab
security.kerberos.login.principal: test
e.參考“組件操作指南 > 使用Flink > 參考 > 簽發證書樣例”章節生成“generate_keystore.sh”腳本并放置在Flink的客戶端bin目錄下,執行如下命令進行安全加固,請參考“組件操作指南 > 使用Flink > 安全加固 > 認證和加密”,password請重新設置為一個用于提交作業的密碼。
sh generate_keystore.sh < password >
該腳本會自動替換“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”中關于SSL的值,針對MRS2.x及之前版本,安全集群默認沒有開啟外部SSL,用戶如果需要啟用外部SSL,請參考“組件操作指南 >使用Flink > 安全加固”進行配置后再次運行該腳本即可。
說明
generate_keystore.sh腳本無需手動生成。
執行認證和加密后會將生成的flink.keystore、flink.truststore、security.cookie自動填充到“flink-conf.yaml”對應配置項中。
f.客戶端訪問flink.keystore和flink.truststore文件的路徑配置。
?絕對路徑:執行該腳本后,在flink-conf.yaml文件中將flink.keystore和flink.truststore文件路徑自動配置為絕對路徑“/opt/hadoopclient/Flink/flink/conf/”,此時需要將conf目錄中的flink.keystore和flink.truststore文件分別放置在Flink Client以及Yarn各個節點的該絕對路徑上。
?相對路徑:請執行如下步驟配置flink.keystore和flink.truststore文件路徑為相對路徑,并確保Flink Client執行命令的目錄可以直接訪問該相對路徑。
i.在“/opt/hadoopclient/Flink/flink/conf/”目錄下新建目錄,例如ssl。
cd /opt/hadoopclient/Flink/flink/conf/
mkdir ssl
ii. 移動flink.keystore和flink.truststore文件到“/opt/hadoopclient/Flink/flink/conf/ssl/”中。
mv flink.keystore ssl/
mv flink.truststore ssl/
iii. 修改flink-conf.yaml文件中如下兩個參數為相對路徑。
security.ssl.internal.keystore: ssl/flink.keystore
security.ssl.internal.truststore: ssl/flink.truststore
5.運行wordcount作業。
須知
用戶在Flink提交作業或者運行作業時,應具有如下權限:
- 如果啟用Ranger鑒權,當前用戶必須屬于hadoop組或者已在Ranger中為該用戶添加“/flink”的讀寫權限。
- 如果停用Ranger鑒權,當前用戶必須屬于hadoop組。
- 普通集群(未開啟Kerberos認證)
?執行如下命令啟動session,并在session中提交作業。
yarn-session.sh -nm " session-name "
flink run/opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
?執行如下命令在Yarn上提交單個作業。
flink run -m yarn-cluster
/opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 安全集群(開啟Kerberos認證)
?flink.keystore和flink.truststore文件路徑為絕對路徑時:
- 執行如下命令啟動session,并在session中提交作業。
yarn-session.sh -nm " session-name "
flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 執行如下命令在Yarn上提交單個作業。
flink run -m yarn-cluster
/opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
?flink.keystore和flink.truststore文件路徑為相對路徑時:
- 在“ssl”的同級目錄下執行如下命令啟動session,并在session中提交作業,其中“ssl”是相對路徑,如“ssl”所在目錄是“opt/hadoopclient/Flink/flink/conf/”,則在“opt/hadoopclient/Flink/flink/conf/”目錄下執行命令。
yarn-session.sh -t ssl/ -nm " session-name "
flink run/opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 執行如下命令在Yarn上提交單個作業。
flink run -m yarn-cluster -yt ssl/ /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
6.作業提交成功后,客戶端界面顯示如下。
詳見下圖: 在Yarn上提交作業成功


詳見下圖:啟動session成功


詳見下圖: 在session中提交作業成功

7. 使用運行用戶進入Yarn服務的原生頁面,具體操作參考“組件操作指南 > 使用Flink > 查看Flink作業”,找到對應作業的application,單擊application名稱,進入到作業詳情頁面。
- 若作業尚未結束,可單擊“Tracking URL”鏈接進入到Flink的原生頁面,查看作業的運行信息。
- 若作業已運行結束,對于在session中提交的作業,可以單擊“Tracking URL”鏈接登錄Flink原生頁面查看作業信息。
詳見下圖:application


使用Flink客戶端(MRS 3.x及之后版本)
1.以客戶端安裝用戶,登錄安裝客戶端的節點。
2.執行以下命令,切換到客戶端安裝目錄。
cd /opt/hadoopclient
3.執行如下命令初始化環境變量。
source/opt/hadoopclient/bigdata_env
4.若集群開啟Kerberos認證,需要執行以下步驟,若集群未開啟Kerberos認證請跳過該步驟。
a.準備一個提交Flink作業的用戶。
b.登錄Manager,下載認證憑據。
登錄集群的Manager界面,具體請參見訪問FusionInsight Manager(MRS 3.x及之后版本),選擇“系統 > 權限 > 用戶”,在已增加用戶所在行的“操作”列,選擇“更多 > 下載認證憑據”。
c.將下載的認證憑據壓縮包解壓縮,并將得到的user.keytab文件拷貝到客戶端節點中,例如客戶端節點的“/opt/hadoopclient/Flink/flink/conf”目錄下。如果是在集群外節點安裝的客戶端,需要將得到的krb5.conf文件拷貝到該節點的“/etc/”目錄下。
d.將客戶端安裝節點的業務IP、Manager的浮動IP和Master節點IP添加到配置文件“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”中的“jobmanager.web.access-control-allow-origin”和“jobmanager.web.allow-access-address”配置項中,IP地址之間使用英文逗號分隔。
jobmanager.web.access-control-allow-origin:
xx.xx.xxx.xxx , xx.xx.xxx.xxx ,xx.xx.xxx.xxx
jobmanager.web.allow-access-address: xx.xx.xxx.xxx , xx.xx.xxx.xxx ,xx.xx.xxx.xxx
說明
客戶端安裝節點的業務IP獲取方法:
集群內節點:
登錄MapReduce服務管理控制臺,選擇“集群列表 > 現有集群”,選中當前的集群并單擊集群名,進入集群信息頁面。
在“節點管理”中查看安裝客戶端所在的節點IP。
- 集群外節點:安裝客戶端所在的彈性云主機的IP。
- Manager的浮動IP獲取方法:
- 登錄MapReduce服務管理控制臺,選擇“集群列表 > 現有集群”,選中當前的集群并單擊集群名,進入集群信息頁面。
在“節點管理”中查看節點名稱,名稱中包含“master1”的節點為Master1節點,名稱中包含“master2”的節點為Master2節點。
- 遠程登錄Master2節點,執行“ifconfig”命令,系統回顯中“eth0:wsom”表示MRS Manager浮動IP地址,請記錄“inet”的實際參數值。如果在Master2節點無法查詢到MRS Manager的浮動IP地址,請切換到Master1節點查詢并記錄。如果只有一個Master節點時,直接在該Master節點查詢并記錄。
e.配置安全認證,在“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”配置文件中的對應配置添加keytab路徑以及用戶名。
security.kerberos.login.keytab: <user.keytab 文件路徑 >
security.kerberos.login.principal: < 用戶名 >
例如:
security.kerberos.login.keytab:
/opt/hadoopclient/Flink/flink/conf/user.keytab
security.kerberos.login.principal: test
f.參考“組件操作指南 > 使用Flink > 參考 > 簽發證書樣例”章節生成“generate_keystore.sh”腳本并放置在Flink的客戶端bin目錄下,執行如下命令進行安全加固,請參考“組件操作指南 >使用Flink > 安全加固 > 認證和加密”,password請重新設置為一個用于提交作業的密碼。
sh generate_keystore.sh < password >
該腳本會自動替換“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”中關于SSL的值。
sh generate_keystore.sh < password >
說明執行認證和加密后會在Flink客戶端的“conf”目錄下生成“flink.keystore”和“flink.truststore”文件,并且在客戶端配置文件“flink-conf.yaml”中將以下配置項進行了默認賦值:
將配置項“security.ssl.keystore”設置為“flink.keystore”文件所在絕對路徑。
將配置項“security.ssl.truststore”設置為“flink.truststore”文件所在的絕對路徑。
將配置項“security.cookie”設置為“generate_keystore.sh”腳本自動生成的一串隨機規則密碼。
默認“flink-conf.yaml”中“security.ssl.encrypt.enabled: false”,“generate_keystore.sh”腳本將配置項“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”的值設置為調用“generate_keystore.sh”腳本時輸入的密碼。
g.客戶端訪問flink.keystore和flink.truststore文件的路徑配置。
?絕對路徑:執行該腳本后,在flink-conf.yaml文件中將flink.keystore和flink.truststore文件路徑自動配置為絕對路徑“/opt/hadoopclient/Flink/flink/conf/”,此時需要將conf目錄中的flink.keystore和flink.truststore文件分別放置在Flink Client以及Yarn各個節點的該絕對路徑上。
?相對路徑:請執行如下步驟配置flink.keystore和flink.truststore文件路徑為相對路徑,并確保Flink Client執行命令的目錄可以直接訪問該相對路徑。
i.在“/opt/hadoopclient/Flink/flink/conf/”目錄下新建目錄,例如ssl。
cd /opt/hadoopclient/Flink/flink/conf/
mkdir ssl
ii. 移動flink.keystore和flink.truststore文件到“/opt/hadoopclient/Flink/flink/conf/ssl/”中。
mv flink.keystore ssl/
mv flink.truststore ssl/
iii. 修改flink-conf.yaml文件中如下兩個參數為相對路徑。
security.ssl.keystore: ssl/flink.keystore
security.ssl.truststore: ssl/flink.truststore
5.運行wordcount作業。
須知
用戶在Flink提交作業或者運行作業時,應具有如下權限:
- 如果啟用Ranger鑒權,當前用戶必須屬于hadoop組或者已在Ranger中為該用戶添加“/flink”的讀寫權限。
- 如果停用Ranger鑒權,當前用戶必須屬于hadoop組。
- 普通集群(未開啟Kerberos認證)
?執行如下命令啟動session,并在session中提交作業。
yarn-session.sh -nm " session-name "
flink run/opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
?執行如下命令在Yarn上提交單個作業。
flink run -m yarn-cluster/opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 安全集群(開啟Kerberos認證)
?flink.keystore和flink.truststore文件路徑為絕對路徑時:
- 執行如下命令啟動session,并在session中提交作業。
yarn-session.sh -nm " session-name "
flink run
/opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 執行如下命令在Yarn上提交單個作業。
flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
?flink.keystore和flink.truststore文件路徑為相對路徑時:
- 在“ssl”的同級目錄下執行如下命令啟動session,并在session中提交作業,其中“ssl”是相對路徑,如“ssl”所在目錄是“opt/hadoopclient/Flink/flink/conf/”,則在“opt/hadoopclient/Flink/flink/conf/”目錄下執行命令。
yarn-session.sh -t ssl/ -nm " session-name "
flink run/opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 執行如下命令在Yarn上提交單個作業。
flink run -m yarn-cluster -yt ssl/
/opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 作業提交成功后,客戶端界面顯示如下。
詳見下圖:在Yarn上提交作業成功


詳見下圖:啟動session成功


詳見下圖:在session中提交作業成功


- 使用運行用戶進入Yarn服務的原生頁面,具體操作參考“組件操作指南 >使用Flink > 查看Flink作業”,找到對應作業的application,單擊application名稱,進入到作業詳情頁面
- 若作業尚未結束,可單擊“Tracking URL”鏈接進入到Flink的原生頁面,查看作業的運行信息。
- 若作業已運行結束,對于在session中提交的作業,可以單擊“Tracking URL”鏈接登錄Flink原生頁面查看作業信息。
詳見下圖: application

