收發定時/延時消息
更新時間 2025-04-22 14:04:26
最近更新時間: 2025-04-22 14:04:26
分享文章
分布式消息服務RocketMQ版支持任意時間的定時消息,最大推遲時間可達到40天。
定時消息即生產者生產消息到分布式消息服務RocketMQ版后,消息不會立即被消費,而是延遲到設定的時間點后才會發送給消費者進行消費。
發送定時消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
適用場景
定時消息適用于以下場景:
- 消息對應的業務邏輯有時間窗口要求,如電商交易中超時未支付關閉訂單的場景。在訂單創建時發送一條定時消息,5分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應訂單是否完成支付,如果未完成支付,則關閉訂單。如果已完成,則忽略。
- 通過消息觸發定時任務的場景,如在某些固定時間點向用戶發送提醒消息。
注意
定時消息的最大延遲時間為40天,延遲超過40天的消息將會發送失敗。
定時消息的定時時間如果被設置成當前時間戳之前的某個時刻,消息將立刻投遞給消費者。
定時消息的精度有1s~2s的延遲誤差
無法確保定時消息僅投遞一次,定時消息可能會重復投遞。
定時消息的定時時間是服務端開始向消費端投遞的時間。如果消費者當前有消息堆積,那么定時消息會排在堆積消息后面,將不能嚴格按照配置的時間進行投遞。
由于客戶端和服務端可能存在時間差,消息的實際投遞時間與客戶端設置的投遞時間之間可能存在偏差,以服務端時間為準。
設置定時消息的投遞時間后,依然受消息老化時間限制,默認消息過期時間為7天。例如,設置定時消息5天后才能被消費,如果第5天后一直沒被消費,那么這條消息將在第12天被刪除。
定時消息將占用普通消息約3倍的存儲空間,大量使用定時消息時需要注意存儲空間占用。
準備環境
-
執行以下命令,檢查是否已安裝Go。
go?version返回如下回顯時,說明Go已經安裝。
go?version?go1.21.5?linux/amd64如果未安裝Go,請官網下載并安裝。
-
在“go.mod”中增加以下代碼,添加依賴。
module?rocketmq-example-go go?1.13 require?( github.com/apache/rocketmq-client-go/v2?v2.1.2 )
以下示例代碼中的參數說明如下,請參考收集連接信息獲取參數值。
- GROUP:表示消費組名稱。
- ENDPOINT:表示實例連接地址和端口。
- TOPIC:表示Topic名稱。
發送定時/延時消息
發送定時/延時消息的示例代碼如下。
package?main
import?(
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
"time"
)
func?main()?{
//?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
endpoint?:=?"${ENDPOINT}"
//?填寫AccessKey,在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
accessKey?:=?"${ACCESS_KEY}"
//?填寫SecretKey?在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
secretKey?:=?"${SECRET_KEY}"
//?填寫Topic,在管理控制臺創建
topic?:=?"${TOPIC}"
p,?_?:=?rocketmq.NewProducer(
producer.WithNameServer([]string{endpoint}),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey:?accessKey,
SecretKey:?secretKey,
}),
)
if?err?:=?p.Start();?err?!=?nil?{
fmt.Println("start?producer?error:",?err)
os.Exit(1)
}
//?創建消息
msg?:=?&primitive.Message{
Topic:?topic,
Body:??[]byte("Hello?RocketMQ"),
}
//?設置延遲等級
//?等級與時間對應關系:
//?1s、?5s、?10s、?30s、??1m、?2m、?3m、?4m、?5m、?6m、?7m、?8m、?9m、?10m、?20m、?30m、?1h、?2h;
//?1????2????3?????4?????5????6???7????8???9???10???11???12??13???14????15????16???17???18
//?如果想用延遲級別,那么設置下面這個方法
msg.WithDelayTimeLevel(3)
//發送任意延遲消息,則使用下面的方法,WithDelayTimeLevel就不要設置了,?時間單位為毫秒,如下所示:消息將在10s后投遞
delayMills?:=?int64(10?*?1000)
msg.WithProperty("__STARTDELIVERTIME",?strconv.FormatInt(time.Now().Unix()+delayMills,?10))
res,?err?:=?p.SendSync(context.Background(),?msg)
if?err?!=?nil?{
fmt.Println("send?message?error:",?err)
return
}?else?{
fmt.Printf("send?message?success:?result=%s\n",?res.String())
}
err?=?p.Shutdown()
if?err?!=?nil?{
fmt.Printf("shutdown?producer?error:?%s",?err.Error())
}
}
訂閱定時/延時消息
訂閱定時/延時消息的示例代碼如下。
package?main
import?(
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func?main()?{
//?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
endpoint?:=?"${ENDPOINT}"
//?填寫AccessKey,在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
accessKey?:=?"${ACCESS_KEY}"
//?填寫SecretKey?在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
secretKey?:=?"${SECRET_KEY}"
//?填寫Topic,在管理控制臺創建
topic?:=?"${TOPIC}"
//?在控制臺創建的訂閱組(Group)
group?:=?"${GROUP}"
c,?_?:=?rocketmq.NewPushConsumer(
consumer.WithGroupName(group),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
consumer.WithCredentials(primitive.Credentials{
AccessKey:?accessKey,
SecretKey:?secretKey,
}),
)
err?:=?c.Subscribe(topic,?consumer.MessageSelector{},?func(ctx?context.Context,
messages?...*primitive.MessageExt)?(consumer.ConsumeResult,?error)?{
for?i?:=?range?messages?{
//?處理消息
fmt.Printf("receive?msg:?%v?\n",?messages[i])
}
//?如果消息處理成功則返回consumer.ConsumeSuccess
//?如果消息處理失敗則返回consumer.ConsumeRetryLater
return?consumer.ConsumeSuccess,?nil
})
if?err?!=?nil?{
fmt.Println(err.Error())
}
//?Note:?start?after?subscribe
err?=?c.Start()
if?err?!=?nil?{
fmt.Println(err.Error())
os.Exit(-1)
}
waitChan?:=?make(chan?interface{},?0)
<-waitChan
}