消息堆積對業務的影響及解決辦法
更新時間 2024-01-08 09:17:34
最近更新時間: 2024-01-08 09:17:34
分享文章
本文主要介紹消息堆積對業務的影響及解決辦法。
消息堆積對業務的影響
過量的消息堆積可能會引起內存或磁盤告警,從而造成所有connection阻塞,進而影響到其他隊列的使用,導致整體服務質量的下降。
消息堆積產生的原因
- 一般來說消息堆積是由于生產消息的速率遠大于消費消息的速率所導致的。比如某個時間段消費端處理消息異常緩慢,發送一條消息只要3秒鐘,而消費一條消息需要1分鐘,每分鐘發送20個消息,只有一個消息被消費端處理,這樣隊列中就會產生大量的消息堆積。
- 消費者出現異常,生產者一直在發送消息,但是消費者不能消費,造成消息積壓。
- 消費者沒有出現異常,但是消費者與隊列間的訂閱可能出現了異常,也會導致消息無法被消費從而造成堆積的情況。
- 消費者正常,與隊列間的訂閱也正常,但是消費端的代碼本身邏輯耗費時間長導致了消費能力降低,這時候就會出現中的情況從而導致消息堆積。
解決消息堆積的辦法
- 生產速率較快,消費速率較慢 :您可以通過以下方法解決。
- 增加消費者數量提高消費速率。
- 采用生產者確認的發送模式,并監控生產端消息生產速度和時長,當消息生產時長有明顯增加時進行流控措施。
- 消費者異常 :建議排查消費者邏輯是不是有問題,優化程序。
- 消費者與隊列間的訂閱異常 :建議排查隊列和消費者之間的訂閱是否正常。
- 消費端的代碼本身邏輯耗費時間長 :建議給消息設置過期時間,設置方法如下:
- 在生產消息時,設置消息過期時間。消息過期時間以expiration值體現。
- 在properties中設置expiration值,單位為毫秒(ms)。
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .build(); String message = "hello rabbitmq"; channel.basicPublish(exchange, routingKey, properties, message.getBytes(StandardCharsets.UTF_8)); - 在Web界面中設置expiration值,單位為毫秒(ms)。
- 在properties中設置expiration值,單位為毫秒(ms)。
登錄Web界面,在“Exchanges”頁簽,單擊Exchange名稱,進入Exchange詳情頁。在“Publish message”區域,設置expiration值,如下圖所示。
- 設置隊列過期時間。隊列過期時間以x-message-ttl值體現。從消息進入隊列開始計算,超過了配置的隊列過期時間,消息會自動被刪除。
-
在客戶端代碼中設置x-message-ttl值,單位為毫秒(ms)。
Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-message-ttl", 10000); channel.queueDeclare(queueName, true, false, false, arguments); -
在Web界面新建隊列時,設置x-message-ttl值,單位為毫秒(ms)。
登錄Web界面,在“Exchanges”頁簽,新建隊列時,設置x-message-ttl值,如下圖所示。
-