收發普通消息
更新時間 2025-07-03 12:03:18
最近更新時間: 2025-07-03 12:03:18
分享文章
本章節介紹普通消息的收發方法和示例代碼。其中,普通消息發送方式分為同步發送、異步發送、單向發送。
- 同步發送:同步發送是指消息發送方發出一條消息后,會在收到服務端同步響應之后才發下一條消息的通訊方式。
- 異步發送:異步發送是指發送方發出一條消息后,不等服務端返回響應,接著發送下一條消息的通訊方式。
- 單向發送:發送方只負責發送消息,不等待服務端返回響應且沒有回調函數觸發。
- 收發消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
準備環境
開源的Java客戶端支持連接分布式消息服務RocketMQ版,推薦使用的客戶端版本為4.9.7。
通過以下任意一種方式引入依賴:
-
使用Maven方式引入依賴。
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.7</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.9.7</version> </dependency> </dependencies> -
點擊下載依賴JAR包:rocketmq-all-4.9.7-bin-release.zip
同步發送
同步發送是最常用的方式,是指消息發送方發出一條消息后,會在收到服務端同步響應之后才發下一條消息的通訊方式,可靠的同步傳輸被廣泛應用于各種場景,如重要的通知消息、短消息通知等。
參考如下示例代碼
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerNormalExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
"accessSecret" // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的密鑰
));
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制臺獲取NAMESRV接入點地址
producer.setNamesrvAddr("XXX:xxx");
//producer.setUseTLS(true); // 如果需要開啟SSL,請增加此行代碼
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("YOUR TOPIC",
"TagA",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
異步發送
消息發送方在發送了一條消息后,不需要等待服務端響應即可發送第二條消息,發送方通過回調接口接收服務端響應,并處理響應結果。異步發送一般用于鏈路耗時較長,對響應時間較為敏感的業務場景。例如,視頻上傳后通知啟動轉碼服務,轉碼完成后通知推送轉碼結果等。
參考如下示例代碼
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncProducerNormalExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("accessKey", // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
"accessSecret" // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的密鑰
));
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制臺獲取NAMESRV接入點地址
producer.setNamesrvAddr("XXX:xxx");
//producer.setUseTLS(true); //如果需要開啟SSL,請增加此行代碼
producer.start();
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"TagA", // 設置消息的TAG,若無可設置為空
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.println("send message success. msgId= " + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println("send message failed.");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
單向發送
發送方只負責發送消息,不等待服務端返回響應且沒有回調函數觸發,即只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。適用于某些耗時非常短,但對可靠性要求并不高的場景,例如日志收集。
參考如下示例代碼
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducerNormalExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
"accessSecret" // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的密鑰
));
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制臺獲取NAMESRV接入點地址
producer.setNamesrvAddr("XXX:xxx");
//producer.setUseTLS(true); // 如果需要開啟SSL,請增加此行代碼
producer.start();
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest",
"TagA", // 設置消息的TAG,若無可設置為空
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
訂閱普通消息
參考如下示例代碼
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class ConsumerNormalExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
"accessSecret" // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的密鑰
));
}
public static void main(String[] args) throws Exception {
/*
* 創建Consumer,如果想開啟消息軌跡,可以按照如下方式創建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely(), true, null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely());
// 填入控制臺NAMESRV接入點地址
consumer.setNamesrvAddr("XXX:xxx");
// consumer.setUseTLS(true); // 如果需要開啟SSL,請增加此行代碼
/*
* 如果想要消費指定TAG的消息,可以按照如下方式訂閱:*為訂閱所有的TAG
* pushConsumer.subscribe(TOPIC_NAME, "Tag1");
*/
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer Started.");
}
}