From bbda3b541d59c88ac6d7970d1aaaedb5791e8471 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=B0=8F=E4=BA=91?= Date: Wed, 28 Jan 2026 14:05:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E5=85=A5=E5=A4=A7=E7=96=86MQTT?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/domain/impl/djimqtt/README.md | 120 ---------- .../djimqtt/config/DjiMqttClientConfig.java | 76 ++++++ .../djimqtt/example/DjiMqttUsageExample.java | 221 ++++++++++-------- .../djimqtt/manager/DjiMqttClientManager.java | 125 ++++++++++ .../djimqtt/service/DjiMqttClientService.java | 111 +++++---- 5 files changed, 384 insertions(+), 269 deletions(-) delete mode 100644 src/main/java/com/ruoyi/device/domain/impl/djimqtt/README.md create mode 100644 src/main/java/com/ruoyi/device/domain/impl/djimqtt/config/DjiMqttClientConfig.java create mode 100644 src/main/java/com/ruoyi/device/domain/impl/djimqtt/manager/DjiMqttClientManager.java diff --git a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/README.md b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/README.md deleted file mode 100644 index bd935c8..0000000 --- a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/README.md +++ /dev/null @@ -1,120 +0,0 @@ -# DJI MQTT 模块使用说明 - -## 概述 - -本模块实现了大疆MQTT消息的接收和处理功能,采用共享订阅方式,支持多实例部署。 - -## 功能特性 - -- ✅ 自动连接和重连 -- ✅ 共享订阅(多实例部署时只有一个实例消费消息) -- ✅ 自动区分无人机和机场数据 -- ✅ 回调接口设计,使用者无需关心osd和state的区别 -- ✅ 完整的数据模型定义 -- ✅ 原始数据保留(rawData字段) - -## 架构设计 - -``` -DjiMqttClientService (MQTT客户端) - ↓ -DjiMqttMessageHandler (消息处理器) - ↓ -IDroneDataCallback / IDockDataCallback (回调接口) - ↓ -使用者实现 -``` - -## 配置说明 - -在 `bootstrap.yml` 中配置: - -```yaml -dji: - mqtt: - host: mqtt.t-aaron.com - port: 10883 - version: 5 - client-id: ThingsBoard_gateway - username: admin - password: admin - connection-timeout: 30 - keep-alive-interval: 60 - auto-reconnect: true - clean-session: false -``` - -## 使用方法 - -### 1. 注入消息处理器 - -```java -@Autowired -private DjiMqttMessageHandler messageHandler; -``` - -### 2. 实现回调接口 - -```java -// 无人机数据回调 -messageHandler.registerDroneDataCallback(new IDroneDataCallback() { - @Override - public void onDroneData(DroneData droneData) { - // 处理无人机数据 - String sn = droneData.getDeviceSn(); - Double latitude = droneData.getLatitude(); - Double longitude = droneData.getLongitude(); - - // 访问原始数据 - Map rawData = droneData.getRawData(); - } -}); - -// 机场数据回调 -messageHandler.registerDockDataCallback(new IDockDataCallback() { - @Override - public void onDockData(DockData dockData) { - // 处理机场数据 - String sn = dockData.getDeviceSn(); - Integer modeCode = dockData.getModeCode(); - Float temperature = dockData.getTemperature(); - - // 访问原始数据 - Map rawData = dockData.getRawData(); - } -}); -``` - -## 数据模型 - -### DroneData(无人机数据) - -主要字段: -- `deviceSn`: 设备SN -- `messageType`: 消息类型(osd/state) -- `latitude/longitude`: 位置信息 -- `elevation/height`: 高度信息 -- `modeCode`: 飞行器状态 -- `rawData`: 原始数据(包含所有字段) - -### DockData(机场数据) - -主要字段: -- `deviceSn`: 设备SN -- `messageType`: 消息类型(osd/state) -- `latitude/longitude`: 位置信息 -- `modeCode`: 机场状态 -- `temperature/humidity`: 环境信息 -- `coverState`: 舱盖状态 -- `rawData`: 原始数据(包含所有字段) - -## 注意事项 - -1. **部分字段推送**:每次MQTT消息可能只包含部分字段,使用时需要判空 -2. **原始数据访问**:所有字段都保存在`rawData`中,可以通过Map访问 -3. **共享订阅**:多实例部署时,同一条消息只会被一个实例消费 -4. **自动重连**:连接断开后会自动重连 - -## 示例代码 - -参考 `DjiMqttUsageExample.java` 获取完整示例。 diff --git a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/config/DjiMqttClientConfig.java b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/config/DjiMqttClientConfig.java new file mode 100644 index 0000000..94b395f --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/config/DjiMqttClientConfig.java @@ -0,0 +1,76 @@ +package com.ruoyi.device.domain.impl.djimqtt.config; + +import lombok.Builder; +import lombok.Data; + +/** + * DJI MQTT客户端配置 + * 用于动态创建MQTT客户端 + * + * @author ruoyi + */ +@Data +@Builder +public class DjiMqttClientConfig { + + /** + * MQTT服务器地址 + */ + private String host; + + /** + * MQTT服务器端口 + */ + private Integer port; + + /** + * 客户端ID(必须唯一) + */ + private String clientId; + + /** + * 用户名 + */ + private String username; + + /** + * 密码 + */ + private String password; + + /** + * 连接超时时间(秒) + */ + @Builder.Default + private Integer connectionTimeout = 30; + + /** + * 保持连接时间(秒) + */ + @Builder.Default + private Integer keepAliveInterval = 60; + + /** + * 自动重连 + */ + @Builder.Default + private Boolean autoReconnect = true; + + /** + * 清除会话 + */ + @Builder.Default + private Boolean cleanSession = false; + + /** + * 是否使用共享订阅 + */ + @Builder.Default + private Boolean useSharedSubscription = true; + + /** + * 共享订阅组名 + */ + @Builder.Default + private String sharedGroupName = "dji-group"; +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/example/DjiMqttUsageExample.java b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/example/DjiMqttUsageExample.java index 9838ea6..9b34bde 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/example/DjiMqttUsageExample.java +++ b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/example/DjiMqttUsageExample.java @@ -2,7 +2,9 @@ package com.ruoyi.device.domain.impl.djimqtt.example; import com.ruoyi.device.domain.impl.djimqtt.callback.IDockDataCallback; import com.ruoyi.device.domain.impl.djimqtt.callback.IDroneDataCallback; +import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttClientConfig; import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler; +import com.ruoyi.device.domain.impl.djimqtt.manager.DjiMqttClientManager; import com.ruoyi.device.domain.impl.djimqtt.model.DockData; import com.ruoyi.device.domain.impl.djimqtt.model.DroneData; import lombok.extern.slf4j.Slf4j; @@ -12,13 +14,14 @@ import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; /** - * DJI MQTT使用示例 + * DJI MQTT使用示例(支持多客户端) * * 使用说明: - * 1. 注入 DjiMqttMessageHandler - * 2. 实现 IDroneDataCallback 或 IDockDataCallback 接口 - * 3. 在应用启动后注册回调 - * 4. 在回调方法中处理接收到的数据 + * 1. 注入 DjiMqttClientManager + * 2. 使用 DjiMqttClientConfig.builder() 创建配置 + * 3. 调用 manager.createClient(config) 创建客户端 + * 4. 通过 manager.getHandler(clientId) 获取消息处理器 + * 5. 注册回调处理数据 * * @author ruoyi */ @@ -26,91 +29,125 @@ import org.springframework.stereotype.Component; @Component public class DjiMqttUsageExample { - @Autowired - private DjiMqttMessageHandler messageHandler; - - /** - * 应用启动后注册回调 - */ - @EventListener(ApplicationReadyEvent.class) - public void registerCallbacks() { - // 注册无人机数据回调 - messageHandler.registerDroneDataCallback(new IDroneDataCallback() { - @Override - public void onDroneData(DroneData droneData) { - handleDroneData(droneData); - } - }); - - // 注册机场数据回调 - messageHandler.registerDockDataCallback(new IDockDataCallback() { - @Override - public void onDockData(DockData dockData) { - handleDockData(dockData); - } - }); - - log.info("DJI MQTT回调已注册"); - } - - /** - * 处理无人机数据 - */ - private void handleDroneData(DroneData droneData) { - log.info("收到无人机数据 - SN: {}, Type: {}", - droneData.getDeviceSn(), - droneData.getMessageType()); - - // 示例:处理位置信息 - if (droneData.getLatitude() != null && droneData.getLongitude() != null) { - log.info("无人机位置 - 纬度: {}, 经度: {}, 高度: {}", - droneData.getLatitude(), - droneData.getLongitude(), - droneData.getElevation()); - } - - // 示例:处理电池信息(从rawData中获取) - if (droneData.getRawData() != null && droneData.getRawData().containsKey("battery")) { - Object battery = droneData.getRawData().get("battery"); - log.info("无人机电池信息: {}", battery); - } - - // 示例:处理相机信息 - if (droneData.getRawData() != null && droneData.getRawData().containsKey("cameras")) { - Object cameras = droneData.getRawData().get("cameras"); - log.info("无人机相机信息: {}", cameras); - } - } - - /** - * 处理机场数据 - */ - private void handleDockData(DockData dockData) { - log.info("收到机场数据 - SN: {}, Type: {}", - dockData.getDeviceSn(), - dockData.getMessageType()); - - // 示例:处理机场状态 - if (dockData.getModeCode() != null) { - log.info("机场状态: {}", dockData.getModeCode()); - } - - // 示例:处理环境信息 - if (dockData.getTemperature() != null) { - log.info("机场温度: {}°C, 湿度: {}%", - dockData.getTemperature(), - dockData.getHumidity()); - } - - // 示例:处理舱盖状态 - if (dockData.getCoverState() != null) { - log.info("舱盖状态: {}", dockData.getCoverState()); - } - - // 示例:处理飞行器充电状态(从rawData中获取) - if (dockData.getRawData() != null && dockData.getRawData().containsKey("drone_charge_state")) { - Object chargeState = dockData.getRawData().get("drone_charge_state"); - log.info("飞行器充电状态: {}", chargeState); - } - } +// @Autowired +// private DjiMqttClientManager clientManager; +// +// /** +// * 应用启动后创建MQTT客户端 +// */ +// @EventListener(ApplicationReadyEvent.class) +// public void onApplicationReady() { +// // 示例1:创建第一个MQTT客户端 +// createClient1(); +// +// // 示例2:创建第二个MQTT客户端(不同的服务器) +// createClient2(); +// } +// +// /** +// * 创建第一个MQTT客户端 +// */ +// private void createClient1() { +// // 构建配置 +// DjiMqttClientConfig config = DjiMqttClientConfig.builder() +// .host("mqtt.t-aaron.com") +// .port(10883) +// .clientId("client_1") +// .username("admin") +// .password("admin") +// .useSharedSubscription(true) +// .sharedGroupName("dji-group-1") +// .build(); +// +// // 创建客户端 +// String clientId = clientManager.createClient(config); +// +// // 获取消息处理器 +// DjiMqttMessageHandler handler = clientManager.getHandler(clientId); +// +// // 注册无人机数据回调 +// handler.registerDroneDataCallback(new IDroneDataCallback() { +// @Override +// public void onDroneData(DroneData droneData) { +// handleDroneDataForClient1(droneData); +// } +// }); +// +// // 注册机场数据回调 +// handler.registerDockDataCallback(new IDockDataCallback() { +// @Override +// public void onDockData(DockData dockData) { +// handleDockDataForClient1(dockData); +// } +// }); +// +// log.info("客户端1已创建并注册回调"); +// } +// +// /** +// * 创建第二个MQTT客户端(连接到不同的服务器) +// */ +// private void createClient2() { +// // 构建配置 +// DjiMqttClientConfig config = DjiMqttClientConfig.builder() +// .host("mqtt.another-server.com") +// .port(1883) +// .clientId("client_2") +// .username("user2") +// .password("pass2") +// .useSharedSubscription(false) // 不使用共享订阅 +// .build(); +// +// // 创建客户端 +// String clientId = clientManager.createClient(config); +// +// // 获取消息处理器 +// DjiMqttMessageHandler handler = clientManager.getHandler(clientId); +// +// // 注册回调 +// handler.registerDroneDataCallback(droneData -> handleDroneDataForClient2(droneData)); +// handler.registerDockDataCallback(dockData -> handleDockDataForClient2(dockData)); +// +// log.info("客户端2已创建并注册回调"); +// } +// +// /** +// * 处理客户端1的无人机数据 +// */ +// private void handleDroneDataForClient1(DroneData droneData) { +// log.info("[客户端1] 收到无人机数据 - SN: {}, Type: {}", +// droneData.getDeviceSn(), +// droneData.getMessageType()); +// +// // 处理位置信息 +// if (droneData.getLatitude() != null && droneData.getLongitude() != null) { +// log.info("[客户端1] 无人机位置 - 纬度: {}, 经度: {}, 高度: {}", +// droneData.getLatitude(), +// droneData.getLongitude(), +// droneData.getElevation()); +// } +// } +// +// /** +// * 处理客户端1的机场数据 +// */ +// private void handleDockDataForClient1(DockData dockData) { +// log.info("[客户端1] 收到机场数据 - SN: {}, Type: {}", +// dockData.getDeviceSn(), +// dockData.getMessageType()); +// } +// +// /** +// * 处理客户端2的无人机数据 +// */ +// private void handleDroneDataForClient2(DroneData droneData) { +// log.info("[客户端2] 收到无人机数据 - SN: {}", droneData.getDeviceSn()); +// } +// +// /** +// * 处理客户端2的机场数据 +// */ +// private void handleDockDataForClient2(DockData dockData) { +// log.info("[客户端2] 收到机场数据 - SN: {}", dockData.getDeviceSn()); +// } } diff --git a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/manager/DjiMqttClientManager.java b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/manager/DjiMqttClientManager.java new file mode 100644 index 0000000..200e633 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/manager/DjiMqttClientManager.java @@ -0,0 +1,125 @@ +package com.ruoyi.device.domain.impl.djimqtt.manager; + +import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttClientConfig; +import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler; +import com.ruoyi.device.domain.impl.djimqtt.service.DjiMqttClientService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * DJI MQTT客户端管理器 + * 管理多个MQTT客户端实例 + * + * @author ruoyi + */ +@Slf4j +@Component +public class DjiMqttClientManager { + + /** + * 客户端映射表 clientId -> DjiMqttClientService + */ + private final Map clients = new ConcurrentHashMap<>(); + + /** + * 消息处理器映射表 clientId -> DjiMqttMessageHandler + */ + private final Map handlers = new ConcurrentHashMap<>(); + + /** + * 创建并连接MQTT客户端 + * + * @param config 客户端配置 + * @return 客户端ID + */ + public String createClient(DjiMqttClientConfig config) { + String clientId = config.getClientId(); + + if (clients.containsKey(clientId)) { + log.warn("MQTT客户端[{}]已存在", clientId); + return clientId; + } + + // 为每个客户端创建独立的消息处理器 + DjiMqttMessageHandler handler = new DjiMqttMessageHandler(); + handlers.put(clientId, handler); + + // 创建客户端 + DjiMqttClientService client = new DjiMqttClientService(config, handler); + clients.put(clientId, client); + + // 连接 + client.connect(); + + log.info("成功创建MQTT客户端[{}]", clientId); + return clientId; + } + + /** + * 获取消息处理器 + * + * @param clientId 客户端ID + * @return 消息处理器 + */ + public DjiMqttMessageHandler getHandler(String clientId) { + return handlers.get(clientId); + } + + /** + * 获取客户端 + * + * @param clientId 客户端ID + * @return 客户端服务 + */ + public DjiMqttClientService getClient(String clientId) { + return clients.get(clientId); + } + + /** + * 断开并移除客户端 + * + * @param clientId 客户端ID + */ + public void removeClient(String clientId) { + DjiMqttClientService client = clients.remove(clientId); + if (client != null) { + client.disconnect(); + log.info("已移除MQTT客户端[{}]", clientId); + } + + handlers.remove(clientId); + } + + /** + * 断开所有客户端 + */ + public void disconnectAll() { + clients.forEach((clientId, client) -> { + try { + client.disconnect(); + log.info("已断开MQTT客户端[{}]", clientId); + } catch (Exception e) { + log.error("断开MQTT客户端[{}]失败: {}", clientId, e.getMessage(), e); + } + }); + clients.clear(); + handlers.clear(); + } + + /** + * 获取所有客户端ID + */ + public java.util.Set getAllClientIds() { + return clients.keySet(); + } + + /** + * 检查客户端是否存在 + */ + public boolean hasClient(String clientId) { + return clients.containsKey(clientId); + } +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java index 8ac6054..b98cb34 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java +++ b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java @@ -1,48 +1,43 @@ package com.ruoyi.device.domain.impl.djimqtt.service; -import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttProperties; +import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttClientConfig; import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Service; -import jakarta.annotation.PreDestroy; /** - * DJI MQTT客户端服务 - * 采用共享订阅方式,支持多实例部署 + * DJI MQTT客户端服务(可实例化) + * 支持动态创建多个客户端 * * @author ruoyi */ @Slf4j -@Service -@RequiredArgsConstructor public class DjiMqttClientService { - private final DjiMqttProperties mqttProperties; + private final DjiMqttClientConfig config; private final DjiMqttMessageHandler messageHandler; - private MqttClient mqttClient; /** - * 无人机OSD主题(共享订阅) + * 无人机OSD主题 */ - private static final String DRONE_OSD_TOPIC = "$share/dji-group/thing/product/+/osd"; + private static final String DRONE_OSD_TOPIC = "thing/product/+/osd"; /** - * 无人机State主题(共享订阅) + * 无人机State主题 */ - private static final String DRONE_STATE_TOPIC = "$share/dji-group/thing/product/+/state"; + private static final String DRONE_STATE_TOPIC = "thing/product/+/state"; /** - * 应用启动后自动连接 + * 构造函数 + * + * @param config 客户端配置 + * @param messageHandler 消息处理器 */ - @EventListener(ApplicationReadyEvent.class) - public void onApplicationReady() { - connect(); + public DjiMqttClientService(DjiMqttClientConfig config, DjiMqttMessageHandler messageHandler) { + this.config = config; + this.messageHandler = messageHandler; } /** @@ -51,36 +46,28 @@ public class DjiMqttClientService { public void connect() { try { if (mqttClient != null && mqttClient.isConnected()) { - log.info("MQTT客户端已连接,无需重复连接"); + log.info("MQTT客户端[{}]已连接,无需重复连接", config.getClientId()); return; } - String broker = String.format("tcp://%s:%d", mqttProperties.getHost(), mqttProperties.getPort()); - log.info("开始连接DJI MQTT服务器: {}", broker); + String broker = String.format("tcp://%s:%d", config.getHost(), config.getPort()); + log.info("开始连接DJI MQTT服务器[{}]: {}", config.getClientId(), broker); - // 创建MQTT客户端 - mqttClient = new MqttClient(broker, mqttProperties.getClientId(), new MemoryPersistence()); + mqttClient = new MqttClient(broker, config.getClientId(), new MemoryPersistence()); - // 配置连接选项 MqttConnectOptions options = new MqttConnectOptions(); - options.setUserName(mqttProperties.getUsername()); - options.setPassword(mqttProperties.getPassword().toCharArray()); - options.setConnectionTimeout(mqttProperties.getConnectionTimeout()); - options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); - options.setAutomaticReconnect(mqttProperties.getAutoReconnect()); - options.setCleanSession(mqttProperties.getCleanSession()); + options.setUserName(config.getUsername()); + options.setPassword(config.getPassword().toCharArray()); + options.setConnectionTimeout(config.getConnectionTimeout()); + options.setKeepAliveInterval(config.getKeepAliveInterval()); + options.setAutomaticReconnect(config.getAutoReconnect()); + options.setCleanSession(config.getCleanSession()); + options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); - // 设置MQTT版本 - if (mqttProperties.getVersion() == 5) { - options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); - log.info("使用MQTT协议版本: 3.1.1"); - } - - // 设置回调 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { - log.error("MQTT连接丢失: {}", cause.getMessage(), cause); + log.error("MQTT客户端[{}]连接丢失: {}", config.getClientId(), cause.getMessage(), cause); } @Override @@ -89,25 +76,23 @@ public class DjiMqttClientService { String payload = new String(message.getPayload()); messageHandler.handleMessage(topic, payload); } catch (Exception e) { - log.error("处理MQTT消息失败: {}", e.getMessage(), e); + log.error("MQTT客户端[{}]处理消息失败: {}", config.getClientId(), e.getMessage(), e); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { - // 不需要处理发送完成事件 + // 不需要处理 } }); - // 连接 mqttClient.connect(options); - log.info("成功连接到DJI MQTT服务器"); + log.info("MQTT客户端[{}]成功连接到服务器", config.getClientId()); - // 订阅主题 subscribe(); } catch (Exception e) { - log.error("连接DJI MQTT服务器失败: {}", e.getMessage(), e); + log.error("MQTT客户端[{}]连接失败: {}", config.getClientId(), e.getMessage(), e); } } @@ -117,36 +102,41 @@ public class DjiMqttClientService { private void subscribe() { try { if (mqttClient == null || !mqttClient.isConnected()) { - log.warn("MQTT客户端未连接,无法订阅主题"); + log.warn("MQTT客户端[{}]未连接,无法订阅主题", config.getClientId()); return; } - // 订阅无人机OSD主题(共享订阅) - mqttClient.subscribe(DRONE_OSD_TOPIC, 1); - log.info("成功订阅主题: {}", DRONE_OSD_TOPIC); + String osdTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_OSD_TOPIC) + : DRONE_OSD_TOPIC; - // 订阅无人机State主题(共享订阅) - mqttClient.subscribe(DRONE_STATE_TOPIC, 1); - log.info("成功订阅主题: {}", DRONE_STATE_TOPIC); + String stateTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_STATE_TOPIC) + : DRONE_STATE_TOPIC; + + mqttClient.subscribe(osdTopic, 1); + log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), osdTopic); + + mqttClient.subscribe(stateTopic, 1); + log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), stateTopic); } catch (Exception e) { - log.error("订阅MQTT主题失败: {}", e.getMessage(), e); + log.error("MQTT客户端[{}]订阅主题失败: {}", config.getClientId(), e.getMessage(), e); } } /** * 断开连接 */ - @PreDestroy public void disconnect() { try { if (mqttClient != null && mqttClient.isConnected()) { mqttClient.disconnect(); mqttClient.close(); - log.info("已断开DJI MQTT连接"); + log.info("MQTT客户端[{}]已断开连接", config.getClientId()); } } catch (Exception e) { - log.error("断开DJI MQTT连接失败: {}", e.getMessage(), e); + log.error("MQTT客户端[{}]断开连接失败: {}", config.getClientId(), e.getMessage(), e); } } @@ -156,4 +146,11 @@ public class DjiMqttClientService { public boolean isConnected() { return mqttClient != null && mqttClient.isConnected(); } + + /** + * 获取客户端ID + */ + public String getClientId() { + return config.getClientId(); + } }