收發事務消息
更新時間 2025-04-22 14:04:25
最近更新時間: 2025-04-22 14:04:25
分享文章
分布式消息服務RocketMQ版的事務消息支持在業務邏輯與發送消息之間提供事務保證,通過兩階段的方式提供對事務消息的支持,事務消息交互流程如圖1所示。
圖1 事務消息交互流程
事務消息生產者首先發送半消息,然后執行本地事務。如果執行成功,則發送事務提交,否則發送事務回滾。服務端在一段時間后如果一直收不到提交或回滾,則發起回查,生產者在收到回查后重新發送事務提交或回滾。消息只有在提交之后才投遞給消費者,消費者對回滾的消息不可見。
收發事務消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
準備環境
-
執行以下命令,檢查是否已安裝Go。
go?version返回如下回顯時,說明Go已經安裝。
go?version?go1.21.5?linux/amd64如果未安裝Go,請官網下載并安裝。
-
在“go.mod”中增加以下代碼,添加依賴。
module?rocketmq-example-go go?1.13 require?( github.com/apache/rocketmq-client-go/v2?v2.1.2 )
以下示例代碼中的參數說明如下,請參考收集連接信息獲取參數值。
- GROUP:表示消費組名稱。
- ENDPOINT:表示實例連接地址和端口。
- TOPIC:表示Topic名稱。
發送事務消息
參考如下示例代碼。
package?main
import?(
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
)
//?這里模擬了一個內存狀態的事務執行,實際需要更換成相應的數據庫等事務操作
type?DemoListener?struct?{
localTrans???????*sync.Map
transactionIndex?int32
}
func?NewDemoListener()?*DemoListener?{
return?&DemoListener{
localTrans:?new(sync.Map),
}
}
//?這里是執行本地事務邏輯的方法回調
func?(dl?*DemoListener)?ExecuteLocalTransaction(msg?*primitive.Message)?primitive.LocalTransactionState?{
nextIndex?:=?atomic.AddInt32(&dl.transactionIndex,?1)
fmt.Printf("nextIndex:?%v?for?transactionID:?%v\n",?nextIndex,?msg.TransactionId)
status?:=?nextIndex?%?3
dl.localTrans.Store(msg.TransactionId,?primitive.LocalTransactionState(status+1))
return?primitive.UnknowState
}
//?查詢本地事務是否執行成功,實際情況需要查詢數據庫當中的信息是否真正執行成功了
func?(dl?*DemoListener)?CheckLocalTransaction(msg?*primitive.MessageExt)?primitive.LocalTransactionState?{
fmt.Printf("%v?msg?transactionID?:?%v\n",?time.Now(),?msg.TransactionId)
v,?existed?:=?dl.localTrans.Load(msg.TransactionId)
if?!existed?{
fmt.Printf("unknow?msg:?%v,?return?Commit",?msg)
return?primitive.CommitMessageState
}
//?這里實際對應業務從數據庫中查詢消息的事務狀態
state?:=?v.(primitive.LocalTransactionState)
switch?state?{
case?1:
fmt.Printf("checkLocalTransaction?COMMIT_MESSAGE:?%v\n",?msg)
return?primitive.CommitMessageState
case?2:
fmt.Printf("checkLocalTransaction?ROLLBACK_MESSAGE:?%v\n",?msg)
return?primitive.RollbackMessageState
case?3:
fmt.Printf("checkLocalTransaction?unknow:?%v\n",?msg)
return?primitive.UnknowState
default:
fmt.Printf("checkLocalTransaction?default?COMMIT_MESSAGE:?%v\n",?msg)
return?primitive.CommitMessageState
}
}
func?main()?{
//?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
endpoint?:=?"${ENDPOINT}"
//?填寫AccessKey,在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
accessKey?:=?"${ACCESS_KEY}"
//?填寫SecretKey?在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
secretKey?:=?"${SECRET_KEY}"
//?填寫Topic,在管理控制臺創建
topic?:=?"${TOPIC}"
p,?_?:=?rocketmq.NewTransactionProducer(
NewDemoListener(),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey:?accessKey,
SecretKey:?secretKey,
}),
)
err?:=?p.Start()
if?err?!=?nil?{
fmt.Printf("start?producer?error:?%s\n",?err.Error())
os.Exit(1)
}
for?i?:=?0;?i?<?4;?i++?{
msg?:=?primitive.NewMessage(topic,?[]byte("Hello?RocketMQ?"+strconv.Itoa(i)))
res,?err?:=?p.SendMessageInTransaction(context.Background(),?msg)
if?err?!=?nil?{
fmt.Printf("send?message?error:?%s\n",?err)
}?else?{
fmt.Printf("send?message?success:?result=%s\n",?res.String())
}
}
//?防止客戶端進程退出,業務自定義處理即可
time.Sleep(5?*?time.Minute)
err?=?p.Shutdown()
if?err?!=?nil?{
fmt.Printf("shutdown?producer?error:?%s",?err.Error())
}
}
注意事務消息生產者需要實現兩個回調函數,其中ExecuteLocalTransaction回調函數在發送完半事務消息后被調用,即上圖中的第3階段,CheckLocalTransaction回調函數在收到回查時調用,即上圖中的第6階段。兩個回調函數均可返回3種事務狀態:
primitive.CommitMessageState:提交事務,允許消費者消費該消息。
primitive.RollbackMessageState:回滾事務,消息將被丟棄不允許消費。
primitive.UnknowState:無法判斷狀態,期待服務端向生產者再次回查該消息的狀態。
訂閱事務消息
package?main
import?(
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func?main()?{
//?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
endpoint?:=?"${ENDPOINT}"
//?填寫AccessKey,在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
accessKey?:=?"${ACCESS_KEY}"
//?填寫SecretKey?在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
secretKey?:=?"${SECRET_KEY}"
//?填寫Topic,在管理控制臺創建
topic?:=?"${TOPIC}"
//?在控制臺創建的訂閱組(Group)
group?:=?"${GROUP}"
c,?_?:=?rocketmq.NewPushConsumer(
consumer.WithGroupName(group),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
consumer.WithCredentials(primitive.Credentials{
AccessKey:?accessKey,
SecretKey:?secretKey,
}),
)
err?:=?c.Subscribe(topic,?consumer.MessageSelector{},?func(ctx?context.Context,
messages?...*primitive.MessageExt)?(consumer.ConsumeResult,?error)?{
for?i?:=?range?messages?{
//?處理消息
fmt.Printf("receive?msg:?%v?\n",?messages[i])
}
//?如果消息處理成功則返回consumer.ConsumeSuccess
//?如果消息處理失敗則返回consumer.ConsumeRetryLater
return?consumer.ConsumeSuccess,?nil
})
if?err?!=?nil?{
fmt.Println(err.Error())
}
//?Note:?start?after?subscribe
err?=?c.Start()
if?err?!=?nil?{
fmt.Println(err.Error())
os.Exit(-1)
}
waitChan?:=?make(chan?interface{},?0)
<-waitChan
}