使用場景
生產者在發布消息后,怎么確認消息已正確發布到服務器端?服務器端又怎么確認消息已成功被消費?RabbitMQ提供的消息確認機制可以解決此問題。
在使用RabbitMQ時,生產者確認和消費者確認對于確保數據可靠性至關重要。如果連接失敗,傳輸中的消息可能會丟失,需要重新傳輸。確認機制可以讓服務器和客戶端知道何時重新傳輸消息。客戶端可以在收到消息時確認消息,也可以在客戶端完全處理完消息后確認。生產者確認會影響性能,如果需要很高的吞吐量,應禁用生產者確認。注意,不使用生產者確認會導致可靠性下降。
更多關于消息確認機制的說明,請參考。
生產者確認
生產者確認,即服務端在收到來自生產者的消息時進行確認。
以下示例演示在Java客戶端配置生產者確認:
try {
channel.confirmSelect() ; //將信道置為publisher confirm模式
//之后正常發送消息
channel . basicPublish( "exchange " , " routingKey" , null , "publisher confirm test " .getBytes());
if (!channel.waitForConfirms()) {
System.out.println( "send message failed " ) ;
// do something else....
}
} catch (InterruptedException e) {
e.printStackTrace() ;
}
調用channel .waitForConfirms方法之后,會等待服務端確認,這是一種同步等待的方式,會對性能產生影響。如果生產者要滿足at least once,就必須使用同步等待方式。
消費者確認
消費者確認是指服務端通過確認消息是否成功被消費者接收,來判斷是否刪除隊列中的此消息。
消費者確認對數據可靠性十分重要,接收重要消息的消費應用程序在未處理完消息前不應確認消息,以便消費者有足夠的時間處理消息,無需擔心消息處理過程中由于消費者進程異常(如工作程序崩潰、重啟等)導致消息丟失。
消費者確認在客戶端上配置,通過配置basicConsume方法啟用確認。在channel中啟用消費者確認適用于大多數場景。
以下示例演示在Java客戶端配置消費者確認(使用Channel#basicAck設置basic.ack為肯定):
// this example assumes an existing channel instanceboolean autoAck = false;
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will
// be discarded
channel.basicAck(deliveryTag, false);
}
});
未確認的消息緩存在內存中,如果未確認的消息過多,會導致內存使用率過高,此時可以在客戶端配置預取值來限制消費者預取的消息數量,具體方法請參見預取值。