处理机场飞行控制数据
This commit is contained in:
parent
cae0018b88
commit
f62d2798f6
|
|
@ -22,6 +22,8 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import static com.ruoyi.device.domain.impl.tuohengmqtt.service.TuohengMqttClientService.*;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
public class TuohengMqttMessageHandler {
|
public class TuohengMqttMessageHandler {
|
||||||
|
|
@ -38,6 +40,7 @@ public class TuohengMqttMessageHandler {
|
||||||
private final List<IAirportFlyControlDataCallback> airportFlyControlDataCallbacks = new ArrayList<>();
|
private final List<IAirportFlyControlDataCallback> airportFlyControlDataCallbacks = new ArrayList<>();
|
||||||
|
|
||||||
private static final Pattern TUOHENG_SN_PATTERN = Pattern.compile("^TH[0-9A-Z]+");
|
private static final Pattern TUOHENG_SN_PATTERN = Pattern.compile("^TH[0-9A-Z]+");
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 设置 MQTT 回调注册中心
|
* 设置 MQTT 回调注册中心
|
||||||
|
|
@ -100,34 +103,23 @@ public class TuohengMqttMessageHandler {
|
||||||
try {
|
try {
|
||||||
log.debug("收到MQTT消息 - Topic: {}", topic);
|
log.debug("收到MQTT消息 - Topic: {}", topic);
|
||||||
|
|
||||||
// 如果是 confirm 消息,打印详细日志
|
|
||||||
// if (topic.contains("/control/confirm")) {
|
|
||||||
// log.info("【收到confirm消息】Topic: {}, Payload: {}", topic, payload);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// 通知 MqttCallbackRegistry 处理回调(用于指令回调)
|
// 通知 MqttCallbackRegistry 处理回调(用于指令回调)
|
||||||
if (machineCallBackRegistry != null) {
|
if (machineCallBackRegistry != null) {
|
||||||
try {
|
try {
|
||||||
// 将 payload 解析为 JSON 对象传递给回调注册中心
|
// 将 payload 解析为 JSON 对象传递给回调注册中心
|
||||||
Object messageBody = objectMapper.readValue(payload, Object.class);
|
Object messageBody = objectMapper.readValue(payload, Object.class);
|
||||||
machineCallBackRegistry.handleMessage(topic, messageBody);
|
machineCallBackRegistry.handleMessage(topic, messageBody);
|
||||||
|
|
||||||
// // 如果是 confirm 消息,打印回调处理结果
|
|
||||||
// if (topic.contains("/control/confirm")) {
|
|
||||||
// log.info("【confirm消息已传递给回调注册中心】Topic: {}", topic);
|
|
||||||
// }
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.debug("通知回调注册中心失败: {}", e.getMessage());
|
log.debug("通知回调注册中心失败: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
String deviceSn = extractDeviceSnFromTopic(topic);
|
String deviceSn = extractDeviceSnFromTopic(topic);
|
||||||
|
|
||||||
if (deviceSn == null) {
|
if (deviceSn == null) {
|
||||||
log.warn("无法从Topic解析设备SN: {}", topic);
|
log.warn("无法从Topic解析设备SN: {}", topic);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isProductTopic(topic)) {
|
if (isProductTopic(topic)) {
|
||||||
if (!isTuohengSn(deviceSn)) {
|
if (!isTuohengSn(deviceSn)) {
|
||||||
log.debug("跳过大疆设备 - SN: {}", deviceSn);
|
log.debug("跳过大疆设备 - SN: {}", deviceSn);
|
||||||
|
|
@ -135,34 +127,29 @@ public class TuohengMqttMessageHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
String messageType = extractMessageTypeFromTopic(topic);
|
// 根据Topic模式匹配分发到不同的处理方法
|
||||||
|
if (matchTopic(topic, AIRPORT_NEST_REALTIME_TOPIC)) {
|
||||||
if (messageType == null) {
|
handleAirportNestRealTimeData(deviceSn, payload);
|
||||||
log.warn("无法从Topic解析消息类型: {}", topic);
|
} else if (matchTopic(topic, AIRPORT_NEST_BASIC_TOPIC)) {
|
||||||
return;
|
handleRealTimeBasicData(deviceSn, payload);
|
||||||
}
|
} else if (matchTopic(topic, AIRPORT_NEST_CONFIRM_TOPIC)) {
|
||||||
|
// 机场控制确认消息(已由 MqttCallbackRegistry 处理)
|
||||||
switch (messageType) {
|
log.debug("机场控制确认消息 - SN: {}", deviceSn);
|
||||||
case "realTime/data":
|
} else if (matchTopic(topic, AIRPORT_DRONE_REALTIME_TOPIC)) {
|
||||||
handleRealTimeData(deviceSn, payload, topic);
|
handleAirportDroneRealTimeData(deviceSn, payload);
|
||||||
break;
|
} else if (matchTopic(topic, AIRPORT_FLY_DATA_TOPIC)) {
|
||||||
case "realTime/basic":
|
handleAirportFlyControlData(deviceSn, payload, topic);
|
||||||
handleRealTimeBasicData(deviceSn, payload);
|
} else if (matchTopic(topic, AIRPORT_FLY_CONFIRM_TOPIC)) {
|
||||||
break;
|
// 飞行控制确认消息(已由 MqttCallbackRegistry 处理)
|
||||||
case "heartbeat/message":
|
log.debug("飞行控制确认消息 - SN: {}", deviceSn);
|
||||||
handleHeartbeatMessage(deviceSn, payload);
|
} else if (matchTopic(topic, HEARTBEAT_MESSAGE_TOPIC)) {
|
||||||
break;
|
handleHeartbeatMessage(deviceSn, payload);
|
||||||
case "osd":
|
} else if (matchTopic(topic, PRODUCT_OSD_TOPIC)) {
|
||||||
handleOsdData(deviceSn, payload);
|
handleOsdData(deviceSn, payload);
|
||||||
break;
|
} else if (matchTopic(topic, PRODUCT_EVENTS_TOPIC)) {
|
||||||
case "events":
|
handleEventsData(deviceSn, payload);
|
||||||
handleEventsData(deviceSn, payload);
|
} else {
|
||||||
break;
|
log.debug("未知的消息类型 - Topic: {}", topic);
|
||||||
case "control/data":
|
|
||||||
handleAirportFlyControlData(deviceSn, payload, topic);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
log.debug("未知消息类型: {}", messageType);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
@ -170,31 +157,35 @@ public class TuohengMqttMessageHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleRealTimeData(String deviceSn, String payload, String topic) {
|
private void handleAirportNestRealTimeData(String deviceSn, String payload) {
|
||||||
try {
|
try {
|
||||||
if (topic.contains("airportNest")) {
|
TuohengRealTimeData data = objectMapper.readValue(payload, TuohengRealTimeData.class);
|
||||||
TuohengRealTimeData data = objectMapper.readValue(payload, TuohengRealTimeData.class);
|
log.debug("处理机场实时数据 - 设备SN: {}", deviceSn);
|
||||||
log.debug("处理机场实时数据 - 设备SN: {}", deviceSn);
|
for (ITuohengRealTimeDataCallback callback : realTimeDataCallbacks) {
|
||||||
for (ITuohengRealTimeDataCallback callback : realTimeDataCallbacks) {
|
try {
|
||||||
try {
|
callback.onRealTimeData(deviceSn, data);
|
||||||
callback.onRealTimeData(deviceSn, data);
|
} catch (Exception e) {
|
||||||
} catch (Exception e) {
|
log.error("实时数据回调执行失败: {}", e.getMessage(), e);
|
||||||
log.error("实时数据回调执行失败: {}", e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (topic.contains("airportDrone")) {
|
|
||||||
DroneRealTimeData data = objectMapper.readValue(payload, DroneRealTimeData.class);
|
|
||||||
log.debug("处理无人机实时数据 - 设备SN: {}", deviceSn);
|
|
||||||
for (IDroneRealTimeCallback callback : droneRealTimeCallbacks) {
|
|
||||||
try {
|
|
||||||
callback.onDroneRealTimeData(deviceSn, data);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("无人机实时数据回调执行失败: {}", e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理实时数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
|
log.error("处理机场实时数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleAirportDroneRealTimeData(String deviceSn, String payload) {
|
||||||
|
try {
|
||||||
|
DroneRealTimeData data = objectMapper.readValue(payload, DroneRealTimeData.class);
|
||||||
|
log.debug("处理无人机实时数据 - 设备SN: {}", deviceSn);
|
||||||
|
for (IDroneRealTimeCallback callback : droneRealTimeCallbacks) {
|
||||||
|
try {
|
||||||
|
callback.onDroneRealTimeData(deviceSn, data);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("无人机实时数据回调执行失败: {}", e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理无人机实时数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -305,38 +296,7 @@ public class TuohengMqttMessageHandler {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String extractMessageTypeFromTopic(String topic) {
|
|
||||||
if (topic == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
String[] parts = topic.split("/");
|
|
||||||
|
|
||||||
// /topic/v1/heartbeat/{deviceSn}/message
|
|
||||||
// parts[0]="", parts[1]="topic", parts[2]="v1", parts[3]="heartbeat",
|
|
||||||
// parts[4]=deviceSn, parts[5]="message"
|
|
||||||
if (topic.startsWith("/topic/v1/heartbeat/")) {
|
|
||||||
if (parts.length >= 6) {
|
|
||||||
return "heartbeat/message";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// /topic/v1/airportNest/{deviceSn}/realTime/data
|
|
||||||
// parts[0]="", parts[1]="topic", parts[2]="v1", parts[3]="airportNest",
|
|
||||||
// parts[4]=deviceSn, parts[5]="realTime", parts[6]="data"
|
|
||||||
else if (topic.startsWith("/topic/v1/")) {
|
|
||||||
if (parts.length >= 7) {
|
|
||||||
return parts[5] + "/" + parts[6]; // "realTime/data" or "realTime/basic"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// thing/product/{deviceSn}/osd
|
|
||||||
// parts[0]="thing", parts[1]="product", parts[2]=deviceSn, parts[3]="osd"
|
|
||||||
else if (topic.startsWith("thing/product/")) {
|
|
||||||
if (parts.length >= 4) {
|
|
||||||
return parts[3]; // "osd" or "events"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isProductTopic(String topic) {
|
private boolean isProductTopic(String topic) {
|
||||||
return topic != null && topic.startsWith("thing/product/");
|
return topic != null && topic.startsWith("thing/product/");
|
||||||
}
|
}
|
||||||
|
|
@ -347,4 +307,19 @@ public class TuohengMqttMessageHandler {
|
||||||
}
|
}
|
||||||
return TUOHENG_SN_PATTERN.matcher(sn).matches();
|
return TUOHENG_SN_PATTERN.matcher(sn).matches();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 匹配Topic模式
|
||||||
|
* @param topic 实际topic
|
||||||
|
* @param pattern 模式topic(支持+通配符)
|
||||||
|
* @return 是否匹配
|
||||||
|
*/
|
||||||
|
private boolean matchTopic(String topic, String pattern) {
|
||||||
|
if (topic == null || pattern == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// 将模式中的+替换为正则表达式(匹配除斜杠外的任意字符)
|
||||||
|
String regex = pattern.replace("+", "[^/]+");
|
||||||
|
return topic.matches(regex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,15 +20,15 @@ public class TuohengMqttClientService {
|
||||||
private final TuohengMqttMessageHandler messageHandler;
|
private final TuohengMqttMessageHandler messageHandler;
|
||||||
private MqttClient mqttClient;
|
private MqttClient mqttClient;
|
||||||
|
|
||||||
private static final String AIRPORT_NEST_REALTIME_TOPIC = "/topic/v1/airportNest/+/realTime/data";
|
public static final String AIRPORT_NEST_REALTIME_TOPIC = "/topic/v1/airportNest/+/realTime/data";
|
||||||
private static final String AIRPORT_NEST_BASIC_TOPIC = "/topic/v1/airportNest/+/realTime/basic";
|
public static final String AIRPORT_NEST_BASIC_TOPIC = "/topic/v1/airportNest/+/realTime/basic";
|
||||||
private static final String AIRPORT_NEST_CONFIRM_TOPIC = "/topic/v1/airportNest/+/control/confirm";
|
public static final String AIRPORT_NEST_CONFIRM_TOPIC = "/topic/v1/airportNest/+/control/confirm";
|
||||||
private static final String AIRPORT_DRONE_REALTIME_TOPIC = "/topic/v1/airportDrone/+/realTime/data";
|
public static final String AIRPORT_DRONE_REALTIME_TOPIC = "/topic/v1/airportDrone/+/realTime/data";
|
||||||
private static final String AIRPORT_FLY_DATA_TOPIC = "/topic/v1/airportFly/+/control/data";
|
public static final String AIRPORT_FLY_DATA_TOPIC = "/topic/v1/airportFly/+/control/data";
|
||||||
private static final String AIRPORT_FLY_CONFIRM_TOPIC = "/topic/v1/airportFly/+/control/confirm";
|
public static final String AIRPORT_FLY_CONFIRM_TOPIC = "/topic/v1/airportFly/+/control/confirm";
|
||||||
private static final String HEARTBEAT_MESSAGE_TOPIC = "/topic/v1/heartbeat/+/message";
|
public static final String HEARTBEAT_MESSAGE_TOPIC = "/topic/v1/heartbeat/+/message";
|
||||||
private static final String PRODUCT_OSD_TOPIC = "thing/product/+/osd";
|
public static final String PRODUCT_OSD_TOPIC = "thing/product/+/osd";
|
||||||
private static final String PRODUCT_EVENTS_TOPIC = "thing/product/+/events";
|
public static final String PRODUCT_EVENTS_TOPIC = "thing/product/+/events";
|
||||||
|
|
||||||
public TuohengMqttClientService(TuohengMqttClientConfig config, TuohengMqttMessageHandler messageHandler) {
|
public TuohengMqttClientService(TuohengMqttClientConfig config, TuohengMqttMessageHandler messageHandler) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue