消息數據存儲
終端消息數據按父topic存儲至kafka隊列,需先在控制臺創建父topic;對未創建父topic的消息可正常收發,但不會存儲至kafka隊列。
Kafka存儲內容格式:
{
"clientId": 設備clientId,
"topic": 主題,
"payload": 消息內容,
"ts": 發送的時間戳
}
會話機制
終端 clean session=true,斷線后會話信息清除,再次上線后之前所有的訂閱關系以及離線消息丟失。 clean session=false斷開連接的情況下,MQTT Broker也會為斷連客戶端保存一個會話,默認2小時,超期未重連訂購關系清除;對于clean session=false的客戶端斷線重連后可接收Qos>0的離線消息。對于客戶端因網絡等各種原因斷線,需要加上重連和訂購關系重新訂購機制。
離線消息
對于clean session=false的客戶端,在未超出會話失效期,斷線重連后可接收Qos>0的離線消息。
系統主題
| 系統主題 | 說明 |
|---|---|
| mq2mqtt | 用于云端服務向終端發送消息。發往該主題消息會轉發至MQTT Broker實現云端與移動端互通 |
| mqtt-device-connect | 設備上線主題,內容 {"clientid":客戶端ID,"ts":上線時間戳 } |
| mqtt-device-disconnect | 設備下線主題,內容 {"clientid":客戶端ID,"ts":下線時間戳 } |
SDK支持
分布式消息服務MQTT支持標準的MQTT協議,理論上適配所有的MQTT客戶端SDK。
推薦對應的第三方 SDK 如下表:
| 語言/平臺 | 推薦的第三方SDK |
|---|---|
| Java | Eclipse Paho SDK |
| iOS | MQTT-Client-Framework |
| Android | Eclipse Paho SDK |
| JavaScript | Eclipse?Paho JavaScript |
| Python | Eclipse Paho Python SDK |
| C | Eclipse Paho C SDK |
| C# | Eclipse Paho C# SDK |
| Golang | Eclipse Paho Golang SDK |
| Node.js | MQTT-JS |
| PHP | Mosquitto-PHP |
主題規則
主題形式:父topic/子級topic1/子級topic2…。(父topic需要先創建)
使用MQTT消息隊列發消息,會把消息以父Topic主題分類保存在kafka上,應用服務可通過kafka 客戶端以父Topic為主題消費消息。
云端應用服務統一發送到kafka topic為mq2mqtt的主題隊列上,移動端topic、會話屬性Qos和cleansession保存在Record Header中,MQTT設備通過訂閱移動端topic,實現云端到移動端通訊。
Kakfa header與mqtt屬性映射如下表:
| Kafka Header | MQTT屬性 |
|---|---|
| qoslevel | Qos |
| cleanSession | cleanSession |
| mqttTopic | 主題 |
生產消費消息
MQTT客戶端收發
使用MQTT SDK接入終端連接地址進行消息生產消費。
生產消息代碼示例:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class PubMsgTest {
// 填入您在mqtt控制臺創建的ACL賬號密碼。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
public static void main(String[] args) {
String topic = "topic-1/a/b/c";
String content = "hello ctg-mqtt service";
int qos = 2;
// 填寫mqtt云消息服務的接入點。
String broker = "tcp://localhost:1883";
// 指定連接客戶端的id,該id可用于查詢連接會話信息以及設備軌跡信息。
String clientId = "ctg-mqtt-client-pub-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient myClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
System.out.println("Connecting to broker: " + broker);
myClient.connect(connOpts);
System.out.println("Connected");
for (int i = 0; i < 10; i++) {
System.out.println("Publishing message: " + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
myClient.publish(topic, message);
System.out.println("Message published");
}
myClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch (MqttException me) {
// 打印詳細的錯誤信息。
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
接收消息代碼示例:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class SubMsgTest {
// 填入您在mqtt控制臺創建的ACL賬號密碼。
private static final String USER_NAME = "your-user-name";
private static final String AUTH_PASSWORD = "your-password";
static String topic = "topic-1/a/b/c";
static int qos = 2;
public static void main(String[] args) {
// 填寫mqtt云消息服務的接入點。
String broker = "tcp://localhost:1883";
// 指定連接客戶端的id,該id可用于查詢連接會話信息以及設備軌跡信息。
String clientId = "ctg-mqtt-client-sub-test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient myClient = getMqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(USER_NAME);
connOpts.setPassword(AUTH_PASSWORD.toCharArray());
myClient.connect(connOpts);
} catch (MqttException me) {
// 打印詳細的錯誤信息。
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
private static MqttClient getMqttClient(String broker, String clientId, MemoryPersistence persistence) throws MqttException {
MqttClient myClient = new MqttClient(broker, clientId, persistence);
myClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connected to broker: " + broker);
try {
myClient.subscribe(topic, qos);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("connection lost");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("message is :" + message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
return myClient;
}
}
MQTT發送Kafka接收
終端設備使用MQTT SDK接入終端連接地址進行消息發布,云端應用服務使用Kafka sdk接入
服務端連接地址按父主題進行數據消費。
MQTT發送順序消息kafka接收順序消息
創建父主題,類型分區順序,父主題以orderMsg2mq-開頭。
終端設備使用MQTT SDK接入終端連接地址進行消息發布,云端應用服務使用kafka sdk接入
服務端連接地址按父主題進行分區順序數據消費
Kafka發送MQTT接收
云端應用服務使用kafka sdk接入服務端連接地址往系統主題mq2mqtt下發指令,終端設備使用MQTT SDK接入終端連接地址接收;
示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import java.util.Properties;
public class PubMsgTest {
public static void main(String[] args) {
Properties props = new Properties();
// 填入您在mqtt控制臺查看到的kafka接入點信息。
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<Object, String> producer = new KafkaProducer<>(props);
// 這里需要指定系統內置的流轉到mqtt服務端的特殊kafka主題,一般命名為:mq2mqtt。
String topic = "mq2mqtt";
// kafka消息需要在header中指定需要發往mqtt訂閱的主題以及一些會話屬性、qos等級等信息。
RecordHeaders headers = new RecordHeaders();
headers.add(new RecordHeader("qosLevel", "2".getBytes()));
headers.add(new RecordHeader("cleanSession", "true".getBytes()));
// 這里需要指定您的mqtt客戶端訂閱的主題,支持topic filter。
headers.add(new RecordHeader("mqttTopic", "topic-1/a/b/c".getBytes()));
byte[] payload = new byte[1026 * 1024];
for (int i = 0; i < 1026 * 1024; i++) {
payload[i] = 'x';
}
try {
for (int i = 0; i < 1000; i++) {
String message = "Message- " + i + " " + new String(payload);
producer.send(new ProducerRecord<>(topic, null, null, null, message, headers));
// System.out.println("Sent: " + message);
System.out.println("sent successfully");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}