添加redis使用的说明

This commit is contained in:
孙小云 2025-12-18 15:09:51 +08:00
parent c218d2ae81
commit 9558a4ee30
2 changed files with 206 additions and 261 deletions

View File

@ -1,261 +0,0 @@
package com.tuoheng.machine.example;
import com.tuoheng.machine.MachineCommandManager;
import com.tuoheng.machine.command.CommandResult;
import com.tuoheng.machine.command.CommandType;
import com.tuoheng.machine.mqtt.MqttCallbackRegistry;
import com.tuoheng.machine.state.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 设备框架使用示例
*/
@Slf4j
@Component
public class MachineFrameworkUsageExample {
private final MachineCommandManager commandManager;
private final MqttCallbackRegistry mqttCallbackRegistry;
public MachineFrameworkUsageExample(MachineCommandManager commandManager,
MqttCallbackRegistry mqttCallbackRegistry) {
this.commandManager = commandManager;
this.mqttCallbackRegistry = mqttCallbackRegistry;
}
/**
* 示例1绑定设备到厂家
*/
public void example1_bindMachine() {
String sn = "DJI-12345678";
String vendorType = "DJI";
// 绑定设备到大疆厂家
commandManager.bindMachine(sn, vendorType);
log.info("设备已绑定: sn={}, vendorType={}", sn, vendorType);
}
/**
* 示例2更新设备状态通常在心跳中调用
*/
public void example2_updateStates() {
String sn = "DJI-12345678";
// 创建新状态
MachineStates newStates = new MachineStates();
newStates.setDroneState(DroneState.FLYING);
newStates.setAirportState(AirportState.ONLINE);
newStates.setCoverState(CoverState.OPENED);
newStates.setDrcState(DrcState.EXITED);
newStates.setDebugModeState(DebugModeState.UNKNOWN);
newStates.setStopState(StopState.EXITED);
// 更新状态
commandManager.updateMachineStates(sn, newStates);
log.info("设备状态已更新: sn={}, states={}", sn, newStates);
}
/**
* 示例3查询设备当前状态
*/
public void example3_queryStates() {
String sn = "DJI-12345678";
// 获取当前状态
MachineStates currentStates = commandManager.getMachineStates(sn);
log.info("设备当前状态: sn={}, droneState={}, airportState={}, coverState={}, drcState={}",
sn,
currentStates.getDroneState(),
currentStates.getAirportState(),
currentStates.getCoverState(),
currentStates.getDrcState());
}
/**
* 示例4查询设备可执行的命令
*/
public void example4_queryAvailableCommands() {
String sn = "DJI-12345678";
// 获取可执行的命令列表
List<CommandType> availableCommands = commandManager.getAvailableCommands(sn);
log.info("设备可执行的命令: sn={}, commands={}", sn, availableCommands);
}
/**
* 示例5检查设备是否正在执行命令
*/
public void example5_checkExecutingCommand() {
String sn = "DJI-12345678";
// 检查是否正在执行命令
boolean isExecuting = commandManager.isExecutingCommand(sn);
CommandType executingCommand = commandManager.getExecutingCommandType(sn);
log.info("设备命令执行状态: sn={}, isExecuting={}, executingCommand=",
sn, isExecuting, executingCommand);
}
/**
* 示例6执行简单命令起飞
*/
public void example6_executeSimpleCommand() {
String sn = "DJI-12345678";
// 执行起飞命令
commandManager.executeCommand(sn, CommandType.TAKE_OFF)
.thenAccept(result -> {
if (result.isSuccess()) {
log.info("起飞命令执行成功: sn={}", sn);
} else {
log.error("起飞命令执行失败: sn={}, error={}", sn, result.getErrorMessage());
}
});
}
/**
* 示例7执行带参数的命令指点飞行
*/
public void example7_executeCommandWithParams() {
String sn = "DJI-12345678";
// 准备命令参数
Map<String, Object> params = new HashMap<>();
params.put("latitude", 39.9042);
params.put("longitude", 116.4074);
params.put("altitude", 100.0);
// 执行指点飞行命令
commandManager.executeCommand(sn, CommandType.POINT_FLY, params)
.thenAccept(result -> {
if (result.isSuccess()) {
log.info("指点飞行命令执行成功: sn={}", sn);
} else {
log.error("指点飞行命令执行失败: sn={}, error={}, failedInstruction={}",
sn, result.getErrorMessage(), result.getFailedInstructionName());
}
});
}
/**
* 示例8注册命令执行监听器
*/
public void example8_registerCommandListener() {
String listenerId = "command-logger";
// 注册命令执行监听器
commandManager.registerCommandListener(listenerId, (sn, result) -> {
log.info("命令执行完成回调: sn={}, commandType={}, success={}, error={}",
sn, result.getCommandType(), result.isSuccess(), result.getErrorMessage());
// 这里可以记录到数据库
// commandResultRepository.save(new CommandResultEntity(sn, result));
});
}
/**
* 示例9注册状态变化监听器
*/
public void example9_registerStateChangeListener() {
String listenerId = "state-logger";
// 注册状态变化监听器
commandManager.registerStateChangeListener(listenerId, (sn, newStates) -> {
log.info("设备状态变化回调: sn={}, droneState={}, airportState={}, coverState={}, drcState={}",
sn,
newStates.getDroneState(),
newStates.getAirportState(),
newStates.getCoverState(),
newStates.getDrcState());
// 这里可以记录到数据库
// stateChangeRepository.save(new StateChangeEntity(sn, newStates));
});
}
/**
* 示例10处理MQTT消息在MQTT消费者中调用
*/
public void example10_handleMqttMessage(String topic, Object messageBody) {
// 当收到MQTT消息时调用回调注册中心处理
mqttCallbackRegistry.handleMessage(topic, messageBody);
// 如果消息包含状态信息更新设备状态
// 这里需要根据实际的消息格式解析
// 示例
// if (topic.endsWith("/state")) {
// String sn = extractSnFromTopic(topic);
// MachineStates newStates = parseStatesFromMessage(messageBody);
// commandManager.updateMachineStates(sn, newStates);
// }
}
/**
* 示例11完整的业务流程
*/
public void example11_completeWorkflow() {
String sn = "DJI-12345678";
// 1. 绑定设备
commandManager.bindMachine(sn, "DJI");
// 2. 设置初始状态
MachineStates initialStates = new MachineStates();
initialStates.setDroneState(DroneState.FLYING);
initialStates.setAirportState(AirportState.ONLINE);
initialStates.setCoverState(CoverState.OPENED);
initialStates.setDrcState(DrcState.EXITED);
initialStates.setDebugModeState(DebugModeState.UNKNOWN);
initialStates.setStopState(StopState.EXITED);
commandManager.updateMachineStates(sn, initialStates);
// 3. 检查是否可以起飞
if (!commandManager.isExecutingCommand(sn)) {
List<CommandType> availableCommands = commandManager.getAvailableCommands(sn);
if (availableCommands.contains(CommandType.TAKE_OFF)) {
// 4. 执行起飞命令
commandManager.executeCommand(sn, CommandType.TAKE_OFF)
.thenAccept(result -> {
if (result.isSuccess()) {
log.info("起飞成功,开始执行任务");
// 5. 起飞成功后可以执行其他命令
executeNextCommand(sn);
} else {
log.error("起飞失败: {}", result.getErrorMessage());
}
});
} else {
log.warn("当前状态不允许起飞");
}
} else {
log.warn("设备正在执行其他命令");
}
}
/**
* 执行下一个命令
*/
private void executeNextCommand(String sn) {
// 等待一段时间后执行返航
try {
Thread.sleep(60000); // 等待60秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 执行返航命令
commandManager.executeCommand(sn, CommandType.RETURN_HOME)
.thenAccept(result -> {
if (result.isSuccess()) {
log.info("返航成功");
} else {
log.error("返航失败: {}", result.getErrorMessage());
}
});
}
}

View File

@ -0,0 +1,206 @@
单节点部署(默认)
# 使用内存存储(默认配置)
machine.state.store.type=memory
多节点部署
# 切换到 Redis 存储
machine.state.store.type=redis
# 配置节点ID可选不配置会自动生成
machine.node.id=node-1
# Redis 配置
spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=your-password
┌─────────────────────────────────────────────────────────────────┐
│ 步骤1: 节点A 执行命令并注册回调 │
└─────────────────────────────────────────────────────────────────┘
节点A: executeCommand(TAKE_OFF)
节点A: registerCallback(topic="dji/SN9527/response", ...)
节点A: MqttCallbackStore.registerCallback()
Redis: 存储回调信息(使用两个 Key 的原因:性能优化)
【Key 1】回调详细信息Hash 结构)
- Key: mqtt:callback:{callbackId}
- Value: {callbackId, topic, nodeId="nodeA", timeoutMs, registerTime, ...}
- 作用: 存储单个回调的完整信息
- 查询: O(1) 时间复杂度,通过 callbackId 直接获取
【Key 2】Topic 索引Set 结构)
- Key: mqtt:topic:dji/SN9527/response
- Value: Set<callbackId> // 例如: ["abc-123", "def-456", "ghi-789"]
- 作用: 快速查询等待某个 topic 的所有回调
- 查询: O(1) 时间复杂度,直接获取 callbackId 列表
【为什么需要两个 Key
如果只用一个 Key 存储所有回调,查询时需要遍历所有回调并过滤 topic
时间复杂度为 O(n)。使用 Topic 索引后,可以直接获取目标回调列表,
时间复杂度降为 O(1),大幅提升性能。
【示例】
假设有 3 个回调:
- callbackId="abc-123", topic="dji/SN9527/response", nodeId="nodeA"
- callbackId="def-456", topic="dji/SN9527/state", nodeId="nodeB"
- callbackId="ghi-789", topic="dji/SN9527/response", nodeId="nodeA"
Redis 存储结构:
mqtt:callback:abc-123 → {callbackId:"abc-123", topic:"dji/SN9527/response", nodeId:"nodeA"}
mqtt:callback:def-456 → {callbackId:"def-456", topic:"dji/SN9527/state", nodeId:"nodeB"}
mqtt:callback:ghi-789 → {callbackId:"ghi-789", topic:"dji/SN9527/response", nodeId:"nodeA"}
mqtt:topic:dji/SN9527/response → ["abc-123", "ghi-789"]
mqtt:topic:dji/SN9527/state → ["def-456"]
查询 topic="dji/SN9527/response" 的回调:
1. 从索引获取: SMEMBERS mqtt:topic:dji/SN9527/response → ["abc-123", "ghi-789"]
2. 批量获取详情: MGET mqtt:callback:abc-123 mqtt:callback:ghi-789
3. 总耗时: O(1) + O(k)k 是该 topic 的回调数量(通常很小)
【Redis 数据清理时机】
Redis 中的回调数据有两种清理机制:
┌─────────────────────────────────────────────────────────────┐
│ 1⃣ 主动清理(业务逻辑触发) │
└─────────────────────────────────────────────────────────────┘
触发时机:
✅ 回调成功执行后TransactionExecutor 的 finally 块)
✅ 回调超时后TransactionExecutor 的 finally 块)
✅ handleMessage 检测到超时(转发前检查)
清理操作:
unregisterCallback(callbackId)
1. 获取回调信息: GET mqtt:callback:{callbackId}
2. 删除回调信息: DEL mqtt:callback:{callbackId}
3. 从索引中移除: SREM mqtt:topic:{topic} {callbackId}
示例:
T0: 注册回调,超时时间 10 秒
T5: 收到 MQTT 响应,回调执行成功
T5: 立即清理 Redis 数据 ✅
- DEL mqtt:callback:abc-123
- SREM mqtt:topic:dji/SN9527/response abc-123
┌─────────────────────────────────────────────────────────────┐
│ 2⃣ 被动清理Redis TTL 自动过期) │
└─────────────────────────────────────────────────────────────┘
作用:兜底机制,防止异常情况下的数据残留
设置方式:
// 注册回调时设置 TTL
SET mqtt:callback:{callbackId} {json} EX 3600 // 1小时后自动过期
EXPIRE mqtt:topic:{topic} 3600 // 1小时后自动过期
触发时机:
⚠️ 应用异常崩溃,主动清理未执行
⚠️ 网络分区,无法删除 Redis 数据
⚠️ 代码 Bug主动清理失败
示例:
T0: 注册回调TTL=3600秒1小时
T5: 应用崩溃,主动清理未执行 ❌
T3600: Redis 自动删除过期数据 ✅
- mqtt:callback:abc-123 自动过期删除
- mqtt:topic:dji/SN9527/response 自动过期删除
【推荐配置】
TTL 应该设置为回调超时时间的 2-3 倍,例如:
- 回调超时: 10 秒
- Redis TTL: 30 秒10秒 × 3
这样可以确保:
✅ 正常情况下,主动清理会在 10 秒内完成
✅ 异常情况下Redis 会在 30 秒后自动清理
✅ 避免设置过长的 TTL 导致内存浪费
【注意事项】
⚠️ Topic 索引的 TTL 问题:
如果同一个 topic 有多个回调,每次添加新回调时都会刷新 TTL。
这可能导致索引的 TTL 比单个回调的 TTL 更长。
解决方案:
方案1: 不为 Topic 索引设置 TTL只在删除最后一个 callbackId 时删除索引
方案2: 每次查询时过滤掉已过期的 callbackId推荐
节点A: 本地内存存储 Consumer<Object>
- localHandlers.put(callbackId, consumer)
节点A: 订阅 Redis Pub/Sub 频道
- Channel: mqtt:node:nodeA
┌─────────────────────────────────────────────────────────────────┐
│ 步骤2: MQTT Broker 将响应路由到节点B不是节点A
└─────────────────────────────────────────────────────────────────┘
MQTT Broker: 收到设备响应
MQTT Broker: 将消息路由到节点B随机/轮询)
节点B: MqttCallbackRegistry.handleMessage(topic, messageBody)
┌─────────────────────────────────────────────────────────────────┐
│ 步骤3: 节点B 从 Redis 查询等待该 topic 的回调 │
└─────────────────────────────────────────────────────────────────┘
节点B: callbackStore.getCallbacksByTopic("dji/SN9527/response")
Redis: 查询 mqtt:topic:dji/SN9527/response
Redis: 返回 Set<callbackId>
Redis: 批量获取回调信息
- mqtt:callback:{callbackId1} → {nodeId="nodeA", ...}
- mqtt:callback:{callbackId2} → {nodeId="nodeA", ...}
节点B: 获得回调列表 List<MqttCallbackInfo>
┌─────────────────────────────────────────────────────────────────┐
│ 步骤4: 节点B 判断回调属于哪个节点 │
└─────────────────────────────────────────────────────────────────┘
节点B: for (MqttCallbackInfo callback : callbacks) {
if (nodeId.equals(callback.getNodeId())) {
// 本节点的回调,直接执行
executeLocalCallback(...)
} else {
// 其他节点的回调,转发到目标节点
callbackStore.publishMessageToNode(...)
}
}
┌─────────────────────────────────────────────────────────────────┐
│ 步骤5: 节点B 通过 Redis Pub/Sub 转发消息到节点A │
└─────────────────────────────────┘
节点B: callbackStore.publishMessageToNode(
nodeId="nodeA",
callbackId="xxx",
messageBody="{...}" // JSON 字符串
)
Redis Pub/Sub: PUBLISH mqtt:node:nodeA
{
"callbackId": "xxx",
"messageBody": "{...}"
}
┌─────────────────────────────────────────────────────────────────┐
│ 步骤6: 节点A 收到 Redis Pub/Sub 消息 │
└─────────────────────────────────────────────────────────────────┘
节点A: Redis Pub/Sub Listener 收到消息
节点A: handleNodeMessage(callbackId, messageBodyJson)
节点A: 反序列化消息体
- Object messageBody = objectMapper.readValue(messageBodyJson)
节点A: executeLocalCallback(callbackId, messageBody)
节点A: 从本地内存获取 Consumer
- Consumer<Object> handler = localHandlers.get(callbackId)
节点A: 执行回调
- handler.accept(messageBody)
✅ 命令执行成功!