消息生產
- 消息壓縮:將較大的消息進行壓縮后發送到服務端,有效利用帶寬。
- 延遲消息:設計消費時延,消息發送到服務端后,過了預設時間才可以被消費。
- 事務消息:根據預設的事務,事務消息可保證分布式系統之間的數據最終一致。
消息消費
- 有序消費:支持普通有序消息和嚴格有序消息兩種方式。
- 集群消費:一個主題可被一個或多個消費者組消費,消費者組中消費者實例可平均分攤消費信息。
- 消費位置設置:支持設置消費組首次啟動消費的位置,包括隊列頭、隊列尾及由客戶端指定。
- 消息回溯:支持按時間回溯消費進度,將訂閱組在某主題上的消費進度重置到過去或者未來。
完善的運維能力
- 應用用戶管理:集群租戶隔離,應用接入集群權限管理。
- 主題管理:支持對實例下的主題進行管理,執行創建刪除等操作。
- 訂閱組管理:支持對實例下的訂閱組進行管理。
- 生產者和消費者管理:用戶可查看當前實例下的生產者和消費者信息,并實時更新。
- 消息查詢:按消息ID、消息邏輯偏移量、消息key。
- 完善的運維功能,節點狀態檢測、啟停;實例狀態檢測、啟停;SLA監控等。
順序消息
順序消息是指消費消息的順序要同發送消息的順序一致,在 RocketMQ 中,主要有兩種有序消息:全局有序消息和局部有序消息(又稱普通有序消息、分區有序消息)。
普通有序消息:在正常情況下可以保證完全的順序消息,但是一旦發生通信異常造成Broker 重啟,隊列總數發生變化,哈希取模后定位的隊列會變化,因此會產生短暫的消息順序不一致。如果業務能容忍在集群異常情況(如某個 Broker 宕機或者重啟)下消息短暫的亂序,使用普通順序方式比較合適。
嚴格有序消息:無論正常異常情況都能保證順序,但是犧牲了分布式Failover 特性,即 Broker 集群中只要有一臺機器不可用,則整個集群都不可用(或者影響hash 值對應隊列的使用),服務可用性大大降低。如果服務器部署為同步雙寫模式,此缺陷可通過備機自動切換為主避免,不過仍然會存在幾分鐘的服務不可用。
事務消息
消息隊列 RocketMQ 提供類似 X/Open XA 的分布事務功能。半事務消息發送后,根據預設的事務進行判斷,滿足事務的消息將會被服務端確認,不滿足的事務的消息不會被服務端接收,從而實現在分布式場景下保障消息生產和本地事務的最終一致性。
半消息: 暫不能投遞的消息,發送方已經將消息成功發送到了消息隊列服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處于該種狀態下的消息即半消息。
消息回查: 由于網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,消息隊列 RocketMQ服務端通過掃描發現某條消息長期處于“半消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該過程即消息回查。
延時消息
生產者將消息發送到消息隊列RocketMQ服務端,設計消費時延,在預設的時間后才可以被消費者消費。發送延時消息時需要設定一個延時時間長度,消息將從當前發送時間點開始延遲固定時間之后才開始投遞,實現分布式場景的延時調度觸發效果。
廣播消費/集群消費
廣播消費:在廣播消費模式下,一條消息被多個Consumer消費,即使這些Consumer屬于同一個ConsumerGroup,消息也會被ConsumerGroup中的每個Consumer都消費一次,廣播消費中的ConsumerGroup 概念可以認為在消息劃分方面無意義。
集群消費:一個Topic可以被一個或多個ConsumerGroup消費,每個ConsumerGroup有自己獨立的消費進度,消費進度是保存在服務端的。 一個ConsumerGroup中的消費者實例可以平均分攤消費消息,做到負載均衡。例如某個Topic有9條消息,其中一個ConsumerGroup有3個不同的消費者實例(可能是3個進程,或者3臺機器),那么每個實例只消費其中的3條消息。在此消費模式下,可以做到Point-To-Point的消費,也可以做到JMS里面廣播消費,能滿足絕大部分場景,推薦使用此消費模式。
消息重試
對于有序消息:有序消息不能跳躍簽收,當消費者消費消息失敗后,消息隊列RocketMQ會自動不斷進行消息重試 (每次間隔時間為1秒),此時應用會出現消息消費被阻塞的情況。因此建議使用有序消息時,務必保證應用能夠及時監控并處理消費失敗的情況,避免阻塞現象的發生。
對于無序消息:消息隊列RocketMQ默認允許每條消息最多重試16次。
每次重試的間隔時間如下:
| 第幾次重試 | 與上次重試的間隔時間 |
|---|---|
| 1 | 10秒 |
| 2 | 30秒 |
| 3 | 1分鐘 |
| 4 | 2分鐘 |
| 5 | 3分鐘 |
| 6 | 4分鐘 |
| 7 | 5分鐘 |
| 8 | 6分鐘 |
| 9 | 7分鐘 |
| 10 | 8分鐘 |
| 11 | 9分鐘 |
| 12 | 10分鐘 |
| 13 | 20分鐘 |
| 14 | 30分鐘 |
| 15 | 1小時 |
| 16 | 2小時 |
如果消息重試16次后仍然失敗,消息將不再投遞。如果嚴格按照上述重試時間間隔計算,某條消息在一直消費失敗的前提下,將會在接下來的4小時46分鐘之內進行16次重試,超過這個時間范圍消息將不再重試投遞。
消息過濾
消費者訂閱了某個Topic后,消息隊列RocketMQ會將該主題中的所有消息投遞給消費者。若消費者只需要關注部分消息,可通過設置過濾條件在消息隊列RocketMQ版服務端進行過濾,只獲取到需要關注的消息子集,避免接收到大量無效的消息。
消息過濾主要通過以下幾個關鍵流程實現:
- 生產者:生產者在初始化消息時預先為消息設置一些屬性和標簽,用于后續消費時指定過濾目標。
- 消費者:消費者在初始化及后續消費流程中向服務端上報需要訂閱指定主題的哪些消息,即過濾條件。
- 服務端:消息隊列RocketMQ服務端根據消費者上報的過濾條件的表達式進行匹配,將符合條件的消息投遞給消費者進行消費。
消息隊列RocketMQ支持兩種過濾方式:
- 通過Tag進行過濾:生產者在發送消息時,設置消息的Tag標簽,消費者通過 Tag標簽指定需要消費的信息。
- 通過SQL屬性過濾:通過生產者為消息設置的屬性(Key)及屬性值(Value)進行匹配。生產者在發送消息時可設置多個屬性,消費者訂閱時可設置SQL語法的過濾表達式過濾多個屬性。