RocketMQ觸發器
更新時間 2025-07-28 09:07:35
最近更新時間: 2025-07-28 09:07:35
分享文章
RocketMQ觸發器
RocketMQ觸發器可以訂閱分布式消息服務RocketMQ并根據消息觸發關聯的工作流,借此能力,使得工作流可以消費指定topic的消息,執行自定義處理邏輯。
注意事項
RocketMQ觸發器訂閱的RocketMQ實例必須和工作流在相同地域。
前提條件
已創建工作流。
開通分布式消息服務RocketMQ實例(RocketMQ引擎類型),詳情請參考開通RocketMQ實例。
創建Topic和GroupID。
創建用戶,且默認Topic權限設置為:PUB|SUB,默認消費組權限為SUB。詳情請參考創建用戶。
觸發消息格式
有兩種消息格式:RawData和CloudEvent格式,可在觸發器配置里選擇。
CloudEvent格式:
[
{
"id": "21000777109E05EF04B574B8A1DF0001",
"source": "ctyun.faas.trigger.rocketmq",
"specversion": "1.0",
"type": "rocketmq:topic:send-message",
"datacontenttype": "application/json",
"subject": ":mq-func-hckzeddbxj-rocket-test:test-for-faas",
"time": "57361-07-03T16:18:39Z",
"data": {
"topic": "test-for-faas",
"properties": {
"CLUSTER": "1dafcb4049ba42df96d80b7dd2f99c5e",
"CONSUME_START_TIME": "1747987057130",
"KEYS": "webtest",
"MAX_OFFSET": "2",
"MIN_OFFSET": "0",
"TAGS": "1747987057097_0",
"UNIQ_KEY": "21000777109E05EF04B574B8A1DF0001"
},
"data": "WebTestTools_174798gjkS"
}
}
]| 參數 | 類型 | 示例值 | 描述 |
|---|---|---|---|
| id | string | 21000777109E05EF04B574B8A1DF0001 | 事件ID。標識事件的唯一值。提取自RocketMQ消息。 |
| source | string | ctyun.faas.trigger.rocketmq | 事件源。RocketMQ觸發器固定為ctyun.faas.trigger.rocketmq。 |
| specversion | string | 1.0 | CloudEvents協議版本。 |
| type | string | rocketmq:topic:send-message | 事件類型。 |
| datacontenttype | string | application/json | 參數data的內容形式。 |
| subject | string | mq-func-hckzeddbxj-rocket-test:test-for-faas | 事件主體。 |
| time | string | 2025-05-22T02:04:16Z | 消息被觸發的時間。 |
| data | object | - | RocketMQ觸發器獨有消息格式,詳細參見下文RawData描述。 |
RawData格式 是CloudEvent格式的子集,只包含原始rabbitmq消息的信息,消息結構相當于CloudEvent的data字段,具體如下:
[
{
"topic": "test-for-faas",
"properties": {
"CLUSTER": "1dafcb4049ba42df96d80b7dd2f99c5e",
"CONSUME_START_TIME": "1747987204637",
"KEYS": "webtest",
"MAX_OFFSET": "3",
"MIN_OFFSET": "0",
"TAGS": "1747987204605_0",
"UNIQ_KEY": "2100077510A605EF04B574BAE2080001"
},
"data": "WebTestTools_17v6fg0J"
}
]| 參數 | 類型 | 示例 | 描述 |
|---|---|---|---|
| topic | string | test-for-faas | Topic名稱。 |
| properties | map | - | 消息自定義屬性。 |
| properties.CLUSTER | string | 1dafcb4049ba42df96d80b7dd2f99c5e | RocketMQ實例ID。 |
| properties.CONSUME_START_TIME | string | 1747987204637 | Unix時間戳,毫秒。 |
| properties.KEYS | string | webtest | 消息的key。 |
| properties.MAX_OFFSET | string | 3 | 消息隊列中的最大偏移量。 |
| properties.MIN_OFFSET | string | 0 | 消息隊列中的最小偏移量。 |
| properties.TAGS | string | 1747987204605_0 | 消息標簽。 |
| properties.UNIQ_KEY | string | 2100077510A605EF04B574BAE2080001 | 消息唯一鍵。 |
| data | string | WebTestTools_17v6fg0J | 消息體內容。 |
操作步驟
登錄,點擊目標工作流,進入工作流詳情詳情。
在配置選項卡中,選擇左邊的 工作流調度 選項卡。
點擊 創建工作流調度,在彈出的右抽屜中選擇 RocketMQ觸發器,配置參數解釋如下表。
| 配置項 | 操作說明 | 示例 |
|---|---|---|
| 觸發器類型 | 選擇RocketMQ觸發器。 | RocketMQ觸發器 |
| 名稱 | 填寫自定義的觸發器名稱。 | rocketmq-trigger |
| 版本或別名 | 默認值為LATEST,支持選擇任意函數版本或函數別名。 | LATEST |
| RocketMQ 實例 | 選擇已創建的RocketMQ實例。 | - |
| Topic | 選擇已創建的RocketMQ實例的Topic。 | - |
| Group ID | 選擇已創建的RocketMQ實例的Group ID。 | - |
| 消費位點 | 選擇消息的消費位點,即觸發器從RocketMQ實例開始拉取消息的位置。取值說明如下。 最新位點:從最新位點開始消費。 最早位點:從最早位點開始消費。 指定時間戳:從指定時間戳開始消費。 | 最新位點 |
| 調用方式 | 選擇函數調用方式。 同步調用:指觸發器消費topic消息后投遞到函數是同步調用,會等待函數響應后繼續下一個消息投遞。 異步調用:指觸發器消費topic消息后投遞到函數是異步調用,不會等待函數響應,可以快速消費事件。 | 同步調用 |
| 用戶ID | RocketMQ實例用戶ID,需要在RocketMQ控制臺創建。 | - |
| 密鑰 | RocketMQ實例用戶密鑰,需要在RocketMQ控制臺創建。 | - |
| 觸發器啟用狀態 | 創建觸發器后是否立即啟用。默認選擇開啟,即創建觸發器后立即啟用觸發器。 | - |
| 推送配置 | 批量推送條數:批量推送的最大值,積壓值達到后立刻推送,取值范圍為 [1, 10000]。 批量推送間隔:批量推送的最大時間間隔,達到后立刻推送,單位秒,取值[0,15]。默認0無需等待,數據直接推送。 推送格式:函數收到的事件格式,詳情請查閱觸發器事件消息格式。 | - |
| 重試策略 | 消息推送函數失敗后重試的策略,共兩種: 指數退避:指數退避重試,重試5次,重試周期為2,4,8,16,32(秒)。 線性退避:線性退避重試,重試5次,重試周期為1,2,3,4,5(秒)。 | - |
| 容錯策略 | 當重試次數耗盡后仍然失敗時的處理方式: 允許容錯:當異常發生并超過重試策略配置時直接丟棄。 禁止容錯:當異常發生并超過重試策略配置時繼續阻塞執行。 | - |
| 死信隊列 | 當容錯策略為:允許容錯時,可以額外開啟死信隊列。當開啟死信隊列時且異常發生并超過重試策略配置時,消息會被投遞到指定的消息隊列里,當前只支持投遞到kafka和rocketmq | - |