Flink Jar作業配置checkpoint保存到OBS
Flink Jar作業配置checkpoint保存到OBS步驟如下:
1.在Flink Jar作業的Jar包代碼中加入如下代碼:
//StreamExecutionEnvironment 依賴的pom文件配置參考后續說明
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(40000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${bucket}/jobs/checkpoint/my_jar"), false);
rocksDbBackend.setOptions(new OptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions
.setMaxLogFileSize(64 * 1024 * 1024)
.setKeepLogFileNum(3);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return currentOptions;
}
});
env.setStateBackend(rocksDbBackend);
說明上述代碼含義是以EXACTLY_ONCE模式,每隔40s保存checkpoint到OBS的${bucket}桶中的jobs/checkpoint/my_jar路徑。
其中,最重要的是保存checkpoint路徑。一般是將checkpoint存入OBS桶中,路徑格式如下:
- 路徑格式:obs://${bucket}/xxx/xxx/xxx
- StreamExecutionEnvironment依賴的包需要在pom文件中添加如下配置。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
2.在DLI Flink Jar作業中配置“ 優化參數 ”和“從checkpoint恢復”功能。
?配置優化參數
i.在DLI控制臺,選擇“作業管理>Flink作業”。
ii.在對應Flink Jar作業操作列單擊“編輯”,進入Flink Jar作業編輯頁面。
iii.在優化參數中,添加如下兩個參數:
fs.obs.access.key=xxx
fs.obs.secret.key=xxx
優化參數說明
| 配置項 | 默認值 | 是否必填 | 說明 |
|---|---|---|---|
| fs.obs.access.key | 無 | 是 | AK(Access Key Id),需要具備訪問OBS對應桶的權限。 |
| fs.obs.secret.key | 無 | 是 | SK(Secret Access Key),需要具備訪問OBS對應桶的權限。 |
?配置從checkpoint恢復
i.勾選“異常自動重啟”。
ii.勾選“從checkpoint恢復”,填寫“Checkpoint路徑”。
Checkpoint路徑與用戶在Jar包中設置的checkpoint路徑相對應,格式如下:
“Checkpoint路徑”格式為:${bucket}/xxx/xxx/xxx
示例:
如果Jar包中代碼配置為:obs://mybucket/jobs/checkpoint/jar-3
那么“Checkpoint路徑”填寫為: mybucket/jobs/checkpoint/jar-3
說明l? 每個Flink Jar作業配置的Checkpoint路徑要保持不同,否則無法從準確的checkpoint路徑恢復。
l? checkpoint路徑中的OBS桶需要給DLI授權,DLI服務才能訪問此桶下的文件。
3.查看作業是否從checkpoint恢復。
Flink Jar作業是否支持上傳配置文件,要如何操作?
自定義(JAR)作業支持上傳配置文件。
1.將配置文件通過程序包管理上傳到DLI;
2.在Flink jar作業的其他依賴文件參數中,選擇創建的DLI程序包;
3.在代碼中通過ClassName.class.getClassLoader().getResource("userData/fileName")加載該文件(其中,“fileName”為需要訪問的文件名,“ClassName”為需要訪問該文件的類名)。
Flink Jar 包沖突,導致提交失敗
問題描述
用戶Flink程序的依賴包與DLI Flink平臺的內置依賴包沖突,導致提交失敗。
解決方案
查看是否已包含DLI Flink運行平臺中已經存在的包,如果存在,則需要將自己的Jar包刪除。
Flink Jar作業訪問DWS啟動異常,提示客戶端連接數太多錯誤
問題描述
提交Flink Jar作業訪問DWS數據倉庫服務時,提示啟動失敗,作業日志報如下錯誤信息。
FATAL: Already too many clients, active/non-active/reserved: 5/508/3
原因分析
當前訪問的DWS數據庫連接已經超過了最大連接數。錯誤信息中,non-active的個數表示空閑連接數,例如,non-active為508,說明當前有大量的空閑連接。
解決方案
出現該問題時建議通過以下操作步驟解決。
1.登錄DWS命令執行窗口,執行以下SQL命令,臨時將所有non-active的連接釋放掉。
SELECT PG_TERMINATE_BACKEND(pid) from pg_stat_activity WHERE state='idle';
2.檢查應用程序是否未主動釋放連接,導致連接殘留。建議優化代碼,合理釋放連接。
3.在GaussDB(DWS) 控制臺設置會話閑置超時時長session_timeout,在閑置會話超過所設定的時間后服務端將主動關閉連接。
session_timeout默認值為600秒,設置為0表示關閉超時限制,一般不建議設置為0。
session_timeout設置方法如下:
a.登錄GaussDB(DWS) 管理控制臺。
b.在左側導航欄中,單擊“集群管理”。
c.在集群列表中找到所需要的集群,單擊集群名稱,進入集群“基本信息”頁面。
d.單擊“參數修改”頁簽,修改參數“session_timeout”,然后單擊“保存”。
e.在“修改預覽”窗口,確認修改無誤后,單擊“保存”。
Flink Jar作業運行報錯,報錯信息為Authentication failed
問題現象
Flink Jar作業運行異常,作業日志中有如下報錯信息:
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed
問題原因
因為帳號沒有在全局配置中配置服務授權,導致該帳號在創建跨源連接訪問外部數據時因為權限不足而導致跨源訪問失敗。
解決方案
1.登錄DLI管理控制臺,選擇“全局配置 > 服務授權”。
2.在服務授權界面,全選委托權限。
3.單擊“更新委托授權”。界面會提示“委托權限更新成功”,表示修改成功。
4.委托授權完成后,重新創建跨源連接和運行作業。
Flink Jar作業設置backend為OBS,報錯不支持OBS文件系統
問題現象
客戶執行Flink Jar作業,通過設置checkpoint存儲在OBS桶中,作業一直提交失敗,并伴有報錯提交日志,提示OBS桶名不合法。
原因分析
1.確認OBS桶名是否正確。
2.確認所用AKSK是否有權限。
3.設置依賴關系provided防止Jar包沖突。
4.確認客戶esdk-obs-java-3.1.3.jar的版本。
5.確認是集群存在問題。
處理步驟
1.設置依賴關系provided。
2.重啟clusteragent應用集群升級后的配置。
3.去掉OBS依賴,否則checkpoint會寫不進OBS。
Hadoop jar包沖突,導致Flink提交失敗
問題現象
Flink 提交失敗,異常為:
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.obs.metrics.OBSAMetricsProvider not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2664)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
... 31 common frames omitted
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.obs.metrics.OBSAMetricsProvider not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2568)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2662)
... 32 common frames omitted
原因分析
Flink jar包沖突。用戶提交的flink jar 與 DLI 集群中的hdfs jar包存在沖突。
處理步驟
1.將用戶pom文件中的的hadoop-hdfs設置為:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<scope> provided </scope>
</dependency>
或使用exclusions標簽將其排除關聯。
2.若使用到hdfs的配置文件,則需要將core-site.xml、hdfs-site.xml、yarn-site.xml 修改為mrs-core-site.xml、mrs-hdfs-site.xml、mrs-hbase-site.xml。
conf.addResource(HBaseUtil.class.getClassLoader().getResourceAsStream("mrs-core-site.xml"), false);
conf.addResource(HBaseUtil.class.getClassLoader().getResourceAsStream("mrs-hdfs-site.xml"), false);
conf.addResource(HBaseUtil.class.getClassLoader().getResourceAsStream("mrs-hbase-site.xml"), false);
Flink jar 如何連接SASL_SSL?
使用Flink Jar連接開啟SASL_SSL認證的Kafka。