This commit is contained in:
parent
6a94e799d6
commit
2b5d02691f
6
pom.xml
6
pom.xml
|
|
@ -111,6 +111,12 @@
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Caffeine Cache -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||||
|
<artifactId>caffeine</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Spring Integration MQTT -->
|
<!-- Spring Integration MQTT -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.integration</groupId>
|
<groupId>org.springframework.integration</groupId>
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,9 @@
|
||||||
package com.ruoyi.device.domain.impl;
|
package com.ruoyi.device.domain.impl;
|
||||||
|
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.Cache;
|
||||||
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
|
import com.github.benmanes.caffeine.cache.Expiry;
|
||||||
import com.ruoyi.device.domain.api.IThingsBoardDomain;
|
import com.ruoyi.device.domain.api.IThingsBoardDomain;
|
||||||
import com.ruoyi.device.domain.model.thingsboard.*;
|
import com.ruoyi.device.domain.model.thingsboard.*;
|
||||||
import com.ruoyi.device.domain.model.thingsboard.constants.DeviceAttributes;
|
import com.ruoyi.device.domain.model.thingsboard.constants.DeviceAttributes;
|
||||||
|
|
@ -27,7 +30,9 @@ import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ThingsBoard设备服务实现类
|
* ThingsBoard设备服务实现类
|
||||||
|
|
@ -41,6 +46,10 @@ public class ThingsBoardDomainImpl implements IThingsBoardDomain {
|
||||||
|
|
||||||
private final RestClient client;
|
private final RestClient client;
|
||||||
private final int pageSize;
|
private final int pageSize;
|
||||||
|
private final Random random = new Random();
|
||||||
|
|
||||||
|
private final Cache<String, AttributeMap> attributesCache;
|
||||||
|
private final Cache<String, TelemetryMap> telemetryCache;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 构造函数 - Spring 会自动装配
|
* 构造函数 - Spring 会自动装配
|
||||||
|
|
@ -51,6 +60,48 @@ public class ThingsBoardDomainImpl implements IThingsBoardDomain {
|
||||||
@Value("${thingsboard.page-size:10}") int pageSize) {
|
@Value("${thingsboard.page-size:10}") int pageSize) {
|
||||||
this.client = clientManager.getClient();
|
this.client = clientManager.getClient();
|
||||||
this.pageSize = pageSize;
|
this.pageSize = pageSize;
|
||||||
|
|
||||||
|
this.attributesCache = Caffeine.newBuilder()
|
||||||
|
.expireAfter(new Expiry<String, AttributeMap>() {
|
||||||
|
@Override
|
||||||
|
public long expireAfterCreate(String key, AttributeMap value, long currentTime) {
|
||||||
|
long randomSeconds = 60 + random.nextInt(61);
|
||||||
|
return TimeUnit.SECONDS.toNanos(randomSeconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long expireAfterUpdate(String key, AttributeMap value, long currentTime, long currentDuration) {
|
||||||
|
long randomSeconds = 60 + random.nextInt(61);
|
||||||
|
return TimeUnit.SECONDS.toNanos(randomSeconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long expireAfterRead(String key, AttributeMap value, long currentTime, long currentDuration) {
|
||||||
|
return currentDuration;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.build();
|
||||||
|
|
||||||
|
this.telemetryCache = Caffeine.newBuilder()
|
||||||
|
.expireAfter(new Expiry<String, TelemetryMap>() {
|
||||||
|
@Override
|
||||||
|
public long expireAfterCreate(String key, TelemetryMap value, long currentTime) {
|
||||||
|
long randomSeconds = 12 + random.nextInt(7);
|
||||||
|
return TimeUnit.SECONDS.toNanos(randomSeconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long expireAfterUpdate(String key, TelemetryMap value, long currentTime, long currentDuration) {
|
||||||
|
long randomSeconds = 12 + random.nextInt(7);
|
||||||
|
return TimeUnit.SECONDS.toNanos(randomSeconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long expireAfterRead(String key, TelemetryMap value, long currentTime, long currentDuration) {
|
||||||
|
return currentDuration;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -88,68 +139,66 @@ public class ThingsBoardDomainImpl implements IThingsBoardDomain {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AttributeMap getDeviceAttributes(String deviceId) {
|
public AttributeMap getDeviceAttributes(String deviceId) {
|
||||||
AttributeMap attributeMap = new AttributeMap();
|
return attributesCache.get(deviceId, id -> {
|
||||||
|
AttributeMap attributeMap = new AttributeMap();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
DeviceId id = new DeviceId(UUID.fromString(deviceId));
|
DeviceId deviceIdObj = new DeviceId(UUID.fromString(id));
|
||||||
|
|
||||||
// 获取所有属性键
|
List<String> attributeKeys = client.getAttributeKeys(deviceIdObj);
|
||||||
List<String> attributeKeys = client.getAttributeKeys(id);
|
if (attributeKeys == null || attributeKeys.isEmpty()) {
|
||||||
if (attributeKeys == null || attributeKeys.isEmpty()) {
|
log.debug("设备 {} 没有属性", id);
|
||||||
log.debug("设备 {} 没有属性", deviceId);
|
return attributeMap;
|
||||||
return attributeMap;
|
}
|
||||||
|
|
||||||
|
List<AttributeKvEntry> attributeKvEntries = client.getAttributeKvEntries(deviceIdObj, attributeKeys);
|
||||||
|
if (attributeKvEntries == null || attributeKvEntries.isEmpty()) {
|
||||||
|
log.debug("设备 {} 的属性值为空", id);
|
||||||
|
return attributeMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (AttributeKvEntry entry : attributeKvEntries) {
|
||||||
|
parseAndPutAttribute(attributeMap, entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("获取设备属性失败: deviceId={}, error={}", id, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取属性值
|
return attributeMap;
|
||||||
List<AttributeKvEntry> attributeKvEntries = client.getAttributeKvEntries(id, attributeKeys);
|
});
|
||||||
if (attributeKvEntries == null || attributeKvEntries.isEmpty()) {
|
|
||||||
log.debug("设备 {} 的属性值为空", deviceId);
|
|
||||||
return attributeMap;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 解析并填充到AttributeMap
|
|
||||||
for (AttributeKvEntry entry : attributeKvEntries) {
|
|
||||||
parseAndPutAttribute(attributeMap, entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("获取设备属性失败: deviceId={}, error={}", deviceId, e.getMessage(), e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return attributeMap;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TelemetryMap getDeviceTelemetry(String deviceId) {
|
public TelemetryMap getDeviceTelemetry(String deviceId) {
|
||||||
TelemetryMap telemetryMap = new TelemetryMap();
|
return telemetryCache.get(deviceId, id -> {
|
||||||
|
TelemetryMap telemetryMap = new TelemetryMap();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
DeviceId id = new DeviceId(UUID.fromString(deviceId));
|
DeviceId deviceIdObj = new DeviceId(UUID.fromString(id));
|
||||||
|
|
||||||
// 获取所有遥测键
|
List<String> timeseriesKeys = client.getTimeseriesKeys(deviceIdObj);
|
||||||
List<String> timeseriesKeys = client.getTimeseriesKeys(id);
|
if (timeseriesKeys == null || timeseriesKeys.isEmpty()) {
|
||||||
if (timeseriesKeys == null || timeseriesKeys.isEmpty()) {
|
log.debug("设备 {} 没有遥测数据", id);
|
||||||
log.debug("设备 {} 没有遥测数据", deviceId);
|
return telemetryMap;
|
||||||
return telemetryMap;
|
}
|
||||||
|
|
||||||
|
List<TsKvEntry> latestTimeseries = client.getLatestTimeseries(deviceIdObj, timeseriesKeys);
|
||||||
|
if (latestTimeseries == null || latestTimeseries.isEmpty()) {
|
||||||
|
log.debug("设备 {} 的遥测数据为空", id);
|
||||||
|
return telemetryMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (TsKvEntry entry : latestTimeseries) {
|
||||||
|
parseAndPutTelemetry(telemetryMap, entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("获取设备遥测数据失败: deviceId={}, error={}", id, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取最新的遥测数据
|
return telemetryMap;
|
||||||
List<TsKvEntry> latestTimeseries = client.getLatestTimeseries(id, timeseriesKeys);
|
});
|
||||||
if (latestTimeseries == null || latestTimeseries.isEmpty()) {
|
|
||||||
log.debug("设备 {} 的遥测数据为空", deviceId);
|
|
||||||
return telemetryMap;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 解析并填充到TelemetryMap
|
|
||||||
for (TsKvEntry entry : latestTimeseries) {
|
|
||||||
parseAndPutTelemetry(telemetryMap, entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("获取设备遥测数据失败: deviceId={}, error={}", deviceId, e.getMessage(), e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return telemetryMap;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue