This commit is contained in:
孙小云 2026-01-28 15:03:05 +08:00
parent d4b3d86b3c
commit faa7ca1790
2 changed files with 102 additions and 81 deletions

View File

@ -111,6 +111,13 @@
<artifactId>spring-integration-mqtt</artifactId> <artifactId>spring-integration-mqtt</artifactId>
</dependency> </dependency>
<!-- Eclipse Paho MQTT v5 Client (支持MQTT 5.0) -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -3,17 +3,19 @@ package com.ruoyi.device.domain.impl.djimqtt.service;
import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttClientConfig; import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttClientConfig;
import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler; import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.eclipse.paho.mqttv5.client.MqttClient;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.springframework.integration.channel.DirectChannel; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.springframework.messaging.Message; import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
/** /**
* DJI MQTT客户端服务基于Spring Integration MQTT * DJI MQTT客户端服务基于Eclipse Paho MQTT v5
* 支持动态创建多个客户端自动重连和重新订阅 * 支持MQTT 5.0协议动态创建多个客户端
* *
* @author ruoyi * @author ruoyi
*/ */
@ -22,7 +24,7 @@ public class DjiMqttClientService {
private final DjiMqttClientConfig config; private final DjiMqttClientConfig config;
private final DjiMqttMessageHandler messageHandler; private final DjiMqttMessageHandler messageHandler;
private MqttPahoMessageDrivenChannelAdapter adapter; private MqttClient mqttClient;
/** /**
* 无人机OSD主题 * 无人机OSD主题
@ -50,7 +52,7 @@ public class DjiMqttClientService {
*/ */
public void connect() { public void connect() {
try { try {
if (adapter != null && adapter.isRunning()) { if (mqttClient != null && mqttClient.isConnected()) {
log.info("MQTT客户端[{}]已连接,无需重复连接", config.getClientId()); log.info("MQTT客户端[{}]已连接,无需重复连接", config.getClientId());
return; return;
} }
@ -58,91 +60,102 @@ public class DjiMqttClientService {
String broker = String.format("tcp://%s:%d", config.getHost(), config.getPort()); String broker = String.format("tcp://%s:%d", config.getHost(), config.getPort());
log.info("开始连接DJI MQTT服务器[{}]: {}", config.getClientId(), broker); log.info("开始连接DJI MQTT服务器[{}]: {}", config.getClientId(), broker);
// 创建MQTT客户端工厂 mqttClient = new MqttClient(broker, config.getClientId(), new MemoryPersistence());
MqttPahoClientFactory clientFactory = createClientFactory(broker);
// 构建订阅主题 MqttConnectionOptions options = new MqttConnectionOptions();
String[] topics = buildTopics(); options.setUserName(config.getUsername());
options.setPassword(config.getPassword().getBytes());
options.setConnectionTimeout(config.getConnectionTimeout());
options.setKeepAliveInterval(config.getKeepAliveInterval());
options.setAutomaticReconnect(config.getAutoReconnect());
options.setCleanStart(config.getCleanSession());
// 创建消息驱动适配器 mqttClient.setCallback(new MqttCallback() {
adapter = new MqttPahoMessageDrivenChannelAdapter( @Override
config.getClientId(), public void disconnected(MqttDisconnectResponse disconnectResponse) {
clientFactory, log.error("MQTT客户端[{}]连接丢失: {}", config.getClientId(),
topics disconnectResponse.getReasonString());
);
// 设置消息转换器 if (config.getAutoReconnect()) {
adapter.setConverter(new DefaultPahoMessageConverter()); log.info("MQTT客户端[{}]将自动重连...", config.getClientId());
}
}
// 设置QoS @Override
adapter.setQos(1); public void mqttErrorOccurred(MqttException exception) {
log.error("MQTT客户端[{}]发生错误: {}", config.getClientId(),
exception.getMessage(), exception);
}
// 创建消息通道并设置消息处理器 @Override
DirectChannel channel = new DirectChannel(); public void messageArrived(String topic, MqttMessage message) {
channel.subscribe(this::handleMessage); try {
adapter.setOutputChannel(channel); String payload = new String(message.getPayload());
messageHandler.handleMessage(topic, payload);
} catch (Exception e) {
log.error("MQTT客户端[{}]处理消息失败: {}", config.getClientId(),
e.getMessage(), e);
}
}
// 启动适配器 @Override
adapter.start(); public void deliveryComplete(IMqttToken token) {
// 不需要处理
}
log.info("MQTT客户端[{}]成功连接到服务器并订阅主题", config.getClientId()); @Override
public void connectComplete(boolean reconnect, String serverURI) {
if (reconnect) {
log.info("MQTT客户端[{}]重连成功: {}", config.getClientId(), serverURI);
// 重连后重新订阅
subscribe();
} else {
log.info("MQTT客户端[{}]首次连接成功: {}", config.getClientId(), serverURI);
}
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {
// 不需要处理
}
});
mqttClient.connect(options);
log.info("MQTT客户端[{}]成功连接到服务器", config.getClientId());
subscribe();
} catch (Exception e) { } catch (Exception e) {
log.error("MQTT客户端[{}]连接失败: {}", config.getClientId(), e.getMessage(), e); log.error("MQTT客户端[{}]连接失败: {}", config.getClientId(), e.getMessage(), e);
throw new RuntimeException("MQTT连接失败", e);
} }
} }
/** /**
* 创建MQTT客户端工厂 * 订阅主题
*/ */
private MqttPahoClientFactory createClientFactory(String broker) { private void subscribe() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{broker});
options.setUserName(config.getUsername());
options.setPassword(config.getPassword().toCharArray());
options.setConnectionTimeout(config.getConnectionTimeout());
options.setKeepAliveInterval(config.getKeepAliveInterval());
options.setAutomaticReconnect(config.getAutoReconnect());
options.setCleanSession(config.getCleanSession());
// 设置MQTT版本为5.0
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_5_0);
factory.setConnectionOptions(options);
return factory;
}
/**
* 构建订阅主题
*/
private String[] buildTopics() {
String osdTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_OSD_TOPIC)
: DRONE_OSD_TOPIC;
String stateTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_STATE_TOPIC)
: DRONE_STATE_TOPIC;
return new String[]{osdTopic, stateTopic};
}
/**
* 处理接收到的MQTT消息
*/
private void handleMessage(Message<?> message) {
try { try {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic"); if (mqttClient == null || !mqttClient.isConnected()) {
String payload = (String) message.getPayload(); log.warn("MQTT客户端[{}]未连接,无法订阅主题", config.getClientId());
return;
if (topic != null) {
messageHandler.handleMessage(topic, payload);
} }
String osdTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_OSD_TOPIC)
: DRONE_OSD_TOPIC;
String stateTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_STATE_TOPIC)
: DRONE_STATE_TOPIC;
mqttClient.subscribe(osdTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), osdTopic);
mqttClient.subscribe(stateTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), stateTopic);
} catch (Exception e) { } catch (Exception e) {
log.error("MQTT客户端[{}]处理消息失败: {}", config.getClientId(), e.getMessage(), e); log.error("MQTT客户端[{}]订阅主题失败: {}", config.getClientId(), e.getMessage(), e);
} }
} }
@ -151,8 +164,9 @@ public class DjiMqttClientService {
*/ */
public void disconnect() { public void disconnect() {
try { try {
if (adapter != null && adapter.isRunning()) { if (mqttClient != null && mqttClient.isConnected()) {
adapter.stop(); mqttClient.disconnect();
mqttClient.close();
log.info("MQTT客户端[{}]已断开连接", config.getClientId()); log.info("MQTT客户端[{}]已断开连接", config.getClientId());
} }
} catch (Exception e) { } catch (Exception e) {
@ -164,7 +178,7 @@ public class DjiMqttClientService {
* 检查连接状态 * 检查连接状态
*/ */
public boolean isConnected() { public boolean isConnected() {
return adapter != null && adapter.isRunning(); return mqttClient != null && mqttClient.isConnected();
} }
/** /**