Java
更新時間 2025-07-03 13:57:39
最近更新時間: 2025-07-03 13:57:39
分享文章
使用Maven方式引入依賴
<dependency>??
????<groupId>com.rabbitmq</groupId>??
????<artifactId>amqp-client</artifactId>??
????<version>5.7.0</version>??
</dependency>
生產消息
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?java.io.IOException;
import?java.nio.charset.StandardCharsets;
import?java.util.concurrent.TimeUnit;
import?java.util.concurrent.TimeoutException;
public?class?RabbitmqProducer?{
????//??private??final??static??String??EXCHANGE_NAME??=??"exchangeTest";
????private?final?static?String?QUEUE_NAME?=?"helloMQ";
????//??private??final??static??String??ROUTING_KEY??=??"test";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
????????//??創建連接工廠
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????//??設置主機ip
????????factory.setHost("127.0.0.1");
????????//??設置amqp的tcp端口號
????????factory.setPort(5672);
????????//??設置用戶名密碼
????????factory.setUsername("YOUR?USERNAME");
????????factory.setPassword("YOUR?PASSWORD");
????????//??設置Vhost,需要在控制臺先創建
????????factory.setVirtualHost("/");
????????//??基于網絡環境合理設置超時時間
????????factory.setConnectionTimeout(30?*?1000);
????????factory.setHandshakeTimeout(30?*?1000);
????????factory.setShutdownTimeout(0);
????????//??創建一個連接
????????Connection?connection?=?factory.newConnection();
????????//??創建一個頻道
????????Channel?channel?=?connection.createChannel();
????????//??指定一個隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????for?(int?i?=?0;?i?<?100;?i++)?{
????????????//??發送的消息
????????????String?message?=?"Hello??rabbitMQ!_"?+?i;
????????????//??往隊列中發送一條消息,使用默認的交換器
????????????channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes(StandardCharsets.UTF_8));
????????????System.out.println("?Sent?message:?'"?+?message?+?"'");
????????????TimeUnit.MILLISECONDS.sleep(100);
????????}
????????//關閉頻道和連接
????????channel.close();
????????connection.close();
????}
}
消費消息
import?com.rabbitmq.client.*;
import?java.io.IOException;
import?java.nio.charset.StandardCharsets;
import?java.util.concurrent.TimeoutException;
public?class?RabbitmqConsumer?{
????//隊列名稱
????private?final?static?String?QUEUE_NAME?=?"helloMQ";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//創建連接工廠
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????//設置主機ip
????????factory.setHost("127.0.0.1");
????????//設置amqp的tcp端口號
????????factory.setPort(5672);
????????//設置用戶名密碼
????????factory.setUsername("YOUR?USERNAME");
????????factory.setPassword("YOUR?PASSWORD");
????????//設置Vhost
????????factory.setVirtualHost("/");
????????//基于網絡環境合理設置超時時間
????????factory.setConnectionTimeout(30?*?1000);
????????factory.setHandshakeTimeout(30?*?1000);
????????factory.setShutdownTimeout(0);
????????Connection?connection?=?factory.newConnection();
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????System.out.println("??[*]??Waiting??for??messages.??To??exit??press??CTRL+C");
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?message?=?new?String(body,?StandardCharsets.UTF_8);
????????????????System.out.println("Received?message:?'"?+?message?+?"'");
????????????}
????????};
????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
????}
}
SSL生產消息
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?com.rabbitmq.client.DefaultSaslConfig;
import?javax.net.ssl.KeyManagerFactory;
import?javax.net.ssl.SSLContext;
import?javax.net.ssl.TrustManagerFactory;
import?java.io.FileInputStream;
import?java.io.IOException;
import?java.nio.charset.StandardCharsets;
import?java.security.*;
import?java.security.cert.CertificateException;
import?java.util.concurrent.TimeUnit;
import?java.util.concurrent.TimeoutException;
public?class?RabbitmqProducerSsl?{
????//??private??final??static??String??EXCHANGE_NAME??=??"exchangeTest";
????private?final?static?String?QUEUE_NAME?=?"helloMQ";
????//??private??final??static??String??ROUTING_KEY??=??"test";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
????????//??創建連接工廠
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????//??設置主機ip
????????factory.setHost("127.0.0.1");
????????//??設置amqp的ssl端口號
????????factory.setPort(5671);
????????String?ksFile?=?"/sslpath/ssl/client_rabbitmq_key.p12";
????????String?tksFile?=?"/sslpath/ssl/truststore";
????????SSLContext?c?=?null;
????????try?{
????????????char[]?keyPassphrase?=?"YOUR?PASSPHRASE".toCharArray();
????????????KeyStore?ks?=?KeyStore.getInstance("PKCS12");
????????????ks.load(new?FileInputStream(ksFile),?keyPassphrase);
????????????KeyManagerFactory?kmf?=?KeyManagerFactory.getInstance("SunX509");
????????????kmf.init(ks,?keyPassphrase);
????????????char[]?trustPassphrase?=?"YOUR?PASSPHRASE".toCharArray();
????????????KeyStore?tks?=?KeyStore.getInstance("JKS");
????????????tks.load(new?FileInputStream(tksFile),?trustPassphrase);
????????????TrustManagerFactory?tmf?=?TrustManagerFactory.getInstance("SunX509");
????????????tmf.init(tks);
????????????c?=?SSLContext.getInstance("tlsv1.2");
????????????c.init(kmf.getKeyManagers(),?tmf.getTrustManagers(),?null);
????????}?catch?(KeyStoreException?e)?{
????????????throw?new?RuntimeException(e);
????????}?catch?(NoSuchAlgorithmException?e)?{
????????????throw?new?RuntimeException(e);
????????}?catch?(CertificateException?e)?{
????????????throw?new?RuntimeException(e);
????????}?catch?(UnrecoverableKeyException?e)?{
????????????throw?new?RuntimeException(e);
????????}?catch?(KeyManagementException?e)?{
????????????throw?new?RuntimeException(e);
????????}
????????factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
????????factory.useSslProtocol(c);
????????//??設置Vhost,需要在控制臺先創建
????????factory.setVirtualHost("/");
????????//??基于網絡環境合理設置超時時間
????????factory.setConnectionTimeout(30?*?1000);
????????factory.setHandshakeTimeout(30?*?1000);
????????factory.setShutdownTimeout(0);
????????//??創建一個連接
????????Connection?connection?=?factory.newConnection();
????????//??創建一個頻道
????????Channel?channel?=?connection.createChannel();
????????//??指定一個隊列
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????for?(int?i?=?0;?i?<?100;?i++)?{
????????????//??發送的消息
????????????String?message?=?"Hello??rabbitMQ!_"?+?i;
????????????//??往隊列中發送一條消息,使用默認的交換器
????????????channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes(StandardCharsets.UTF_8));
????????????System.out.println("?Sent?message:?'"?+?message?+?"'");
????????????TimeUnit.MILLISECONDS.sleep(100);
????????}
????????//關閉頻道和連接
????????channel.close();
????????connection.close();
????}
}
SSL消費消息
import?com.rabbitmq.client.*;
import?javax.net.ssl.KeyManagerFactory;
import?javax.net.ssl.SSLContext;
import?javax.net.ssl.TrustManagerFactory;
import?java.io.FileInputStream;
import?java.io.IOException;
import?java.nio.charset.StandardCharsets;
import?java.security.*;
import?java.security.cert.CertificateException;
import?java.util.concurrent.TimeoutException;
public?class?RabbitmqConsumerSsl?{
????//隊列名稱????
????private?final?static?String?QUEUE_NAME?=?"helloMQ";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//創建連接工廠????
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????//設置主機ip????
????????factory.setHost("127.0.0.1");
????????//??設置amqp的ssl端口號
????????factory.setPort(5671);
????????String?ksFile?=?"/sslpath/ssl/client_rabbitmq_key.p12";
????????String?tksFile?=?"/sslpath/ssl/truststore";
????????SSLContext?c?=?null;
????????try?{
????????????char[]?keyPassphrase?=?"YOUR?PASSPHRASE".toCharArray();
????????????KeyStore?ks?=?KeyStore.getInstance("PKCS12");
????????????ks.load(new?FileInputStream(ksFile),?keyPassphrase);
????????????KeyManagerFactory?kmf?=?KeyManagerFactory.getInstance("SunX509");
????????????kmf.init(ks,?keyPassphrase);
????????????char[]?trustPassphrase?=?"YOUR?PASSPHRASE".toCharArray();
????????????KeyStore?tks?=?KeyStore.getInstance("JKS");
????????????tks.load(new?FileInputStream(tksFile),?trustPassphrase);
????????????TrustManagerFactory?tmf?=?TrustManagerFactory.getInstance("SunX509");
????????????tmf.init(tks);
????????????c?=?SSLContext.getInstance("tlsv1.2");
????????????c.init(kmf.getKeyManagers(),?tmf.getTrustManagers(),?null);
????????}?catch?(KeyStoreException?e)?{
????????????throw?new?RuntimeException(e);
????????}?catch?(NoSuchAlgorithmException?e)?{
????????????throw?new?RuntimeException(e);
????????}?catch?(CertificateException?e)?{
????????????throw?new?RuntimeException(e);
????????}?catch?(UnrecoverableKeyException?e)?{
????????????throw?new?RuntimeException(e);
????????}?catch?(KeyManagementException?e)?{
????????????throw?new?RuntimeException(e);
????????}
????????factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
????????factory.useSslProtocol(c);
????????//設置Vhost,需要在控制臺先創建????
????????factory.setVirtualHost("vhost");
????????//基于網絡環境合理設置超時時間????
????????factory.setConnectionTimeout(30?*?1000);
????????factory.setHandshakeTimeout(30?*?1000);
????????factory.setShutdownTimeout(0);
????????Connection?connection?=?factory.newConnection();
????????Channel?channel?=?connection.createChannel();
????????//聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。????
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????System.out.println("??[*]??Waiting??for??messages.??To??exit??press??CTRL+C");
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?message?=?new?String(body,?StandardCharsets.UTF_8);
????????????????System.out.println("Received?message:?'"?+?message?+?"'");
????????????}
????????};
????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
????}
}