Python客戶端連接配置
更新時間 2025-06-16 11:59:53
最近更新時間: 2025-06-16 11:59:53
分享文章
安裝依賴
pip3 install paho-mqtt代碼示例
import ssl
import random
from paho.mqtt import client as paho_client
# 填入您在mqtt控制臺創建的ACL賬號密碼。
USER_NAME = "your-user-name"
AUTH_PASSWORD = "your-password"
# 是否使用tls加密傳輸
isTls = True
class MQTTClient:
def __init__(self):
# 填寫mqtt云消息服務的接入點,去掉:{端口號}部分
self.broker = "tcp://localhost"
# 指定連接客戶端的id
self.client_id = "ctg-mqtt-client-test"
self.client = None
def on_connect(self, client, userdata, flags, rc):
# 連接建立成功
if rc == 0:
print("Connected to MQTT Broker!")
else:
print(f"Failed to connect, return code {rc}")
def on_disconnect(self, client, userdata, rc):
# 連接丟失
print(f"Disconnected with result code {rc}")
# 可以在這里添加重連邏輯
def on_message(self, client, userdata, msg):
# 收到消息的回調
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
def on_publish(self, client, userdata, mid):
# 成功發送消息到服務端
print(f"Message {mid} published successfully")
def connect_mqtt(self):
# 創建客戶端實例
self.client = paho_client.Client(client_id=self.client_id)
self.client.username_pw_set(USER_NAME, AUTH_PASSWORD)
# 設置回調函數
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
self.client.on_publish = self.on_publish
# 設置TLS
if isTls:
# 創建不驗證證書的SSL上下文
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self.client.tls_set_context(context)
# 設置其他連接選項
self.client.connect(self.broker.replace("tcp://", "").replace("ssl://", ""),
port=8883 if isTls else 1883)
self.client.loop_start() # 啟動網絡循環
return self.client
def run(self):
try:
print(f"Connecting to broker: {self.broker}")
client = self.connect_mqtt()
# 這里可以添加訂閱或發布邏輯
# 例如: client.subscribe("topic/test")
# 例如: client.publish("topic/test", "Hello World")
# 保持連接
while True:
pass
except Exception as e:
print(f"Error occurred: {e}")
if self.client:
self.client.disconnect()
print("Disconnected")
raise
if __name__ == '__main__':
mqtt_client = MQTTClient()
mqtt_client.run()