本文介紹如何應用事件總線EventBridge的事件流功能實現分布式消息服務RocketMQ的消息路由。
前提條件
開通分布式消息服務RocketMQ并創建最少兩個主題。
背景信息
事件流作為更輕量、實時端到端的流式事件通道,提供輕量級的流式數據的過濾和轉換的能力,在不同的數據倉庫之間、數據處理程序之間、數據分析和處理系統之間進行數據同步。源端分布式消息服務RocketMQ生產的消息可以通過事件流這個通道被路由到目標端的分布式消息服務RocketMQ。
步驟一:創建事件流
登錄事件總線EventBridge控制臺。
在左側導航欄,單擊事件流。
在事件流頁面,單擊創建事件流。
在創建事件流面板,設置任務名稱和描述,配置以下參數,然后單擊保存。
a.在Source(源)配置向導,選擇數據提供方為分布式消息服務RocketMQ,設置以下參數,然后單擊下一步。
參數 說明 示例 實例名稱 前提條件中已創建的分布式消息服務RocketMQ版實例。 xxx Topic 當前實例中的Topic。 topic1 Group 消費組名。
快速創建:自動創建以GID_EVENTBRIDGE_xxx命名的Group ID。
使用已有:選擇當前實例中已創建的Group,請不要與已有業務的Group混用,以免影響已有的消息收發。
group1 消費位點 開始消費的位置。
最新位點:從最新位點開始消費。
最新位點 Tag 用于過濾消息的Tag值,非必填。 tag1 b.在Filtering(過濾)配置向導,設置事件過濾規則,單擊下一步。
c.在Sink(目標)配置向導,選擇服務類型為分布式消息服務RocketMQ,配置以下參數,單擊保存,如圖1所示。
參數 說明 示例 實例 選擇分布式消息服務RocketMQ實例。 instance-xxx Topic 選擇RocketMQ實例的Topic。 topic1 消息體 選擇消息體(Body)的內容,更多參考“事件內容轉換”。 完整事件 自定義屬性 選擇自定義屬性(Properties)的內容,更多參考“事件內容轉換”。 空 索引 選擇索引(Keys)的內容,更多參考“事件內容轉換”。 空 標簽 選擇標簽(Tags)的內容,更多參考“事件內容轉換”。 空 圖1 創建事件流時選擇服務類型為分布式消息服務RocketMQ的事件目標
創建事件流后,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。
步驟二:測試驗證
登錄分布式消息服務RocketMQ管理控制臺。
在左側導航欄,單擊實例列表,選擇事件流的源實例。
在主題管理頁面,選擇源的目標主題,操作列點擊更多,然后點擊測試。
在對話框輸入想要發送的消息,然后點擊發送消息。
發送消息后返回實例列表,選擇事件流的目標實例,進入管理。
在消息查詢頁面,選擇目標實例的目標主題,然后按主題查詢,查詢目標主題對應的消息。
查看查詢到的Key和Value值是否與生產的消息一致,如圖2所示。
圖2 在分布式消息服務RocketMQ管理控制臺中查看消息