package com.ruoyi.device.domain.impl; import com.ruoyi.device.domain.api.IThingsBoardDomain; import com.ruoyi.device.domain.model.thingsboard.*; import com.ruoyi.device.domain.model.thingsboard.constants.DeviceAttributes; import com.ruoyi.device.domain.model.thingsboard.constants.DeviceTelemetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.rest.client.RestClient; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelationInfo; import org.thingsboard.server.common.data.relation.EntityRelationsQuery; import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.relation.RelationsSearchParameters; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; /** * ThingsBoard设备服务实现类 */ @Component public class ThingsBoardDomainImpl implements IThingsBoardDomain { private static final Logger log = LoggerFactory.getLogger(ThingsBoardDomainImpl.class); private final RestClient client; private final int pageSize; /** * 构造函数 - Spring 会自动装配 * @param clientManager RestClient 管理器 * @param pageSize 分页大小,从配置文件读取,默认值为 10 */ public ThingsBoardDomainImpl(RestClientManager clientManager, @Value("${thingsboard.page-size:10}") int pageSize) { this.client = clientManager.getClient(); this.pageSize = pageSize; } @Override public Iterable> getAllDevices() { return new DeviceIterator(client, pageSize); } @Override public Iterable> getAllGatewayDevices() { return new GatewayDeviceIterator(client, pageSize); } @Override public DeviceInfo getDeviceInfo(String deviceId) { try { DeviceId id = new DeviceId(UUID.fromString(deviceId)); Optional deviceOptional = client.getDeviceById(id); if (deviceOptional.isEmpty()) { log.warn("设备不存在: deviceId={}", deviceId); return null; } return new DeviceInfo( deviceOptional.get().getName(), deviceOptional.get().getId().getId().toString(), deviceOptional.get().getType(), deviceOptional.get().getId(), deviceOptional.get().getAdditionalInfo() ); } catch (Exception e) { log.error("获取设备信息失败: deviceId={}, error={}", deviceId, e.getMessage(), e); return null; } } @Override public AttributeMap getDeviceAttributes(String deviceId) { AttributeMap attributeMap = new AttributeMap(); try { DeviceId id = new DeviceId(UUID.fromString(deviceId)); // 获取所有属性键 List attributeKeys = client.getAttributeKeys(id); if (attributeKeys == null || attributeKeys.isEmpty()) { log.debug("设备 {} 没有属性", deviceId); return attributeMap; } // 获取属性值 List attributeKvEntries = client.getAttributeKvEntries(id, attributeKeys); if (attributeKvEntries == null || attributeKvEntries.isEmpty()) { log.debug("设备 {} 的属性值为空", deviceId); return attributeMap; } // 解析并填充到AttributeMap for (AttributeKvEntry entry : attributeKvEntries) { parseAndPutAttribute(attributeMap, entry); } } catch (Exception e) { log.error("获取设备属性失败: deviceId={}, error={}", deviceId, e.getMessage(), e); } return attributeMap; } @Override public TelemetryMap getDeviceTelemetry(String deviceId) { TelemetryMap telemetryMap = new TelemetryMap(); try { DeviceId id = new DeviceId(UUID.fromString(deviceId)); // 获取所有遥测键 List timeseriesKeys = client.getTimeseriesKeys(id); if (timeseriesKeys == null || timeseriesKeys.isEmpty()) { log.debug("设备 {} 没有遥测数据", deviceId); return telemetryMap; } // 获取最新的遥测数据 List latestTimeseries = client.getLatestTimeseries(id, timeseriesKeys); if (latestTimeseries == null || latestTimeseries.isEmpty()) { log.debug("设备 {} 的遥测数据为空", deviceId); return telemetryMap; } // 解析并填充到TelemetryMap for (TsKvEntry entry : latestTimeseries) { parseAndPutTelemetry(telemetryMap, entry); } } catch (Exception e) { log.error("获取设备遥测数据失败: deviceId={}, error={}", deviceId, e.getMessage(), e); } return telemetryMap; } @Override public AttributeMap getPredefinedDeviceAttributes(String deviceId) { // 先获取所有属性(已经处理了异常情况) AttributeMap allAttributes = getDeviceAttributes(deviceId); // 创建新的 AttributeMap 只包含预定义的键 AttributeMap predefinedAttributes = new AttributeMap(); // 获取预定义的键名称集合 List predefinedKeyNames = DeviceAttributes.getPredefinedKeys() .stream() .map(AttributeKey::getName) .toList(); // 过滤:只保留预定义的键 for (AttributeKey key : allAttributes.keySet()) { if (predefinedKeyNames.contains(key.getName())) { // 复制到新的 map allAttributes.get(key).ifPresent(value -> { @SuppressWarnings("unchecked") AttributeKey objKey = (AttributeKey) key; predefinedAttributes.put(objKey, value); }); } } return predefinedAttributes; } @Override public TelemetryMap getPredefinedDeviceTelemetry(String deviceId) { // 先获取所有遥测数据(已经处理了 null 值问题) TelemetryMap allTelemetry = getDeviceTelemetry(deviceId); // 创建新的 TelemetryMap 只包含预定义的键 TelemetryMap predefinedTelemetry = new TelemetryMap(); // 获取预定义的键名称集合 List predefinedKeyNames = DeviceTelemetry.getPredefinedKeys() .stream() .map(TelemetryKey::getName) .toList(); // 过滤:只保留预定义的键 for (TelemetryKey key : allTelemetry.keySet()) { if (predefinedKeyNames.contains(key.getName())) { // 复制到新的 map allTelemetry.get(key).ifPresent(telemetryValue -> { @SuppressWarnings("unchecked") TelemetryKey objKey = (TelemetryKey) key; predefinedTelemetry.put(objKey, telemetryValue.getValue(), telemetryValue.getTimestamp()); }); } } return predefinedTelemetry; } @Override public String getDeviceGatewayId(String deviceId) { try { DeviceId id = new DeviceId(UUID.fromString(deviceId)); // 查询指向该设备的 "Contains" 关系(网关 -> 设备) List relations = client.findByTo( id, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON ); if (relations == null || relations.isEmpty()) { log.debug("设备 {} 不属于任何网关", deviceId); return null; } // 获取第一个关系的 from 实体(网关设备) EntityId gatewayEntityId = relations.get(0).getFrom(); String gatewayId = gatewayEntityId.getId().toString(); log.debug("设备 {} 属于网关 {}", deviceId, gatewayId); return gatewayId; } catch (Exception e) { log.error("获取设备网关关系失败: deviceId={}, error={}", deviceId, e.getMessage(), e); return null; } } @Override public List getGatewayChildDevices(String gatewayDeviceId) { List childDeviceIds = new ArrayList<>(); try { DeviceId gatewayId = new DeviceId(UUID.fromString(gatewayDeviceId)); // 构建查询参数:从网关出发查询子设备 // direction = FROM 表示从 rootId 出发的关系(网关 -> 子设备) RelationsSearchParameters parameters = new RelationsSearchParameters( gatewayId, // rootId: 网关设备ID EntitySearchDirection.FROM, // direction: 从网关出发 1, // maxLevel: 查询深度为1(直接子设备) RelationTypeGroup.COMMON, // relationTypeGroup: COMMON false // fetchLastLevelOnly: false ); EntityRelationsQuery query = new EntityRelationsQuery(); query.setParameters(parameters); // 调用 findInfoByQuery 查询关系信息 List relationInfos = client.findInfoByQuery(query); if (relationInfos == null || relationInfos.isEmpty()) { log.debug("网关 {} 没有子设备", gatewayDeviceId); return childDeviceIds; } // 提取所有子设备的ID for (EntityRelationInfo relationInfo : relationInfos) { EntityId childEntityId = relationInfo.getTo(); String childDeviceId = childEntityId.getId().toString(); childDeviceIds.add(childDeviceId); } log.debug("网关 {} 有 {} 个子设备", gatewayDeviceId, childDeviceIds.size()); return childDeviceIds; } catch (Exception e) { log.error("获取网关子设备失败: gatewayDeviceId={}, error={}", gatewayDeviceId, e.getMessage(), e); return childDeviceIds; } } /** * 解析属性并添加到AttributeMap * 使用延迟注册机制,自动处理所有属性 */ @SuppressWarnings("unchecked") private void parseAndPutAttribute(AttributeMap attributeMap, AttributeKvEntry entry) { String keyName = entry.getKey(); Object value = entry.getValue(); try { // 使用延迟注册机制:如果键不存在则自动创建 AttributeKey key = AttributeKey.getOrCreate(keyName, value); // 使用键的解析器解析值 Object parsedValue = ((AttributeKey) key).parse(value); attributeMap.put((AttributeKey) key, parsedValue); log.debug("成功解析属性: {} = {} (type: {})", keyName, parsedValue, key.getType().getSimpleName()); } catch (Exception e) { log.warn("解析属性失败: key={}, value={}, error={}", keyName, value, e.getMessage()); } } /** * 解析遥测数据并添加到TelemetryMap * 使用延迟注册机制,自动处理所有遥测数据 */ @SuppressWarnings("unchecked") private void parseAndPutTelemetry(TelemetryMap telemetryMap, TsKvEntry entry) { String keyName = entry.getKey(); Object value = entry.getValue(); try { // 使用延迟注册机制:如果键不存在则自动创建 TelemetryKey key = TelemetryKey.getOrCreate(keyName, value); // 使用键的解析器解析值 Object parsedValue = ((TelemetryKey) key).parse(value); telemetryMap.put((TelemetryKey) key, parsedValue, entry.getTs()); log.debug("成功解析遥测数据: {} = {} (timestamp: {}, type: {})", keyName, parsedValue, entry.getTs(), key.getType().getSimpleName()); } catch (Exception e) { log.warn("解析遥测数据失败: key={}, value={}, error={}", keyName, value, e.getMessage()); } } }