package com.ruoyi.device.service.impl; import com.ruoyi.device.domain.api.*; import com.ruoyi.device.domain.model.*; import com.ruoyi.device.domain.model.thingsboard.AttributeMap; import com.ruoyi.device.domain.model.thingsboard.DeviceInfo; import com.ruoyi.device.domain.model.thingsboard.TelemetryMap; import com.ruoyi.device.domain.model.thingsboard.TelemetryValue; import com.ruoyi.device.domain.model.thingsboard.attributes.psdk.PsdkDevice; import com.ruoyi.device.domain.model.thingsboard.constants.DeviceAttributes; import com.ruoyi.device.domain.model.thingsboard.constants.DeviceTelemetry; import com.ruoyi.device.domain.model.thingsboard.tuoheng.constants.TuohengDeviceAttributes; import com.ruoyi.device.api.enums.PayloadTypeEnum; import com.ruoyi.device.service.enums.DeviceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @Service public class SynService { private static final Logger log = LoggerFactory.getLogger(SynService.class); private final IThingsBoardDomain iThingsBoardDomain; /** * 设备名称过滤正则表达式列表(从配置文件读取) * 匹配这些正则表达式的设备将被跳过,不进行同步 */ @Value("${device.sync.exclude-patterns:}") private String excludePatterns; /** * 编译后的正则表达式模式列表(延迟初始化) */ private List excludePatternList; @Autowired private IDeviceDomain deviceDomain; @Autowired private IDockDomain dockDomain; @Autowired private IAircraftDomain aircraftDomain; @Autowired private IDockAircraftDomain dockAircraftDomain; @Autowired private IPayloadDomain payloadDomain; @Autowired private IAircraftPayloadDomain aircraftPayloadDomain; @Autowired private IGroupDomain groupDomain; @Autowired private IDockGroupDomain dockGroupDomain; @Autowired(required = false) private StringRedisTemplate redisTemplate; public SynService(IThingsBoardDomain iThingsBoardDomain) { this.iThingsBoardDomain = iThingsBoardDomain; } /** * 初始化设备名称过滤正则表达式列表 * 延迟初始化,在第一次使用时编译正则表达式 * 改动点: 首先不要用这个过滤了,TH开头的就是拓恒无人机 */ private void initExcludePatterns() { if (excludePatternList == null) { excludePatternList = new ArrayList<>(); if (StringUtils.hasText(excludePatterns)) { String[] patterns = excludePatterns.split(","); for (String pattern : patterns) { String trimmedPattern = pattern.trim(); if (StringUtils.hasText(trimmedPattern)) { try { excludePatternList.add(Pattern.compile(trimmedPattern)); log.info("加载设备名称过滤规则: {}", trimmedPattern); } catch (Exception e) { log.error("无效的正则表达式: {}, error={}", trimmedPattern, e.getMessage()); } } } } if (excludePatternList.isEmpty()) { log.info("未配置设备名称过滤规则"); } } } /** * 判断设备名称是否应该被过滤(跳过同步) * * @param deviceName 设备名称 * @return true 表示应该被过滤,false 表示不过滤 */ private boolean shouldExcludeDevice(String deviceName) { // 延迟初始化 initExcludePatterns(); // 如果没有配置过滤规则,不过滤任何设备 if (excludePatternList.isEmpty()) { return false; } // 检查设备名称是否匹配任何过滤规则 for (Pattern pattern : excludePatternList) { if (pattern.matcher(deviceName).matches()) { log.debug("设备 {} 匹配过滤规则 {},跳过同步", deviceName, pattern.pattern()); return true; } } return false; } /** * 获取分布式锁 * * @param lockKey 锁的key * @param lockValue 锁的value(用于释放锁时验证) * @param expireTime 锁的过期时间(秒) * @return true表示获取锁成功,false表示获取锁失败 */ private boolean tryLock(String lockKey, String lockValue, long expireTime) { if (redisTemplate == null) { log.warn("Redis未配置,跳过分布式锁,使用本地同步"); return true; } try { Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.SECONDS); return Boolean.TRUE.equals(result); } catch (Exception e) { log.error("获取分布式锁失败: lockKey={}, error={}", lockKey, e.getMessage(), e); return false; } } /** * 释放分布式锁 * * @param lockKey 锁的key * @param lockValue 锁的value(用于验证是否是自己持有的锁) */ private void releaseLock(String lockKey, String lockValue) { if (redisTemplate == null) { return; } try { String currentValue = redisTemplate.opsForValue().get(lockKey); if (lockValue.equals(currentValue)) { redisTemplate.delete(lockKey); } } catch (Exception e) { log.error("释放分布式锁失败: lockKey={}, error={}", lockKey, e.getMessage(), e); } } /** * 定时任务:同步基础表数据 * 执行时间:启动后1分钟开始,每10分钟执行一次(可通过配置文件修改) * 配置项:device.schedule.print-devices.initial-delay 初始延迟时间(毫秒) * device.schedule.print-devices.fixed-delay 执行间隔时间(毫秒) * * 优化策略: * 1. 先识别所有网关设备 * 2. 遍历每个网关,同步网关及其子设备 * 3. 同步非网关的独立设备(机场和无人机) */ @Scheduled(initialDelayString = "${device.schedule.update-devices.initial-delay:60000}", fixedDelayString = "${device.schedule.update-devices.fixed-delay:120000}") public void updateDevicesScheduled() { try { log.info("========== 开始执行定时任务:同步基础表数据 =========="); int totalCount = 0; int skippedCount = 0; // 第一步:直接获取所有网关设备(服务端过滤) Iterable> gatewayDevices = iThingsBoardDomain.getAllGatewayDevices(); // 第二步:遍历每个网关,同步网关及其子设备 for (List gatewayBatch : gatewayDevices) { for (DeviceInfo gatewayInfo : gatewayBatch) { try { // 按照名字过滤网关设备(已禁用,不再过滤 TH 开头的设备) // if (shouldExcludeDevice(gatewayInfo.getName())) { // skippedCount++; // continue; // } // 同步网关设备本身 // log.info("开始同步网关: {}", gatewayInfo.getName()); syncDevice(gatewayInfo, DeviceType.GATEWAY, null); totalCount++; // 获取该网关的所有子设备ID List childDeviceIds = iThingsBoardDomain.getGatewayChildDevices(gatewayInfo.getId()); // log.info("网关 {} 有 {} 个子设备", gatewayInfo.getName(), childDeviceIds.size()); // 遍历该网关的子设备 for (String childDeviceId : childDeviceIds) { // 通过API获取子设备信息 DeviceInfo childDeviceInfo = iThingsBoardDomain.getDeviceInfo(childDeviceId); if (childDeviceInfo == null) { log.warn("子设备 {} 不存在,跳过", childDeviceId); skippedCount++; continue; } // 按照名字过滤子设备(已禁用,不再过滤 TH 开头的设备) // if (shouldExcludeDevice(childDeviceInfo.getName())) { // skippedCount++; // continue; // } try { String deviceName = childDeviceInfo.getName(); // 判断是大疆还是拓恒设备(通过设备名称是否以 TH 开头) if (deviceName.startsWith("TH")) { // 拓恒设备处理逻辑 log.info("检测到拓恒设备: {}", deviceName); syncTuohengDevice(childDeviceInfo, gatewayInfo.getId()); } else { // 大疆设备处理逻辑(原有逻辑) log.info("检测到大疆设备: {}", deviceName); AttributeMap attributes = iThingsBoardDomain.getPredefinedDeviceAttributes(childDeviceId); log.info("大疆设备 {} 的属性: {}", deviceName, attributes); DeviceType deviceType = determineDeviceType(childDeviceInfo, attributes); log.info("大疆设备 {} 的类型: {}", deviceName, deviceType); Long deviceId = syncDevice(childDeviceInfo, deviceType, gatewayInfo.getId()); log.info("大疆设备 {} 同步到device表,deviceId={}", deviceName, deviceId); syncDeviceByType(deviceId, childDeviceInfo.getName(), deviceType, attributes); log.info("大疆设备 {} 同步完成", deviceName); } totalCount++; } catch (Exception e) { log.error("同步网关子设备失败: gatewayName={}, childDeviceId={}, error={}", gatewayInfo.getName(), childDeviceId, e.getMessage(), e); } } } catch (Exception e) { log.error("同步网关失败: gatewayName={}, error={}", gatewayInfo.getName(), e.getMessage(), e); } } } log.info("========== 数据同步任务完成,共同步 {} 个设备,跳过 {} 个设备 ==========", totalCount, skippedCount); // 处理没有分组的机场,将其添加到默认分组 assignDocksToDefaultGroup(); } catch (Exception e) { log.error("数据同步任务执行失败: {}", e.getMessage(), e); } } /** * 将没有分组的机场添加到默认分组 * 默认分组名称为 "默认分组" */ private void assignDocksToDefaultGroup() { try { log.info("========== 开始检查并分配未分组的机场到默认分组 =========="); // 1. 查询或创建默认分组 String defaultGroupName = "默认分组"; Group defaultGroup = groupDomain.selectGroupByGroupName(defaultGroupName); if (defaultGroup == null) { // 默认分组不存在,创建它 log.info("默认分组不存在,开始创建: {}", defaultGroupName); defaultGroup = new Group(); defaultGroup.setGroupName(defaultGroupName); defaultGroup.setCreateBy("system"); groupDomain.insertGroup(defaultGroup); log.info("默认分组创建成功: groupId={}, groupName={}", defaultGroup.getGroupId(), defaultGroupName); } else { log.info("默认分组已存在: groupId={}, groupName={}", defaultGroup.getGroupId(), defaultGroupName); } Long defaultGroupId = defaultGroup.getGroupId(); // 2. 查询所有机场 Dock queryDock = new Dock(); List allDocks = dockDomain.selectDockList(queryDock); log.info("查询到机场总数: {}", allDocks != null ? allDocks.size() : 0); if (allDocks == null || allDocks.isEmpty()) { log.info("没有机场需要处理"); return; } int assignedCount = 0; int alreadyInGroupCount = 0; // 3. 遍历所有机场,检查是否在分组中 for (Dock dock : allDocks) { try { Long dockId = dock.getDockId(); // 查询该机场是否已经在任何分组中 List existingGroups = dockGroupDomain.selectDockGroupByDockId(dockId); if (existingGroups == null || existingGroups.isEmpty()) { // 机场没有分组,添加到默认分组 DockGroup dockGroup = new DockGroup(); dockGroup.setDockId(dockId); dockGroup.setGroupId(defaultGroupId); dockGroup.setCreateBy("system"); dockGroupDomain.insertDockGroup(dockGroup); assignedCount++; log.info("机场 [ID:{}, Name:{}] 已添加到默认分组", dockId, dock.getDockName()); } else { alreadyInGroupCount++; log.debug("机场 [ID:{}, Name:{}] 已在分组中,跳过", dockId, dock.getDockName()); } } catch (Exception e) { log.error("处理机场分组失败: dockId={}, error={}", dock.getDockId(), e.getMessage(), e); } } log.info("========== 机场分组检查完成:新分配 {} 个机场到默认分组,{} 个机场已有分组 ==========", assignedCount, alreadyInGroupCount); } catch (Exception e) { log.error("分配机场到默认分组失败: {}", e.getMessage(), e); } } /** * 根据设备类型进行不同的同步处理 * * @param deviceId 设备主键ID * @param deviceName 设备名称 * @param deviceType 设备类型 * @param attributes 设备属性 */ private void syncDeviceByType(Long deviceId, String deviceName, DeviceType deviceType, AttributeMap attributes) { if (deviceType == DeviceType.DOCK) { // 机场:同步机场表 syncDock(deviceId, deviceName); // 获取机场挂载的无人机SN号 Optional subDeviceSnOpt = attributes.get(DeviceAttributes.SUB_DEVICE_SN); log.info("机场 {} 的 sub_device.device_sn 属性: {}", deviceName, subDeviceSnOpt.isPresent() ? subDeviceSnOpt.get() : "不存在或为空"); if (subDeviceSnOpt.isPresent() && StringUtils.hasText(subDeviceSnOpt.get())) { String aircraftSn = subDeviceSnOpt.get(); log.info("机场 {} 尝试查找无人机: aircraftSn={}", deviceName, aircraftSn); Device aircraftDevice = findDeviceBySn(aircraftSn); if (aircraftDevice != null) { log.info("找到无人机设备: aircraftSn={}, deviceId={}", aircraftSn, aircraftDevice.getDeviceId()); syncDockAircraft(deviceId, aircraftDevice.getDeviceId()); } else { log.warn("未找到无人机设备: aircraftSn={}, 可能无人机尚未同步", aircraftSn); } } else { log.warn("机场 {} 没有 sub_device.device_sn 属性,无法建立机场-无人机关联", deviceName); } } else if (deviceType == DeviceType.AIRCRAFT) { // 无人机:同步无人机表 syncAircraft(deviceId, deviceName); // 获取无人机所属的机场SN号 Optional dockSnOpt = attributes.get(DeviceAttributes.DOCK_SN); if (dockSnOpt.isPresent() && StringUtils.hasText(dockSnOpt.get())) { String dockSn = dockSnOpt.get(); log.info("无人机 {} 尝试查找所属机场: dockSn={}", deviceName, dockSn); // 通过机场SN号查找机场设备 Device dockDevice = findDeviceBySn(dockSn); if (dockDevice != null) { log.info("找到机场设备: dockSn={}, deviceId={}", dockSn, dockDevice.getDeviceId()); // 建立机场-无人机关联 syncDockAircraft(dockDevice.getDeviceId(), deviceId); } else { log.warn("未找到机场设备: dockSn={}, 可能机场尚未同步", dockSn); } } else { log.warn("无人机 {} 没有 dock_sn 属性,无法建立机场-无人机关联", deviceName); } } // 网关类型不需要额外处理 } /** * 判断设备类型 * 优化后的判断逻辑: * 1. 优先使用 ThingsBoard 标准的 additionalInfo.gateway 字段判断网关 * 2. 对于非网关设备,通过 dock_sn 属性区分机场和无人机 * * @param deviceInfo ThingsBoard设备信息 * @param attributes 设备属性 * @return 设备类型 */ private DeviceType determineDeviceType(DeviceInfo deviceInfo, AttributeMap attributes) { String deviceName = deviceInfo.getName(); // 1. 使用 ThingsBoard 标准方式判断网关:检查 additionalInfo 中的 gateway 字段 if (deviceInfo.isGateway()) { return DeviceType.GATEWAY; } // 2. 非网关设备:通过 dock_sn 属性区分机场和无人机 Optional dockSnOpt = attributes.get(DeviceAttributes.DOCK_SN); if (dockSnOpt.isPresent() && StringUtils.hasText(dockSnOpt.get())) { String dockSn = dockSnOpt.get(); // dock_sn 等于设备名称 -> 机场 // dock_sn 不等于设备名称 -> 无人机(挂载在该机场下) return deviceName.equals(dockSn) ? DeviceType.DOCK : DeviceType.AIRCRAFT; } // dock_sn 属性不存在或为空,无法判断设备类型 throw new IllegalStateException("无法确定设备类型:设备 " + deviceName + " 缺少 dock_sn 属性"); } /** * 同步设备数据 * * @param deviceInfo ThingsBoard设备信息 * @param deviceType 设备类型 * @param gatewayId 网关设备ID(从映射中获取,避免重复API调用) * @return 设备主键ID */ private Long syncDevice(DeviceInfo deviceInfo, DeviceType deviceType, String gatewayId) { String iotDeviceId = deviceInfo.getId(); String deviceName = deviceInfo.getName(); String deviceSn = deviceName; // 使用设备名称作为SN号,网关其实是没有SN号的 log.info("开始同步设备: iotDeviceId={}, deviceName={}, deviceType={}, gatewayId={}", iotDeviceId, deviceName, deviceType, gatewayId); // 查询设备是否已存在(通过 iotDeviceId) Device existingDevice = deviceDomain.selectDeviceByIotDeviceId(iotDeviceId); // 如果通过 iotDeviceId 未找到,再检查 device_sn 是否已存在 if (existingDevice == null) { Device existingDeviceBySn = deviceDomain.selectDeviceByDeviceSn(deviceSn); if (existingDeviceBySn != null) { log.warn("设备SN已存在但iotDeviceId不同: deviceSn={}, existingIotDeviceId={}, newIotDeviceId={}", deviceSn, existingDeviceBySn.getIotDeviceId(), iotDeviceId); // 返回已存在的设备ID,避免重复插入 return existingDeviceBySn.getDeviceId(); } } if (existingDevice == null) { // 设备不存在,使用分布式锁防止并发插入 String lockKey = "device:sync:lock:" + deviceSn; String lockValue = UUID.randomUUID().toString(); // 尝试获取锁,最多等待3秒 boolean locked = tryLock(lockKey, lockValue, 10); if (!locked) { log.warn("获取设备同步锁失败,可能有其他线程正在同步该设备: deviceSn={}", deviceSn); // 等待一小段时间后重新查询,可能已经被其他线程插入 try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } Device retryDevice = deviceDomain.selectDeviceByDeviceSn(deviceSn); if (retryDevice != null) { log.info("重新查询到设备: deviceSn={}, deviceId={}", deviceSn, retryDevice.getDeviceId()); return retryDevice.getDeviceId(); } log.error("获取锁失败且重新查询也未找到设备: deviceSn={}", deviceSn); return null; } try { // 双重检查:获取锁后再次查询,防止重复插入 Device doubleCheckDevice = deviceDomain.selectDeviceByDeviceSn(deviceSn); if (doubleCheckDevice != null) { log.info("双重检查发现设备已存在: deviceSn={}, deviceId={}", deviceSn, doubleCheckDevice.getDeviceId()); return doubleCheckDevice.getDeviceId(); } // 插入新设备 Device newDevice = new Device(); newDevice.setDeviceName(deviceName); newDevice.setIotDeviceId(iotDeviceId); newDevice.setDeviceType(deviceType.getCode()); newDevice.setDeviceSn(deviceSn); newDevice.setGateway(gatewayId); newDevice.setCreateBy("system"); // 根据设备名称判断厂商 String manufacturer = deviceName.startsWith("TH") ? "tuoheng" : "dajiang"; newDevice.setDeviceManufacturer(manufacturer); log.info("准备插入新设备: deviceName={}, deviceType={}, deviceSn={}, manufacturer={}", deviceName, deviceType, deviceSn, manufacturer); deviceDomain.insertDevice(newDevice); Long deviceId = newDevice.getDeviceId(); log.info("插入新设备成功: iotDeviceId={}, deviceName={}, deviceType={}, 返回deviceId={}", iotDeviceId, deviceName, deviceType, deviceId); return deviceId; } finally { // 释放锁 releaseLock(lockKey, lockValue); } } else { // 设备已存在,检查是否需要更新 boolean needUpdate = false; if (!Objects.equals(existingDevice.getDeviceName(), deviceName)) { existingDevice.setDeviceName(deviceName); needUpdate = true; } if (!Objects.equals(existingDevice.getDeviceType(), deviceType.getCode())) { existingDevice.setDeviceType(deviceType.getCode()); needUpdate = true; } if (!Objects.equals(existingDevice.getDeviceSn(), deviceSn)) { existingDevice.setDeviceSn(deviceSn); needUpdate = true; } if (!Objects.equals(existingDevice.getGateway(), gatewayId)) { existingDevice.setGateway(gatewayId); needUpdate = true; } // 检查并更新厂商字段 String manufacturer = deviceName.startsWith("TH") ? "tuoheng" : "dajiang"; if (!Objects.equals(existingDevice.getDeviceManufacturer(), manufacturer)) { existingDevice.setDeviceManufacturer(manufacturer); needUpdate = true; } if (needUpdate) { existingDevice.setUpdateBy("system"); deviceDomain.updateDevice(existingDevice); log.info("更新设备: iotDeviceId={}, deviceName={}, deviceType={}, manufacturer={}", iotDeviceId, deviceName, deviceType, manufacturer); } return existingDevice.getDeviceId(); } } /** * 同步机场数据 * 逻辑: * 1. 如果是新插入的机场,需要将其插入到默认分组(ID=0) * 2. 如果默认分组不存在,需要先创建默认分组 * 3. 如果是已存在的机场,检查机场是否在任何分组中,如果不在,将其插入默认分组 * * @param deviceId 设备主键ID * @param deviceName 设备名称 */ private void syncDock(Long deviceId, String deviceName) { log.info("开始同步机场: deviceId={}, deviceName={}", deviceId, deviceName); // 查询机场是否已存在 Dock existingDock = dockDomain.selectDockByDeviceId(deviceId); boolean isNewDock = (existingDock == null); Long dockId; if (isNewDock) { // 机场不存在,插入新机场 Dock newDock = new Dock(); newDock.setDockName(deviceName); newDock.setDeviceId(deviceId); newDock.setCreateBy("system"); log.info("准备插入新机场: deviceId={}, dockName={}", deviceId, deviceName); dockDomain.insertDock(newDock); dockId = newDock.getDockId(); log.info("插入新机场成功: deviceId={}, dockName={}, dockId={}", deviceId, deviceName, dockId); } else { dockId = existingDock.getDockId(); log.info("机场已存在: deviceId={}, dockName={}, dockId={}", deviceId, deviceName, dockId); } // 确保默认分组存在 ensureDefaultGroupExists(); // 检查机场是否在任何分组中 List dockGroups = dockGroupDomain.selectDockGroupByDockId(dockId); if (dockGroups == null || dockGroups.isEmpty()) { // 机场不在任何分组中,将其插入默认分组 insertDockToDefaultGroup(dockId, deviceName); } else { log.info("机场已在分组中: dockId={}, 分组数量={}", dockId, dockGroups.size()); } } /** * 同步无人机数据 * * @param deviceId 设备主键ID * @param deviceName 设备名称 * @return 无人机主键ID */ private Long syncAircraft(Long deviceId, String deviceName) { log.info("开始同步无人机: deviceId={}, deviceName={}", deviceId, deviceName); // 查询无人机是否已存在 Aircraft existingAircraft = aircraftDomain.selectAircraftByDeviceId(deviceId); if (existingAircraft == null) { // 无人机不存在,插入新无人机 Aircraft newAircraft = new Aircraft(); newAircraft.setAircraftName(deviceName); newAircraft.setDeviceId(deviceId); newAircraft.setCreateBy("system"); log.info("准备插入新无人机: deviceId={}, aircraftName={}", deviceId, deviceName); aircraftDomain.insertAircraft(newAircraft); Long aircraftId = newAircraft.getAircraftId(); log.info("插入新无人机成功: deviceId={}, aircraftName={}, aircraftId={}", deviceId, deviceName, aircraftId); // 新插入的无人机,直接返回 aircraftId,不需要处理 PSDK 设备 return aircraftId; } // 无人机已存在,记录日志 log.info("无人机已存在: deviceId={}, aircraftName={}, aircraftId={}", deviceId, deviceName, existingAircraft.getAircraftId()); Device device = deviceDomain.selectDeviceByDeviceId(deviceId); TelemetryMap telemetryMap = iThingsBoardDomain.getPredefinedDeviceTelemetry(device.getIotDeviceId()); Optional>> psdkDevicesOption = telemetryMap.get(DeviceTelemetry.PSDK_WIDGET_VALUES); if(psdkDevicesOption.isPresent()) { List psdkDevices = psdkDevicesOption.get().getValue(); if(!CollectionUtils.isEmpty(psdkDevices)) { for (PsdkDevice psdkDevice : psdkDevices) { if(Objects.nonNull(psdkDevice.getSpeaker()) && Objects.nonNull(psdkDevice.getPsdk_sn()) && !psdkDevice.getPsdk_sn().isEmpty()) { log.info("开始同步PSDK喊话器设备: psdkSn={}, psdkName={}", psdkDevice.getPsdk_sn(), psdkDevice.getPsdk_name()); // 第一步:同步挂载设备(喊话器) Payload payload = new Payload(); payload.setPayloadSn(psdkDevice.getPsdk_sn()); List payloads = payloadDomain.selectPayloadList(payload); if(CollectionUtils.isEmpty(payloads)) { // 挂载不存在,插入新挂载 payload.setPayloadName(psdkDevice.getPsdk_name()); payload.setPayloadDisplayName("喊话器"); payload.setPayloadType(PayloadTypeEnum.SPEAKER.getCode()); payload.setCreateBy("system"); payloadDomain.insertPayload(payload); log.info("插入新挂载设备: payloadSn={}, payloadName={}, payloadId={}", psdkDevice.getPsdk_sn(), psdkDevice.getPsdk_name(), payload.getPayloadId()); } else { // 挂载已存在,使用已有的挂载 payload = payloads.get(0); log.info("挂载设备已存在: payloadSn={}, payloadId={}", psdkDevice.getPsdk_sn(), payload.getPayloadId()); } // 第二步:同步无人机挂载关联表 Long payloadId = payload.getPayloadId(); Long aircraftId = existingAircraft.getAircraftId(); // 获取无人机关联的机场ID Long dockId = getDockIdByAircraftId(aircraftId); List aircraftPayloads = aircraftPayloadDomain.selectAircraftPayloadByPayloadId(payloadId); if(!CollectionUtils.isEmpty(aircraftPayloads)) { // 关联已存在,检查是否需要更新 AircraftPayload existingRelation = aircraftPayloads.get(0); boolean needUpdate = false; if(!Objects.equals(existingRelation.getAircraftId(), aircraftId)) { existingRelation.setAircraftId(aircraftId); needUpdate = true; } if(!Objects.equals(existingRelation.getDockId(), dockId)) { existingRelation.setDockId(dockId); needUpdate = true; } if(needUpdate) { existingRelation.setUpdateBy("system"); aircraftPayloadDomain.updateAircraftPayload(existingRelation); log.info("更新无人机挂载关联: aircraftId={}, payloadId={}, dockId={}", aircraftId, payloadId, dockId); } else { log.info("无人机挂载关联无需更新: aircraftId={}, payloadId={}", aircraftId, payloadId); } } else { // 关联不存在,插入新关联 AircraftPayload newRelation = new AircraftPayload(); newRelation.setAircraftId(aircraftId); newRelation.setPayloadId(payloadId); newRelation.setDockId(dockId); newRelation.setCreateBy("system"); aircraftPayloadDomain.insertAircraftPayload(newRelation); log.info("插入无人机挂载关联: aircraftId={}, payloadId={}, dockId={}", aircraftId, payloadId, dockId); } } } } } // 无人机已存在,无需更新 return existingAircraft.getAircraftId(); } /** * 根据设备SN号查找设备 * * @param deviceSn 设备SN号 * @return 设备信息 */ private Device findDeviceBySn(String deviceSn) { Device queryDevice = new Device(); queryDevice.setDeviceSn(deviceSn); List devices = deviceDomain.selectDeviceList(queryDevice); return (devices != null && !devices.isEmpty()) ? devices.get(0) : null; } /** * 根据无人机ID获取关联的机场ID * * @param aircraftId 无人机主键ID * @return 机场主键ID,如果没有关联则返回null */ private Long getDockIdByAircraftId(Long aircraftId) { List dockAircrafts = dockAircraftDomain.selectDockAircraftByAircraftId(aircraftId); if (dockAircrafts != null && !dockAircrafts.isEmpty()) { Long dockId = dockAircrafts.get(0).getDockId(); log.debug("无人机 {} 关联的机场ID: {}", aircraftId, dockId); return dockId; } log.debug("无人机 {} 没有关联的机场", aircraftId); return null; } /** * 同步机场无人机关联数据 * 按照一个机场只会挂载一台无人机的逻辑进行处理 * * @param dockDeviceId 机场设备主键ID * @param aircraftDeviceId 无人机设备主键ID */ private void syncDockAircraft(Long dockDeviceId, Long aircraftDeviceId) { log.info("开始同步机场无人机关联: dockDeviceId={}, aircraftDeviceId={}", dockDeviceId, aircraftDeviceId); // 获取机场主键 Dock dock = dockDomain.selectDockByDeviceId(dockDeviceId); if (dock == null) { log.warn("机场不存在,无法同步机场无人机关联: dockDeviceId={}", dockDeviceId); return; } log.info("找到机场: dockId={}, dockName={}", dock.getDockId(), dock.getDockName()); // 获取无人机主键 Aircraft aircraft = aircraftDomain.selectAircraftByDeviceId(aircraftDeviceId); if (aircraft == null) { log.warn("无人机不存在,无法同步机场无人机关联: aircraftDeviceId={}", aircraftDeviceId); return; } Long dockId = dock.getDockId(); Long aircraftId = aircraft.getAircraftId(); // 查询该机场是否已有关联 List existingRelations = dockAircraftDomain.selectDockAircraftByDockId(dockId); if (existingRelations == null || existingRelations.isEmpty()) { // 机场没有关联,插入新关联 DockAircraft newRelation = new DockAircraft(); newRelation.setDockId(dockId); newRelation.setAircraftId(aircraftId); newRelation.setCreateBy("system"); dockAircraftDomain.insertDockAircraft(newRelation); log.info("插入机场无人机关联: dockId={}, aircraftId={}", dockId, aircraftId); } else { // 机场已有关联,检查是否需要更新 DockAircraft existingRelation = existingRelations.get(0); if (!Objects.equals(existingRelation.getAircraftId(), aircraftId)) { // 无人机发生变化,更新关联 existingRelation.setAircraftId(aircraftId); existingRelation.setUpdateBy("system"); dockAircraftDomain.updateDockAircraft(existingRelation); log.info("更新机场无人机关联: dockId=, aircraftId={}", dockId, aircraftId); } } } /** * 确保默认分组存在 * 默认分组ID为0,名称为"默认分组" * 如果不存在则创建 */ private void ensureDefaultGroupExists() { Long defaultGroupId = 0L; Group defaultGroup = groupDomain.selectGroupByGroupId(defaultGroupId); if (defaultGroup == null) { // 默认分组不存在,创建默认分组 Group newGroup = new Group(); newGroup.setGroupName("默认分组"); newGroup.setCreateBy("system"); newGroup.setRemark("系统自动创建的默认分组,用于存放未分组的机场"); groupDomain.insertGroup(newGroup); log.info("创建默认分组成功: groupId={}, groupName={}", defaultGroupId, "默认分组"); } else { log.debug("默认分组已存在: groupId={}, groupName={}", defaultGroupId, defaultGroup.getGroupName()); } } /** * 将机场插入到默认分组 * * @param dockId 机场主键ID * @param dockName 机场名称(用于日志) */ private void insertDockToDefaultGroup(Long dockId, String dockName) { Long defaultGroupId = 0L; DockGroup newDockGroup = new DockGroup(); newDockGroup.setDockId(dockId); newDockGroup.setGroupId(defaultGroupId); newDockGroup.setCreateBy("system"); newDockGroup.setRemark("系统自动添加到默认分组"); dockGroupDomain.insertDockGroup(newDockGroup); log.info("将机场添加到默认分组: dockId={}, dockName={}, groupId={}", dockId, dockName, defaultGroupId); } /** * 同步拓恒设备(机场和无人机不区分类型,同时落三个表) * * @param deviceInfo 设备信息 * @param gatewayId 网关ID */ private void syncTuohengDevice(DeviceInfo deviceInfo, String gatewayId) { String deviceId = deviceInfo.getId(); String deviceName = deviceInfo.getName(); // 这是SN号,如 THJSQ03B2410TGJ3MCJY log.info("开始同步拓恒设备: deviceName(SN)={}", deviceName); try { // 获取拓恒设备属性 AttributeMap attributes = iThingsBoardDomain.getPredefinedTuohengDeviceAttributes(deviceId); // 获取 airportID(机场和无人机的名称) Optional airportIdOpt = attributes.get(TuohengDeviceAttributes.AIRPORT_ID); String airportId = airportIdOpt.orElse(deviceName); // 如果没有 airportID,使用设备名称 // 获取固件版本 Optional firmwareVersionOpt = attributes.get(TuohengDeviceAttributes.HARDWARE_VERSION); String firmwareVersion = firmwareVersionOpt.orElse(""); log.info("拓恒设备属性: SN={}, airportID={}, firmwareVersion={}", deviceName, airportId, firmwareVersion); // 1. 同步到 device 表(作为机场)- 使用原始 SN Long dockDeviceId = syncTuohengDeviceAsType(deviceInfo, DeviceType.DOCK, gatewayId, deviceName); // 2. 同步到 device_dock 表 syncTuohengDock(dockDeviceId, deviceName, airportId, firmwareVersion); // 3. 同步到 device 表(作为无人机)- 使用带后缀的 SN String aircraftSn = deviceName + "-AIRCRAFT"; Long aircraftDeviceId = syncTuohengDeviceAsType(deviceInfo, DeviceType.AIRCRAFT, gatewayId, aircraftSn); // 4. 同步到 device_aircraft 表 syncTuohengAircraft(aircraftDeviceId, deviceName, airportId, firmwareVersion); // 5. 同步到 device_dock_aircraft 表(建立机场-无人机关联) syncDockAircraft(dockDeviceId, aircraftDeviceId); log.info("拓恒设备同步完成: SN={}, dockDeviceId={}, aircraftDeviceId={}", deviceName, dockDeviceId, aircraftDeviceId); } catch (Exception e) { log.error("同步拓恒设备失败: deviceName={}, error={}", deviceName, e.getMessage(), e); throw e; } } /** * 同步拓恒设备到 device 表(指定设备类型和 SN) * * @param deviceInfo ThingsBoard设备信息 * @param deviceType 设备类型 * @param gatewayId 网关设备ID * @param deviceSn 设备SN号(可自定义) * @return 设备主键ID */ private Long syncTuohengDeviceAsType(DeviceInfo deviceInfo, DeviceType deviceType, String gatewayId, String deviceSn) { String iotDeviceId = deviceInfo.getId(); String deviceName = deviceInfo.getName(); log.info("开始同步拓恒设备到device表: iotDeviceId={}, deviceName={}, deviceType={}, deviceSn={}, gatewayId={}", iotDeviceId, deviceName, deviceType, deviceSn, gatewayId); // 查询设备是否已存在(通过 iotDeviceId + deviceType 组合查询) Device queryDevice = new Device(); queryDevice.setIotDeviceId(iotDeviceId); queryDevice.setDeviceType(deviceType.getCode()); List existingDevices = deviceDomain.selectDeviceList(queryDevice); Device existingDevice = (existingDevices != null && !existingDevices.isEmpty()) ? existingDevices.get(0) : null; // 如果通过 iotDeviceId + deviceType 未找到,再检查 device_sn 是否已存在 if (existingDevice == null) { Device existingDeviceBySn = deviceDomain.selectDeviceByDeviceSn(deviceSn); if (existingDeviceBySn != null) { log.warn("拓恒设备SN已存在但iotDeviceId或deviceType不同: deviceSn={}, existingIotDeviceId={}, existingDeviceType={}, newIotDeviceId={}, newDeviceType={}", deviceSn, existingDeviceBySn.getIotDeviceId(), existingDeviceBySn.getDeviceType(), iotDeviceId, deviceType.getCode()); // 返回已存在的设备ID,避免重复插入 return existingDeviceBySn.getDeviceId(); } } if (existingDevice == null) { // 设备不存在,使用分布式锁防止并发插入 String lockKey = "device:sync:lock:" + deviceSn; String lockValue = UUID.randomUUID().toString(); // 尝试获取锁 boolean locked = tryLock(lockKey, lockValue, 10); if (!locked) { log.warn("获取拓恒设备同步锁失败,可能有其他线程正在同步该设备: deviceSn={}", deviceSn); // 等待后重新查询 try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } Device retryDevice = deviceDomain.selectDeviceByDeviceSn(deviceSn); if (retryDevice != null) { log.info("重新查询到拓恒设备: deviceSn={}, deviceId={}", deviceSn, retryDevice.getDeviceId()); return retryDevice.getDeviceId(); } log.error("获取锁失败且重新查询也未找到拓恒设备: deviceSn={}", deviceSn); return null; } try { // 双重检查 Device doubleCheckDevice = deviceDomain.selectDeviceByDeviceSn(deviceSn); if (doubleCheckDevice != null) { log.info("双重检查发现拓恒设备已存在: deviceSn={}, deviceId={}", deviceSn, doubleCheckDevice.getDeviceId()); return doubleCheckDevice.getDeviceId(); } // 插入新设备 Device newDevice = new Device(); newDevice.setDeviceName(deviceName); newDevice.setIotDeviceId(iotDeviceId); newDevice.setDeviceType(deviceType.getCode()); newDevice.setDeviceSn(deviceSn); newDevice.setGateway(gatewayId); newDevice.setDeviceManufacturer("tuoheng"); newDevice.setCreateBy("system"); log.info("准备插入新拓恒设备: deviceName={}, deviceType={}, deviceSn={}, manufacturer=tuoheng", deviceName, deviceType, deviceSn); deviceDomain.insertDevice(newDevice); Long deviceId = newDevice.getDeviceId(); log.info("插入新拓恒设备成功: iotDeviceId={}, deviceName={}, deviceType={}, deviceSn={}, 返回deviceId={}", iotDeviceId, deviceName, deviceType, deviceSn, deviceId); return deviceId; } finally { // 释放锁 releaseLock(lockKey, lockValue); } } else { // 设备已存在,检查是否需要更新 boolean needUpdate = false; if (!Objects.equals(existingDevice.getDeviceName(), deviceName)) { existingDevice.setDeviceName(deviceName); needUpdate = true; } if (!Objects.equals(existingDevice.getDeviceSn(), deviceSn)) { existingDevice.setDeviceSn(deviceSn); needUpdate = true; } if (!Objects.equals(existingDevice.getGateway(), gatewayId)) { existingDevice.setGateway(gatewayId); needUpdate = true; } if (needUpdate) { existingDevice.setUpdateBy("system"); deviceDomain.updateDevice(existingDevice); log.info("更新拓恒设备: iotDeviceId={}, deviceName={}, deviceType={}, deviceSn={}", iotDeviceId, deviceName, deviceType, deviceSn); } return existingDevice.getDeviceId(); } } /** * 同步拓恒机场数据 * * @param deviceId 设备主键ID * @param sn 设备SN号(设备名称) * @param airportId 机场ID(airportID属性) * @param firmwareVersion 固件版本 */ private void syncTuohengDock(Long deviceId, String sn, String airportId, String firmwareVersion) { log.info("开始同步拓恒机场: deviceId={}, SN={}, airportID={}", deviceId, sn, airportId); // 查询机场是否已存在 Dock existingDock = dockDomain.selectDockByDeviceId(deviceId); Long dockId; if (existingDock == null) { // 机场不存在,插入新机场 Dock newDock = new Dock(); newDock.setDockName(airportId); // 使用 airportID 作为机场名称 newDock.setDeviceId(deviceId); newDock.setCreateBy("system"); log.info("准备插入新拓恒机场: deviceId={}, dockName={}, SN={}", deviceId, airportId, sn); dockDomain.insertDock(newDock); dockId = newDock.getDockId(); log.info("插入新拓恒机场成功: dockId={}", dockId); } else { dockId = existingDock.getDockId(); log.info("拓恒机场已存在: deviceId={}, dockId={}", deviceId, dockId); } // 确保默认分组存在 ensureDefaultGroupExists(); // 检查机场是否在任何分组中(无论新旧机场都检查) List dockGroups = dockGroupDomain.selectDockGroupByDockId(dockId); if (dockGroups == null || dockGroups.isEmpty()) { // 机场不在任何分组中,将其插入默认分组 insertDockToDefaultGroup(dockId, airportId); } else { log.info("拓恒机场已在分组中: dockId={}, 分组数量={}", dockId, dockGroups.size()); } } /** * 同步拓恒无人机数据 * * @param deviceId 设备主键ID * @param sn 设备SN号(设备名称) * @param airportId 机场ID(airportID属性) * @param firmwareVersion 固件版本 * @return 无人机主键ID */ private Long syncTuohengAircraft(Long deviceId, String sn, String airportId, String firmwareVersion) { log.info("开始同步拓恒无人机: deviceId={}, SN={}, airportID={}", deviceId, sn, airportId); if (deviceId == null) { log.error("同步拓恒无人机失败: deviceId 为 null, SN={}", sn); return null; } // 查询无人机是否已存在 Aircraft existingAircraft = aircraftDomain.selectAircraftByDeviceId(deviceId); if (existingAircraft == null) { // 无人机不存在,插入新无人机 Aircraft newAircraft = new Aircraft(); newAircraft.setAircraftName(airportId); // 使用 airportID 作为无人机名称 newAircraft.setDeviceId(deviceId); newAircraft.setCreateBy("system"); log.info("准备插入新拓恒无人机: deviceId={}, aircraftName={}, SN={}", deviceId, airportId, sn); aircraftDomain.insertAircraft(newAircraft); Long aircraftId = newAircraft.getAircraftId(); log.info("插入新拓恒无人机成功: deviceId={}, aircraftId={}", deviceId, aircraftId); return aircraftId; } else { log.info("拓恒无人机已存在: deviceId={}, aircraftId={}", deviceId, existingAircraft.getAircraftId()); return existingAircraft.getAircraftId(); } } }