diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IHeartbeatMessageCallback.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IHeartbeatMessageCallback.java new file mode 100644 index 0000000..272bb7b --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IHeartbeatMessageCallback.java @@ -0,0 +1,8 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.callback; + +import com.ruoyi.device.domain.impl.tuohengmqtt.model.HeartbeatMessageData; + +public interface IHeartbeatMessageCallback { + + void onHeartbeatMessage(String deviceSn, HeartbeatMessageData data); +} \ No newline at end of file diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java index feb8b08..5e1074a 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java @@ -3,6 +3,7 @@ package com.ruoyi.device.domain.impl.tuohengmqtt.handler; import com.fasterxml.jackson.databind.ObjectMapper; import com.ruoyi.device.domain.impl.machine.mqtt.MqttCallbackRegistry; import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneRealTimeCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IHeartbeatMessageCallback; import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRealTimeBasicCallback; import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengEventsCallback; import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengOsdCallback; @@ -10,6 +11,7 @@ import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengRealTimeDataCal import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData; import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneRealTimeData; import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData; +import com.ruoyi.device.domain.impl.tuohengmqtt.model.HeartbeatMessageData; import com.ruoyi.device.domain.impl.tuohengmqtt.model.RealTimeBasicData; import com.ruoyi.device.domain.impl.tuohengmqtt.model.TuohengRealTimeData; import lombok.extern.slf4j.Slf4j; @@ -31,6 +33,7 @@ public class TuohengMqttMessageHandler { private final List eventsCallbacks = new ArrayList<>(); private final List realTimeBasicCallbacks = new ArrayList<>(); private final List droneRealTimeCallbacks = new ArrayList<>(); + private final List heartbeatMessageCallbacks = new ArrayList<>(); private static final Pattern TUOHENG_SN_PATTERN = Pattern.compile("^TH[0-9A-Z]+"); @@ -77,6 +80,13 @@ public class TuohengMqttMessageHandler { } } + public void registerHeartbeatMessageCallback(IHeartbeatMessageCallback callback) { + if (callback != null && !heartbeatMessageCallbacks.contains(callback)) { + heartbeatMessageCallbacks.add(callback); + log.info("注册心跳消息回调: {}", callback.getClass().getSimpleName()); + } + } + public void handleMessage(String topic, String payload) { try { log.debug("收到MQTT消息 - Topic: {}", topic); @@ -130,6 +140,9 @@ public class TuohengMqttMessageHandler { case "realTime/basic": handleRealTimeBasicData(deviceSn, payload); break; + case "heartbeat/message": + handleHeartbeatMessage(deviceSn, payload); + break; case "osd": handleOsdData(deviceSn, payload); break; @@ -224,6 +237,23 @@ public class TuohengMqttMessageHandler { } } + private void handleHeartbeatMessage(String deviceSn, String payload) { + try { + HeartbeatMessageData data = objectMapper.readValue(payload, HeartbeatMessageData.class); + log.debug("处理心跳消息数据 - 设备SN: {}", deviceSn); + + for (IHeartbeatMessageCallback callback : heartbeatMessageCallbacks) { + try { + callback.onHeartbeatMessage(deviceSn, data); + } catch (Exception e) { + log.error("心跳消息回调执行失败: {}", e.getMessage(), e); + } + } + } catch (Exception e) { + log.error("处理心跳消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e); + } + } + private String extractDeviceSnFromTopic(String topic) { if (topic == null) { return null; diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/HeartbeatMessageData.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/HeartbeatMessageData.java new file mode 100644 index 0000000..c834fb8 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/HeartbeatMessageData.java @@ -0,0 +1,52 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class HeartbeatMessageData { + + @JsonProperty("nestDoor") + private NestDoorInfo nestDoor; + + @JsonProperty("pingBaidu") + private Object pingBaidu; + + @JsonProperty("cangneiwai") + private Object cangneiwai; + + @JsonProperty("nestBasic") + private Object nestBasic; + + @JsonProperty("zongguan") + private Object zongguan; + + @JsonProperty("livestream") + private Object livestream; + + @Data + public static class NestDoorInfo { + @JsonProperty("msg") + private String msg; + + @JsonProperty("code") + private Integer code; + + @JsonProperty("data") + private NestDoorData data; + + @JsonProperty("sender") + private String sender; + + @JsonProperty("fun") + private String fun; + } + + @Data + public static class NestDoorData { + @JsonProperty("status") + private Integer status; // 1=开仓, 2=关仓 + } +} \ No newline at end of file diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java index 8a6d23e..33ba680 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java @@ -24,6 +24,7 @@ public class TuohengMqttClientService { private static final String AIRPORT_NEST_BASIC_TOPIC = "/topic/v1/airportNest/+/realTime/basic"; private static final String AIRPORT_NEST_CONFIRM_TOPIC = "/topic/v1/airportNest/+/control/confirm"; private static final String AIRPORT_DRONE_REALTIME_TOPIC = "/topic/v1/airportDrone/+/realTime/data"; + private static final String HEARTBEAT_MESSAGE_TOPIC = "/topic/v1/heartbeat/+/message"; private static final String PRODUCT_OSD_TOPIC = "thing/product/+/osd"; private static final String PRODUCT_EVENTS_TOPIC = "thing/product/+/events"; @@ -123,6 +124,7 @@ public class TuohengMqttClientService { AIRPORT_NEST_BASIC_TOPIC, AIRPORT_NEST_CONFIRM_TOPIC, AIRPORT_DRONE_REALTIME_TOPIC, + HEARTBEAT_MESSAGE_TOPIC, PRODUCT_OSD_TOPIC, PRODUCT_EVENTS_TOPIC }; diff --git a/src/main/java/com/ruoyi/device/service/impl/TuohengService.java b/src/main/java/com/ruoyi/device/service/impl/TuohengService.java index b0aa632..52945a3 100644 --- a/src/main/java/com/ruoyi/device/service/impl/TuohengService.java +++ b/src/main/java/com/ruoyi/device/service/impl/TuohengService.java @@ -171,6 +171,18 @@ public class TuohengService { } }); + handler.registerHeartbeatMessageCallback(new com.ruoyi.device.domain.impl.tuohengmqtt.callback.IHeartbeatMessageCallback() { + @Override + public void onHeartbeatMessage(String deviceSn, com.ruoyi.device.domain.impl.tuohengmqtt.model.HeartbeatMessageData data) { + try { + // 同步舱门状态到 MachineStateManager + syncCoverStateFromHeartbeat(deviceSn, data); + } catch (Exception e) { + log.error("处理心跳消息失败", e); + } + } + }); + log.info("TuohengService 初始化完成,已注册所有回调"); } @@ -415,6 +427,43 @@ public class TuohengService { } } + /** + * 从 heartbeat/message 主题同步舱门状态 + */ + private void syncCoverStateFromHeartbeat(String deviceSn, com.ruoyi.device.domain.impl.tuohengmqtt.model.HeartbeatMessageData data) { + try { + if (data == null || data.getNestDoor() == null || data.getNestDoor().getData() == null) { + return; + } + + Integer status = data.getNestDoor().getData().getStatus(); + if (status == null) { + return; + } + + log.info("【状态同步】收到舱门状态(heartbeat): sn={}, status={}", deviceSn, status); + + // 根据 status 值更新舱门状态 + // 1 = 开仓, 2 = 关仓 + CoverState coverState; + if (status == 1) { + coverState = CoverState.OPENED; + log.info("【状态同步】同步舱门开启状态(heartbeat): sn={}, status={}", deviceSn, status); + } else if (status == 2) { + coverState = CoverState.CLOSED; + log.info("【状态同步】同步舱门关闭状态(heartbeat): sn={}, status={}", deviceSn, status); + } else { + log.warn("未知的舱门状态值(heartbeat): sn={}, status={}", deviceSn, status); + return; + } + + stateManager.setCoverState(deviceSn, coverState); + + } catch (Exception e) { + log.error("同步舱门状态失败(heartbeat): sn={}", deviceSn, e); + } + } + /** * 定时检查机场心跳超时 * 每分钟执行一次