節點重啟后消費者如何重連
更新時間 2024-01-18 10:19:24
最近更新時間: 2024-01-18 10:19:24
分享文章
本文主要介紹節點重啟后消費者如何重連分布式消息服務RabbitMQ。
本章節以Java中使用的RabbitMQ客戶端amqp-client為例介紹節點重啟后消費者如何重連。
amqp-client自帶重連機制,但是自帶的重連機制只會重試一次,一次連不上后就不會再執行了,這時如果消費者沒有做額外的重試機制,那么這個消費者就徹底喪失的消費能力。
amqp-client在節點斷連后,根據與通道建立的節點不同,產生不同的錯誤。
- 如果通道連接的是隊列所在的節點,消費者就會收到一個shutdown信號,這時amqp-client的重連機制就會生效,嘗試重新連接服務端。如果連上了,這個通道就會繼續連接消費。如果連不上,就會執行channel.close方法,關閉這個通道。
- 如果通道連接的不是隊列所在的節點,消費者不會觸發關閉動作,而是由服務端發送的一個取消動作,這個動作對amqp-client來說并不是異常行為,所以日志上不會有明顯的報錯,但是連接最終還是會關閉。
amqp-client出現上面兩種錯誤時,會分別回調handleShutdownSignal以及handleCancel方法,您可以通過重寫這兩種方法,在回調時執行重寫的重連邏輯,就能在通道關閉后重新創建消費者的新通道繼續消費。
以下提供一個簡單的代碼示例,此示例能夠解決上面的兩種錯誤,實現消費者的持續消費。
package rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer {
public static void main(String... args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("100.00.000.000");
factory.setPort(5672);
factory.setUsername("name");
factory.setPassword("password");
Connection connection = factory.newConnection();
createNewConnection(connection);
}
public static void createNewConnection(Connection connection) {
try {
Thread.sleep(1000);
Channel channel = connection.createChannel();
channel.basicQos(64);
channel.basicConsume("queue-01", false, new CustomConsumer(channel, connection));
} catch (Exception e) {
// e.printStackTrace();
createNewConnection(connection);
}
}
static class CustomConsumer implements Consumer {
private final Channel _channel;
private final Connection _connection;
public CustomConsumer(Channel channel, Connection connection) {
_channel = channel;
_connection = connection;
}
@Override
public void handleConsumeOk(String consumerTag) {
}
@Override
public void handleCancelOk(String consumerTag) {
}
@Override
public void handleCancel(String consumerTag) throws IOException {
System.out.println("handleCancel");
System.out.println(consumerTag);
createNewConnection(_connection);
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
System.out.println("handleShutdownSignal");
System.out.println(consumerTag);
System.out.println(sig.getReason());
createNewConnection(_connection);
}
@Override
public void handleRecoverOk(String consumerTag) {
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
_channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}