diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java index e543508..ef9192f 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.List; import java.util.regex.Pattern; +import static com.ruoyi.device.domain.impl.tuohengmqtt.service.TuohengMqttClientService.*; + @Slf4j @Component public class TuohengMqttMessageHandler { @@ -38,6 +40,7 @@ public class TuohengMqttMessageHandler { private final List airportFlyControlDataCallbacks = new ArrayList<>(); private static final Pattern TUOHENG_SN_PATTERN = Pattern.compile("^TH[0-9A-Z]+"); + /** * 设置 MQTT 回调注册中心 @@ -100,34 +103,23 @@ public class TuohengMqttMessageHandler { try { log.debug("收到MQTT消息 - Topic: {}", topic); - // 如果是 confirm 消息,打印详细日志 -// if (topic.contains("/control/confirm")) { -// log.info("【收到confirm消息】Topic: {}, Payload: {}", topic, payload); -// } - // 通知 MqttCallbackRegistry 处理回调(用于指令回调) if (machineCallBackRegistry != null) { try { // 将 payload 解析为 JSON 对象传递给回调注册中心 Object messageBody = objectMapper.readValue(payload, Object.class); machineCallBackRegistry.handleMessage(topic, messageBody); - -// // 如果是 confirm 消息,打印回调处理结果 -// if (topic.contains("/control/confirm")) { -// log.info("【confirm消息已传递给回调注册中心】Topic: {}", topic); -// } } catch (Exception e) { log.debug("通知回调注册中心失败: {}", e.getMessage()); } } String deviceSn = extractDeviceSnFromTopic(topic); - if (deviceSn == null) { log.warn("无法从Topic解析设备SN: {}", topic); return; } - + if (isProductTopic(topic)) { if (!isTuohengSn(deviceSn)) { log.debug("跳过大疆设备 - SN: {}", deviceSn); @@ -135,34 +127,29 @@ public class TuohengMqttMessageHandler { } } - String messageType = extractMessageTypeFromTopic(topic); - - if (messageType == null) { - log.warn("无法从Topic解析消息类型: {}", topic); - return; - } - - switch (messageType) { - case "realTime/data": - handleRealTimeData(deviceSn, payload, topic); - break; - case "realTime/basic": - handleRealTimeBasicData(deviceSn, payload); - break; - case "heartbeat/message": - handleHeartbeatMessage(deviceSn, payload); - break; - case "osd": - handleOsdData(deviceSn, payload); - break; - case "events": - handleEventsData(deviceSn, payload); - break; - case "control/data": - handleAirportFlyControlData(deviceSn, payload, topic); - break; - default: - log.debug("未知消息类型: {}", messageType); + // 根据Topic模式匹配分发到不同的处理方法 + if (matchTopic(topic, AIRPORT_NEST_REALTIME_TOPIC)) { + handleAirportNestRealTimeData(deviceSn, payload); + } else if (matchTopic(topic, AIRPORT_NEST_BASIC_TOPIC)) { + handleRealTimeBasicData(deviceSn, payload); + } else if (matchTopic(topic, AIRPORT_NEST_CONFIRM_TOPIC)) { + // 机场控制确认消息(已由 MqttCallbackRegistry 处理) + log.debug("机场控制确认消息 - SN: {}", deviceSn); + } else if (matchTopic(topic, AIRPORT_DRONE_REALTIME_TOPIC)) { + handleAirportDroneRealTimeData(deviceSn, payload); + } else if (matchTopic(topic, AIRPORT_FLY_DATA_TOPIC)) { + handleAirportFlyControlData(deviceSn, payload, topic); + } else if (matchTopic(topic, AIRPORT_FLY_CONFIRM_TOPIC)) { + // 飞行控制确认消息(已由 MqttCallbackRegistry 处理) + log.debug("飞行控制确认消息 - SN: {}", deviceSn); + } else if (matchTopic(topic, HEARTBEAT_MESSAGE_TOPIC)) { + handleHeartbeatMessage(deviceSn, payload); + } else if (matchTopic(topic, PRODUCT_OSD_TOPIC)) { + handleOsdData(deviceSn, payload); + } else if (matchTopic(topic, PRODUCT_EVENTS_TOPIC)) { + handleEventsData(deviceSn, payload); + } else { + log.debug("未知的消息类型 - Topic: {}", topic); } } catch (Exception e) { @@ -170,31 +157,35 @@ public class TuohengMqttMessageHandler { } } - private void handleRealTimeData(String deviceSn, String payload, String topic) { + private void handleAirportNestRealTimeData(String deviceSn, String payload) { try { - if (topic.contains("airportNest")) { - TuohengRealTimeData data = objectMapper.readValue(payload, TuohengRealTimeData.class); - log.debug("处理机场实时数据 - 设备SN: {}", deviceSn); - for (ITuohengRealTimeDataCallback callback : realTimeDataCallbacks) { - try { - callback.onRealTimeData(deviceSn, data); - } catch (Exception e) { - log.error("实时数据回调执行失败: {}", e.getMessage(), e); - } - } - } else if (topic.contains("airportDrone")) { - DroneRealTimeData data = objectMapper.readValue(payload, DroneRealTimeData.class); - log.debug("处理无人机实时数据 - 设备SN: {}", deviceSn); - for (IDroneRealTimeCallback callback : droneRealTimeCallbacks) { - try { - callback.onDroneRealTimeData(deviceSn, data); - } catch (Exception e) { - log.error("无人机实时数据回调执行失败: {}", e.getMessage(), e); - } + TuohengRealTimeData data = objectMapper.readValue(payload, TuohengRealTimeData.class); + log.debug("处理机场实时数据 - 设备SN: {}", deviceSn); + for (ITuohengRealTimeDataCallback callback : realTimeDataCallbacks) { + try { + callback.onRealTimeData(deviceSn, data); + } catch (Exception e) { + log.error("实时数据回调执行失败: {}", e.getMessage(), e); } } } catch (Exception e) { - log.error("处理实时数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e); + log.error("处理机场实时数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e); + } + } + + private void handleAirportDroneRealTimeData(String deviceSn, String payload) { + try { + DroneRealTimeData data = objectMapper.readValue(payload, DroneRealTimeData.class); + log.debug("处理无人机实时数据 - 设备SN: {}", deviceSn); + for (IDroneRealTimeCallback callback : droneRealTimeCallbacks) { + try { + callback.onDroneRealTimeData(deviceSn, data); + } catch (Exception e) { + log.error("无人机实时数据回调执行失败: {}", e.getMessage(), e); + } + } + } catch (Exception e) { + log.error("处理无人机实时数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e); } } @@ -305,38 +296,7 @@ public class TuohengMqttMessageHandler { return null; } - private String extractMessageTypeFromTopic(String topic) { - if (topic == null) { - return null; - } - String[] parts = topic.split("/"); - - // /topic/v1/heartbeat/{deviceSn}/message - // parts[0]="", parts[1]="topic", parts[2]="v1", parts[3]="heartbeat", - // parts[4]=deviceSn, parts[5]="message" - if (topic.startsWith("/topic/v1/heartbeat/")) { - if (parts.length >= 6) { - return "heartbeat/message"; - } - } - // /topic/v1/airportNest/{deviceSn}/realTime/data - // parts[0]="", parts[1]="topic", parts[2]="v1", parts[3]="airportNest", - // parts[4]=deviceSn, parts[5]="realTime", parts[6]="data" - else if (topic.startsWith("/topic/v1/")) { - if (parts.length >= 7) { - return parts[5] + "/" + parts[6]; // "realTime/data" or "realTime/basic" - } - } - // thing/product/{deviceSn}/osd - // parts[0]="thing", parts[1]="product", parts[2]=deviceSn, parts[3]="osd" - else if (topic.startsWith("thing/product/")) { - if (parts.length >= 4) { - return parts[3]; // "osd" or "events" - } - } - return null; - } - + private boolean isProductTopic(String topic) { return topic != null && topic.startsWith("thing/product/"); } @@ -347,4 +307,19 @@ public class TuohengMqttMessageHandler { } return TUOHENG_SN_PATTERN.matcher(sn).matches(); } + + /** + * 匹配Topic模式 + * @param topic 实际topic + * @param pattern 模式topic(支持+通配符) + * @return 是否匹配 + */ + private boolean matchTopic(String topic, String pattern) { + if (topic == null || pattern == null) { + return false; + } + // 将模式中的+替换为正则表达式(匹配除斜杠外的任意字符) + String regex = pattern.replace("+", "[^/]+"); + return topic.matches(regex); + } } diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java index 27e1d3b..5d1010f 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java @@ -20,15 +20,15 @@ public class TuohengMqttClientService { private final TuohengMqttMessageHandler messageHandler; private MqttClient mqttClient; - private static final String AIRPORT_NEST_REALTIME_TOPIC = "/topic/v1/airportNest/+/realTime/data"; - private static final String AIRPORT_NEST_BASIC_TOPIC = "/topic/v1/airportNest/+/realTime/basic"; - private static final String AIRPORT_NEST_CONFIRM_TOPIC = "/topic/v1/airportNest/+/control/confirm"; - private static final String AIRPORT_DRONE_REALTIME_TOPIC = "/topic/v1/airportDrone/+/realTime/data"; - private static final String AIRPORT_FLY_DATA_TOPIC = "/topic/v1/airportFly/+/control/data"; - private static final String AIRPORT_FLY_CONFIRM_TOPIC = "/topic/v1/airportFly/+/control/confirm"; - private static final String HEARTBEAT_MESSAGE_TOPIC = "/topic/v1/heartbeat/+/message"; - private static final String PRODUCT_OSD_TOPIC = "thing/product/+/osd"; - private static final String PRODUCT_EVENTS_TOPIC = "thing/product/+/events"; + public static final String AIRPORT_NEST_REALTIME_TOPIC = "/topic/v1/airportNest/+/realTime/data"; + public static final String AIRPORT_NEST_BASIC_TOPIC = "/topic/v1/airportNest/+/realTime/basic"; + public static final String AIRPORT_NEST_CONFIRM_TOPIC = "/topic/v1/airportNest/+/control/confirm"; + public static final String AIRPORT_DRONE_REALTIME_TOPIC = "/topic/v1/airportDrone/+/realTime/data"; + public static final String AIRPORT_FLY_DATA_TOPIC = "/topic/v1/airportFly/+/control/data"; + public static final String AIRPORT_FLY_CONFIRM_TOPIC = "/topic/v1/airportFly/+/control/confirm"; + public static final String HEARTBEAT_MESSAGE_TOPIC = "/topic/v1/heartbeat/+/message"; + public static final String PRODUCT_OSD_TOPIC = "thing/product/+/osd"; + public static final String PRODUCT_EVENTS_TOPIC = "thing/product/+/events"; public TuohengMqttClientService(TuohengMqttClientConfig config, TuohengMqttMessageHandler messageHandler) { this.config = config;