收發順序消息
更新時間 2025-04-22 14:04:25
最近更新時間: 2025-04-22 14:04:25
分享文章
順序消息是分布式消息服務RocketMQ版提供的一種嚴格按照順序來發布和消費的消息類型。
順序消息分為全局順序消息和分區順序消息:
- 全局順序消息:對于指定的一個Topic,將隊列數量設置為1,這個隊列內所有消息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和訂閱。
- 分區順序消息:對于指定的一個Topic,同一個隊列內的消息按照嚴格的FIFO順序進行發布和訂閱。生產者指定分區選擇算法,保證需要按順序消費的消息被分配到同一個隊列。
全局順序消息和分區順序消息的區別僅為隊列數量不同,代碼沒有區別。
收發消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
準備環境
開源的Java客戶端支持連接分布式消息服務RocketMQ版,推薦使用的客戶端版本為4.9.7。
通過以下任意一種方式引入依賴:
-
使用Maven方式引入依賴。
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.7</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.9.7</version> </dependency> </dependencies> -
點擊下載依賴JAR包:rocketmq-all-4.9.7-bin-release.zip
發送順序消息
參考如下示例代碼
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerFifoExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
"accessSecret" // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的密鑰
));
}
public static void main(String[] args) throws Exception {
/*
* 創建Producer,如果想開啟消息軌跡,可以按照如下方式創建:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制臺NAMESRV接入點地址
producer.setNamesrvAddr("XXX:xxx");
//producer.setUseTLS(true); // 如果需要開啟SSL,請增加此行代碼
producer.start();
for (int i = 0; i < 128; i++) {
try {
int orderId = i % 10;
Message msg = new Message("TopicTest",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
// 選擇適合自己的分區選擇算法,保證同一個參數得到的結果相同。
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}, orderId);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
注意上述代碼中,相同id的消息需要保證順序,不同id的消息不需要保證順序,所以在分區選擇算法中以“id/隊列個數的余數”作為消息發送的隊列。
訂閱順序消息
參考如下示例代碼
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class ConsumerFifoExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
"accessSecret" // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的密鑰
));
}
public static void main(String[] args) throws Exception {
/*
* 創建Consumer,如果想開啟消息軌跡,可以按照如下方式創建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely(), true, null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely());
// 填入控制臺NAMESRV接入點地址
consumer.setNamesrvAddr("XXX:xxx");
// consumer.setUseTLS(true); // 如果需要開啟SSL,請增加此行代碼
/*
* 如果想要消費指定TAG的消息,可以按照如下方式訂閱:* 為訂閱所有的TAG
* pushConsumer.subscribe(TOPIC_NAME, "Tag1");
*/
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
System.out.println("Consumer Started.");
}
}