a-tuoheng-device/src/main/java/com/ruoyi/device/service/impl/SynService.java

1128 lines
50 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.ruoyi.device.service.impl;
import com.ruoyi.device.domain.api.*;
import com.ruoyi.device.domain.model.*;
import com.ruoyi.device.domain.model.thingsboard.AttributeMap;
import com.ruoyi.device.domain.model.thingsboard.DeviceInfo;
import com.ruoyi.device.domain.model.thingsboard.TelemetryMap;
import com.ruoyi.device.domain.model.thingsboard.TelemetryValue;
import com.ruoyi.device.domain.model.thingsboard.attributes.psdk.PsdkDevice;
import com.ruoyi.device.domain.model.thingsboard.constants.DeviceAttributes;
import com.ruoyi.device.domain.model.thingsboard.constants.DeviceTelemetry;
import com.ruoyi.device.domain.model.thingsboard.tuoheng.constants.TuohengDeviceAttributes;
import com.ruoyi.device.api.enums.PayloadTypeEnum;
import com.ruoyi.device.service.enums.DeviceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@Service
public class SynService {
private static final Logger log = LoggerFactory.getLogger(SynService.class);
private final IThingsBoardDomain iThingsBoardDomain;
/**
* 设备名称过滤正则表达式列表(从配置文件读取)
* 匹配这些正则表达式的设备将被跳过,不进行同步
*/
@Value("${device.sync.exclude-patterns:}")
private String excludePatterns;
/**
* 编译后的正则表达式模式列表(延迟初始化)
*/
private List<Pattern> excludePatternList;
@Autowired
private IDeviceDomain deviceDomain;
@Autowired
private IDockDomain dockDomain;
@Autowired
private IAircraftDomain aircraftDomain;
@Autowired
private IDockAircraftDomain dockAircraftDomain;
@Autowired
private IPayloadDomain payloadDomain;
@Autowired
private IAircraftPayloadDomain aircraftPayloadDomain;
@Autowired
private IGroupDomain groupDomain;
@Autowired
private IDockGroupDomain dockGroupDomain;
@Autowired(required = false)
private StringRedisTemplate redisTemplate;
public SynService(IThingsBoardDomain iThingsBoardDomain) {
this.iThingsBoardDomain = iThingsBoardDomain;
}
/**
* 初始化设备名称过滤正则表达式列表
* 延迟初始化,在第一次使用时编译正则表达式
* 改动点: 首先不要用这个过滤了TH开头的就是拓恒无人机
*/
private void initExcludePatterns() {
if (excludePatternList == null) {
excludePatternList = new ArrayList<>();
if (StringUtils.hasText(excludePatterns)) {
String[] patterns = excludePatterns.split(",");
for (String pattern : patterns) {
String trimmedPattern = pattern.trim();
if (StringUtils.hasText(trimmedPattern)) {
try {
excludePatternList.add(Pattern.compile(trimmedPattern));
// log.info("加载设备名称过滤规则: {}", trimmedPattern);
} catch (Exception e) {
log.error("无效的正则表达式: {}, error={}", trimmedPattern, e.getMessage());
}
}
}
}
if (excludePatternList.isEmpty()) {
log.error("未配置设备名称过滤规则");
}
}
}
/**
* 判断设备名称是否应该被过滤(跳过同步)
*
* @param deviceName 设备名称
* @return true 表示应该被过滤false 表示不过滤
*/
private boolean shouldExcludeDevice(String deviceName) {
// 延迟初始化
initExcludePatterns();
// 如果没有配置过滤规则,不过滤任何设备
if (excludePatternList.isEmpty()) {
return false;
}
// 检查设备名称是否匹配任何过滤规则
for (Pattern pattern : excludePatternList) {
if (pattern.matcher(deviceName).matches()) {
log.debug("设备 {} 匹配过滤规则 {},跳过同步", deviceName, pattern.pattern());
return true;
}
}
return false;
}
/**
* 获取分布式锁
*
* @param lockKey 锁的key
* @param lockValue 锁的value用于释放锁时验证
* @param expireTime 锁的过期时间(秒)
* @return true表示获取锁成功false表示获取锁失败
*/
private boolean tryLock(String lockKey, String lockValue, long expireTime) {
if (redisTemplate == null) {
log.warn("Redis未配置跳过分布式锁使用本地同步");
return true;
}
try {
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
} catch (Exception e) {
log.error("获取分布式锁失败: lockKey={}, error={}", lockKey, e.getMessage(), e);
return false;
}
}
/**
* 释放分布式锁
*
* @param lockKey 锁的key
* @param lockValue 锁的value用于验证是否是自己持有的锁
*/
private void releaseLock(String lockKey, String lockValue) {
if (redisTemplate == null) {
return;
}
try {
String currentValue = redisTemplate.opsForValue().get(lockKey);
if (lockValue.equals(currentValue)) {
redisTemplate.delete(lockKey);
}
} catch (Exception e) {
log.error("释放分布式锁失败: lockKey={}, error={}", lockKey, e.getMessage(), e);
}
}
/**
* 定时任务:同步基础表数据
* 执行时间启动后1分钟开始每10分钟执行一次可通过配置文件修改
* 配置项device.schedule.print-devices.initial-delay 初始延迟时间(毫秒)
* device.schedule.print-devices.fixed-delay 执行间隔时间(毫秒)
*
* 优化策略:
* 1. 先识别所有网关设备
* 2. 遍历每个网关,同步网关及其子设备
* 3. 同步非网关的独立设备(机场和无人机)
*/
@Scheduled(initialDelayString = "${device.schedule.update-devices.initial-delay:60000}",
fixedDelayString = "${device.schedule.update-devices.fixed-delay:120000}")
public void updateDevicesScheduled() {
try {
log.info("========== 开始执行定时任务:同步基础表数据 ==========");
int totalCount = 0;
int skippedCount = 0;
// 第一步:直接获取所有网关设备(服务端过滤)
Iterable<List<DeviceInfo>> gatewayDevices = iThingsBoardDomain.getAllGatewayDevices();
// 第二步:遍历每个网关,同步网关及其子设备
for (List<DeviceInfo> gatewayBatch : gatewayDevices) {
for (DeviceInfo gatewayInfo : gatewayBatch) {
try {
// 按照名字过滤网关设备(已禁用,不再过滤 TH 开头的设备)
// if (shouldExcludeDevice(gatewayInfo.getName())) {
// skippedCount++;
// continue;
// }
log.info("开始同步网关: {}", gatewayInfo.getName());
syncDevice(gatewayInfo, DeviceType.GATEWAY, null);
totalCount++;
// 获取该网关的所有子设备ID
List<String> childDeviceIds = iThingsBoardDomain.getGatewayChildDevices(gatewayInfo.getId());
log.info("网关 {} 有 {} 个子设备", gatewayInfo.getName(), childDeviceIds.size());
// 遍历该网关的子设备
for (String childDeviceId : childDeviceIds) {
// 通过API获取子设备信息
DeviceInfo childDeviceInfo = iThingsBoardDomain.getDeviceInfo(childDeviceId);
if (childDeviceInfo == null) {
// log.warn("子设备 {} 不存在,跳过", childDeviceId);
skippedCount++;
continue;
}
// 按照名字过滤子设备(已禁用,不再过滤 TH 开头的设备)
// if (shouldExcludeDevice(childDeviceInfo.getName())) {
// skippedCount++;
// continue;
// }
try {
String deviceName = childDeviceInfo.getName();
// 判断是大疆还是拓恒设备(通过设备名称是否以 TH 开头)
if (deviceName.startsWith("TH")) {
// 拓恒设备处理逻辑
// log.info("检测到拓恒设备: {}", deviceName);
syncTuohengDevice(childDeviceInfo, gatewayInfo.getId());
} else {
// 大疆设备处理逻辑(原有逻辑)
// log.info("检测到大疆设备: {}", deviceName);
AttributeMap attributes = iThingsBoardDomain.getPredefinedDeviceAttributes(childDeviceId);
// log.info("大疆设备 {} 的属性: {}", deviceName, attributes);
DeviceType deviceType = determineDeviceType(childDeviceInfo, attributes);
// log.info("大疆设备 {} 的类型: {}", deviceName, deviceType);
Long deviceId = syncDevice(childDeviceInfo, deviceType, gatewayInfo.getId());
// log.info("大疆设备 {} 同步到device表deviceId={}", deviceName, deviceId);
syncDeviceByType(deviceId, childDeviceInfo.getName(), deviceType, attributes);
// log.info("大疆设备 {} 同步完成", deviceName);
}
totalCount++;
} catch (Exception e) {
log.error("同步网关子设备失败: gatewayName={}, childDeviceId={}, error={}",
gatewayInfo.getName(), childDeviceId, e.getMessage(), e);
}
}
} catch (Exception e) {
log.error("同步网关失败: gatewayName={}, error={}",
gatewayInfo.getName(), e.getMessage(), e);
}
}
}
log.info("========== 数据同步任务完成,共同步 {} 个设备,跳过 {} 个设备 ==========", totalCount, skippedCount);
// 处理没有分组的机场,将其添加到默认分组
assignDocksToDefaultGroup();
} catch (Exception e) {
log.error("数据同步任务执行失败: {}", e.getMessage(), e);
}
}
/**
* 将没有分组的机场添加到默认分组
* 默认分组名称为 "默认分组"
*/
private void assignDocksToDefaultGroup() {
try {
// log.info("========== 开始检查并分配未分组的机场到默认分组 ==========");
// 1. 查询或创建默认分组
String defaultGroupName = "默认分组";
Group defaultGroup = groupDomain.selectGroupByGroupName(defaultGroupName);
if (defaultGroup == null) {
// 默认分组不存在,创建它
// log.info("默认分组不存在,开始创建: {}", defaultGroupName);
defaultGroup = new Group();
defaultGroup.setGroupName(defaultGroupName);
defaultGroup.setCreateBy("system");
groupDomain.insertGroup(defaultGroup);
// log.info("默认分组创建成功: groupId={}, groupName={}", defaultGroup.getGroupId(), defaultGroupName);
} else {
// log.info("默认分组已存在: groupId={}, groupName={}", defaultGroup.getGroupId(), defaultGroupName);
}
Long defaultGroupId = defaultGroup.getGroupId();
// 2. 查询所有机场
Dock queryDock = new Dock();
List<Dock> allDocks = dockDomain.selectDockList(queryDock);
// log.info("查询到机场总数: {}", allDocks != null ? allDocks.size() : 0);
if (allDocks == null || allDocks.isEmpty()) {
// log.info("没有机场需要处理");
return;
}
int assignedCount = 0;
int alreadyInGroupCount = 0;
// 3. 遍历所有机场,检查是否在分组中
for (Dock dock : allDocks) {
try {
Long dockId = dock.getDockId();
// 查询该机场是否已经在任何分组中
List<DockGroup> existingGroups = dockGroupDomain.selectDockGroupByDockId(dockId);
if (existingGroups == null || existingGroups.isEmpty()) {
// 机场没有分组,添加到默认分组
DockGroup dockGroup = new DockGroup();
dockGroup.setDockId(dockId);
dockGroup.setGroupId(defaultGroupId);
dockGroup.setCreateBy("system");
dockGroupDomain.insertDockGroup(dockGroup);
assignedCount++;
// log.info("机场 [ID:{}, Name:{}] 已添加到默认分组", dockId, dock.getDockName());
} else {
alreadyInGroupCount++;
log.debug("机场 [ID:{}, Name:{}] 已在分组中,跳过", dockId, dock.getDockName());
}
} catch (Exception e) {
log.error("处理机场分组失败: dockId={}, error={}", dock.getDockId(), e.getMessage(), e);
}
}
// log.info("========== 机场分组检查完成:新分配 {} 个机场到默认分组,{} 个机场已有分组 ==========",
// assignedCount, alreadyInGroupCount);
} catch (Exception e) {
log.error("分配机场到默认分组失败: {}", e.getMessage(), e);
}
}
/**
* 根据设备类型进行不同的同步处理
*
* @param deviceId 设备主键ID
* @param deviceName 设备名称
* @param deviceType 设备类型
* @param attributes 设备属性
*/
private void syncDeviceByType(Long deviceId, String deviceName, DeviceType deviceType, AttributeMap attributes) {
if (deviceType == DeviceType.DOCK) {
// 机场:同步机场表
syncDock(deviceId, deviceName);
// 获取机场挂载的无人机SN号
Optional<String> subDeviceSnOpt = attributes.get(DeviceAttributes.SUB_DEVICE_SN);
// log.info("机场 {} 的 sub_device.device_sn 属性: {}", deviceName,
// subDeviceSnOpt.isPresent() ? subDeviceSnOpt.get() : "不存在或为空");
if (subDeviceSnOpt.isPresent() && StringUtils.hasText(subDeviceSnOpt.get())) {
String aircraftSn = subDeviceSnOpt.get();
// log.info("机场 {} 尝试查找无人机: aircraftSn={}", deviceName, aircraftSn);
Device aircraftDevice = findDeviceBySn(aircraftSn);
if (aircraftDevice != null) {
// log.info("找到无人机设备: aircraftSn={}, deviceId={}", aircraftSn, aircraftDevice.getDeviceId());
syncDockAircraft(deviceId, aircraftDevice.getDeviceId());
} else {
log.warn("未找到无人机设备: aircraftSn={}, 可能无人机尚未同步", aircraftSn);
}
} else {
log.warn("机场 {} 没有 sub_device.device_sn 属性,无法建立机场-无人机关联", deviceName);
}
} else if (deviceType == DeviceType.AIRCRAFT) {
// 无人机:先检查是否有 dock_sn 属性
Optional<String> dockSnOpt = attributes.get(DeviceAttributes.DOCK_SN);
if (dockSnOpt.isPresent() && StringUtils.hasText(dockSnOpt.get())) {
String dockSn = dockSnOpt.get();
// log.info("无人机 {} 尝试查找所属机场: dockSn={}", deviceName, dockSn);
// 通过机场SN号查找机场设备
Device dockDevice = findDeviceBySn(dockSn);
if (dockDevice != null) {
// log.info("找到机场设备: dockSn={}, deviceId={}", dockSn, dockDevice.getDeviceId());
// 只有在找到机场的情况下,才同步无人机表
syncAircraft(deviceId, deviceName);
// 建立机场-无人机关联
syncDockAircraft(dockDevice.getDeviceId(), deviceId);
} else {
log.warn("未找到机场设备: dockSn={}, 跳过无人机同步,避免产生脏数据", dockSn);
}
} else {
log.warn("无人机 {} 没有 dock_sn 属性,跳过同步,避免产生脏数据", deviceName);
}
}
// 网关类型不需要额外处理
}
/**
* 判断设备类型
* 优化后的判断逻辑:
* 1. 优先使用 ThingsBoard 标准的 additionalInfo.gateway 字段判断网关
* 2. 对于非网关设备,通过 dock_sn 属性区分机场和无人机
*
* @param deviceInfo ThingsBoard设备信息
* @param attributes 设备属性
* @return 设备类型
*/
private DeviceType determineDeviceType(DeviceInfo deviceInfo, AttributeMap attributes) {
String deviceName = deviceInfo.getName();
// 1. 使用 ThingsBoard 标准方式判断网关:检查 additionalInfo 中的 gateway 字段
if (deviceInfo.isGateway()) {
return DeviceType.GATEWAY;
}
// 2. 非网关设备:通过 dock_sn 属性区分机场和无人机
Optional<String> dockSnOpt = attributes.get(DeviceAttributes.DOCK_SN);
if (dockSnOpt.isPresent() && StringUtils.hasText(dockSnOpt.get())) {
String dockSn = dockSnOpt.get();
// dock_sn 等于设备名称 -> 机场
// dock_sn 不等于设备名称 -> 无人机(挂载在该机场下)
return deviceName.equals(dockSn) ? DeviceType.DOCK : DeviceType.AIRCRAFT;
}
// dock_sn 属性不存在或为空,无法判断设备类型
throw new IllegalStateException("无法确定设备类型:设备 " + deviceName + " 缺少 dock_sn 属性");
}
/**
* 同步设备数据
*
* @param deviceInfo ThingsBoard设备信息
* @param deviceType 设备类型
* @param gatewayId 网关设备ID从映射中获取避免重复API调用
* @return 设备主键ID
*/
private Long syncDevice(DeviceInfo deviceInfo, DeviceType deviceType, String gatewayId) {
String iotDeviceId = deviceInfo.getId();
String deviceName = deviceInfo.getName();
String deviceSn = deviceName; // 使用设备名称作为SN号,网关其实是没有SN号的
//
log.info("开始同步设备: iotDeviceId={}, deviceName={}, deviceType={}, gatewayId={}",
iotDeviceId, deviceName, deviceType, gatewayId);
// 查询设备是否已存在(通过 iotDeviceId
Device existingDevice = deviceDomain.selectDeviceByIotDeviceId(iotDeviceId);
// 如果通过 iotDeviceId 未找到,再检查 device_sn 是否已存在
if (existingDevice == null) {
Device existingDeviceBySn = deviceDomain.selectDeviceByDeviceSn(deviceSn);
if (existingDeviceBySn != null) {
log.warn("设备SN已存在但iotDeviceId不同: deviceSn={}, existingIotDeviceId={}, newIotDeviceId={}",
deviceSn, existingDeviceBySn.getIotDeviceId(), iotDeviceId);
// 返回已存在的设备ID避免重复插入
return existingDeviceBySn.getDeviceId();
}
}
if (existingDevice == null) {
// 设备不存在,使用分布式锁防止并发插入
String lockKey = "device:sync:lock:" + deviceSn;
String lockValue = UUID.randomUUID().toString();
// 尝试获取锁最多等待3秒
boolean locked = tryLock(lockKey, lockValue, 10);
if (!locked) {
// log.warn("获取设备同步锁失败,可能有其他线程正在同步该设备: deviceSn={}", deviceSn);
// 等待一小段时间后重新查询,可能已经被其他线程插入
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Device retryDevice = deviceDomain.selectDeviceByDeviceSn(deviceSn);
if (retryDevice != null) {
// log.info("重新查询到设备: deviceSn={}, deviceId={}", deviceSn, retryDevice.getDeviceId());
return retryDevice.getDeviceId();
}
log.error("获取锁失败且重新查询也未找到设备: deviceSn={}", deviceSn);
return null;
}
try {
// 双重检查:获取锁后再次查询,防止重复插入
Device doubleCheckDevice = deviceDomain.selectDeviceByDeviceSn(deviceSn);
if (doubleCheckDevice != null) {
// log.info("双重检查发现设备已存在: deviceSn={}, deviceId={}", deviceSn, doubleCheckDevice.getDeviceId());
return doubleCheckDevice.getDeviceId();
}
// 插入新设备
Device newDevice = new Device();
newDevice.setDeviceName(deviceName);
newDevice.setIotDeviceId(iotDeviceId);
newDevice.setDeviceType(deviceType.getCode());
newDevice.setDeviceSn(deviceSn);
newDevice.setGateway(gatewayId);
newDevice.setCreateBy("system");
// 根据设备名称判断厂商
String manufacturer = deviceName.startsWith("TH") ? "tuoheng" : "dajiang";
newDevice.setDeviceManufacturer(manufacturer);
// log.info("准备插入新设备: deviceName={}, deviceType={}, deviceSn={}, manufacturer={}",
// deviceName, deviceType, deviceSn, manufacturer);
deviceDomain.insertDevice(newDevice);
Long deviceId = newDevice.getDeviceId();
// log.info("插入新设备成功: iotDeviceId={}, deviceName={}, deviceType={}, 返回deviceId={}",
// iotDeviceId, deviceName, deviceType, deviceId);
return deviceId;
} finally {
// 释放锁
releaseLock(lockKey, lockValue);
}
} else {
// 设备已存在,检查是否需要更新
boolean needUpdate = false;
if (!Objects.equals(existingDevice.getDeviceName(), deviceName)) {
existingDevice.setDeviceName(deviceName);
needUpdate = true;
}
if (!Objects.equals(existingDevice.getDeviceType(), deviceType.getCode())) {
existingDevice.setDeviceType(deviceType.getCode());
needUpdate = true;
}
if (!Objects.equals(existingDevice.getDeviceSn(), deviceSn)) {
existingDevice.setDeviceSn(deviceSn);
needUpdate = true;
}
if (!Objects.equals(existingDevice.getGateway(), gatewayId)) {
existingDevice.setGateway(gatewayId);
needUpdate = true;
}
// 检查并更新厂商字段
String manufacturer = deviceName.startsWith("TH") ? "tuoheng" : "dajiang";
if (!Objects.equals(existingDevice.getDeviceManufacturer(), manufacturer)) {
existingDevice.setDeviceManufacturer(manufacturer);
needUpdate = true;
}
if (needUpdate) {
existingDevice.setUpdateBy("system");
deviceDomain.updateDevice(existingDevice);
// log.info("更新设备: iotDeviceId={}, deviceName={}, deviceType={}, manufacturer={}",
// iotDeviceId, deviceName, deviceType, manufacturer);
}
return existingDevice.getDeviceId();
}
}
/**
* 同步机场数据
* 逻辑:
* 1. 如果是新插入的机场需要将其插入到默认分组ID=0
* 2. 如果默认分组不存在,需要先创建默认分组
* 3. 如果是已存在的机场,检查机场是否在任何分组中,如果不在,将其插入默认分组
*
* @param deviceId 设备主键ID
* @param deviceName 设备名称
*/
private void syncDock(Long deviceId, String deviceName) {
// log.info("开始同步机场: deviceId={}, deviceName={}", deviceId, deviceName);
// 查询机场是否已存在
Dock existingDock = dockDomain.selectDockByDeviceId(deviceId);
boolean isNewDock = (existingDock == null);
Long dockId;
if (isNewDock) {
// 机场不存在,插入新机场
Dock newDock = new Dock();
newDock.setDockName(deviceName);
newDock.setDeviceId(deviceId);
newDock.setCreateBy("system");
// log.info("准备插入新机场: deviceId={}, dockName={}", deviceId, deviceName);
dockDomain.insertDock(newDock);
dockId = newDock.getDockId();
// log.info("插入新机场成功: deviceId={}, dockName={}, dockId={}",
// deviceId, deviceName, dockId);
} else {
dockId = existingDock.getDockId();
// log.info("机场已存在: deviceId={}, dockName={}, dockId={}", deviceId, deviceName, dockId);
}
// 确保默认分组存在
ensureDefaultGroupExists();
// 检查机场是否在任何分组中
List<DockGroup> dockGroups = dockGroupDomain.selectDockGroupByDockId(dockId);
if (dockGroups == null || dockGroups.isEmpty()) {
// 机场不在任何分组中,将其插入默认分组
insertDockToDefaultGroup(dockId, deviceName);
} else {
// log.info("机场已在分组中: dockId={}, 分组数量={}", dockId, dockGroups.size());
}
}
/**
* 同步无人机数据
*
* @param deviceId 设备主键ID
* @param deviceName 设备名称
* @return 无人机主键ID
*/
private Long syncAircraft(Long deviceId, String deviceName) {
// log.info("开始同步无人机: deviceId={}, deviceName={}", deviceId, deviceName);
// 查询无人机是否已存在
Aircraft existingAircraft = aircraftDomain.selectAircraftByDeviceId(deviceId);
if (existingAircraft == null) {
// 无人机不存在,插入新无人机
Aircraft newAircraft = new Aircraft();
newAircraft.setAircraftName(deviceName);
newAircraft.setDeviceId(deviceId);
newAircraft.setCreateBy("system");
// log.info("准备插入新无人机: deviceId={}, aircraftName={}", deviceId, deviceName);
aircraftDomain.insertAircraft(newAircraft);
Long aircraftId = newAircraft.getAircraftId();
// log.info("插入新无人机成功: deviceId={}, aircraftName={}, aircraftId={}",
// deviceId, deviceName, aircraftId);
// 新插入的无人机,直接返回 aircraftId不需要处理 PSDK 设备
return aircraftId;
}
// 无人机已存在,记录日志
// log.info("无人机已存在: deviceId={}, aircraftName={}, aircraftId={}",
// deviceId, deviceName, existingAircraft.getAircraftId());
Device device = deviceDomain.selectDeviceByDeviceId(deviceId);
TelemetryMap telemetryMap = iThingsBoardDomain.getPredefinedDeviceTelemetry(device.getIotDeviceId());
Optional<TelemetryValue<List<PsdkDevice>>>
psdkDevicesOption = telemetryMap.get(DeviceTelemetry.PSDK_WIDGET_VALUES);
if(psdkDevicesOption.isPresent()) {
List<PsdkDevice> psdkDevices = psdkDevicesOption.get().getValue();
if(!CollectionUtils.isEmpty(psdkDevices)) {
for (PsdkDevice psdkDevice : psdkDevices) {
if(Objects.nonNull(psdkDevice.getSpeaker()) &&
Objects.nonNull(psdkDevice.getPsdk_sn()) && !psdkDevice.getPsdk_sn().isEmpty()) {
// log.info("开始同步PSDK喊话器设备: psdkSn={}, psdkName={}",
// psdkDevice.getPsdk_sn(), psdkDevice.getPsdk_name());
// 第一步:同步挂载设备(喊话器)
Payload payload = new Payload();
payload.setPayloadSn(psdkDevice.getPsdk_sn());
List<Payload> payloads = payloadDomain.selectPayloadList(payload);
if(CollectionUtils.isEmpty(payloads)) {
// 挂载不存在,插入新挂载
payload.setPayloadName(psdkDevice.getPsdk_name());
payload.setPayloadDisplayName("喊话器");
payload.setPayloadType(PayloadTypeEnum.SPEAKER.getCode());
payload.setCreateBy("system");
payloadDomain.insertPayload(payload);
// log.info("插入新挂载设备: payloadSn={}, payloadName={}, payloadId={}",
// psdkDevice.getPsdk_sn(), psdkDevice.getPsdk_name(), payload.getPayloadId());
} else {
// 挂载已存在,使用已有的挂载
payload = payloads.get(0);
// log.info("挂载设备已存在: payloadSn={}, payloadId={}",
// psdkDevice.getPsdk_sn(), payload.getPayloadId());
}
// 第二步:同步无人机挂载关联表
Long payloadId = payload.getPayloadId();
Long aircraftId = existingAircraft.getAircraftId();
// 获取无人机关联的机场ID
Long dockId = getDockIdByAircraftId(aircraftId);
List<AircraftPayload> aircraftPayloads =
aircraftPayloadDomain.selectAircraftPayloadByPayloadId(payloadId);
if(!CollectionUtils.isEmpty(aircraftPayloads)) {
// 关联已存在,检查是否需要更新
AircraftPayload existingRelation = aircraftPayloads.get(0);
boolean needUpdate = false;
if(!Objects.equals(existingRelation.getAircraftId(), aircraftId)) {
existingRelation.setAircraftId(aircraftId);
needUpdate = true;
}
if(!Objects.equals(existingRelation.getDockId(), dockId)) {
existingRelation.setDockId(dockId);
needUpdate = true;
}
if(needUpdate) {
existingRelation.setUpdateBy("system");
aircraftPayloadDomain.updateAircraftPayload(existingRelation);
// log.info("更新无人机挂载关联: aircraftId={}, payloadId={}, dockId={}",
// aircraftId, payloadId, dockId);
} else {
// log.info("无人机挂载关联无需更新: aircraftId={}, payloadId={}",
// aircraftId, payloadId);
}
} else {
// 关联不存在,插入新关联
AircraftPayload newRelation = new AircraftPayload();
newRelation.setAircraftId(aircraftId);
newRelation.setPayloadId(payloadId);
newRelation.setDockId(dockId);
newRelation.setCreateBy("system");
aircraftPayloadDomain.insertAircraftPayload(newRelation);
// log.info("插入无人机挂载关联: aircraftId={}, payloadId={}, dockId={}",
// aircraftId, payloadId, dockId);
}
}
}
}
}
// 无人机已存在,无需更新
return existingAircraft.getAircraftId();
}
/**
* 根据设备SN号查找设备
*
* @param deviceSn 设备SN号
* @return 设备信息
*/
private Device findDeviceBySn(String deviceSn) {
Device queryDevice = new Device();
queryDevice.setDeviceSn(deviceSn);
List<Device> devices = deviceDomain.selectDeviceList(queryDevice);
return (devices != null && !devices.isEmpty()) ? devices.get(0) : null;
}
/**
* 根据无人机ID获取关联的机场ID
*
* @param aircraftId 无人机主键ID
* @return 机场主键ID如果没有关联则返回null
*/
private Long getDockIdByAircraftId(Long aircraftId) {
List<DockAircraft> dockAircrafts = dockAircraftDomain.selectDockAircraftByAircraftId(aircraftId);
if (dockAircrafts != null && !dockAircrafts.isEmpty()) {
Long dockId = dockAircrafts.get(0).getDockId();
log.debug("无人机 {} 关联的机场ID: {}", aircraftId, dockId);
return dockId;
}
log.debug("无人机 {} 没有关联的机场", aircraftId);
return null;
}
/**
* 同步机场无人机关联数据
* 按照一个机场只会挂载一台无人机的逻辑进行处理
*
* @param dockDeviceId 机场设备主键ID
* @param aircraftDeviceId 无人机设备主键ID
*/
private void syncDockAircraft(Long dockDeviceId, Long aircraftDeviceId) {
// log.info("开始同步机场无人机关联: dockDeviceId={}, aircraftDeviceId={}", dockDeviceId, aircraftDeviceId);
// 获取机场主键
Dock dock = dockDomain.selectDockByDeviceId(dockDeviceId);
if (dock == null) {
log.warn("机场不存在,无法同步机场无人机关联: dockDeviceId={}", dockDeviceId);
return;
}
// log.info("找到机场: dockId={}, dockName={}", dock.getDockId(), dock.getDockName());
// 获取无人机主键
Aircraft aircraft = aircraftDomain.selectAircraftByDeviceId(aircraftDeviceId);
if (aircraft == null) {
log.warn("无人机不存在,无法同步机场无人机关联: aircraftDeviceId={}", aircraftDeviceId);
return;
}
Long dockId = dock.getDockId();
Long aircraftId = aircraft.getAircraftId();
// 查询该机场是否已有关联
List<DockAircraft> existingRelations = dockAircraftDomain.selectDockAircraftByDockId(dockId);
if (existingRelations == null || existingRelations.isEmpty()) {
// 机场没有关联,插入新关联
DockAircraft newRelation = new DockAircraft();
newRelation.setDockId(dockId);
newRelation.setAircraftId(aircraftId);
newRelation.setCreateBy("system");
dockAircraftDomain.insertDockAircraft(newRelation);
// log.info("插入机场无人机关联: dockId={}, aircraftId={}", dockId, aircraftId);
} else {
// 机场已有关联,检查是否需要更新
DockAircraft existingRelation = existingRelations.get(0);
if (!Objects.equals(existingRelation.getAircraftId(), aircraftId)) {
// 无人机发生变化,更新关联
existingRelation.setAircraftId(aircraftId);
existingRelation.setUpdateBy("system");
dockAircraftDomain.updateDockAircraft(existingRelation);
// log.info("更新机场无人机关联: dockId=, aircraftId={}", dockId, aircraftId);
}
}
}
/**
* 确保默认分组存在
* 默认分组ID为0名称为"默认分组"
* 如果不存在则创建
*/
private void ensureDefaultGroupExists() {
Long defaultGroupId = 0L;
Group defaultGroup = groupDomain.selectGroupByGroupId(defaultGroupId);
if (defaultGroup == null) {
// 默认分组不存在,创建默认分组
Group newGroup = new Group();
newGroup.setGroupName("默认分组");
newGroup.setCreateBy("system");
newGroup.setRemark("系统自动创建的默认分组,用于存放未分组的机场");
groupDomain.insertGroup(newGroup);
// log.info("创建默认分组成功: groupId={}, groupName={}", defaultGroupId, "默认分组");
} else {
log.debug("默认分组已存在: groupId={}, groupName={}", defaultGroupId, defaultGroup.getGroupName());
}
}
/**
* 将机场插入到默认分组
*
* @param dockId 机场主键ID
* @param dockName 机场名称(用于日志)
*/
private void insertDockToDefaultGroup(Long dockId, String dockName) {
Long defaultGroupId = 0L;
DockGroup newDockGroup = new DockGroup();
newDockGroup.setDockId(dockId);
newDockGroup.setGroupId(defaultGroupId);
newDockGroup.setCreateBy("system");
newDockGroup.setRemark("系统自动添加到默认分组");
dockGroupDomain.insertDockGroup(newDockGroup);
// log.info("将机场添加到默认分组: dockId={}, dockName={}, groupId={}",
// dockId, dockName, defaultGroupId);
}
/**
* 同步拓恒设备(机场和无人机不区分类型,同时落三个表)
*
* @param deviceInfo 设备信息
* @param gatewayId 网关ID
*/
private void syncTuohengDevice(DeviceInfo deviceInfo, String gatewayId) {
String deviceId = deviceInfo.getId();
String deviceName = deviceInfo.getName(); // 这是SN号如 THJSQ03B2410TGJ3MCJY
// log.info("开始同步拓恒设备: deviceName(SN)={}", deviceName);
try {
// 获取拓恒设备属性
AttributeMap attributes = iThingsBoardDomain.getPredefinedTuohengDeviceAttributes(deviceId);
// 获取 airportID机场和无人机的名称
Optional<String> airportIdOpt = attributes.get(TuohengDeviceAttributes.AIRPORT_ID);
String airportId = airportIdOpt.orElse(deviceName); // 如果没有 airportID使用设备名称
// 获取固件版本
Optional<String> firmwareVersionOpt = attributes.get(TuohengDeviceAttributes.HARDWARE_VERSION);
String firmwareVersion = firmwareVersionOpt.orElse("");
// log.info("拓恒设备属性: SN={}, airportID={}, firmwareVersion={}",
// deviceName, airportId, firmwareVersion);
// 1. 同步到 device 表(作为机场)- 使用原始 SN
Long dockDeviceId = syncTuohengDeviceAsType(deviceInfo, DeviceType.DOCK, gatewayId, deviceName);
// 2. 同步到 device_dock 表
syncTuohengDock(dockDeviceId, deviceName, airportId, firmwareVersion);
// 3. 同步到 device 表(作为无人机)- 使用带后缀的 SN
String aircraftSn = deviceName + "-AIRCRAFT";
Long aircraftDeviceId = syncTuohengDeviceAsType(deviceInfo, DeviceType.AIRCRAFT, gatewayId, aircraftSn);
// 4. 同步到 device_aircraft 表
syncTuohengAircraft(aircraftDeviceId, deviceName, airportId, firmwareVersion);
// 5. 同步到 device_dock_aircraft 表(建立机场-无人机关联)
syncDockAircraft(dockDeviceId, aircraftDeviceId);
// log.info("拓恒设备同步完成: SN={}, dockDeviceId={}, aircraftDeviceId={}",
// deviceName, dockDeviceId, aircraftDeviceId);
} catch (Exception e) {
log.error("同步拓恒设备失败: deviceName={}, error={}", deviceName, e.getMessage(), e);
throw e;
}
}
/**
* 同步拓恒设备到 device 表(指定设备类型和 SN
*
* @param deviceInfo ThingsBoard设备信息
* @param deviceType 设备类型
* @param gatewayId 网关设备ID
* @param deviceSn 设备SN号可自定义
* @return 设备主键ID
*/
private Long syncTuohengDeviceAsType(DeviceInfo deviceInfo, DeviceType deviceType, String gatewayId, String deviceSn) {
String iotDeviceId = deviceInfo.getId();
String deviceName = deviceInfo.getName();
// log.info("开始同步拓恒设备到device表: iotDeviceId={}, deviceName={}, deviceType={}, deviceSn={}, gatewayId={}",
// iotDeviceId, deviceName, deviceType, deviceSn, gatewayId);
// 查询设备是否已存在(通过 iotDeviceId + deviceType 组合查询)
Device queryDevice = new Device();
queryDevice.setIotDeviceId(iotDeviceId);
queryDevice.setDeviceType(deviceType.getCode());
List<Device> existingDevices = deviceDomain.selectDeviceList(queryDevice);
Device existingDevice = (existingDevices != null && !existingDevices.isEmpty()) ? existingDevices.get(0) : null;
// 如果通过 iotDeviceId + deviceType 未找到,再检查 device_sn 是否已存在
if (existingDevice == null) {
Device existingDeviceBySn = deviceDomain.selectDeviceByDeviceSn(deviceSn);
if (existingDeviceBySn != null) {
log.warn("拓恒设备SN已存在但iotDeviceId或deviceType不同: deviceSn={}, existingIotDeviceId={}, existingDeviceType={}, newIotDeviceId={}, newDeviceType={}",
deviceSn, existingDeviceBySn.getIotDeviceId(), existingDeviceBySn.getDeviceType(), iotDeviceId, deviceType.getCode());
// 返回已存在的设备ID避免重复插入
return existingDeviceBySn.getDeviceId();
}
}
if (existingDevice == null) {
// 设备不存在,使用分布式锁防止并发插入
String lockKey = "device:sync:lock:" + deviceSn;
String lockValue = UUID.randomUUID().toString();
// 尝试获取锁
boolean locked = tryLock(lockKey, lockValue, 10);
if (!locked) {
log.warn("获取拓恒设备同步锁失败,可能有其他线程正在同步该设备: deviceSn={}", deviceSn);
// 等待后重新查询
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Device retryDevice = deviceDomain.selectDeviceByDeviceSn(deviceSn);
if (retryDevice != null) {
// log.info("重新查询到拓恒设备: deviceSn={}, deviceId={}", deviceSn, retryDevice.getDeviceId());
return retryDevice.getDeviceId();
}
log.error("获取锁失败且重新查询也未找到拓恒设备: deviceSn={}", deviceSn);
return null;
}
try {
// 双重检查
Device doubleCheckDevice = deviceDomain.selectDeviceByDeviceSn(deviceSn);
if (doubleCheckDevice != null) {
// log.info("双重检查发现拓恒设备已存在: deviceSn={}, deviceId={}", deviceSn, doubleCheckDevice.getDeviceId());
return doubleCheckDevice.getDeviceId();
}
// 插入新设备
Device newDevice = new Device();
newDevice.setDeviceName(deviceName);
newDevice.setIotDeviceId(iotDeviceId);
newDevice.setDeviceType(deviceType.getCode());
newDevice.setDeviceSn(deviceSn);
newDevice.setGateway(gatewayId);
newDevice.setDeviceManufacturer("tuoheng");
newDevice.setCreateBy("system");
// log.info("准备插入新拓恒设备: deviceName={}, deviceType={}, deviceSn={}, manufacturer=tuoheng",
// deviceName, deviceType, deviceSn);
deviceDomain.insertDevice(newDevice);
Long deviceId = newDevice.getDeviceId();
// log.info("插入新拓恒设备成功: iotDeviceId={}, deviceName={}, deviceType={}, deviceSn={}, 返回deviceId={}",
// iotDeviceId, deviceName, deviceType, deviceSn, deviceId);
return deviceId;
} finally {
// 释放锁
releaseLock(lockKey, lockValue);
}
} else {
// 设备已存在,检查是否需要更新
boolean needUpdate = false;
if (!Objects.equals(existingDevice.getDeviceName(), deviceName)) {
existingDevice.setDeviceName(deviceName);
needUpdate = true;
}
if (!Objects.equals(existingDevice.getDeviceSn(), deviceSn)) {
existingDevice.setDeviceSn(deviceSn);
needUpdate = true;
}
if (!Objects.equals(existingDevice.getGateway(), gatewayId)) {
existingDevice.setGateway(gatewayId);
needUpdate = true;
}
if (needUpdate) {
existingDevice.setUpdateBy("system");
deviceDomain.updateDevice(existingDevice);
// log.info("更新拓恒设备: iotDeviceId={}, deviceName={}, deviceType={}, deviceSn={}",
// iotDeviceId, deviceName, deviceType, deviceSn);
}
return existingDevice.getDeviceId();
}
}
/**
* 同步拓恒机场数据
*
* @param deviceId 设备主键ID
* @param sn 设备SN号设备名称
* @param airportId 机场IDairportID属性
* @param firmwareVersion 固件版本
*/
private void syncTuohengDock(Long deviceId, String sn, String airportId, String firmwareVersion) {
// log.info("开始同步拓恒机场: deviceId={}, SN={}, airportID={}", deviceId, sn, airportId);
// 查询机场是否已存在
Dock existingDock = dockDomain.selectDockByDeviceId(deviceId);
Long dockId;
if (existingDock == null) {
// 机场不存在,插入新机场
Dock newDock = new Dock();
newDock.setDockName(airportId); // 使用 airportID 作为机场名称
newDock.setDeviceId(deviceId);
newDock.setCreateBy("system");
// log.info("准备插入新拓恒机场: deviceId={}, dockName={}, SN={}", deviceId, airportId, sn);
dockDomain.insertDock(newDock);
dockId = newDock.getDockId();
// log.info("插入新拓恒机场成功: dockId={}", dockId);
} else {
dockId = existingDock.getDockId();
// log.info("拓恒机场已存在: deviceId={}, dockId={}", deviceId, dockId);
}
// 确保默认分组存在
ensureDefaultGroupExists();
// 检查机场是否在任何分组中(无论新旧机场都检查)
List<DockGroup> dockGroups = dockGroupDomain.selectDockGroupByDockId(dockId);
if (dockGroups == null || dockGroups.isEmpty()) {
// 机场不在任何分组中,将其插入默认分组
insertDockToDefaultGroup(dockId, airportId);
} else {
// log.info("拓恒机场已在分组中: dockId={}, 分组数量={}", dockId, dockGroups.size());
}
}
/**
* 同步拓恒无人机数据
*
* @param deviceId 设备主键ID
* @param sn 设备SN号设备名称
* @param airportId 机场IDairportID属性
* @param firmwareVersion 固件版本
* @return 无人机主键ID
*/
private Long syncTuohengAircraft(Long deviceId, String sn, String airportId, String firmwareVersion) {
// log.info("开始同步拓恒无人机: deviceId={}, SN={}, airportID={}", deviceId, sn, airportId);
if (deviceId == null) {
log.error("同步拓恒无人机失败: deviceId 为 null, SN={}", sn);
return null;
}
// 查询无人机是否已存在
Aircraft existingAircraft = aircraftDomain.selectAircraftByDeviceId(deviceId);
if (existingAircraft == null) {
// 无人机不存在,插入新无人机
Aircraft newAircraft = new Aircraft();
newAircraft.setAircraftName(airportId); // 使用 airportID 作为无人机名称
newAircraft.setDeviceId(deviceId);
newAircraft.setCreateBy("system");
// log.info("准备插入新拓恒无人机: deviceId={}, aircraftName={}, SN={}", deviceId, airportId, sn);
aircraftDomain.insertAircraft(newAircraft);
Long aircraftId = newAircraft.getAircraftId();
// log.info("插入新拓恒无人机成功: deviceId={}, aircraftId={}", deviceId, aircraftId);
return aircraftId;
} else {
// log.info("拓恒无人机已存在: deviceId={}, aircraftId={}", deviceId, existingAircraft.getAircraftId());
return existingAircraft.getAircraftId();
}
}
}