本章節基于吞吐量和可靠性兩個指標,指導您通過設置隊列長度、集群負載均衡、優先隊列數量等參數,實現RabbitMQ的高性能。
使用較小的隊列長度
隊列中存在大量消息時,會給內存使用帶來沉重的負擔,為了釋放內存,RabbitMQ會將消息刷新到磁盤。這個過程通常需要時間,由于需要重建索引,重啟包含大量消息的集群非常耗時。當刷盤的消息過多時,會阻塞隊列處理消息,從而降低隊列速度,對RabbitMQ節點的性能產生負面影響。
要獲得最佳性能,應盡可能縮短隊列。建議始終保持隊列消息堆積的數量在0左右。
對于經常受到消息峰值影響的應用程序,和對吞吐量要求較高的應用程序,建議在隊列上設置 最大長度 。這樣可以通過丟棄隊列頭部的消息來保持隊列長度,隊列長度永遠不會大于最大長度設置。
最大長度可以通過Policy設置,也可以通過在隊列聲明時使用對應參數設置。
- 在Policy中設置


在隊列聲明時使用對應參數設置
//創建隊列 HashMap<String, Object> map = new HashMap<>(); //設置隊列最大長度 map.put("x-max-length",10 ); //設置隊列溢出方式保留前10 map.put("x-overflow","reject-publish" ); channel.queueDeclare(queueName,false,false,false,map);
當隊列長度超過設置的最大長度時,RabbitMQ的默認做法是將隊列頭部的信息(隊列中最老的消息)丟棄或變成死信。可以通過設置不同的overflow值來改變這種方式,如果overflow值設置為 drop-head ,表示從隊列前面丟棄或dead-letter消息,保存后n條消息。如果overflow值設置為 reject-publish ,表示最近發布的消息將被丟棄,即保存前n條消息。
說明如果同時使用以上兩種方式設置隊列的最大長度,兩者中較小的值將被使用。
超過隊列最大長度的消息會被丟棄,請謹慎使用。
使用集群的負載均衡
隊列的性能受單個CPU內核控制,當一個RabbitMQ節點處理消息的能力達到瓶頸時,可以通過集群進行擴展,從而達到提升吞吐量的目的。
使用多個節點,集群會自動將隊列均衡的創建在各個節點上。除了使用集群模式,您還可以使用以下兩個插件優化負載均衡:
Consistent hash exchange
該插件使用交換器來平衡隊列之間的消息。根據消息的路由鍵,發送到交換器的消息一致且均勻地分布在多個隊列中。該插件創建路由鍵的散列,并將消息傳播到與該交換器具有綁定關系的隊列中。使用此插件時,需要確保消費者從所有隊列中消費。
使用示例如下:
- 使用不同的路由鍵來路由消息。
public class ConsistentHashExchangeExample1 {
private static String CONSISTENT_HASH_EXCHANGE_TYPE = "x-consistent-hash";
public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.newConnection();
Channel ch = conn.createChannel();
for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
ch.queueDeclare(q, true, false, false, null);
ch.queuePurge(q);
}
ch.exchangeDeclare("e1", CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);
for (String q : Arrays.asList("q1", "q2")) {
ch.queueBind(q, "e1", "1");
}
for (String q : Arrays.asList("q3", "q4")) {
ch.queueBind(q, "e1", "2");
}
ch.confirmSelect();
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
for (int i = 0; i < 100000; i++) {
ch.basicPublish("e1", String.valueOf(i), bldr.build(), "".getBytes("UTF-8"));
}
ch.waitForConfirmsOrDie(10000);
System.out.println("Done publishing!");
System.out.println("Evaluating results...");
// wait for one stats emission interval so that queue counters
// are up-to-date in the management UI
Thread.sleep(5);
System.out.println("Done.");
conn.close();
}
}
- 通過不同的header來路由消息,該方式需要為交換器提供“hash-header”參數設置,且消息必須帶有header,否則會被路由到相同的隊列。
public class ConsistentHashExchangeExample2 {
public static final String EXCHANGE = "e2";
private static String EXCHANGE_TYPE = "x-consistent-hash";
public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.newConnection();
Channel ch = conn.createChannel();
for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
ch.queueDeclare(q, true, false, false, null);
ch.queuePurge(q);
}
Map<String, Object> args = new HashMap<>();
args.put("hash-header", "hash-on");
ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
for (String q : Arrays.asList("q1", "q2")) {
ch.queueBind(q, EXCHANGE, "1");
}
for (String q : Arrays.asList("q3", "q4")) {
ch.queueBind(q, EXCHANGE, "2");
}
ch.confirmSelect();
for (int i = 0; i < 100000; i++) {
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
Map<String, Object> hdrs = new HashMap<>();
hdrs.put("hash-on", String.valueOf(i));
ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF-8"));
}
ch.waitForConfirmsOrDie(10000);
System.out.println("Done publishing!");
System.out.println("Evaluating results...");
// wait for one stats emission interval so that queue counters
// are up-to-date in the management UI
Thread.sleep(5);
System.out.println("Done.");
conn.close();
}
}
- 使用消息屬性來路由消息,例如message_id、correlation_id或timestamp屬性。該方式需要使用“hash-property”參數來聲明交換器,且消息必須帶有所選擇的消息屬性,否則會被路由到相同的隊列。
public class ConsistentHashExchangeExample2 {
public static final String EXCHANGE = "e2";
private static String EXCHANGE_TYPE = "x-consistent-hash";
public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.newConnection();
Channel ch = conn.createChannel();
for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
ch.queueDeclare(q, true, false, false, null);
ch.queuePurge(q);
}
Map<String, Object> args = new HashMap<>();
args.put("hash-header", "hash-on");
ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);
for (String q : Arrays.asList("q1", "q2")) {
ch.queueBind(q, EXCHANGE, "1");
}
for (String q : Arrays.asList("q3", "q4")) {
ch.queueBind(q, EXCHANGE, "2");
}
ch.confirmSelect();
for (int i = 0; i < 100000; i++) {
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
Map<String, Object> hdrs = new HashMap<>();
hdrs.put("hash-on", String.valueOf(i));
ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF-8"));
}
ch.waitForConfirmsOrDie(10000);
System.out.println("Done publishing!");
System.out.println("Evaluating results...");
// wait for one stats emission interval so that queue counters
// are up-to-date in the management UI
Thread.sleep(5);
System.out.println("Done.");
conn.close();
}
}
RabbitMQ sharding
該插件自動對隊列進行分區,也就是說,一旦您將一個交換器定義為sharded,那么在每個集群節點上自動創建支持隊列,并在它們之間共享消息。該插件提供了一個集中發送消息的位置,并通過向集群中的其他節點添加隊列,實現負載均衡。使用此插件時,需要確保消費者從所有隊列中消費。
配置RabbitMQ sharding插件的步驟如下:
- 創建x-modulus-hash屬性交換器。


- 為該交換器添加策略。


- 單擊該交換器詳情,查看是否配置成功。


自動刪除不再使用的隊列
客戶端可能連接失敗導致隊列被殘留,大量的殘留隊列會影響實例的性能。RabbitMQ提供三種自動刪除隊列的方法:
- 在隊列中設置TTL策略:例如TTL策略設置為28天,當持續28天隊列未被使用時,此隊列將被刪除。
- 使用auto-delete隊列:當最后一個消費者退出或通道/連接關閉(或與服務器的TCP連接丟失)時,auto-delete隊列會被刪除。
- 使用exclusive queue:exclusive queue只能在創建它的連接中使用,當此連接關閉或消失時,exclusive queue會被刪除。
設置方法如下:
boolean exclusive = true;
boolean autoDelete = true;
channel.queueDeclare(QUEUENAME, durable, exclusive, autoDelete, arguments);
限制使用優先隊列的數量
每個優先隊列會啟動一個Erlang進程,過多的優先隊列會影響性能。在大多數情況下,建議使用不超過5個優先隊列。
連接和通道
每個連接使用大約100 KB的內存(如果使用 TLS會更多),成千上萬的連接會導致RabbitMQ負載很高,極端情況下,會導致內存溢出。AMQP協議引入了通道的概念,一個連接中可以有多個通道。連接是長期存在的,AMQP連接的握手過程比較復雜,至少需要7個TCP數據包(如果使用TLS會更多)。相對連接來說,打開和關閉通道會更簡單,但是建議通道也設置為長期存在的。例如,應該為每個生產者線程重用相同的通道,不要在每次生產時都打開通道。最佳實踐是重用連接并將線程之間的連接與通道多路復用。
推薦使用Spring AMQP線程池:ConnectionFactory是Spring AMQP定義的連接工廠,負責創建連接。
不要在線程之間共享通道
大多數客戶端并未實現通道的線程安全,所以不要在線程之間共享通道。
不要頻繁打開和關閉連接或通道
頻繁打開和關閉連接或通道會發送和接收大量的TCP包,從而導致更高的延遲,確保不要頻繁打開和關閉連接或通道。
生產者和消費者使用不同的通道
生產者和消費者使用不同的連接以實現高吞吐量。當生產者發送太多消息給服務端處理時,RabbitMQ會將壓力傳遞到TCP連接上。如果在同一個TCP連接上消費,服務端可能不會收到來自客戶端的消息確認,從而影響消費性能。若消費速度過低,服務端將不堪重負。
大量的連接和通道可能會影響RabbitMQ管理接口的性能
RabbitMQ會收集每個連接和通道的數據進行分析和顯示,大量連接和通道會影響RabbitMQ管理接口的性能。
禁用未使用的插件
插件可能會消耗大量CPU或占用大量內存,建議禁用未使用的插件。