Flink SQL作業的消費能力如何,即一天可以處理多大的數據量?
Flink SQL作業的消費能力與源端的數據發送、隊列大小、作業參數配置均有關系,每秒10M峰值。
實際處理數據量,與您使用時長相關。
Flink SQL中的temp流中數據是否需要定期清理,如何清理?
不需要定期清理。
Flink SQL中的temp流類似于子查詢,只是邏輯意義上的流,用于簡化SQL邏輯,不會產生數據存儲,因而不存在清理問題。
創建FlinkSQL作業時選擇OBS桶,提示未授權
問題描述
用戶創建Flink SQL作業,配置參數時,選擇自己創建的OBS桶,提示“該OBS桶未授權。立即授權”,單擊“立即授權”后提示“服務器內部出錯了,請聯系客服或者稍后重試”,無法授權。
解決方案
在報錯頁面,通過F12查看錯誤詳細信息:
{"error_code":"DLI.10001","error_msg":"服務內部出錯了。{0} 請聯系客服或者稍后重試","error_json_opt":{"error":" Unexpected exception[NoSuchElementException: None.get]"}}
查看用戶是否創建DLI委托,發現用戶沒有創建委托權限,在“全局配置”>“服務授權”頁面勾選“Tenant Administrator(全局服務)”權限后,重試可以給OBS桶授權。
Flink SQL作業將OBS表映射為DLI的分區表
場景概述
用戶使用Flink SQL作業時,需要創建OBS分區表,用于后續進行批處理。
操作步驟
該示例將car_info數據,以day字段為分區字段,parquet為編碼格式(目前僅支持parquet格式),轉儲數據到OBS。
create sink stream car_infos (
carId string,
carOwner string,
average_speed double,
day string
) partitioned by (day)
with (
type = "filesystem",
file.path = "obs://obs-sink/car_infos",
encode = "parquet",
ak = "{{myAk}}",
sk = "{{mySk}}"
);
數據最終在OBS中的存儲目錄結構為:obs://obs-sink/car_infos/day=xx/part-x-x。
數據生成后,可通過如下SQL語句建立OBS分區表,用于后續批處理:
1.創建OBS分區表。
create table car_infos (
carId string,
carOwner string,
average_speed double
)
partitioned by (day string)
stored as parquet
location 'obs://obs-sink/car-infos';
2.從關聯OBS路徑中恢復分區信息。
alter table car_infos recover partitions;
OBS表如何映射為DLI的分區表?
該示例將car_info數據,以day字段為分區字段,parquet為編碼格式(目前僅支持parquet格式),轉儲數據到OBS。
create sink stream car_infos (
carId string,
carOwner string,
average_speed double,
day string
) partitioned by (day)
with (
type = "filesystem",
file.path = "obs://obs-sink/car_infos",
encode = "parquet",
ak = "{{myAk}}",
sk = "{{mySk}}"
);
數據最終在OBS中的存儲目錄結構為:obs://obs-sink/car_infos/day=xx/part-x-x。
數據生成后,可通過如下SQL語句建立OBS分區表,用于后續批處理:
1.創建OBS分區表。
create table car_infos (
carId string,
carOwner string,
average_speed double
)
partitioned by (day string)
stored as parquet
location 'obs://obs-sink/car-infos';
2.從關聯OBS路徑中恢復分區信息。
alter table car_infos recover partitions;
在Flink SQL作業中創建表使用EL表達式,作業運行報DLI.0005錯誤
問題現象
Flink SQL作業創建表時,表名使用EL表達式,運行作業時報如下錯誤:
DLI.0005: AnalysisException: t_user_message_input_#{date_format(date_sub(current_date(), 1), 'yyyymmddhhmmss')} is not a valid name for tables/databases. Valid names only contain alphabet characters, numbers and _.
解決方案
需要將SQL中表名的“#”字符改成“
”即可。DLI中使用EL表達式的格式為:
{ expr } 。
Flink作業輸出流寫入數據到OBS,通過該OBS文件路徑創建的DLI表查詢無數據
問題現象
使用Flink作業輸出流寫入數據到了OBS中,通過該OBS文件路徑創建的DLI表進行數據查詢時,無法查詢到數據。
例如,使用如下Flink結果表將數據寫入到OBS的“obs://obs-sink/car_infos”路徑下。
create sink stream car_infos_sink (
carId string,
carOwner string,
average_speed double,
buyday string
) partitioned by (buyday)
with (
type = "filesystem",
file.path = "obs://obs-sink/car_infos",
encode = "parquet",
ak = "{{myAk}}",
sk = "{{mySk}}"
);
通過該OBS文件路徑創建DLI分區表,在DLI查詢car_infos表數據時沒有查詢到數據。
create table car_infos (
carId string,
carOwner string,
average_speed double
)
partitioned by (buyday string)
stored as parquet
location 'obs://obs-sink/car_infos';
解決方案
1.在DLI創建Flink結果表到OBS的作業時,如上述舉例中的car_infos_sink表,是否開啟了Checkpoint。如果未開啟則需要開啟Checkpoint參數,重新運行作業生成OBS數據文件。
開啟Checkpoint步驟如下。
a.到DLI管理控制臺,左側導航欄選擇“作業管理 > Flink作業”,在對應的Flink作業所在行,操作列下單擊“編輯”。
b.在“運行參數”下,查看“開啟Checkpoint”參數是否開啟。
2.確認Flink結果表的表結構和DLI分區表的表結構是否保持一致。如問題描述中car_infos_sink和car_infos表的字段是否一致。
3.通過OBS文件創建DLI分區表后,是否執行以下命令從OBS路徑中恢復分區信息。如下,在創建完DLI分區表后,需要恢復DLI分區表car_infos分區信息。
alter table car_infos recover partitions;
Flink SQL作業運行失敗,日志中有connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null錯誤
問題現象
在DLI上提交Flink SQL作業,作業運行失敗,在作業日志中有如下報錯信息:
connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null
問題根因
該Flink SQL作業在配置作業運行參數時,有選擇保存作業日志或開啟Checkpoint,配置了OBS桶保存作業日志和Checkpoint。但是運行該Flink SQL作業的IAM用戶沒有OBS寫入權限導致該問題。
解決方案
1.登錄IAM控制臺頁面,單擊“用戶”,在搜索框中選擇“用戶名”,輸入運行作業的IAM用戶名。
2.單擊查詢到用戶名,查看該用戶對應的用戶組。
3.單擊“用戶組”,輸入查詢到的用戶組查詢,單擊用戶組名稱,在“授權記錄”中查看當前用戶的權限。
4.確認當前用戶所屬用戶組下的權限是否包含OBS寫入的權限,比如“OBS OperateAccess”。如果沒有OBS寫入權限,則給對應的用戶組進行授權。
5.授權完成后,等待5到10分鐘等待權限生效。再次運行失敗的Flink SQL作業,查看作業運行狀態。
Flink SQL作業讀取DIS數據報Not authorized錯誤
問題現象
Flink SQL作業讀取DIS數據,運行該作業時,語義校驗失敗。具體作業失敗提示信息如下:
Get dis channel xxx info failed. error info: Not authorized, please click the overview page to do the authorize action
問題原因
運行Flink作業前,沒有對運行的用戶賬號授權獲取DIS數據的權限。
解決方案
1.登錄到DLI管理控制臺,左側導航欄選擇“全局配置 > 服務授權”。
2.在服務授權管理界面,勾選“DIS Administrator”權限,單擊“更新委托授權”完成對當前用戶的DIS權限授權。
3.在“作業管理 > Flink作業”,單擊對應的Flink SQL作業,重新啟動和運行該作業。
Flink SQL作業消費Kafka后sink到es集群,作業執行成功,但未寫入數據
問題現象
客戶創建Flink SQL作業,消費Kafka后sink到es集群,作業執行成功,但無數據。
原因分析
查看客戶作業腳本內容,排查無問題,作業執行成功,出現該問題可能的原因如下:
- 數據不準確。
- 數據處理有問題。
處理步驟
1.在Flink UI查看task日志,發現報錯中提到json體,基本確定原因為數據格式問題。
2.排查客戶實際數據,發現客戶Kafka數據存在多層嵌套的復雜json體。不支持解析。
3.有兩種方式解決此問題:
- 通過udf成jar包的形式
- 修改配置
4.修改源數據格式,再次執行作業,無問題。