概述
本文介紹使用 RocketMQ C++(4.x) 客戶端 SDK,訪問分布式消息服務RocketMQ,幫助您更好地理解消息收發的完整過程。
前置條件
安裝gcc-c++ 4.8.2 及以上版本,需支持C++11。
安裝cmake 2.8.0及以上版本。
安裝automake 1.11.1及以上版本。
安裝autoconf 2.65及以上版本。
安裝libtool 2.2.6 及以上版本。
環境準備
需要在客戶端環境安裝 RocketMQ-Client-CPP 庫,根據官方文檔進行安裝即可: 安裝 CPP 動態庫,推薦使用 master 分支構建。
在項目中引入 RocketMQ-Client-CPP 相關頭文件及動態庫,詳見實例代碼頭文件。
使用g++命令獲得可執行文件,如:
g++ -o xxxProducer xxxProducer.cpp -lrocketmq -lpthread -lz -ldl -lrt
使用 C++ SDK 收發普通消息
發送普通消息
#include <iostream>
#include <chrono>
#include <thread>
#include <string>
#include "rocketmq/DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
int main(){
DefaultMQProducer producer("group_name");
//填寫分布式消息服務RocketMQ版的接入點
producer.setNamesrvAddr("your access point");
//填寫分布式消息服務RocketMQ版的ak、sk
producer.setSessionCredentials("ak", "sk", "channel");
producer.start();
int count = 64;
for (int i = 0; i < count; ++i)
{
//填入主題名、tag名、消息body
MQMessage msg("topic_name", "TAG", "msg content");
try
{
SendResult sendResult = producer.send(msg);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl;
this_thread::sleep_for(chrono::seconds(1));
}
catch (MQException e)
{
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
std::cout << "Send " << count << " messages OK, costs" << std::endl;
producer.shutdown();
return 0;
}
收取普通消息
#include <iostream>
#include <thread>
#include "rocketmq/DefaultMQPushConsumer.h"
using namespace rocketmq;
class ConcurrentMessageListener : public MessageListenerConcurrently
{
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs)
{
for (auto item = msgs.begin(); item != msgs.end(); item++)
{
std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]){
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("consumer_group");
consumer->setNamesrvAddr("your access point");
consumer->setSessionCredentials("ak", "sk");
ConcurrentMessageListener *messageListener = new ConcurrentMessageListener();
consumer->subscribe("topic_name", "tag");
consumer->registerMessageListener(messageListener);
consumer->start();
std::this_thread::sleep_for(std::chrono::seconds(60));
consumer->shutdown();
return 0;
}
使用C++客戶端收發順序消息
簡介
順序消息分為兩類,全局順序消息和分區順序消息,通過隊列數區分。
全局順序:
對于指定的一個 Topic,所有消息的生產和消費需要遵循一定的順序,消息的消費順序必須和生產順序一致,即需要嚴格的先入先出 FIFO的順序進行發布和消費。
分區順序:
對于指定的一個 Topic,其中每一個分區的消息生產與消費是有序的,同一個隊列內的消息按照嚴格的 FIFO 順序進行發布和訂閱。消息投遞到哪一個分區由消息的 Sharding Key 來進行區分。在 SDK 中可以通過指定 Sharding Key 和回調函數來控制消息投遞到哪個分區。
發送順序消息
#include <iostream>
#include <chrono>
#include <thread>
#include "rocketmq/DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
class DefineSelectMessageQueue : public MessageQueueSelector
{
public:
MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg)
{
//若希望全局有序,請修改對應index
int orderId = *static_cast<int *>(arg);
int index = orderId % mqs.size();
return mqs[index];
}
};
int main(){
DefaultMQProducer producer("group_name");
//填寫分布式消息服務RocketMQ版的接入點
producer.setNamesrvAddr("your access point");
//填寫分布式消息服務RocketMQ版的ak、sk
producer.setSessionCredentials("ak", "sk", "channel");
producer.start();
DefineSelectMessageQueue *queueSelector = new DefineSelectMessageQueue();
int count = 64;
for (int i = 0; i < count; ++i)
{
MQMessage msg("you_topic_name", "TAG", "msg content");
try
{
SendResult sendResult = producer.send(msg, queueSelector, &i, 3, false);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl;
this_thread::sleep_for(chrono::seconds(1));
}
catch (MQException e)
{
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
std::cout << "Send " << count << " messages OK, costs" << std::endl;
producer.shutdown();
return 0;
}
消費順序消息
#include <iostream>
#include <thread>
#include "rocketmq/DefaultMQPushConsumer.h"
using namespace rocketmq;
class OrderlyMessageListener : public MessageListenerOrderly
{
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs)
{
for (auto item = msgs.begin(); item != msgs.end(); item++)
{
std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]){
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_group");
consumer->setNamesrvAddr("your access point");
consumer->setSessionCredentials("ak", "sk", "VOLC");
OrderlyMessageListener *messageListener = new OrderlyMessageListener();
consumer->subscribe("topic_name", "tag");
consumer->registerMessageListener(messageListener);
consumer->start();
std::this_thread::sleep_for(std::chrono::seconds(60));
consumer->shutdown(); return 0;
}
使用C++客戶端收發事務消息
簡介
業務側通過 sendMessageInTransaction 發送消息到 RocketMQ 服務端。
業務側通過 executeLocalTransaction 執行本地事務。
實現業務查詢事務執行是否成功的接口 checkLocalTransaction。
使用C++客戶端發送事務消息
#include <iostream>
#include <chrono>
#include <thread>
#include "rocketmq/TransactionMQProducer.h"
#include "rocketmq/MQClientException.h"
#include "rocketmq/TransactionListener.h"
using namespace std;
using namespace rocketmq;
class DefineTransactionListener : public TransactionListener
{
public:
LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg)
{
/*
執行本地事務
1. 成功返回COMMIT_MESSAGE
2. 失敗返回ROLLBACK_MESSAGE
3. 不確定返回UNKNOWN。服務端會觸發定時任務回查狀態
*/
std::cout << "Execute Local Transaction,Received Message Topic:" << msg.getTopic()
<< ", transactionId:" << msg.getTransactionId() << std::endl;
return UNKNOWN;
}
LocalTransactionState checkLocalTransaction(const MQMessageExt &msg)
{
/*
回查本地事務執行情況
1. 成功返回COMMIT_MESSAGE
2. 失敗返回ROLLBACK_MESSAGE
3. 不確定返回UNKNOWN。則等待下次定時任務回查。
*/
std::cout << "Check Local Transaction,Received Message Topic:" << msg.getTopic()
<< ", MsgId:" << msg.getMsgId() << std::endl;
return COMMIT_MESSAGE;
}
};
int main(){
TransactionMQProducer producer("producer_group_name");
producer.setNamesrvAddr("accesspoint");
producer.setSessionCredentials("ak", "sk", "channel");
DefineTransactionListener *exampleTransactionListener = new DefineTransactionListener();
producer.setTransactionListener(exampleTransactionListener);
producer.start();
int count = 3;
for (int i = 0; i < count; ++i)
{
MQMessage msg("TRANSACTION TOPIC", "TAG", "Transaction content");
try
{
SendResult sendResult = producer.sendMessageInTransaction(msg, &i);
std::cout << "SendResult:" << sendResult.getSendStatus()
<< ", Message ID: " << sendResult.getMsgId()
<< std::endl;
this_thread::sleep_for(chrono::seconds(1));
}
catch (MQException e)
{
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
std::cout << "Send " << count << " messages OK " << endl; std::cout << "Wait for local transaction check..... " << std::endl;
for (int i = 0; i < 6; ++i)
{
this_thread::sleep_for(chrono::seconds(10));
std::cout << "Running " << i * 10 + 10 << " Seconds......" << std::endl;
}
producer.shutdown();
return 0;
}
使用C++客戶端消費事務消息
和消費普通消息一致,請參考對應部分。
使用C++客戶端收發延時消息
使用C++客戶端發送延時消息
#include <iostream>
#include <chrono>
#include <thread>
#include <string>
#include "rocketmq/DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
int main(){
DefaultMQProducer producer("producer_group_name");
producer.setNamesrvAddr("accesspoint");
producer.setSessionCredentials("ak", "sk", "volc");
producer.start();
int count = 64;
for (int i = 0; i < count; ++i)
{
MQMessage msg("you topic name", "TAG", "msg content");
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(5);
try
{
SendResult sendResult = producer.send(msg);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl;
this_thread::sleep_for(chrono::seconds(1));
}
catch (MQException e)
{
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
}
std::cout << "Send " << count << " messages OK, costs" << std::endl;
producer.shutdown();
return 0;
}
使用C++客戶端消費延時消息
和消費普通消息一致,請參考對應部分。