添加拓恒接入

This commit is contained in:
孙小云 2026-02-10 10:51:42 +08:00
parent ae6b8bcc93
commit b97ba6c332
20 changed files with 1336 additions and 0 deletions

View File

@ -0,0 +1,102 @@
# TuohengMqtt 模块说明
## 概述
本模块提供对大疆设备MQTT消息的订阅和处理功能支持监听机场和无人机的各类Topic消息。
## 包结构
```
com.ruoyi.device.domain.impl.tuohengmqtt
├── callback/ # 数据回调接口
│ ├── IAirportOsdCallback.java # 机场OSD数据回调
│ ├── IDroneOsdCallback.java # 无人机OSD数据回调
│ ├── IAirportStateCallback.java # 机场State数据回调
│ ├── IEventsCallback.java # Events数据回调
│ ├── IServicesReplyCallback.java # 服务回复回调
│ └── IRequestsCallback.java # 设备请求回调
├── config/ # 配置类
│ └── TuohengMqttClientConfig.java
├── handler/ # 消息处理器
│ └── TuohengMqttMessageHandler.java
├── manager/ # 客户端管理器
│ └── TuohengMqttClientManager.java
├── model/ # 数据模型
│ ├── TuohengMqttMessage.java # MQTT消息基础结构
│ ├── AirportOsdData.java # 机场OSD数据
│ ├── DroneOsdData.java # 无人机OSD数据
│ ├── AirportStateData.java # 机场State数据
│ └── EventsData.java # Events数据
├── service/ # MQTT客户端服务
│ └── TuohengMqttClientService.java
└── example/ # 使用示例
└── TuohengMqttUsageExample.java
```
## 支持的Topic
| Topic | 说明 | 示例 |
|-------|------|------|
| `thing/product/{AirportSn}/osd` | 机场OSD数据 | thing/product/7CTDM3D00BVY4C/osd |
| `thing/product/{DroneSn}/osd` | 无人机OSD数据 | thing/product/1581F6Q8D243100C605L/osd |
| `thing/product/{AirportSn}/state` | 机场State数据 | thing/product/7CTDM3D00BVY4C/state |
| `thing/product/{AirportSn}/events` | 机场Events数据 | thing/product/7CTDM3D00BVY4C/events |
| `thing/product/{AirportSn}/services_reply` | 服务回复 | thing/product/7CTDM3D00BVY4C/services_reply |
| `sys/product/{AirportSn}/status` | 设备状态 | sys/product/7CTDM3D00BVY4C/status |
| `thing/product/{AirportSn}/requests` | 设备请求 | thing/product/7CTDM3D00BVY4C/requests |
## 使用方法
### 1. 创建MQTT客户端配置
```java
TuohengMqttClientConfig config = TuohengMqttClientConfig.builder()
.host("your-mqtt-host")
.port(1883)
.clientId("tuoheng-client-1")
.username("your-username")
.password("your-password")
.build();
```
### 2. 创建客户端
```java
@Autowired
private TuohengMqttClientManager manager;
String clientId = manager.createClient(config);
```
### 3. 注册回调
```java
var handler = manager.getHandler(clientId);
// 设置机场与无人机的SN映射
Map<String, String> mapping = new HashMap<>();
mapping.put("7CTDM3D00BVY4C", "1581F6Q8D243100C605L");
handler.setAirportDroneMapping(mapping);
// 注册各种回调
handler.registerAirportOsdCallback(new IAirportOsdCallback() {
@Override
public void onAirportOsdData(String airportSn, AirportOsdData data) {
// 处理机场OSD数据
}
});
```
### 4. 发送消息
```java
var client = manager.getClient(clientId);
client.publish("thing/product/7CTDM3D00BVY4C/services", message);
```
## 注意事项
1. 机场SN以 `7C` 开头无人机SN以 `158` 开头
2. 需要正确配置机场与无人机的SN映射关系
3. 使用MQTT 5.0协议 (Eclipse Paho MQTT v5)
4. 支持共享订阅,可通过配置开启

View File

@ -0,0 +1,13 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData;
/**
* 机场OSD数据回调接口
*
* @author ruoyi
*/
public interface IAirportOsdCallback {
void onAirportOsdData(String airportSn, AirportOsdData data);
}

View File

@ -0,0 +1,13 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportStateData;
/**
* 机场State数据回调接口
*
* @author ruoyi
*/
public interface IAirportStateCallback {
void onAirportStateData(String airportSn, String droneSn, AirportStateData data);
}

View File

@ -0,0 +1,13 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneOsdData;
/**
* 无人机OSD数据回调接口
*
* @author ruoyi
*/
public interface IDroneOsdCallback {
void onDroneOsdData(String droneSn, String airportSn, DroneOsdData data);
}

View File

@ -0,0 +1,13 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData;
/**
* 机场Events数据回调接口
*
* @author ruoyi
*/
public interface IEventsCallback {
void onEventsData(String airportSn, EventsData data);
}

View File

@ -0,0 +1,13 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import java.util.Map;
/**
* 设备请求数据回调接口
*
* @author ruoyi
*/
public interface IRequestsCallback {
void onRequestsData(String airportSn, String method, Map<String, Object> data);
}

View File

@ -0,0 +1,13 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import java.util.Map;
/**
* 服务回复数据回调接口
*
* @author ruoyi
*/
public interface IServicesReplyCallback {
void onServicesReplyData(String airportSn, Map<String, Object> data);
}

View File

@ -0,0 +1,44 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.config;
import lombok.Builder;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* Tuoheng MQTT客户端配置
*
* @author ruoyi
*/
@Data
@Builder
public class TuohengMqttClientConfig {
private String host;
private Integer port;
private String clientId;
private String username;
private String password;
@Builder.Default
private Integer connectionTimeout = 30;
@Builder.Default
private Integer keepAliveInterval = 60;
@Builder.Default
private Boolean autoReconnect = true;
@Builder.Default
private Boolean cleanSession = true;
@Builder.Default
private Boolean useSharedSubscription = true;
@Builder.Default
private String sharedGroupName = "tuoheng-group";
}

View File

@ -0,0 +1,90 @@
//package com.ruoyi.device.domain.impl.tuohengmqtt.example;
//
//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportOsdCallback;
//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportStateCallback;
//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IEventsCallback;
//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneOsdCallback;
//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRequestsCallback;
//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IServicesReplyCallback;
//import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig;
//import com.ruoyi.device.domain.impl.tuohengmqtt.manager.TuohengMqttClientManager;
//import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData;
//import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportStateData;
//import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneOsdData;
//import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//
//import java.util.HashMap;
//import java.util.Map;
//
//@Slf4j
//@Component
//public class TuohengMqttUsageExample {
//
// @Autowired
// private TuohengMqttClientManager manager;
//
// public void initExample() {
// TuohengMqttClientConfig config = TuohengMqttClientConfig.builder()
// .host("your-mqtt-host")
// .port(1883)
// .clientId("tuoheng-client-1")
// .username("your-username")
// .password("your-password")
// .build();
//
// String clientId = manager.createClient(config);
// log.info("创建MQTT客户端: {}", clientId);
//
// var handler = manager.getHandler(clientId);
//
// Map<String, String> mapping = new HashMap<>();
// mapping.put("7CTDM3D00BVY4C", "1581F6Q8D243100C605L");
// handler.setAirportDroneMapping(mapping);
//
// handler.registerAirportOsdCallback(new IAirportOsdCallback() {
// @Override
// public void onAirportOsdData(String airportSn, AirportOsdData data) {
// log.info("收到机场OSD数据 - AirportSN: {}, Temperature: {}", airportSn, data.getTemperature());
// }
// });
//
// handler.registerDroneOsdCallback(new IDroneOsdCallback() {
// @Override
// public void onDroneOsdData(String droneSn, String airportSn, DroneOsdData data) {
// log.info("收到无人机OSD数据 - DroneSN: {}, AirportSN: {}, Latitude: {}, Longitude: {}",
// droneSn, airportSn, data.getLatitude(), data.getLongitude());
// }
// });
//
// handler.registerAirportStateCallback(new IAirportStateCallback() {
// @Override
// public void onAirportStateData(String airportSn, String droneSn, AirportStateData data) {
// log.info("收到机场State数据 - AirportSN: {}, DroneSN: {}", airportSn, droneSn);
// }
// });
//
// handler.registerEventsCallback(new IEventsCallback() {
// @Override
// public void onEventsData(String airportSn, EventsData data) {
// log.info("收到Events数据 - AirportSN: {}, Event: {}", airportSn, data.getEvent());
// }
// });
//
// handler.registerServicesReplyCallback(new IServicesReplyCallback() {
// @Override
// public void onServicesReplyData(String airportSn, Map<String, Object> data) {
// log.info("收到ServicesReply数据 - AirportSN: {}", airportSn);
// }
// });
//
// handler.registerRequestsCallback(new IRequestsCallback() {
// @Override
// public void onRequestsData(String airportSn, String method, Map<String, Object> data) {
// log.info("收到Requests数据 - AirportSN: {}, Method: {}", airportSn, method);
// }
// });
// }
//}

View File

@ -0,0 +1,288 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportStateCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IEventsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRequestsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IServicesReplyCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportStateData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneOsdData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.TuohengMqttMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
@Slf4j
@Component
public class TuohengMqttMessageHandler {
private final ObjectMapper objectMapper = new ObjectMapper();
private final List<IAirportOsdCallback> airportOsdCallbacks = new ArrayList<>();
private final List<IDroneOsdCallback> droneOsdCallbacks = new ArrayList<>();
private final List<IAirportStateCallback> airportStateCallbacks = new ArrayList<>();
private final List<IEventsCallback> eventsCallbacks = new ArrayList<>();
private final List<IServicesReplyCallback> servicesReplyCallbacks = new ArrayList<>();
private final List<IRequestsCallback> requestsCallbacks = new ArrayList<>();
private static final Pattern DRONE_SN_PATTERN = Pattern.compile("^158[0-9A-Z]+$");
private static final Pattern AIRPORT_SN_PATTERN = Pattern.compile("^7C[0-9A-Z]+$");
private Map<String, String> airportDroneMapping;
public void setAirportDroneMapping(Map<String, String> mapping) {
this.airportDroneMapping = mapping;
}
public void registerAirportOsdCallback(IAirportOsdCallback callback) {
if (callback != null && !airportOsdCallbacks.contains(callback)) {
airportOsdCallbacks.add(callback);
log.info("注册机场OSD数据回调: {}", callback.getClass().getSimpleName());
}
}
public void registerDroneOsdCallback(IDroneOsdCallback callback) {
if (callback != null && !droneOsdCallbacks.contains(callback)) {
droneOsdCallbacks.add(callback);
log.info("注册无人机OSD数据回调: {}", callback.getClass().getSimpleName());
}
}
public void registerAirportStateCallback(IAirportStateCallback callback) {
if (callback != null && !airportStateCallbacks.contains(callback)) {
airportStateCallbacks.add(callback);
log.info("注册机场State数据回调: {}", callback.getClass().getSimpleName());
}
}
public void registerEventsCallback(IEventsCallback callback) {
if (callback != null && !eventsCallbacks.contains(callback)) {
eventsCallbacks.add(callback);
log.info("注册Events数据回调: {}", callback.getClass().getSimpleName());
}
}
public void registerServicesReplyCallback(IServicesReplyCallback callback) {
if (callback != null && !servicesReplyCallbacks.contains(callback)) {
servicesReplyCallbacks.add(callback);
log.info("注册ServicesReply数据回调: {}", callback.getClass().getSimpleName());
}
}
public void registerRequestsCallback(IRequestsCallback callback) {
if (callback != null && !requestsCallbacks.contains(callback)) {
requestsCallbacks.add(callback);
log.info("注册Requests数据回调: {}", callback.getClass().getSimpleName());
}
}
public void handleMessage(String topic, String payload) {
try {
log.debug("收到MQTT消息 - Topic: {}, Payload: {}", topic, payload);
String deviceSn = extractDeviceSnFromTopic(topic);
String messageType = extractMessageTypeFromTopic(topic);
if (deviceSn == null || messageType == null) {
log.warn("无法从Topic解析设备SN或消息类型: {}", topic);
return;
}
String droneSn = null;
String airportSn = null;
if (isAirportSn(deviceSn)) {
airportSn = deviceSn;
if (airportDroneMapping != null) {
droneSn = airportDroneMapping.get(deviceSn);
}
} else if (isDroneSn(deviceSn)) {
droneSn = deviceSn;
}
@SuppressWarnings("unchecked")
TuohengMqttMessage<Map<String, Object>> message = objectMapper.readValue(
payload,
objectMapper.getTypeFactory().constructParametricType(
TuohengMqttMessage.class,
Map.class
)
);
if ("osd".equals(messageType)) {
handleOsdMessage(deviceSn, droneSn, airportSn, message);
} else if ("state".equals(messageType)) {
handleStateMessage(deviceSn, droneSn, airportSn, message);
} else if ("events".equals(messageType)) {
handleEventsMessage(deviceSn, message);
} else if ("services_reply".equals(messageType)) {
handleServicesReplyMessage(deviceSn, message);
} else if ("requests".equals(messageType)) {
handleRequestsMessage(deviceSn, message);
} else if ("status".equals(messageType)) {
handleStatusMessage(deviceSn, message);
}
} catch (Exception e) {
log.error("处理MQTT消息失败 - Topic: {}, Error: {}", topic, e.getMessage(), e);
}
}
private void handleOsdMessage(String deviceSn, String droneSn, String airportSn, TuohengMqttMessage<Map<String, Object>> message) {
try {
if (isDroneSn(deviceSn) && droneSn != null) {
DroneOsdData droneOsdData = objectMapper.convertValue(message.getData(), DroneOsdData.class);
log.debug("处理无人机OSD数据 - SN: {}, Airport: {}", droneSn, airportSn);
for (IDroneOsdCallback callback : droneOsdCallbacks) {
try {
callback.onDroneOsdData(droneSn, airportSn, droneOsdData);
} catch (Exception e) {
log.error("无人机OSD数据回调执行失败: {}", e.getMessage(), e);
}
}
} else if (isAirportSn(deviceSn) && airportSn != null) {
AirportOsdData airportOsdData = objectMapper.convertValue(message.getData(), AirportOsdData.class);
log.debug("处理机场OSD数据 - SN: {}", airportSn);
for (IAirportOsdCallback callback : airportOsdCallbacks) {
try {
callback.onAirportOsdData(airportSn, airportOsdData);
} catch (Exception e) {
log.error("机场OSD数据回调执行失败: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("处理OSD消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleStateMessage(String deviceSn, String droneSn, String airportSn, TuohengMqttMessage<Map<String, Object>> message) {
try {
if (isAirportSn(deviceSn) && airportSn != null) {
AirportStateData airportStateData = objectMapper.convertValue(message.getData(), AirportStateData.class);
log.debug("处理机场State数据 - SN: {}, Drone: {}", airportSn, droneSn);
for (IAirportStateCallback callback : airportStateCallbacks) {
try {
callback.onAirportStateData(airportSn, droneSn, airportStateData);
} catch (Exception e) {
log.error("机场State数据回调执行失败: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("处理State消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleEventsMessage(String deviceSn, TuohengMqttMessage<Map<String, Object>> message) {
try {
if (isAirportSn(deviceSn)) {
EventsData eventsData = objectMapper.convertValue(message.getData(), EventsData.class);
log.debug("处理Events数据 - SN: {}", deviceSn);
for (IEventsCallback callback : eventsCallbacks) {
try {
callback.onEventsData(deviceSn, eventsData);
} catch (Exception e) {
log.error("Events数据回调执行失败: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("处理Events消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleServicesReplyMessage(String deviceSn, TuohengMqttMessage<Map<String, Object>> message) {
try {
if (isAirportSn(deviceSn)) {
log.debug("处理ServicesReply数据 - SN: {}, Method: {}", deviceSn, message.getMethod());
for (IServicesReplyCallback callback : servicesReplyCallbacks) {
try {
callback.onServicesReplyData(deviceSn, message.getData());
} catch (Exception e) {
log.error("ServicesReply数据回调执行失败: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("处理ServicesReply消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleRequestsMessage(String deviceSn, TuohengMqttMessage<Map<String, Object>> message) {
try {
if (isAirportSn(deviceSn)) {
log.debug("处理Requests数据 - SN: {}, Method: {}", deviceSn, message.getMethod());
for (IRequestsCallback callback : requestsCallbacks) {
try {
callback.onRequestsData(deviceSn, message.getMethod(), message.getData());
} catch (Exception e) {
log.error("Requests数据回调执行失败: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("处理Requests消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleStatusMessage(String deviceSn, TuohengMqttMessage<Map<String, Object>> message) {
log.debug("处理Status数据 - SN: {}", deviceSn);
}
private String extractDeviceSnFromTopic(String topic) {
if (topic == null) {
return null;
}
String[] parts = topic.split("/");
if (parts.length >= 3) {
return parts[2];
}
return null;
}
private String extractMessageTypeFromTopic(String topic) {
if (topic == null) {
return null;
}
String[] parts = topic.split("/");
if (parts.length >= 4) {
return parts[3];
}
return null;
}
private boolean isDroneSn(String sn) {
if (sn == null) {
return false;
}
return DRONE_SN_PATTERN.matcher(sn).matches();
}
private boolean isAirportSn(String sn) {
if (sn == null) {
return false;
}
return AIRPORT_SN_PATTERN.matcher(sn).matches();
}
}

View File

@ -0,0 +1,47 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.manager;
import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig;
import com.ruoyi.device.domain.impl.tuohengmqtt.handler.TuohengMqttMessageHandler;
import com.ruoyi.device.domain.impl.tuohengmqtt.service.TuohengMqttClientService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TuohengMqttClientManager {
private TuohengMqttClientService clientService;
private TuohengMqttMessageHandler messageHandler;
public void initClient(TuohengMqttClientConfig config) {
if (clientService != null && clientService.isConnected()) {
log.info("MQTT客户端已连接无需重复创建");
return;
}
messageHandler = new TuohengMqttMessageHandler();
clientService = new TuohengMqttClientService(config, messageHandler);
clientService.connect();
log.info("MQTT客户端创建并连接成功: {}", config.getClientId());
}
public TuohengMqttMessageHandler getHandler() {
return messageHandler;
}
public TuohengMqttClientService getClient() {
return clientService;
}
public void disconnect() {
if (clientService != null) {
clientService.disconnect();
log.info("MQTT客户端已断开连接");
}
}
public boolean isConnected() {
return clientService != null && clientService.isConnected();
}
}

View File

@ -0,0 +1,66 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.util.Map;
/**
* 机场OSD数据
*
* @author ruoyi
*/
@Data
public class AirportOsdData {
@JsonProperty("working_current")
private String workingCurrent;
@JsonProperty("drc_state")
private String drcState;
@JsonProperty("longitude")
private String longitude;
@JsonProperty("latitude")
private String latitude;
@JsonProperty("height")
private String height;
@JsonProperty("air_conditioner")
private Map<String, Object> airConditioner;
@JsonProperty("humidity")
private String humidity;
@JsonProperty("temperature")
private String temperature;
@JsonProperty("environment_temperature")
private String environmentTemperature;
@JsonProperty("wind_speed")
private String windSpeed;
@JsonProperty("rainfall")
private String rainfall;
@JsonProperty("drone_in_dock")
private String droneInDock;
@JsonProperty("cover_state")
private String coverState;
@JsonProperty("flighttask_step_code")
private String flighttaskStepCode;
@JsonProperty("mode_code")
private String modeCode;
@JsonProperty("position_state")
private Map<String, Object> positionState;
@JsonProperty("drone_battery_maintenance_info")
private Map<String, Object> droneBatteryMaintenanceInfo;
}

View File

@ -0,0 +1,21 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.util.Map;
/**
* 机场State数据
*
* @author ruoyi
*/
@Data
public class AirportStateData {
@JsonProperty("live_status")
private String liveStatus;
@JsonProperty("live_capacity")
private String liveCapacity;
}

View File

@ -0,0 +1,57 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.util.Map;
/**
* 无人机OSD数据
*
* @author ruoyi
*/
@Data
public class DroneOsdData {
@JsonProperty("longitude")
private String longitude;
@JsonProperty("latitude")
private String latitude;
@JsonProperty("elevation")
private String elevation;
@JsonProperty("height")
private String height;
@JsonProperty("wind_speed")
private String windSpeed;
@JsonProperty("wind_direction")
private String windDirection;
@JsonProperty("position_state")
private Map<String, Object> positionState;
@JsonProperty("low_battery_warning_threshold")
private String lowBatteryWarningThreshold;
@JsonProperty("battery")
private Map<String, Object> battery;
@JsonProperty("mode_code")
private String modeCode;
@JsonProperty("horizontal_speed")
private String horizontalSpeed;
@JsonProperty("vertical_speed")
private String verticalSpeed;
@JsonProperty("home_distance")
private String homeDistance;
@JsonProperty("cameras")
private String cameras;
}

View File

@ -0,0 +1,24 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.util.Map;
/**
* 机场Events数据
*
* @author ruoyi
*/
@Data
public class EventsData {
@JsonProperty("event")
private String event;
@JsonProperty("result")
private Integer result;
@JsonProperty("output")
private Map<String, Object> output;
}

View File

@ -0,0 +1,37 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
* Tuoheng MQTT消息基础结构
*
* @author ruoyi
*/
@Data
public class TuohengMqttMessage<T> {
@JsonProperty("tid")
private String tid;
@JsonProperty("bid")
private String bid;
@JsonProperty("timestamp")
private Long timestamp;
@JsonProperty("method")
private String method;
@JsonProperty("data")
private T data;
@JsonProperty("gateway")
private String gateway;
@JsonProperty("source")
private String source;
@JsonProperty("need_reply")
private Integer needReply;
}

View File

@ -0,0 +1,202 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.service;
import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig;
import com.ruoyi.device.domain.impl.tuohengmqtt.handler.TuohengMqttMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
@Slf4j
public class TuohengMqttClientService {
private final TuohengMqttClientConfig config;
private final TuohengMqttMessageHandler messageHandler;
private MqttClient mqttClient;
private static final String AIRPORT_OSD_TOPIC = "thing/product/+/osd";
private static final String AIRPORT_STATE_TOPIC = "thing/product/+/state";
private static final String AIRPORT_EVENTS_TOPIC = "thing/product/+/events";
private static final String SERVICES_REPLY_TOPIC = "thing/product/+/services_reply";
private static final String STATUS_TOPIC = "sys/product/+/status";
private static final String REQUESTS_TOPIC = "thing/product/+/requests";
public TuohengMqttClientService(TuohengMqttClientConfig config, TuohengMqttMessageHandler messageHandler) {
this.config = config;
this.messageHandler = messageHandler;
}
public void connect() {
try {
if (mqttClient != null && mqttClient.isConnected()) {
log.info("MQTT客户端[{}]已连接,无需重复连接", config.getClientId());
return;
}
String broker = String.format("tcp://%s:%d", config.getHost(), config.getPort());
log.info("开始连接Tuoheng MQTT服务器[{}]: {}", config.getClientId(), broker);
mqttClient = new MqttClient(broker, config.getClientId(), new MemoryPersistence());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(config.getUsername());
options.setPassword(config.getPassword().getBytes());
options.setConnectionTimeout(config.getConnectionTimeout());
options.setKeepAliveInterval(config.getKeepAliveInterval());
options.setAutomaticReconnect(config.getAutoReconnect());
options.setCleanStart(config.getCleanSession());
mqttClient.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
log.error("MQTT客户端[{}]连接丢失: {}", config.getClientId(),
disconnectResponse.getReasonString());
if (config.getAutoReconnect()) {
log.info("MQTT客户端[{}]将自动重连...", config.getClientId());
}
}
@Override
public void mqttErrorOccurred(MqttException exception) {
log.error("MQTT客户端[{}]发生错误: {}", config.getClientId(),
exception.getMessage(), exception);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
String payload = new String(message.getPayload());
messageHandler.handleMessage(topic, payload);
} catch (Exception e) {
log.error("MQTT客户端[{}]处理消息失败: {}", config.getClientId(),
e.getMessage(), e);
}
}
@Override
public void deliveryComplete(IMqttToken token) {
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (reconnect) {
log.info("MQTT客户端[{}]重连成功: {}", config.getClientId(), serverURI);
subscribe();
} else {
log.info("MQTT客户端[{}]首次连接成功: {}", config.getClientId(), serverURI);
}
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {
}
});
mqttClient.connect(options);
log.info("MQTT客户端[{}]成功连接到服务器", config.getClientId());
subscribe();
} catch (Exception e) {
log.error("MQTT客户端[{}]连接失败: {}", config.getClientId(), e.getMessage(), e);
}
}
private void subscribe() {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("MQTT客户端[{}]未连接,无法订阅主题", config.getClientId());
return;
}
String airportOsdTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), AIRPORT_OSD_TOPIC)
: AIRPORT_OSD_TOPIC;
String airportStateTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), AIRPORT_STATE_TOPIC)
: AIRPORT_STATE_TOPIC;
String airportEventsTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), AIRPORT_EVENTS_TOPIC)
: AIRPORT_EVENTS_TOPIC;
String servicesReplyTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), SERVICES_REPLY_TOPIC)
: SERVICES_REPLY_TOPIC;
String statusTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), STATUS_TOPIC)
: STATUS_TOPIC;
String requestsTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), REQUESTS_TOPIC)
: REQUESTS_TOPIC;
mqttClient.subscribe(airportOsdTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), airportOsdTopic);
mqttClient.subscribe(airportStateTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), airportStateTopic);
mqttClient.subscribe(airportEventsTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), airportEventsTopic);
mqttClient.subscribe(servicesReplyTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), servicesReplyTopic);
mqttClient.subscribe(statusTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), statusTopic);
mqttClient.subscribe(requestsTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), requestsTopic);
} catch (Exception e) {
log.error("MQTT客户端[{}]订阅主题失败: {}", config.getClientId(), e.getMessage(), e);
}
}
public void disconnect() {
try {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
mqttClient.close();
log.info("MQTT客户端[{}]已断开连接", config.getClientId());
}
} catch (Exception e) {
log.error("MQTT客户端[{}]断开连接失败: {}", config.getClientId(), e.getMessage(), e);
}
}
public boolean isConnected() {
return mqttClient != null && mqttClient.isConnected();
}
public String getClientId() {
return config.getClientId();
}
public void publish(String topic, String message) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("MQTT客户端[{}]未连接,无法发送消息", config.getClientId());
return;
}
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(1);
mqttClient.publish(topic, mqttMessage);
log.debug("MQTT客户端[{}]发送消息 - Topic: {}, Message: {}", config.getClientId(), topic, message);
} catch (MqttException e) {
log.error("MQTT客户端[{}]发送消息失败 - Topic: {}, Error: {}", config.getClientId(), topic, e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,66 @@
package com.ruoyi.device.service.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* DJI MQTT配置属性
*
* @author ruoyi
*/
@Data
@Component
@ConfigurationProperties(prefix = "tuoheng.mqtt")
public class TuohengMqttProperties {
/**
* MQTT服务器地址
*/
private String host = "mqtt.t-aaron.com";
/**
* MQTT服务器端口
*/
private Integer port = 10883;
/**
* MQTT协议版本
*/
private Integer version = 5;
/**
* 客户端ID
*/
private String clientId = "ThingsBoard_gateway";
/**
* 用户名
*/
private String username = "admin";
/**
* 密码
*/
private String password = "admin";
/**
* 连接超时时间
*/
private Integer connectionTimeout = 30;
/**
* 保持连接时间
*/
private Integer keepAliveInterval = 60;
/**
* 自动重连
*/
private Boolean autoReconnect = true;
/**
* 清除会话
*/
private Boolean cleanSession = false;
}

View File

@ -0,0 +1,202 @@
package com.ruoyi.device.service.impl;
import com.ruoyi.device.domain.api.IDockAircraftDomain;
import com.ruoyi.device.domain.api.IDockDomain;
import com.ruoyi.device.domain.api.IAircraftDomain;
import com.ruoyi.device.domain.api.IDeviceDomain;
import com.ruoyi.device.domain.model.DockAircraft;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportStateCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IEventsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRequestsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IServicesReplyCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig;
import com.ruoyi.device.domain.impl.tuohengmqtt.handler.TuohengMqttMessageHandler;
import com.ruoyi.device.domain.impl.tuohengmqtt.manager.TuohengMqttClientManager;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportStateData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneOsdData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData;
import com.ruoyi.device.domain.model.Aircraft;
import com.ruoyi.device.domain.model.Device;
import com.ruoyi.device.domain.model.Dock;
import com.ruoyi.device.service.config.TuohengMqttProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
public class TuohengService {
@Autowired
private TuohengMqttClientManager clientManager;
@Autowired
private TuohengMqttProperties mqttProperties;
@Autowired
private IDockAircraftDomain dockAircraftDomain;
@Autowired
private IDockDomain dockDomain;
@Autowired
private IAircraftDomain aircraftDomain;
@Autowired
private IDeviceDomain deviceDomain;
private final ObjectMapper objectMapper = new ObjectMapper();
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
TuohengMqttClientConfig config = TuohengMqttClientConfig.builder()
.host(mqttProperties.getHost())
.port(mqttProperties.getPort())
.clientId(mqttProperties.getClientId())
.username(mqttProperties.getUsername())
.password(mqttProperties.getPassword())
.connectionTimeout(mqttProperties.getConnectionTimeout())
.keepAliveInterval(mqttProperties.getKeepAliveInterval())
.autoReconnect(mqttProperties.getAutoReconnect())
.cleanSession(mqttProperties.getCleanSession())
.useSharedSubscription(true)
.sharedGroupName("tuoheng-group")
.build();
clientManager.initClient(config);
TuohengMqttMessageHandler handler = clientManager.getHandler();
Map<String, String> mapping = loadAirportDroneMapping();
handler.setAirportDroneMapping(mapping);
handler.registerAirportOsdCallback(new IAirportOsdCallback() {
@Override
public void onAirportOsdData(String airportSn, AirportOsdData data) {
log.info("========== 收到机场OSD数据 ==========");
log.info("机场SN: {}", airportSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
log.error("序列化数据失败", e);
}
log.info("=====================================");
}
});
handler.registerDroneOsdCallback(new IDroneOsdCallback() {
@Override
public void onDroneOsdData(String droneSn, String airportSn, DroneOsdData data) {
log.info("========== 收到无人机OSD数据 ==========");
log.info("无人机SN: {}, 机场SN: {}", droneSn, airportSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
log.error("序列化数据失败", e);
}
log.info("=====================================");
}
});
handler.registerAirportStateCallback(new IAirportStateCallback() {
@Override
public void onAirportStateData(String airportSn, String droneSn, AirportStateData data) {
log.info("========== 收到机场State数据 ==========");
log.info("机场SN: {}, 无人机SN: {}", airportSn, droneSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
log.error("序列化数据失败", e);
}
log.info("=====================================");
}
});
handler.registerEventsCallback(new IEventsCallback() {
@Override
public void onEventsData(String airportSn, EventsData data) {
log.info("========== 收到Events数据 ==========");
log.info("机场SN: {}", airportSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
log.error("序列化数据失败", e);
}
log.info("=====================================");
}
});
handler.registerServicesReplyCallback(new IServicesReplyCallback() {
@Override
public void onServicesReplyData(String airportSn, Map<String, Object> data) {
log.info("========== 收到ServicesReply数据 ==========");
log.info("机场SN: {}", airportSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
log.error("序列化数据失败", e);
}
log.info("=====================================");
}
});
handler.registerRequestsCallback(new IRequestsCallback() {
@Override
public void onRequestsData(String airportSn, String method, Map<String, Object> data) {
log.info("========== 收到Requests数据 ==========");
log.info("机场SN: {}, Method: {}", airportSn, method);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
log.error("序列化数据失败", e);
}
log.info("=====================================");
}
});
log.info("TuohengService 初始化完成,已注册所有回调");
}
private Map<String, String> loadAirportDroneMapping() {
Map<String, String> mapping = new HashMap<>();
try {
List<DockAircraft> dockAircraftList = dockAircraftDomain.selectDockAircraftList(new DockAircraft());
for (DockAircraft dockAircraft : dockAircraftList) {
Dock dock = dockDomain.selectDockByDockId(dockAircraft.getDockId());
Aircraft aircraft = aircraftDomain.selectAircraftByAircraftId(dockAircraft.getAircraftId());
if (dock != null && aircraft != null) {
Device dockDevice = deviceDomain.selectDeviceByDeviceId(dock.getDeviceId());
Device aircraftDevice = deviceDomain.selectDeviceByDeviceId(aircraft.getDeviceId());
if (dockDevice != null && aircraftDevice != null) {
String airportSn = dockDevice.getDeviceSn();
String droneSn = aircraftDevice.getDeviceSn();
if (airportSn != null && droneSn != null) {
mapping.put(airportSn, droneSn);
log.info("加载机场-无人机映射: {} -> {}", airportSn, droneSn);
}
}
}
}
log.info("从数据库加载机场-无人机映射完成,共 {} 条记录", mapping.size());
} catch (Exception e) {
log.error("从数据库加载机场-无人机映射失败", e);
}
return mapping;
}
}

View File

@ -44,6 +44,18 @@ machine:
# DJI MQTT配置 # DJI MQTT配置
dji: dji:
mqtt:
host: mqtt.t-aaron.com
port: 10883
version: 5
client-id: mqttx_c1c67436
username: admin
password: admin
connection-timeout: 30
keep-alive-interval: 60
auto-reconnect: true
clean-session: false
tuoheng:
mqtt: mqtt:
host: mqtt.t-aaron.com host: mqtt.t-aaron.com
port: 10883 port: 10883