Java客戶端連接配置
更新時間 2025-06-16 11:59:53
最近更新時間: 2025-06-16 11:59:53
分享文章
本文為您介紹分布式消息服務MQTT客戶端連接配置。
引入依賴//版本號按需調整
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
連接mqtt云消息服務的示例代碼如下:
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ConnTest {
// 填入您在mqtt控制臺創建的ACL賬號密碼。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
// 是否使用tls加密傳輸
private static final Boolean isTls = true;
public static void main(String[] args) {
// 填寫mqtt云消息服務的接入點。接入點分為tls以及非tls兩種接入。tls接入格式為:ssl://{ip}:8085
String broker = "tcp://localhost:1883";
// 指定連接客戶端的id,該id可用于查詢連接會話信息以及設備軌跡信息。
String clientId = "ctg-mqtt-client-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
final MqttClient myClient = getMqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
// 設置心跳間隔(這里示例為2分鐘)
connOpts.setKeepAliveInterval(120);
// 設置自動重連
connOpts.setAutomaticReconnect(true);
// 設置tls相關配置(可選)
// 目前暫未支持自動配置ssl證書,默認的ssl證書需要客戶端進行默認證書信任。不影響正常的tls鏈路加密
if (isTls) {
SSLContext sslContext = SSLContext.getInstance("TLS");
// 默認信任服務端ssl證書
sslContext.init(null, new TrustManager[]{new X509TrustManager() {
public void checkClientTrusted(X509Certificate[] chain, String authType) {
}
public void checkServerTrusted(X509Certificate[] chain, String authType) {
}
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}}, new SecureRandom());
// 可以按照自定義的方式進行ssl證書的主機名驗證
connOpts.setHttpsHostnameVerificationEnabled(false);
connOpts.setSSLHostnameVerifier((hostname, session) -> true);
connOpts.setSocketFactory(sslContext.getSocketFactory());
}
System.out.println("Connecting to broker: " + broker);
myClient.connect(connOpts);
System.out.println("Connected");
// 這里編寫您的消息收發邏輯
myClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch (MqttException me) {
// 打印詳細的錯誤信息。
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static MqttClient getMqttClient(String broker, String clientId, MemoryPersistence persistence) throws MqttException {
final MqttClient myClient = new MqttClient(broker, clientId, persistence);
myClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// 連接建立成功
}
@Override
public void connectionLost(Throwable cause) {
// 連接丟失,建議記錄日志,做好監控
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 收到消息的回調,這里不要進行阻塞操作,以免卡住導致連接斷開
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 成功發送消息到服務端
}
});
return myClient;
}
}