收發普通消息
更新時間 2025-04-22 14:04:25
最近更新時間: 2025-04-22 14:04:25
分享文章
本章節介紹普通消息的收發方法和示例代碼。其中,普通消息發送方式分為同步發送、異步發送、單向發送。
- 同步發送:同步發送是指消息發送方發出一條消息后,會在收到服務端同步響應之后才發下一條消息的通訊方式。
- 異步發送:異步發送是指發送方發出一條消息后,不等服務端返回響應,接著發送下一條消息的通訊方式。
- 單向發送:發送方只負責發送消息,不等待服務端返回響應且沒有回調函數觸發。
- 收發消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
準備環境
-
執行以下命令,檢查是否已安裝Go。
go version返回如下回顯時,說明Go已經安裝。
go version go1.21.5 linux/amd64如果未安裝Go,請官網下載并安裝。
-
在“go.mod”中增加以下代碼,添加依賴。
module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 )
以下示例代碼中的參數說明如下,請參考收集連接信息獲取參數值。
- GROUP:表示消費組名稱。
- ENDPOINT:表示實例連接地址和端口。
- TOPIC:表示Topic名稱。
同步發送
同步發送是指消息發送方發出一條消息到服務端,服務端接收并處理消息,然后返回響應給發送方,發送方收到響應后才會發送下一條消息的通訊方式。
參考如下示例代碼。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
)
func main() {
// 填寫分布式消息服務RocketMQ控制臺Namesrv接入點
endpoint := "${ENDPOINT}"
// 填寫AccessKey,在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
accessKey := "${ACCESS_KEY}"
// 填寫SecretKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
secretKey := "${SECRET_KEY}"
// 填寫Topic,在管理控制臺創建
topic := "${TOPIC}"
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 4; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ! " + strconv.Itoa(i)),
}
// 為消息添加Tag
// msg.WithTag("TagA")
// 為消息添加Key
// msg.WithKeys([]string{"KeyA"})
// 使用同步方式發送消息
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
異步發送
異步發送是指消息發送方發出一條消息后,不等服務端返回響應,接著發送下一條消息的通訊方式。
使用異步發送需要客戶端實現異步發送回調接口(SendCallback)。即消息發送方在發送了一條消息后,不需要等待服務端響應接著發送第二條消息。發送方通過回調接口接收服務端響應,并處理響應結果。
參考如下示例代碼。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"sync"
)
func main() {
// 填寫分布式消息服務RocketMQ控制臺Namesrv接入點
endpoint := "${ENDPOINT}"
// 填寫AccessKey,在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
accessKey := "${ACCESS_KEY}"
// 填寫SecretKey 在在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
secretKey := "${SECRET_KEY}"
// 填寫Topic,在管理控制臺創建
topic := "${TOPIC}"
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
// 使用異步方式發送消息
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
err := p.SendAsync(context.TODO(), func(ctx context.Context, result *primitive.SendResult, e error) {
if e != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", result.String())
}
wg.Done()
}, primitive.NewMessage(topic, []byte("Hello RocketMQ")))
if err != nil {
fmt.Printf("send message error: %s\n", err)
}
}
wg.Wait()
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
單向發送
發送方只負責發送消息,不等待服務端返回響應且沒有回調函數觸發,即只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。適用于某些耗時非常短,但對可靠性要求并不高的場景,例如日志收集。
參考如下示例代碼。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
)
func main() {
// 填寫分布式消息服務RocketMQ控制臺Namesrv接入點
endpoint := "${ENDPOINT}"
// 填寫AccessKey,在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
accessKey := "${ACCESS_KEY}"
// 填寫SecretKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
secretKey := "${SECRET_KEY}"
// 填寫Topic,在管理控制臺創建
topic := "${TOPIC}"
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 4; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ! " + strconv.Itoa(i)),
}
// 使用單向方式發送消息
err := p.SendOneWay(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success")
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
訂閱普通消息
參考如下示例代碼。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
// 填寫分布式消息服務RocketMQ控制臺Namesrv接入點
endpoint := "${ENDPOINT}"
// 填寫AccessKey,在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
accessKey := "${ACCESS_KEY}"
// 填寫SecretKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
secretKey := "${SECRET_KEY}"
// 填寫Topic,在管理控制臺創建
topic := "${TOPIC}"
// 在控制臺創建的訂閱組(Group)
group := "${GROUP}"
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName(group),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
)
err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range messages {
// 處理消息
fmt.Printf("receive msg: %v \n", messages[i])
}
// 如果消息處理成功則返回consumer.ConsumeSuccess
// 如果消息處理失敗則返回consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
waitChan := make(chan interface{}, 0)
<-waitChan
}