接入大疆MQTT

This commit is contained in:
孙小云 2026-01-28 14:05:09 +08:00
parent a6b37bd269
commit bbda3b541d
5 changed files with 384 additions and 269 deletions

View File

@ -1,120 +0,0 @@
# DJI MQTT 模块使用说明
## 概述
本模块实现了大疆MQTT消息的接收和处理功能采用共享订阅方式支持多实例部署。
## 功能特性
- ✅ 自动连接和重连
- ✅ 共享订阅(多实例部署时只有一个实例消费消息)
- ✅ 自动区分无人机和机场数据
- ✅ 回调接口设计使用者无需关心osd和state的区别
- ✅ 完整的数据模型定义
- ✅ 原始数据保留rawData字段
## 架构设计
```
DjiMqttClientService (MQTT客户端)
DjiMqttMessageHandler (消息处理器)
IDroneDataCallback / IDockDataCallback (回调接口)
使用者实现
```
## 配置说明
`bootstrap.yml` 中配置:
```yaml
dji:
mqtt:
host: mqtt.t-aaron.com
port: 10883
version: 5
client-id: ThingsBoard_gateway
username: admin
password: admin
connection-timeout: 30
keep-alive-interval: 60
auto-reconnect: true
clean-session: false
```
## 使用方法
### 1. 注入消息处理器
```java
@Autowired
private DjiMqttMessageHandler messageHandler;
```
### 2. 实现回调接口
```java
// 无人机数据回调
messageHandler.registerDroneDataCallback(new IDroneDataCallback() {
@Override
public void onDroneData(DroneData droneData) {
// 处理无人机数据
String sn = droneData.getDeviceSn();
Double latitude = droneData.getLatitude();
Double longitude = droneData.getLongitude();
// 访问原始数据
Map<String, Object> rawData = droneData.getRawData();
}
});
// 机场数据回调
messageHandler.registerDockDataCallback(new IDockDataCallback() {
@Override
public void onDockData(DockData dockData) {
// 处理机场数据
String sn = dockData.getDeviceSn();
Integer modeCode = dockData.getModeCode();
Float temperature = dockData.getTemperature();
// 访问原始数据
Map<String, Object> rawData = dockData.getRawData();
}
});
```
## 数据模型
### DroneData无人机数据
主要字段:
- `deviceSn`: 设备SN
- `messageType`: 消息类型osd/state
- `latitude/longitude`: 位置信息
- `elevation/height`: 高度信息
- `modeCode`: 飞行器状态
- `rawData`: 原始数据(包含所有字段)
### DockData机场数据
主要字段:
- `deviceSn`: 设备SN
- `messageType`: 消息类型osd/state
- `latitude/longitude`: 位置信息
- `modeCode`: 机场状态
- `temperature/humidity`: 环境信息
- `coverState`: 舱盖状态
- `rawData`: 原始数据(包含所有字段)
## 注意事项
1. **部分字段推送**每次MQTT消息可能只包含部分字段使用时需要判空
2. **原始数据访问**:所有字段都保存在`rawData`中可以通过Map访问
3. **共享订阅**:多实例部署时,同一条消息只会被一个实例消费
4. **自动重连**:连接断开后会自动重连
## 示例代码
参考 `DjiMqttUsageExample.java` 获取完整示例。

View File

@ -0,0 +1,76 @@
package com.ruoyi.device.domain.impl.djimqtt.config;
import lombok.Builder;
import lombok.Data;
/**
* DJI MQTT客户端配置
* 用于动态创建MQTT客户端
*
* @author ruoyi
*/
@Data
@Builder
public class DjiMqttClientConfig {
/**
* MQTT服务器地址
*/
private String host;
/**
* MQTT服务器端口
*/
private Integer port;
/**
* 客户端ID必须唯一
*/
private String clientId;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接超时时间
*/
@Builder.Default
private Integer connectionTimeout = 30;
/**
* 保持连接时间
*/
@Builder.Default
private Integer keepAliveInterval = 60;
/**
* 自动重连
*/
@Builder.Default
private Boolean autoReconnect = true;
/**
* 清除会话
*/
@Builder.Default
private Boolean cleanSession = false;
/**
* 是否使用共享订阅
*/
@Builder.Default
private Boolean useSharedSubscription = true;
/**
* 共享订阅组名
*/
@Builder.Default
private String sharedGroupName = "dji-group";
}

View File

@ -2,7 +2,9 @@ package com.ruoyi.device.domain.impl.djimqtt.example;
import com.ruoyi.device.domain.impl.djimqtt.callback.IDockDataCallback; import com.ruoyi.device.domain.impl.djimqtt.callback.IDockDataCallback;
import com.ruoyi.device.domain.impl.djimqtt.callback.IDroneDataCallback; import com.ruoyi.device.domain.impl.djimqtt.callback.IDroneDataCallback;
import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttClientConfig;
import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler; import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler;
import com.ruoyi.device.domain.impl.djimqtt.manager.DjiMqttClientManager;
import com.ruoyi.device.domain.impl.djimqtt.model.DockData; import com.ruoyi.device.domain.impl.djimqtt.model.DockData;
import com.ruoyi.device.domain.impl.djimqtt.model.DroneData; import com.ruoyi.device.domain.impl.djimqtt.model.DroneData;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -12,13 +14,14 @@ import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* DJI MQTT使用示例 * DJI MQTT使用示例支持多客户端
* *
* 使用说明 * 使用说明
* 1. 注入 DjiMqttMessageHandler * 1. 注入 DjiMqttClientManager
* 2. 实现 IDroneDataCallback IDockDataCallback 接口 * 2. 使用 DjiMqttClientConfig.builder() 创建配置
* 3. 在应用启动后注册回调 * 3. 调用 manager.createClient(config) 创建客户端
* 4. 在回调方法中处理接收到的数据 * 4. 通过 manager.getHandler(clientId) 获取消息处理器
* 5. 注册回调处理数据
* *
* @author ruoyi * @author ruoyi
*/ */
@ -26,91 +29,125 @@ import org.springframework.stereotype.Component;
@Component @Component
public class DjiMqttUsageExample { public class DjiMqttUsageExample {
@Autowired // @Autowired
private DjiMqttMessageHandler messageHandler; // private DjiMqttClientManager clientManager;
//
/** // /**
* 应用启动后注册回调 // * 应用启动后创建MQTT客户端
*/ // */
@EventListener(ApplicationReadyEvent.class) // @EventListener(ApplicationReadyEvent.class)
public void registerCallbacks() { // public void onApplicationReady() {
// 注册无人机数据回调 // // 示例1创建第一个MQTT客户端
messageHandler.registerDroneDataCallback(new IDroneDataCallback() { // createClient1();
@Override //
public void onDroneData(DroneData droneData) { // // 示例2创建第二个MQTT客户端不同的服务器
handleDroneData(droneData); // createClient2();
} // }
}); //
// /**
// 注册机场数据回调 // * 创建第一个MQTT客户端
messageHandler.registerDockDataCallback(new IDockDataCallback() { // */
@Override // private void createClient1() {
public void onDockData(DockData dockData) { // // 构建配置
handleDockData(dockData); // DjiMqttClientConfig config = DjiMqttClientConfig.builder()
} // .host("mqtt.t-aaron.com")
}); // .port(10883)
// .clientId("client_1")
log.info("DJI MQTT回调已注册"); // .username("admin")
} // .password("admin")
// .useSharedSubscription(true)
/** // .sharedGroupName("dji-group-1")
* 处理无人机数据 // .build();
*/ //
private void handleDroneData(DroneData droneData) { // // 创建客户端
log.info("收到无人机数据 - SN: {}, Type: {}", // String clientId = clientManager.createClient(config);
droneData.getDeviceSn(), //
droneData.getMessageType()); // // 获取消息处理器
// DjiMqttMessageHandler handler = clientManager.getHandler(clientId);
// 示例处理位置信息 //
if (droneData.getLatitude() != null && droneData.getLongitude() != null) { // // 注册无人机数据回调
log.info("无人机位置 - 纬度: {}, 经度: {}, 高度: {}", // handler.registerDroneDataCallback(new IDroneDataCallback() {
droneData.getLatitude(), // @Override
droneData.getLongitude(), // public void onDroneData(DroneData droneData) {
droneData.getElevation()); // handleDroneDataForClient1(droneData);
} // }
// });
// 示例处理电池信息从rawData中获取 //
if (droneData.getRawData() != null && droneData.getRawData().containsKey("battery")) { // // 注册机场数据回调
Object battery = droneData.getRawData().get("battery"); // handler.registerDockDataCallback(new IDockDataCallback() {
log.info("无人机电池信息: {}", battery); // @Override
} // public void onDockData(DockData dockData) {
// handleDockDataForClient1(dockData);
// 示例处理相机信息 // }
if (droneData.getRawData() != null && droneData.getRawData().containsKey("cameras")) { // });
Object cameras = droneData.getRawData().get("cameras"); //
log.info("无人机相机信息: {}", cameras); // log.info("客户端1已创建并注册回调");
} // }
} //
// /**
/** // * 创建第二个MQTT客户端连接到不同的服务器
* 处理机场数据 // */
*/ // private void createClient2() {
private void handleDockData(DockData dockData) { // // 构建配置
log.info("收到机场数据 - SN: {}, Type: {}", // DjiMqttClientConfig config = DjiMqttClientConfig.builder()
dockData.getDeviceSn(), // .host("mqtt.another-server.com")
dockData.getMessageType()); // .port(1883)
// .clientId("client_2")
// 示例处理机场状态 // .username("user2")
if (dockData.getModeCode() != null) { // .password("pass2")
log.info("机场状态: {}", dockData.getModeCode()); // .useSharedSubscription(false) // 不使用共享订阅
} // .build();
//
// 示例处理环境信息 // // 创建客户端
if (dockData.getTemperature() != null) { // String clientId = clientManager.createClient(config);
log.info("机场温度: {}°C, 湿度: {}%", //
dockData.getTemperature(), // // 获取消息处理器
dockData.getHumidity()); // DjiMqttMessageHandler handler = clientManager.getHandler(clientId);
} //
// // 注册回调
// 示例处理舱盖状态 // handler.registerDroneDataCallback(droneData -> handleDroneDataForClient2(droneData));
if (dockData.getCoverState() != null) { // handler.registerDockDataCallback(dockData -> handleDockDataForClient2(dockData));
log.info("舱盖状态: {}", dockData.getCoverState()); //
} // log.info("客户端2已创建并注册回调");
// }
// 示例处理飞行器充电状态从rawData中获取 //
if (dockData.getRawData() != null && dockData.getRawData().containsKey("drone_charge_state")) { // /**
Object chargeState = dockData.getRawData().get("drone_charge_state"); // * 处理客户端1的无人机数据
log.info("飞行器充电状态: {}", chargeState); // */
} // private void handleDroneDataForClient1(DroneData droneData) {
} // log.info("[客户端1] 收到无人机数据 - SN: {}, Type: {}",
// droneData.getDeviceSn(),
// droneData.getMessageType());
//
// // 处理位置信息
// if (droneData.getLatitude() != null && droneData.getLongitude() != null) {
// log.info("[客户端1] 无人机位置 - 纬度: {}, 经度: {}, 高度: {}",
// droneData.getLatitude(),
// droneData.getLongitude(),
// droneData.getElevation());
// }
// }
//
// /**
// * 处理客户端1的机场数据
// */
// private void handleDockDataForClient1(DockData dockData) {
// log.info("[客户端1] 收到机场数据 - SN: {}, Type: {}",
// dockData.getDeviceSn(),
// dockData.getMessageType());
// }
//
// /**
// * 处理客户端2的无人机数据
// */
// private void handleDroneDataForClient2(DroneData droneData) {
// log.info("[客户端2] 收到无人机数据 - SN: {}", droneData.getDeviceSn());
// }
//
// /**
// * 处理客户端2的机场数据
// */
// private void handleDockDataForClient2(DockData dockData) {
// log.info("[客户端2] 收到机场数据 - SN: {}", dockData.getDeviceSn());
// }
} }

View File

@ -0,0 +1,125 @@
package com.ruoyi.device.domain.impl.djimqtt.manager;
import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttClientConfig;
import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler;
import com.ruoyi.device.domain.impl.djimqtt.service.DjiMqttClientService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* DJI MQTT客户端管理器
* 管理多个MQTT客户端实例
*
* @author ruoyi
*/
@Slf4j
@Component
public class DjiMqttClientManager {
/**
* 客户端映射表 clientId -> DjiMqttClientService
*/
private final Map<String, DjiMqttClientService> clients = new ConcurrentHashMap<>();
/**
* 消息处理器映射表 clientId -> DjiMqttMessageHandler
*/
private final Map<String, DjiMqttMessageHandler> handlers = new ConcurrentHashMap<>();
/**
* 创建并连接MQTT客户端
*
* @param config 客户端配置
* @return 客户端ID
*/
public String createClient(DjiMqttClientConfig config) {
String clientId = config.getClientId();
if (clients.containsKey(clientId)) {
log.warn("MQTT客户端[{}]已存在", clientId);
return clientId;
}
// 为每个客户端创建独立的消息处理器
DjiMqttMessageHandler handler = new DjiMqttMessageHandler();
handlers.put(clientId, handler);
// 创建客户端
DjiMqttClientService client = new DjiMqttClientService(config, handler);
clients.put(clientId, client);
// 连接
client.connect();
log.info("成功创建MQTT客户端[{}]", clientId);
return clientId;
}
/**
* 获取消息处理器
*
* @param clientId 客户端ID
* @return 消息处理器
*/
public DjiMqttMessageHandler getHandler(String clientId) {
return handlers.get(clientId);
}
/**
* 获取客户端
*
* @param clientId 客户端ID
* @return 客户端服务
*/
public DjiMqttClientService getClient(String clientId) {
return clients.get(clientId);
}
/**
* 断开并移除客户端
*
* @param clientId 客户端ID
*/
public void removeClient(String clientId) {
DjiMqttClientService client = clients.remove(clientId);
if (client != null) {
client.disconnect();
log.info("已移除MQTT客户端[{}]", clientId);
}
handlers.remove(clientId);
}
/**
* 断开所有客户端
*/
public void disconnectAll() {
clients.forEach((clientId, client) -> {
try {
client.disconnect();
log.info("已断开MQTT客户端[{}]", clientId);
} catch (Exception e) {
log.error("断开MQTT客户端[{}]失败: {}", clientId, e.getMessage(), e);
}
});
clients.clear();
handlers.clear();
}
/**
* 获取所有客户端ID
*/
public java.util.Set<String> getAllClientIds() {
return clients.keySet();
}
/**
* 检查客户端是否存在
*/
public boolean hasClient(String clientId) {
return clients.containsKey(clientId);
}
}

View File

@ -1,48 +1,43 @@
package com.ruoyi.device.domain.impl.djimqtt.service; package com.ruoyi.device.domain.impl.djimqtt.service;
import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttProperties; import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttClientConfig;
import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler; import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import jakarta.annotation.PreDestroy;
/** /**
* DJI MQTT客户端服务 * DJI MQTT客户端服务可实例化
* 采用共享订阅方式支持多实例部署 * 支持动态创建多个客户端
* *
* @author ruoyi * @author ruoyi
*/ */
@Slf4j @Slf4j
@Service
@RequiredArgsConstructor
public class DjiMqttClientService { public class DjiMqttClientService {
private final DjiMqttProperties mqttProperties; private final DjiMqttClientConfig config;
private final DjiMqttMessageHandler messageHandler; private final DjiMqttMessageHandler messageHandler;
private MqttClient mqttClient; private MqttClient mqttClient;
/** /**
* 无人机OSD主题共享订阅 * 无人机OSD主题
*/ */
private static final String DRONE_OSD_TOPIC = "$share/dji-group/thing/product/+/osd"; private static final String DRONE_OSD_TOPIC = "thing/product/+/osd";
/** /**
* 无人机State主题共享订阅 * 无人机State主题
*/ */
private static final String DRONE_STATE_TOPIC = "$share/dji-group/thing/product/+/state"; private static final String DRONE_STATE_TOPIC = "thing/product/+/state";
/** /**
* 应用启动后自动连接 * 构造函数
*
* @param config 客户端配置
* @param messageHandler 消息处理器
*/ */
@EventListener(ApplicationReadyEvent.class) public DjiMqttClientService(DjiMqttClientConfig config, DjiMqttMessageHandler messageHandler) {
public void onApplicationReady() { this.config = config;
connect(); this.messageHandler = messageHandler;
} }
/** /**
@ -51,36 +46,28 @@ public class DjiMqttClientService {
public void connect() { public void connect() {
try { try {
if (mqttClient != null && mqttClient.isConnected()) { if (mqttClient != null && mqttClient.isConnected()) {
log.info("MQTT客户端已连接,无需重复连接"); log.info("MQTT客户端[{}]已连接,无需重复连接", config.getClientId());
return; return;
} }
String broker = String.format("tcp://%s:%d", mqttProperties.getHost(), mqttProperties.getPort()); String broker = String.format("tcp://%s:%d", config.getHost(), config.getPort());
log.info("开始连接DJI MQTT服务器: {}", broker); log.info("开始连接DJI MQTT服务器[{}]: {}", config.getClientId(), broker);
// 创建MQTT客户端 mqttClient = new MqttClient(broker, config.getClientId(), new MemoryPersistence());
mqttClient = new MqttClient(broker, mqttProperties.getClientId(), new MemoryPersistence());
// 配置连接选项
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername()); options.setUserName(config.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray()); options.setPassword(config.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getConnectionTimeout()); options.setConnectionTimeout(config.getConnectionTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); options.setKeepAliveInterval(config.getKeepAliveInterval());
options.setAutomaticReconnect(mqttProperties.getAutoReconnect()); options.setAutomaticReconnect(config.getAutoReconnect());
options.setCleanSession(mqttProperties.getCleanSession()); options.setCleanSession(config.getCleanSession());
// 设置MQTT版本
if (mqttProperties.getVersion() == 5) {
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
log.info("使用MQTT协议版本: 3.1.1");
}
// 设置回调
mqttClient.setCallback(new MqttCallback() { mqttClient.setCallback(new MqttCallback() {
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
log.error("MQTT连接丢失: {}", cause.getMessage(), cause); log.error("MQTT客户端[{}]连接丢失: {}", config.getClientId(), cause.getMessage(), cause);
} }
@Override @Override
@ -89,25 +76,23 @@ public class DjiMqttClientService {
String payload = new String(message.getPayload()); String payload = new String(message.getPayload());
messageHandler.handleMessage(topic, payload); messageHandler.handleMessage(topic, payload);
} catch (Exception e) { } catch (Exception e) {
log.error("处理MQTT消息失败: {}", e.getMessage(), e); log.error("MQTT客户端[{}]处理消息失败: {}", config.getClientId(), e.getMessage(), e);
} }
} }
@Override @Override
public void deliveryComplete(IMqttDeliveryToken token) { public void deliveryComplete(IMqttDeliveryToken token) {
// 不需要处理发送完成事件 // 不需要处理
} }
}); });
// 连接
mqttClient.connect(options); mqttClient.connect(options);
log.info("成功连接到DJI MQTT服务器"); log.info("MQTT客户端[{}]成功连接到服务器", config.getClientId());
// 订阅主题
subscribe(); subscribe();
} catch (Exception e) { } catch (Exception e) {
log.error("连接DJI MQTT服务器失败: {}", e.getMessage(), e); log.error("MQTT客户端[{}]连接失败: {}", config.getClientId(), e.getMessage(), e);
} }
} }
@ -117,36 +102,41 @@ public class DjiMqttClientService {
private void subscribe() { private void subscribe() {
try { try {
if (mqttClient == null || !mqttClient.isConnected()) { if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("MQTT客户端未连接,无法订阅主题"); log.warn("MQTT客户端[{}]未连接,无法订阅主题", config.getClientId());
return; return;
} }
// 订阅无人机OSD主题共享订阅 String osdTopic = config.getUseSharedSubscription()
mqttClient.subscribe(DRONE_OSD_TOPIC, 1); ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_OSD_TOPIC)
log.info("成功订阅主题: {}", DRONE_OSD_TOPIC); : DRONE_OSD_TOPIC;
// 订阅无人机State主题共享订阅 String stateTopic = config.getUseSharedSubscription()
mqttClient.subscribe(DRONE_STATE_TOPIC, 1); ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_STATE_TOPIC)
log.info("成功订阅主题: {}", DRONE_STATE_TOPIC); : DRONE_STATE_TOPIC;
mqttClient.subscribe(osdTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), osdTopic);
mqttClient.subscribe(stateTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), stateTopic);
} catch (Exception e) { } catch (Exception e) {
log.error("订阅MQTT主题失败: {}", e.getMessage(), e); log.error("MQTT客户端[{}]订阅主题失败: {}", config.getClientId(), e.getMessage(), e);
} }
} }
/** /**
* 断开连接 * 断开连接
*/ */
@PreDestroy
public void disconnect() { public void disconnect() {
try { try {
if (mqttClient != null && mqttClient.isConnected()) { if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect(); mqttClient.disconnect();
mqttClient.close(); mqttClient.close();
log.info("已断开DJI MQTT连接"); log.info("MQTT客户端[{}]已断开连接", config.getClientId());
} }
} catch (Exception e) { } catch (Exception e) {
log.error("断开DJI MQTT连接失败: {}", e.getMessage(), e); log.error("MQTT客户端[{}]断开连接失败: {}", config.getClientId(), e.getMessage(), e);
} }
} }
@ -156,4 +146,11 @@ public class DjiMqttClientService {
public boolean isConnected() { public boolean isConnected() {
return mqttClient != null && mqttClient.isConnected(); return mqttClient != null && mqttClient.isConnected();
} }
/**
* 获取客户端ID
*/
public String getClientId() {
return config.getClientId();
}
} }