diff --git a/pom.xml b/pom.xml index b6a0a18..b59866c 100644 --- a/pom.xml +++ b/pom.xml @@ -105,11 +105,10 @@ provided - + - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.2.5 + org.springframework.integration + spring-integration-mqtt diff --git a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java index b98cb34..d08d7c4 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java +++ b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java @@ -3,12 +3,17 @@ package com.ruoyi.device.domain.impl.djimqtt.service; import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttClientConfig; import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.messaging.Message; /** - * DJI MQTT客户端服务(可实例化) - * 支持动态创建多个客户端 + * DJI MQTT客户端服务(基于Spring Integration MQTT) + * 支持动态创建多个客户端,自动重连和重新订阅 * * @author ruoyi */ @@ -17,7 +22,7 @@ public class DjiMqttClientService { private final DjiMqttClientConfig config; private final DjiMqttMessageHandler messageHandler; - private MqttClient mqttClient; + private MqttPahoMessageDrivenChannelAdapter adapter; /** * 无人机OSD主题 @@ -45,7 +50,7 @@ public class DjiMqttClientService { */ public void connect() { try { - if (mqttClient != null && mqttClient.isConnected()) { + if (adapter != null && adapter.isRunning()) { log.info("MQTT客户端[{}]已连接,无需重复连接", config.getClientId()); return; } @@ -53,75 +58,88 @@ public class DjiMqttClientService { String broker = String.format("tcp://%s:%d", config.getHost(), config.getPort()); log.info("开始连接DJI MQTT服务器[{}]: {}", config.getClientId(), broker); - mqttClient = new MqttClient(broker, config.getClientId(), new MemoryPersistence()); + // 创建MQTT客户端工厂 + MqttPahoClientFactory clientFactory = createClientFactory(broker); - MqttConnectOptions options = new MqttConnectOptions(); - options.setUserName(config.getUsername()); - options.setPassword(config.getPassword().toCharArray()); - options.setConnectionTimeout(config.getConnectionTimeout()); - options.setKeepAliveInterval(config.getKeepAliveInterval()); - options.setAutomaticReconnect(config.getAutoReconnect()); - options.setCleanSession(config.getCleanSession()); - options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); + // 构建订阅主题 + String[] topics = buildTopics(); - mqttClient.setCallback(new MqttCallback() { - @Override - public void connectionLost(Throwable cause) { - log.error("MQTT客户端[{}]连接丢失: {}", config.getClientId(), cause.getMessage(), cause); - } + // 创建消息驱动适配器 + adapter = new MqttPahoMessageDrivenChannelAdapter( + config.getClientId(), + clientFactory, + topics + ); - @Override - public void messageArrived(String topic, MqttMessage message) { - try { - String payload = new String(message.getPayload()); - messageHandler.handleMessage(topic, payload); - } catch (Exception e) { - log.error("MQTT客户端[{}]处理消息失败: {}", config.getClientId(), e.getMessage(), e); - } - } + // 设置消息转换器 + adapter.setConverter(new DefaultPahoMessageConverter()); - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - // 不需要处理 - } - }); + // 设置QoS + adapter.setQos(1); - mqttClient.connect(options); - log.info("MQTT客户端[{}]成功连接到服务器", config.getClientId()); + // 创建消息通道并设置消息处理器 + DirectChannel channel = new DirectChannel(); + channel.subscribe(this::handleMessage); + adapter.setOutputChannel(channel); - subscribe(); + // 启动适配器 + adapter.start(); + + log.info("MQTT客户端[{}]成功连接到服务器并订阅主题", config.getClientId()); } catch (Exception e) { log.error("MQTT客户端[{}]连接失败: {}", config.getClientId(), e.getMessage(), e); + throw new RuntimeException("MQTT连接失败", e); } } /** - * 订阅主题 + * 创建MQTT客户端工厂 */ - private void subscribe() { + private MqttPahoClientFactory createClientFactory(String broker) { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + + MqttConnectOptions options = new MqttConnectOptions(); + options.setServerURIs(new String[]{broker}); + options.setUserName(config.getUsername()); + options.setPassword(config.getPassword().toCharArray()); + options.setConnectionTimeout(config.getConnectionTimeout()); + options.setKeepAliveInterval(config.getKeepAliveInterval()); + options.setAutomaticReconnect(config.getAutoReconnect()); + options.setCleanSession(config.getCleanSession()); + + factory.setConnectionOptions(options); + return factory; + } + + /** + * 构建订阅主题 + */ + private String[] buildTopics() { + String osdTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_OSD_TOPIC) + : DRONE_OSD_TOPIC; + + String stateTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_STATE_TOPIC) + : DRONE_STATE_TOPIC; + + return new String[]{osdTopic, stateTopic}; + } + + /** + * 处理接收到的MQTT消息 + */ + private void handleMessage(Message message) { try { - if (mqttClient == null || !mqttClient.isConnected()) { - log.warn("MQTT客户端[{}]未连接,无法订阅主题", config.getClientId()); - return; + String topic = (String) message.getHeaders().get("mqtt_receivedTopic"); + String payload = (String) message.getPayload(); + + if (topic != null && payload != null) { + messageHandler.handleMessage(topic, payload); } - - String osdTopic = config.getUseSharedSubscription() - ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_OSD_TOPIC) - : DRONE_OSD_TOPIC; - - String stateTopic = config.getUseSharedSubscription() - ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_STATE_TOPIC) - : DRONE_STATE_TOPIC; - - mqttClient.subscribe(osdTopic, 1); - log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), osdTopic); - - mqttClient.subscribe(stateTopic, 1); - log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), stateTopic); - } catch (Exception e) { - log.error("MQTT客户端[{}]订阅主题失败: {}", config.getClientId(), e.getMessage(), e); + log.error("MQTT客户端[{}]处理消息失败: ", config.getClientId(), e.getMessage(), e); } } @@ -130,9 +148,8 @@ public class DjiMqttClientService { */ public void disconnect() { try { - if (mqttClient != null && mqttClient.isConnected()) { - mqttClient.disconnect(); - mqttClient.close(); + if (adapter != null && adapter.isRunning()) { + adapter.stop(); log.info("MQTT客户端[{}]已断开连接", config.getClientId()); } } catch (Exception e) { @@ -144,7 +161,7 @@ public class DjiMqttClientService { * 检查连接状态 */ public boolean isConnected() { - return mqttClient != null && mqttClient.isConnected(); + return adapter != null && adapter.isRunning(); } /**