a-tuoheng-device/src/main/java/com/ruoyi/device/service/impl/TuohengService.java

490 lines
20 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.ruoyi.device.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.device.domain.api.IDockAircraftDomain;
import com.ruoyi.device.domain.api.IDockDomain;
import com.ruoyi.device.domain.api.IAircraftDomain;
import com.ruoyi.device.domain.api.IDeviceDomain;
import com.ruoyi.device.domain.impl.machine.mqtt.MqttCallbackRegistry;
import com.ruoyi.device.domain.impl.machine.state.CoverState;
import com.ruoyi.device.domain.impl.machine.state.DroneState;
import com.ruoyi.device.domain.impl.machine.state.AirportState;
import com.ruoyi.device.domain.impl.machine.state.MachineStates;
import com.ruoyi.device.domain.impl.machine.statemachine.MachineStateManager;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRealTimeBasicCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengEventsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengRealTimeDataCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig;
import com.ruoyi.device.domain.impl.tuohengmqtt.handler.TuohengMqttMessageHandler;
import com.ruoyi.device.domain.impl.tuohengmqtt.manager.TuohengMqttClientManager;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.RealTimeBasicData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.TuohengRealTimeData;
import com.ruoyi.device.domain.model.Aircraft;
import com.ruoyi.device.domain.model.Device;
import com.ruoyi.device.domain.model.Dock;
import com.ruoyi.device.domain.model.DockAircraft;
import com.ruoyi.device.service.config.TuohengMqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
public class TuohengService {
@Autowired
private TuohengMqttClientManager clientManager;
@Autowired
private TuohengMqttProperties mqttProperties;
@Autowired
private IDockAircraftDomain dockAircraftDomain;
@Autowired
private IDockDomain dockDomain;
@Autowired
private IAircraftDomain aircraftDomain;
@Autowired
private IDeviceDomain deviceDomain;
@Autowired
private MachineStateManager stateManager;
@Autowired
private MqttCallbackRegistry mqttCallbackRegistry;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 机场心跳时间戳记录 (deviceSn -> lastHeartbeatTime)
*/
private final Map<String, Long> airportHeartbeatMap = new java.util.concurrent.ConcurrentHashMap<>();
/**
* 心跳超时时间5分钟
*/
private static final long HEARTBEAT_TIMEOUT = 5 * 60 * 1000;
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
TuohengMqttClientConfig config = TuohengMqttClientConfig.builder()
.host(mqttProperties.getHost())
.port(mqttProperties.getPort())
.clientId(mqttProperties.getClientId())
.username(mqttProperties.getUsername())
.password(mqttProperties.getPassword())
.connectionTimeout(mqttProperties.getConnectionTimeout())
.keepAliveInterval(mqttProperties.getKeepAliveInterval())
.autoReconnect(mqttProperties.getAutoReconnect())
.cleanSession(mqttProperties.getCleanSession())
.useSharedSubscription(true)
.sharedGroupName("tuoheng-group")
.build();
clientManager.initClient(config);
TuohengMqttMessageHandler handler = clientManager.getHandler();
// 设置 MqttCallbackRegistry 到 handler用于指令回调
handler.setMqttCallbackRegistry(mqttCallbackRegistry);
Map<String, String> mapping = loadAirportDroneMapping();
handler.registerRealTimeDataCallback(new ITuohengRealTimeDataCallback() {
@Override
public void onRealTimeData(String deviceSn, TuohengRealTimeData data) {
// log.info("========== 收到拓恒实时数据 ==========");
// log.info("收到拓恒实时数据 设备SN: {}", deviceSn);
try {
// log.info("数据内容: {}", objectMapper.writeValueAsString(data));
// 更新机场心跳时间戳
updateAirportHeartbeat(deviceSn);
// 同步无人机开关机状态
syncDronePowerState(deviceSn, data);
// 同步舱门状态
syncCoverStateFromRealTimeData(deviceSn, data);
} catch (Exception e) {
log.error("处理实时数据失败", e);
}
// log.info("=====================================");
}
});
handler.registerOsdCallback(new ITuohengOsdCallback() {
@Override
public void onOsdData(String deviceSn, AirportOsdData data) {
// log.info("========== 收到拓恒OSD数据 ==========");
// log.info("收到拓恒OSD数据 设备SN: {}", deviceSn);
try {
// log.info("数据内容: {}", objectMapper.writeValueAsString(data));
// 同步飞行状态到 MachineStateManager
syncFlightState(deviceSn, data);
} catch (Exception e) {
log.error("序列化数据失败", e);
}
// log.info("=====================================");
}
});
handler.registerEventsCallback(new ITuohengEventsCallback() {
@Override
public void onEventsData(String deviceSn, EventsData data) {
// log.info("========== 收到拓恒Events数据 ==========");
// log.info("收到拓恒Events数据 设备SN: {}", deviceSn);
try {
// log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
log.error("序列化数据失败", e);
}
// log.info("=====================================");
}
});
handler.registerRealTimeBasicCallback(new IRealTimeBasicCallback() {
@Override
public void onRealTimeBasicData(String deviceSn, RealTimeBasicData data) {
try {
// 同步舱门状态到 MachineStateManager
syncCoverState(deviceSn, data);
} catch (Exception e) {
log.error("处理实时基础数据失败", e);
}
}
});
handler.registerHeartbeatMessageCallback(new com.ruoyi.device.domain.impl.tuohengmqtt.callback.IHeartbeatMessageCallback() {
@Override
public void onHeartbeatMessage(String deviceSn, com.ruoyi.device.domain.impl.tuohengmqtt.model.HeartbeatMessageData data) {
try {
// 同步舱门状态到 MachineStateManager
syncCoverStateFromHeartbeat(deviceSn, data);
} catch (Exception e) {
log.error("处理心跳消息失败", e);
}
}
});
log.info("TuohengService 初始化完成,已注册所有回调");
}
private Map<String, String> loadAirportDroneMapping() {
Map<String, String> mapping = new HashMap<>();
try {
List<DockAircraft> dockAircraftList = dockAircraftDomain.selectDockAircraftList(new DockAircraft());
for (DockAircraft dockAircraft : dockAircraftList) {
Dock dock = dockDomain.selectDockByDockId(dockAircraft.getDockId());
Aircraft aircraft = aircraftDomain.selectAircraftByAircraftId(dockAircraft.getAircraftId());
if (dock != null && aircraft != null) {
Device dockDevice = deviceDomain.selectDeviceByDeviceId(dock.getDeviceId());
Device aircraftDevice = deviceDomain.selectDeviceByDeviceId(aircraft.getDeviceId());
if (dockDevice != null && aircraftDevice != null) {
String airportSn = dockDevice.getDeviceSn();
String droneSn = aircraftDevice.getDeviceSn();
if (airportSn != null && droneSn != null) {
if (airportSn.startsWith("THJS") || droneSn.startsWith("THJS")) {
log.debug("跳过大疆设备 - 机场SN: {}, 无人机SN: {}", airportSn, droneSn);
continue;
}
mapping.put(airportSn, droneSn);
log.info("加载机场-无人机映射: {} -> {}", airportSn, droneSn);
}
}
}
}
log.info("从数据库加载机场-无人机映射完成,共 {} 条记录", mapping.size());
} catch (Exception e) {
log.error("从数据库加载机场-无人机映射失败", e);
}
return mapping;
}
/**
* 同步飞行状态到 MachineStateManager
* 根据 OSD 数据中的 flighttask_step_code 和 mode_code 判断飞行状态
*/
private void syncFlightState(String deviceSn, AirportOsdData data) {
try {
if (data == null) {
return;
}
String flighttaskStepCode = data.getFlighttaskStepCode();
String modeCode = data.getModeCode();
// 同步无人机状态
DroneState droneState = determineDroneState(flighttaskStepCode, modeCode);
if (droneState != null) {
stateManager.setDroneState(deviceSn, droneState);
log.debug("同步飞行状态: sn={}, flighttaskStepCode={}, modeCode={}, state={}",
deviceSn, flighttaskStepCode, modeCode, droneState);
}
// 注意:机场在线状态由 IOT 平台的心跳机制判断5分钟超时
// 不在这里简单地根据收到数据就判断为在线
} catch (Exception e) {
log.error("同步飞行状态失败: sn={}", deviceSn, e);
}
}
/**
* 根据任务状态码和模式码判断无人机状态
*/
private DroneState determineDroneState(String flighttaskStepCode, String modeCode) {
// 优先根据 flighttask_step_code 判断
if (flighttaskStepCode != null) {
switch (flighttaskStepCode) {
case "1":
// 飞行作业中
return DroneState.FLYING;
case "2":
// 作业后状态恢复,可能是返航或已到达
return DroneState.RETURNING;
case "5":
// 任务空闲
return DroneState.ONLINE;
case "255":
// 飞行器异常
return DroneState.UNKNOWN;
}
}
// 根据 mode_code 辅助判断
if (modeCode != null) {
if (modeCode.equals("3") || modeCode.equals("4") || modeCode.equals("5")) {
// 飞行中状态
return DroneState.FLYING;
}
}
return null;
}
/**
* 更新机场心跳时间戳并设置在线状态
*/
private void updateAirportHeartbeat(String deviceSn) {
long currentTime = System.currentTimeMillis();
airportHeartbeatMap.put(deviceSn, currentTime);
// 收到心跳,设置机场为在线
stateManager.setAirportState(deviceSn, AirportState.ONLINE);
log.debug("更新机场心跳: sn={}, time={}", deviceSn, currentTime);
}
/**
* 同步无人机开关机状态
*/
private void syncDronePowerState(String deviceSn, TuohengRealTimeData data) {
try {
if (data == null || data.getDroneBattery() == null || data.getDroneBattery().getData() == null) {
return;
}
Integer powerOn = data.getDroneBattery().getData().getBPowerON();
if (powerOn == null) {
return;
}
log.info("【状态同步】收到无人机电源状态: sn={}, bPowerON={}", deviceSn, powerOn);
// 根据 bPowerON 值更新无人机状态
// 1 = 开机, 2 = 关机
if (powerOn == 1) {
// 开机时,只有在特定状态下才更新为 ONLINE
// 只有从 POWER_OFF、UNKNOWN、ARRIVED、RETURNING 这些状态才能转为 ONLINE
MachineStates currentStates = stateManager.getStates(deviceSn);
if (currentStates != null && currentStates.getDroneState() != null) {
DroneState currentState = currentStates.getDroneState();
if (currentState == DroneState.POWER_OFF ||
currentState == DroneState.UNKNOWN ||
currentState == DroneState.ARRIVED ||
currentState == DroneState.RETURNING) {
stateManager.setDroneState(deviceSn, DroneState.ONLINE);
log.info("【状态同步】同步无人机开机状态: sn={}, powerOn={}, 从 {} 更新为 ONLINE",
deviceSn, powerOn, currentState);
} else {
log.debug("无人机已开机,但当前状态为 {},不更新为 ONLINE", currentState);
}
} else {
// 如果没有当前状态,直接设置为 ONLINE
stateManager.setDroneState(deviceSn, DroneState.ONLINE);
log.debug("同步无人机开机状态: sn={}, powerOn={}, 初始化为 ONLINE", deviceSn, powerOn);
}
} else if (powerOn == 2) {
stateManager.setDroneState(deviceSn, DroneState.POWER_OFF);
log.info("【状态同步】同步无人机关机状态: sn={}, powerOn={}", deviceSn, powerOn);
}
} catch (Exception e) {
log.error("同步无人机开关机状态失败: sn={}", deviceSn, e);
}
}
/**
* 从 realTime/data 主题同步舱门状态
*/
private void syncCoverStateFromRealTimeData(String deviceSn, TuohengRealTimeData data) {
try {
if (data == null || data.getNestDoor() == null || data.getNestDoor().getData() == null) {
return;
}
Integer status = data.getNestDoor().getData().getStatus();
if (status == null) {
return;
}
log.info("【状态同步】收到舱门状态: sn={}, status={}", deviceSn, status);
// 根据 status 值更新舱门状态
// 1 = 开仓, 2 = 关仓
CoverState coverState;
if (status == 1) {
coverState = CoverState.OPENED;
log.info("【状态同步】同步舱门开启状态: sn={}, status={}", deviceSn, status);
} else if (status == 2) {
coverState = CoverState.CLOSED;
log.info("【状态同步】同步舱门关闭状态: sn={}, status={}", deviceSn, status);
} else {
log.warn("未知的舱门状态值: sn={}, status={}", deviceSn, status);
return;
}
stateManager.setCoverState(deviceSn, coverState);
} catch (Exception e) {
log.error("同步舱门状态失败: sn={}", deviceSn, e);
}
}
/**
* 同步舱门状态(从 realTime/basic 主题,已废弃)
*/
private void syncCoverState(String deviceSn, com.ruoyi.device.domain.impl.tuohengmqtt.model.RealTimeBasicData data) {
try {
if (data == null || data.getData() == null || data.getData().getNestDoor() == null) {
return;
}
RealTimeBasicData.NestDoorInfo nestDoor = data.getData().getNestDoor();
if (nestDoor.getData() == null) {
return;
}
Integer status = nestDoor.getData().getStatus();
if (status == null) {
return;
}
log.info("【状态同步】收到舱门状态: sn={}, status={}", deviceSn, status);
// 根据 status 值更新舱门状态
// 1 = 开仓, 2 = 关仓
CoverState coverState;
if (status == 1) {
coverState = CoverState.OPENED;
log.info("【状态同步】同步舱门开启状态: sn={}, status={}", deviceSn, status);
} else if (status == 2) {
coverState = CoverState.CLOSED;
log.info("【状态同步】同步舱门关闭状态: sn={}, status={}", deviceSn, status);
} else {
log.warn("未知的舱门状态值: sn={}, status={}", deviceSn, status);
return;
}
stateManager.setCoverState(deviceSn, coverState);
} catch (Exception e) {
log.error("同步舱门状态失败: sn={}", deviceSn, e);
}
}
/**
* 从 heartbeat/message 主题同步舱门状态
*/
private void syncCoverStateFromHeartbeat(String deviceSn, com.ruoyi.device.domain.impl.tuohengmqtt.model.HeartbeatMessageData data) {
try {
if (data == null || data.getNestDoor() == null || data.getNestDoor().getData() == null) {
return;
}
Integer status = data.getNestDoor().getData().getStatus();
if (status == null) {
return;
}
log.info("【状态同步】收到舱门状态(heartbeat): sn={}, status={}", deviceSn, status);
// 根据 status 值更新舱门状态
// 0 = 开仓, 1 = 关仓
CoverState coverState;
if (status == 0) {
coverState = CoverState.OPENED;
log.info("【状态同步】同步舱门开启状态(heartbeat): sn={}, status={}", deviceSn, status);
} else if (status == 1) {
coverState = CoverState.CLOSED;
log.info("【状态同步】同步舱门关闭状态(heartbeat): sn={}, status={}", deviceSn, status);
} else {
log.warn("未知的舱门状态值(heartbeat): sn={}, status={}", deviceSn, status);
return;
}
stateManager.setCoverState(deviceSn, coverState);
} catch (Exception e) {
log.error("同步舱门状态失败(heartbeat): sn={}", deviceSn, e);
}
}
/**
* 定时检查机场心跳超时
* 每分钟执行一次
*/
@Scheduled(fixedRate = 60000)
public void checkAirportHeartbeatTimeout() {
long currentTime = System.currentTimeMillis();
for (Map.Entry<String, Long> entry : airportHeartbeatMap.entrySet()) {
String deviceSn = entry.getKey();
Long lastHeartbeatTime = entry.getValue();
long timeDiff = currentTime - lastHeartbeatTime;
if (timeDiff > HEARTBEAT_TIMEOUT) {
// 超时,设置为离线
stateManager.setAirportState(deviceSn, AirportState.OFFLINE);
log.warn("机场心跳超时,设置为离线: sn={}, 超时时长={}秒",
deviceSn, timeDiff / 1000);
}
}
}
}