收發事務消息
更新時間 2025-04-22 14:04:25
最近更新時間: 2025-04-22 14:04:25
分享文章
分布式消息服務RocketMQ版的事務消息支持在業務邏輯與發送消息之間提供事務保證,通過兩階段的方式提供對事務消息的支持,事務消息交互流程如圖1所示。
圖1 事務消息交互流程
事務消息生產者首先發送半消息,然后執行本地事務。如果執行成功,則發送事務提交,否則發送事務回滾。服務端在一段時間后如果一直收不到提交或回滾,則發起回查,生產者在收到回查后重新發送事務提交或回滾。消息只有在提交之后才投遞給消費者,消費者對回滾的消息不可見。
收發事務消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
準備環境
開源的Java客戶端支持連接分布式消息服務RocketMQ版,推薦使用的客戶端版本為4.9.7。
通過以下任意一種方式引入依賴:
-
使用Maven方式引入依賴
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.7</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.9.7</version> </dependency> </dependencies> -
點擊下載依賴JAR包:rocketmq-all-4.9.7-bin-release.zip
發送事務消息
參考如下示例代碼
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerTransactionExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
"accessSecret" // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的密鑰
));
}
public static void main(String[] args) throws Exception {
// 執行本地事務和事務回查的接口
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("開始執行本地事務: " + message);
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("收到事務消息的回查請求, MsgId: " + messageExt.getMsgId());
return LocalTransactionState.COMMIT_MESSAGE;
}
};
/*
* 創建Producer,如果想開啟消息軌跡,可以按照如下方式創建:
* TransactionMQProducer producer = new TransactionMQProducer(null, "YOUR GROUP ID", getAclRPCHook(), true, null);
*/
TransactionMQProducer producer = new TransactionMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制臺NAMESRV接入點地址
producer.setNamesrvAddr("XXX:xxx");
//producer.setUseTLS(true); // 如果需要開啟SSL,請增加此行代碼
producer.setTransactionListener(transactionListener);
producer.start();
Message msg = new Message("YOUR TOPIC",
"TagA",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
}
}
事務消息生產者需要實現執行本地事務和事務回查的接口,其中executeLocalTransaction方法在發送完半事務消息后被調用,checkLocalTransaction方法在收到事務回查時調用,兩者被調用時可返回如下三個事務狀態。
- LocalTransactionState.COMMIT_MESSAGE:提交事務,允許事務消息投遞到消費者。
- LocalTransactionState.ROLLBACK_MESSAGE:回滾事務,消息將被丟棄不允許消費。
- LocalTransactionState.UNKNOW:無法判斷狀態,服務端會向生產者再次回查該消息的狀態。
訂閱事務消息
參考如下示例代碼
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class ConsumerTransactionExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
"accessSecret" // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的密鑰
));
}
public static void main(String[] args) throws Exception {
/*
* 創建Consumer,如果想開啟消息軌跡,可以按照如下方式創建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely(), true, null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely());
// 填入控制臺NAMESRV接入點地址
consumer.setNamesrvAddr("XXX:xxx");
// consumer.setUseTLS(true); // 如果需要開啟SSL,請增加此行代碼
/*
* 如果想要消費指定TAG的消息,可以按照如下方式訂閱:* 為訂閱所有的TAG
* pushConsumer.subscribe(TOPIC_NAME, "Tag1");
*/
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer Started.");
}
}