收發順序消息
更新時間 2025-04-22 14:04:26
最近更新時間: 2025-04-22 14:04:26
分享文章
順序消息是分布式消息服務RocketMQ版提供的一種嚴格按照順序來發布和消費的消息類型。
順序消息分為全局順序消息和分區順序消息:
- 全局順序消息:對于指定的一個Topic,將隊列數量設置為1,這個隊列內所有消息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和訂閱。
- 分區順序消息:對于指定的一個Topic,同一個隊列內的消息按照嚴格的FIFO順序進行發布和訂閱。生產者指定分區選擇算法,保證需要按順序消費的消息被分配到同一個隊列。
全局順序消息和分區順序消息的區別僅為隊列數量不同,代碼沒有區別。
收發消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
準備環境
-
在命令行輸入python,檢查是否已安裝Python。得到如下回顯,說明Python已安裝。
PS?C:\>?python Python?3.9.9?(tags/v3.9.9:ccb0e6a,?Nov?15?2021,?18:08:50)?[MSC?v.1929?64?bit?(AMD64)]?on?win32 Type?"help",?"copyright",?"credits"?or?"license"?for?more?information.如果未安裝Python,請使用以下命令安裝:
pip?install?rocketmq-client-python -
安裝librocketmq庫和rocketmq-client-python。
說明建議下載rocketmq-client-cpp-2.2.0,獲取librocketmq庫。
-
將librocketmq.so添加到系統動態庫搜索路徑。
- 查找librocketmq.so的路徑。
find?/?-name?librocketmq.so - 將librocketmq.so添加到系統動態庫搜索路徑。
ln?-s?/查找到的librocketmq.so路徑/librocketmq.so?/usr/lib sudo?ldconfig
- 查找librocketmq.so的路徑。
以下示例代碼中的參數說明如下,請參考收集連接信息獲取參數值。
- GROUP:表示消費組名稱。
- ENDPOINT:表示實例連接地址和端口。
- TOPIC:表示Topic名稱。
發送消息
參考如下示例代碼。
from?rocketmq.client?import?Producer,?Message
endpoint?=?"${ENDPOINT}"??#?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
access_key?=?"${ACCESS_KEY}"??#?填寫AccessKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
access_secret?=?"${SECRET_KEY}"??#?填寫SecretKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
topic?=?"${TOPIC}"??#?填寫Topic,在管理控制臺創建
producer_group?=?"${GROUP}"??#?生產者組group
#?創建并啟動生產者實例
producer?=?Producer(producer_group)
producer.set_name_server_address(endpoint)
producer.set_session_credentials(access_key,?access_secret,?"")
producer.start()
msg?=?Message(topic)
msg.set_body("Hello?RocketMQ")
msg.set_keys("")??#?消息key
msg.set_tags("")??#?消息tag
sharding_key?=?"key"??#?指定消息投遞的sharding?key
#?根據Sharding?Key,發送順序消息,
ret?=?producer.send_orderly_with_sharding_key(msg,?sharding_key)
print(ret.status,?ret.msg_id,?ret.offset)
#?關閉生產者實例,釋放資源
producer.shutdown()
訂閱消息
參考如下示例代碼。
import?time
from?rocketmq.client?import?PushConsumer,?ConsumeStatus
endpoint?=?"${ENDPOINT}"??#?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
access_key?=?"${ACCESS_KEY}"??#?填寫AccessKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
access_secret?=?"${SECRET_KEY}"??#?填寫SecretKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
topic?=?"${TOPIC}"??#?填寫Topic,在管理控制臺創建
group?=?"${GROUP}"??#?填寫訂閱組group,在管理控制臺創建
def?callback(msg):
????print(msg.id,?msg.body)
????return?ConsumeStatus.CONSUME_SUCCESS
consumer?=?PushConsumer(group,?orderly=True)??#?指定消費者為順序消費類型
consumer.set_name_server_address(endpoint)
consumer.set_session_credentials(access_key,?access_secret,?"")
consumer.subscribe(topic,?callback)
consumer.start()
while?True:
????time.sleep(3600)
consumer.shutdown()