Kafka觸發器
更新時間 2025-07-28 09:07:35
最近更新時間: 2025-07-28 09:07:35
分享文章
Kafka觸發器
Kafka觸發器可以訂閱天翼云提供的分布式消息隊列Kafka實例,并根據消息觸發關聯的工作流,借此能力,使得工作流可以消費指定topic的消息,執行特定工作流處理邏輯。
注意事項
Kafka觸發器訂閱的Kafka實例必須和工作流在相同地域。
前提條件
已創建工作流。
已開通分布式消息Kafka實例(KAFKA引擎版),詳情請參考創建分布式消息服務Kafka實例。
已創建Topic,創建GroupID(可選)
觸發消息格式
Kafka觸發器有兩種消息格式:RawData和CloudEvent格式,可在觸發器配置里選擇。 CloudEvent格式:
[
{
"id": "eca53463-6baf-4d56-8f86-cbdb748208ed",
"source": "ctyun.faas.trigger.kafka",
"specversion": "1.0",
"type": "kafka:topic:send-message",
"datacontenttype": "application/json",
"subject": "kafka-trigger-mqbjvsezbp-dial-test:test-for-faas",
"time": "2025-05-22T02:04:16Z",
"data": {
"headers": {},
"timestamp": 1747879456,
"topic": "test-for-faas",
"partition": 0,
"offset": 15280,
"key": "",
"value": "msg[9]: 154b2a0e-2c3d-4b03-ae9e-c225b5370c3b, ts=2025-05-22 02:04:16"
}
}
]| 參數 | 類型 | 示例值 | 描述 |
|---|---|---|---|
| id | string | eca53463-6baf-4d56-8f86-cbdb748208ed | 事件ID。標識事件的唯一值。 |
| source | string | ctyun.faas.trigger.kafka | 事件源。Kafka觸發器固定為ctyun.faas.trigger.kafka。 |
| specversion | string | 1.0 | CloudEvents協議版本。 |
| type | string | kafka:topic:send-message | 事件類型。 |
| datacontenttype | string | application/json | 參數data的內容形式。 |
| subject | string | kafka-trigger-mqbjvsezbp-dial-test:test-for-faas | 事件主體。格式為[SourceName]:[消息topic]。 |
| time | string | 2025-05-22T02:04:16Z | 消息被觸發的時間。 |
| data | object | - | Kafka觸發器獨有消息格式,詳細參見下文RawData描述。 |
RawData格式是CloudEvent格式的子集,只包含原始kafka消息的信息,消息結構相當于CloudEvent的data字段
[
{
"offset" : 15280,
"partition" : 0,
"headers" : {},
"topic" : "test-for-faas",
"key" : "testkey",
"timestamp" : 1747879456,
"value" : "msg[9]: 154b2a0e-2c3d-4b03-ae9e-c225b5370c3b, ts=2025-05-22 02:04:16"
}
]| 參數 | 類型 | 示例值 | 描述 |
|---|---|---|---|
| offset | int | 15280 | 消息偏移量。 |
| partition | int | 0 | 分區信息。 |
| headers | map | - | 消息攜帶的header。 |
| topic | string | test-for-faas | topic的名稱。 |
| key | string | testkey | 消息的key。 |
| timestamp | int | 1747879456 | Unix時間戳(秒)。 |
| value | string | hello,kafka | 消息的內容。 |
操作步驟
登錄,點擊目標工作流,進入工作流詳情詳情。
在配置選項卡中,選擇左邊的 工作流調度 選項卡。
點擊 創建工作流調度,在彈出的右抽屜中選擇 Kafka觸發器,配置參數解釋如下表。
| 配置項 | 操作說明 | 示例 |
|---|---|---|
| 觸發器類型 | 選擇Kafka觸發器。 | Kafka觸發器 |
| 名稱 | 填寫自定義的觸發器名稱。 | kafka-trigger |
| Kafka實例 | 選擇已創建的Kafka實例。 | - |
| Topic | 選擇已創建的Kafka實例的Topic。 | - |
| Group ID | ? 快速創建:推薦方案。自動創建以GROUP-FC-Trigger-{trigger_name}-{uuid}命名的Group ID。 ? 使用已有:選擇Kafka實例已有的GroupID,請您注意不要與已有的業務混用GroupID,否則會影響已有的消息收發。 | - |
| 消費任務并發數 | 消費者的并發數量,有效取值范圍為[1,20],建議不超過Topic的分區數。該值同時影響投遞到函數的并發數。 | - |
| 消費位點 | 選擇消息的消費位點,即觸發器從kafka消息隊列開始拉取消息的位置。 ? 最早位點:從最早位點開始消費。 ? 最新位點:從最新位點開始消費。 | 最新位點 |
| 調用方式 | 選擇函數調用方式。 ? 同步調用:指觸發器消費topic消息后投遞到函數是同步調用,會等待函數響應后繼續下一個消息投遞。但消費任務并發數大于1時,多個消費者有可能會并發消費消息并投遞,并發的情況視topic隊列本身積存的消息而定。 ? 異步調用:指觸發器消費topic消息后投遞到函數是異步調用,不會等待函數響應,可以快速消費事件。 | 同步調用 |
| 觸發器啟用狀態 | 創建觸發器后是否立即啟用。默認選擇開啟,即創建觸發器后立即啟用觸發器。 | 啟用 |
| 推送配置 | ? 批量推送條數:批量推送的最大值,積壓值達到后立刻推送,取值范圍為[1,10000]。 ? 批量推送間隔:批量推送的最大時間間隔,達到后立刻推送,單位秒,取值[0,15]。默認0無需等待,數據直接推送。 ? 推送格式:函數收到的事件格式,詳情請查閱觸發器事件消息格式。 | - |
| 重試策略 | 消息推送函數失敗后重試的策略,共兩種: ? 指數退避:指數退避重試,重試5次,重試周期為2,4,8,16,32(秒)。 ? 線性退避:線性退避重試,重試5次,重試周期為1,2,3,4,5(秒)。 | - |
| 容錯策略 | 當重試次數耗盡后仍然失敗時的處理方式: ? 允許容錯:當異常發生并超過重試策略配置時直接丟棄。 ? 禁止容錯:當異常發生并超過重試策略配置時繼續阻塞執行。 | - |
| 死信隊列 | 當容錯策略為:允許容錯時,可以額外開啟死信隊列。當開啟死信隊列時且異常發生并超過重試策略配置時,消息會被投遞到指定的消息隊列里,當前只支持投遞到kafka和rocketmq | - |