收發定時/延時消息
更新時間 2025-04-22 14:04:26
最近更新時間: 2025-04-22 14:04:26
分享文章
分布式消息服務RocketMQ版支持任意時間的定時消息,最大推遲時間可達到40天。
定時消息即生產者生產消息到分布式消息服務RocketMQ版后,消息不會立即被消費,而是延遲到設定的時間點后才會發送給消費者進行消費。
收發消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
準備環境
-
在命令行輸入python,檢查是否已安裝Python。得到如下回顯,說明Python已安裝。
PS?C:\>?python Python?3.9.9?(tags/v3.9.9:ccb0e6a,?Nov?15?2021,?18:08:50)?[MSC?v.1929?64?bit?(AMD64)]?on?win32 Type?"help",?"copyright",?"credits"?or?"license"?for?more?information.如果未安裝Python,請使用以下命令安裝:
pip?install?rocketmq-client-python -
安裝librocketmq庫和rocketmq-client-python。
說明建議下載rocketmq-client-cpp-2.2.0,獲取librocketmq庫。
-
將librocketmq.so添加到系統動態庫搜索路徑。
- 查找librocketmq.so的路徑。
find?/?-name?librocketmq.so - 將librocketmq.so添加到系統動態庫搜索路徑。
ln?-s?/查找到的librocketmq.so路徑/librocketmq.so?/usr/lib sudo?ldconfig
- 查找librocketmq.so的路徑。
以下示例代碼中的參數說明如下,請參考收集連接信息獲取參數值。
- GROUP:表示消費組名稱。
- ENDPOINT:表示實例連接地址和端口。
- TOPIC:表示Topic名稱。
發送消息
參考如下示例代碼。
import?datetime
from?rocketmq.client?import?Producer,?Message
from?rocketmq.exceptions?import?RocketMQException
endpoint?=?"${ENDPOINT}"??#?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
access_key?=?"${ACCESS_KEY}"??#?填寫AccessKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
access_secret?=?"${SECRET_KEY}"??#?填寫SecretKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
topic?=?"${TOPIC}"??#?填寫Topic,在管理控制臺創建
producer_group?=?"${GROUP}"??#?生產者組group
#?初始化生產者
producer?=?Producer(producer_group)
producer.set_namesrv_addr(endpoint)
producer.set_session_credentials(access_key,?access_secret,?"")
#?啟動生產者
try:
????producer.start()
except?RocketMQException?as?e:
????print('start?producer?error:',?e)
????exit(1)
msg?=?Message(topic)
msg.set_body("Hello?RocketMQ")
delay_time?=?10
#?發送任意延遲消息,時間單位為毫秒,如下所示:消息將在10s后投遞
delay_timestamp?=?int((datetime.datetime.now()?+?datetime.timedelta(seconds=delay_time)).timestamp()?*?1000)
msg.set_property('__STARTDELIVERTIME',?str(delay_timestamp))
#?發送消息
try:
????result?=?producer.send_sync(msg)
????print('send?result:',?result)
except?RocketMQException?as?e:
????print('send?message?error:',?e)
????producer.shutdown()
????exit(1)
#?關閉生產者實例,釋放資源
producer.shutdown()
訂閱消息
參考如下示例代碼。
import?time
from?rocketmq.client?import?PushConsumer,?ConsumeStatus
endpoint?=?"${ENDPOINT}"??#?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
access_key?=?"${ACCESS_KEY}"??#?填寫AccessKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
access_secret?=?"${SECRET_KEY}"??#?填寫SecretKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
topic?=?"${TOPIC}"??#?填寫Topic,在管理控制臺創建
group?=?"${GROUP}"??#?填寫訂閱組group,在管理控制臺創建
def?callback(msg):
????print(msg.id,?msg.body)
????return?ConsumeStatus.CONSUME_SUCCESS
consumer?=?PushConsumer(group)
consumer.set_name_server_address(endpoint)
consumer.set_session_credentials(access_key,?access_secret,?"")
consumer.subscribe(topic,?callback)
consumer.start()
while?True:
????time.sleep(3600)
consumer.shutdown()