背景信息
RocketMQ的生產消費驗證是指在使用RocketMQ進行消息生產和消費時的驗證過程。具體而言,驗證包括以下幾個方面:
- 生產者驗證:RocketMQ提供了豐富的生產者API,開發人員可以使用這些API將消息發送到RocketMQ的消息隊列中。在驗證階段,可以通過發送消息并檢查返回結果來確保消息成功發送到Broker節點。此外,生產者還應該驗證消息的順序性、事務性以及可靠性等方面。
- 消費者驗證:RocketMQ的消費者可以訂閱特定的消息主題,從而消費這些主題下的消息。在驗證階段,消費者應該能夠正確地從Broker節點拉取消息并進行消費處理。消費者還可以驗證消息的順序性、重試機制以及消息過濾等功能。
操作步驟
1、 天翼云官網點擊控制中心,選擇產品分布式消息服務RocketMQ。
2、 登錄分布式消息服務RocketMQ控制臺,點擊右上角地域選擇對應資源池。
進入實例列表,點擊【管理】按鈕進入管理菜單。
3、 進入實例列表,點擊【管理】按鈕進入管理菜單。
4、 進入主題管理菜單,點擊【撥測】按鈕,進行生產消費的撥測驗證,驗證開通的消息實例和主題。

1)生產測試撥測:

- 選擇消息類型,默認普通消息。
- 填寫需要產生的測試消息數量,以及每條消息的大小,默認每條消息1KB,建議不超過4MB(4096KB)。
- 選擇已建的消息主題,若無選項,請新增主題,詳見上文創建主題和訂閱組。
- 點擊【測試】按鈕,按照已填寫規格及數量產生測試消息數據,展示消息數據的信息,包括消息ID(messageID)、發送狀態、主題名(topic名)、Broker名、隊列ID。
撥測功能涉及消息發送狀態碼,以下是RocketMQ消息發送狀態碼及其說明:
? SEND_OK(發送成功):表示消息成功發送到了消息服務器。
? FLUSH_DISK_TIMEOUT(刷新磁盤超時):表示消息已經成功發送到消息服務器,但是刷新到磁盤上超時。這可能會導致消息服務器在宕機后,尚未持久化到磁盤上的數據丟失。
? FLUSH_SLAVE_TIMEOUT(刷新從服務器超時):表示消息已經成功發送到消息服務器,但是刷新到從服務器上超時。這可能會導致主從同步不一致。
? SLAVE_NOT_AVAILABLE(從服務器不可用):表示消息已經成功發送到消息服務器,但是從服務器不可用。這可能是由于網絡故障或從服務器宕機引起的。
? UNKNOWN_ERROR(未知錯誤):表示發送消息時遇到了未知的錯誤。一般情況下建議重試發送消息。
? MESSAGE_SIZE_EXCEEDED(消息大小超過限制):表示消息的大小超過了消息服務器的限制。需要檢查消息的大小是否合適。
? PRODUCE_THROTTLE(消息生產被限流):表示消息生產者的頻率超出了消息服務器的限制。這可能是由于消息發送頻率過高引起的。
? SERVICE_NOT_AVAILABLE(服務不可用):表示消息服務器不可用。這可能是由于網絡故障或者消息服務器宕機引起的。
請注意,以上狀態碼僅適用于RocketMQ消息發送階段,并且并不代表消息是否成功被消費者接收。同時,這些狀態碼也可能因版本變化而有所不同,建議查閱官方文檔獲取最新信息。
2)消費測試撥測:

- 選擇消息順序,下拉選擇無序/有序,默認選項為無序。
RocketMQ是一種開源的分布式消息中間件,它支持有序消息和無序消息。
? 有序消息是指消息的消費順序與發送順序完全一致。在某些業務場景下,消息的處理需要保證順序性,例如訂單的處理或者任務的執行。RocketMQ提供了有序消息的支持,通過指定消息的順序屬性或使用消息隊列的分區機制,可以確保消息按照指定的順序進行消費。
? 無序消息則是指消息的消費順序與發送順序無關。無序消息的特點是高吞吐量和低延遲,適用于一些不要求嚴格順序的業務場景,如日志收集等。
在RocketMQ中,有序消息和無序消息的實現方式略有不同。有序消息需要借助MessageQueue的分區機制和消費者端的順序消息消費來實現。而無序消息則是通過消息的發送和接收的并發處理來實現的。
總的來說,RocketMQ既支持有序消息也支持無序消息,根據業務需求選擇合適的消息類型來滿足業務的要求。
- 選擇消費方式,目前僅提供pull方式。值得注意的是,RocketMQ還提供了推送(push)方式的消費模式,其中消息隊列服務器會主動將消息推送給消費者。但在當前僅限于pull方式的消費模式。
- 填寫消費數量。
- 下拉選擇選擇已建的消息主題和訂閱組,若無選項,請新增主題和訂閱組,詳見上文創建主題和訂閱組。
- 點擊【測試】按鈕,按照已填寫規格及數量產生消費數據,展示消息數據的信息,包括消息ID(messageID)、主題名稱(topicName)、生成時間、存儲時間、隊列ID、消費狀態。
撥測功能涉及消息消費狀態碼,RocketMQ消費狀態碼是指在消息消費過程中,對消費結果進行標識的狀態碼。以下是常見的RocketMQ消費狀態碼:
? CONSUME_SUCCESS(消費成功):表示消息成功被消費。
? RECONSUME_LATER(稍后重試):表示消費失敗,需要稍后再次進行消費。
? CONSUME_FAILURE(消費失敗):表示消息消費出現異常或失敗。
? SLAVE_NOT_AVAILABLE(從節點不可用):表示消費者無法訪問從節點來消費消息。
? NO_MATCHED_MESSAGE(無匹配的消息):表示當前沒有匹配的消息需要消費。
? OFFSET_ILLEGAL(偏移量非法):表示消費的偏移量參數不合法。
? BROKER_TIMEOUT(Broker超時):表示由于Broker超時導致消費失敗。
5、 用戶應用按照規范接入RocketMQ,發送、消費消息。
1)生產者示例API
以下適用于南京3、上海7、重慶2、烏魯木齊27、保定、石家莊20、內蒙6、晉中、北京5節點。
--ctgmq引擎版本,SDK下載方式詳見環境準備-其他工具章節。
package com.ctg.guide;
import com.ctg.mq.api.CTGMQFactory;
import com.ctg.mq.api.IMQProducer;
import com.ctg.mq.api.PropertyKeyConst;
import com.ctg.mq.api.bean.MQMessage;
import com.ctg.mq.api.bean.MQSendResult;
import com.ctg.mq.api.exception.MQException;
import com.ctg.mq.api.exception.MQProducerException;
import java.util.Properties;
/**
* Producer,發送消息
*/
public class Producer {
public static void main(String[] args) throws InterruptedException, MQException {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ProducerGroupName, "producer_group");
properties.setProperty(PropertyKeyConst.NamesrvAddr, "10.50.208.1:9876;10.50.208.2:9876;10.50.208.3:9876");
properties.setProperty(PropertyKeyConst.NamesrvAuthID, "app4test");
properties.setProperty(PropertyKeyConst.NamesrvAuthPwd, "******"); properties.setProperty(PropertyKeyConst.ClusterName, "defaultMQBrokerCluster");
properties.setProperty(PropertyKeyConst.TenantID, "defaultMQTenantID");
IMQProducer producer = CTGMQFactory.createProducer(properties);//建議應用啟動時創建
int connectResult = producer.connect();
if(connectResult != 0){
return;
}
for (int i = 0; i < 10; i++) {
try {
MQMessage message = new MQMessage(
"test_topic_1",// topic
"ORDER_KEY_"+i,// key
"ORDER_TAG",//tag
("HELLO ORDER BODY" + i).getBytes()// body
);
MQSendResult sendResult = producer.send(message);
//System.out.println(sendResult);
//TODO
} catch (MQProducerException e) {
//TODO
}
}
producer.close();//建議應用關閉時關閉
}
以下適用于華東1、華北2、西南1、華南2、上海36、青島20、長沙42、南昌5、武漢41、杭州7、西南2-貴州、太原4、鄭州5、西安7、呼和浩特3節點。
--rocketmq引擎版本,SDK下載方式詳見環境準備-其他工具章節。
importorg.apache.rocketmq.client.exception.MQClientException;
importorg.apache.rocketmq.client.producer.DefaultMQProducer;
importorg.apache.rocketmq.client.producer.SendResult;
importorg.apache.rocketmq.common.message.Message;
importorg.apache.rocketmq.remoting.common.RemotingHelper;
importorg.apache.rocketmq.acl.common.AclClientRPCHook;
publicclassProducer{
publicstaticvoidmain(String[]?args)throwsMQClientException,InterruptedException{
AclClientRPCHook?rpcHook?=newAclClientRPCHook(
newSessionCredentials(ACCESS_KEY,?SECRET_KEY));
DefaultMQProducer?producer?=newDefaultMQProducer("ProducerGroupName",?rpcHook);
//?填入元數據地址
????????producer.setNamesrvAddr("192.168.0.1:9876");
//producer.setUseTLS(true);????//如果需要開啟SSL,請增加此行代碼
????????producer.start();
for(int?i?=0;?i?<128;?i++)
try{
{
Message?msg?=newMessage("TopicTest",
"TagA",
"OrderID188",
"Hello?world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult?sendResult?=?producer.send(msg);
System.out.printf("%s%n",?sendResult);
}
}catch(Exception?e){
????????????????e.printStackTrace();
}
????????producer.shutdown();
}
}
示例參數說明:
Namesrv地址

Namesrv地址可從控制臺查看,多個地址按分號分隔:

應用用戶和密碼

這個應用用戶和密碼就是控制臺創建的應用用戶和密碼。
租戶id和集群名

集群名和租戶id可以從應用用戶管理查詢:

生產組

生產組名不需要提前創建,只需創建生產者時候配置,服務端會自動創建。建議按業務規劃好生產組名,嚴禁按隨機方式生成生產組名。
6.消費者示例API
以下適用于南京3、上海7、重慶2、烏魯木齊27、保定、石家莊20、內蒙6、晉中、北京5節點。
--ctgmq引擎版本,SDK下載方式詳見環境準備-其他工具章節。
package com.ctg.guide;
import com.ctg.mq.api.enums.MQConsumeFromWhere;
import com.ctg.mq.api.CTGMQFactory;
import com.ctg.mq.api.IMQPushConsumer;
import com.ctg.mq.api.PropertyKeyConst;
import com.ctg.mq.api.bean.MQResult;
import com.ctg.mq.api.listener.ConsumerTopicListener;
import com.ctg.mq.api.listener.ConsumerTopicStatus;
import java.util.List;
import java.util.Properties;
public class PushConsumer {
public static void main(String[] args) throws Exception {
final Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ConsumerGroupName, "test_consumer_1");
properties.setProperty(PropertyKeyConst.NamesrvAddr, "10.50.208.1:9876;10.50.208.2:9876;10.50.208.3:9876");
properties.setProperty(PropertyKeyConst.NamesrvAuthID, "app4test");
properties.setProperty(PropertyKeyConst.NamesrvAuthPwd, "******"); properties.setProperty(PropertyKeyConst.ClusterName, "defaultMQBrokerCluster");
properties.setProperty(PropertyKeyConst.TenantID, "defaultMQTenantID");
IMQPushConsumer consumer = CTGMQFactory.createPushConsumer(properties);
int connectResult = consumer.connect();
if (connectResult != 0) {
return;
}
consumer.listenTopic("test_topic_1", null, new ConsumerTopicListener() {
@Override
public ConsumerTopicStatus onMessage(List<MQResult> mqResultList) {
//mqResultList 默認為1,可通過批量消費數量設置
for(MQResult result : mqResultList) {
//TODO
System.out.println(result);
}
return ConsumerTopicStatus.CONSUME_SUCCESS;//對消息批量確認(成功)
//return ConsumerTopicStatus.RECONSUME_LATER;//對消息批量確認(失敗)
}
});
}
}
以下適用于華東1、華北2、西南1、華南2、上海36、青島20、長沙42、南昌5、武漢41、杭州7、西南2-貴州、太原4、鄭州5、西安7、呼和浩特3節點。
--rocketmq引擎版本,SDK下載方式詳見環境準備-其他工具章節。
importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
importorg.apache.rocketmq.client.exception.MQClientException;
publicclassPushConsumer{
publicstaticvoidmain(String[]?args)throwsException{
AclClientRPCHook?rpcHook?=newAclClientRPCHook(
newSessionCredentials(ACCESS_KEY,?SECRET_KEY));
DefaultMQPushConsumer?consumer?=newDefaultMQPushConsumer(rpcHook);
????consumer.setConsumerGroup("ConsumerGroupName");
//?填入元數據地址
????????consumer.setNamesrvAddr("192.168.0.1:9876");
//consumer.setUseTLS(true);????//如果需要開啟SSL,請增加此行代碼
????????consumer.subscribe("TopicTest","*");
????????consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
System.out.printf("%s?Receive?New?Messages:?%s?%n",Thread.currentThread().getName(),?msgs);
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
????????consumer.start();
System.out.printf("Consumer?Started.%n");
}
}
示例參數說明:
Namesrv地址

Namesrv地址可從控制臺查看,多個地址按分號分隔。

應用用戶和密碼

這個應用用戶和密碼就是控制臺創建的應用用戶和密碼。
租戶id和集群名

集群名和租戶id可以從應用用戶管理查詢。

訂閱組

訂閱組名需要在控制臺提前創建好。
