diff --git a/src/main/java/com/tuoheng/machine/example/MachineFrameworkUsageExample.java b/src/main/java/com/tuoheng/machine/example/MachineFrameworkUsageExample.java deleted file mode 100644 index 9167062..0000000 --- a/src/main/java/com/tuoheng/machine/example/MachineFrameworkUsageExample.java +++ /dev/null @@ -1,261 +0,0 @@ -package com.tuoheng.machine.example; - -import com.tuoheng.machine.MachineCommandManager; -import com.tuoheng.machine.command.CommandResult; -import com.tuoheng.machine.command.CommandType; -import com.tuoheng.machine.mqtt.MqttCallbackRegistry; -import com.tuoheng.machine.state.*; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * 设备框架使用示例 - */ -@Slf4j -@Component -public class MachineFrameworkUsageExample { - - private final MachineCommandManager commandManager; - private final MqttCallbackRegistry mqttCallbackRegistry; - - public MachineFrameworkUsageExample(MachineCommandManager commandManager, - MqttCallbackRegistry mqttCallbackRegistry) { - this.commandManager = commandManager; - this.mqttCallbackRegistry = mqttCallbackRegistry; - } - - /** - * 示例1:绑定设备到厂家 - */ - public void example1_bindMachine() { - String sn = "DJI-12345678"; - String vendorType = "DJI"; - - // 绑定设备到大疆厂家 - commandManager.bindMachine(sn, vendorType); - log.info("设备已绑定: sn={}, vendorType={}", sn, vendorType); - } - - /** - * 示例2:更新设备状态(通常在心跳中调用) - */ - public void example2_updateStates() { - String sn = "DJI-12345678"; - - // 创建新状态 - MachineStates newStates = new MachineStates(); - newStates.setDroneState(DroneState.FLYING); - newStates.setAirportState(AirportState.ONLINE); - newStates.setCoverState(CoverState.OPENED); - newStates.setDrcState(DrcState.EXITED); - newStates.setDebugModeState(DebugModeState.UNKNOWN); - newStates.setStopState(StopState.EXITED); - - // 更新状态 - commandManager.updateMachineStates(sn, newStates); - log.info("设备状态已更新: sn={}, states={}", sn, newStates); - } - - /** - * 示例3:查询设备当前状态 - */ - public void example3_queryStates() { - String sn = "DJI-12345678"; - - // 获取当前状态 - MachineStates currentStates = commandManager.getMachineStates(sn); - log.info("设备当前状态: sn={}, droneState={}, airportState={}, coverState={}, drcState={}", - sn, - currentStates.getDroneState(), - currentStates.getAirportState(), - currentStates.getCoverState(), - currentStates.getDrcState()); - } - - /** - * 示例4:查询设备可执行的命令 - */ - public void example4_queryAvailableCommands() { - String sn = "DJI-12345678"; - - // 获取可执行的命令列表 - List availableCommands = commandManager.getAvailableCommands(sn); - log.info("设备可执行的命令: sn={}, commands={}", sn, availableCommands); - } - - /** - * 示例5:检查设备是否正在执行命令 - */ - public void example5_checkExecutingCommand() { - String sn = "DJI-12345678"; - - // 检查是否正在执行命令 - boolean isExecuting = commandManager.isExecutingCommand(sn); - CommandType executingCommand = commandManager.getExecutingCommandType(sn); - - log.info("设备命令执行状态: sn={}, isExecuting={}, executingCommand=", - sn, isExecuting, executingCommand); - } - - /** - * 示例6:执行简单命令(起飞) - */ - public void example6_executeSimpleCommand() { - String sn = "DJI-12345678"; - - // 执行起飞命令 - commandManager.executeCommand(sn, CommandType.TAKE_OFF) - .thenAccept(result -> { - if (result.isSuccess()) { - log.info("起飞命令执行成功: sn={}", sn); - } else { - log.error("起飞命令执行失败: sn={}, error={}", sn, result.getErrorMessage()); - } - }); - } - - /** - * 示例7:执行带参数的命令(指点飞行) - */ - public void example7_executeCommandWithParams() { - String sn = "DJI-12345678"; - - // 准备命令参数 - Map params = new HashMap<>(); - params.put("latitude", 39.9042); - params.put("longitude", 116.4074); - params.put("altitude", 100.0); - - // 执行指点飞行命令 - commandManager.executeCommand(sn, CommandType.POINT_FLY, params) - .thenAccept(result -> { - if (result.isSuccess()) { - log.info("指点飞行命令执行成功: sn={}", sn); - } else { - log.error("指点飞行命令执行失败: sn={}, error={}, failedInstruction={}", - sn, result.getErrorMessage(), result.getFailedInstructionName()); - } - }); - } - - /** - * 示例8:注册命令执行监听器 - */ - public void example8_registerCommandListener() { - String listenerId = "command-logger"; - - // 注册命令执行监听器 - commandManager.registerCommandListener(listenerId, (sn, result) -> { - log.info("命令执行完成回调: sn={}, commandType={}, success={}, error={}", - sn, result.getCommandType(), result.isSuccess(), result.getErrorMessage()); - - // 这里可以记录到数据库 - // commandResultRepository.save(new CommandResultEntity(sn, result)); - }); - } - - /** - * 示例9:注册状态变化监听器 - */ - public void example9_registerStateChangeListener() { - String listenerId = "state-logger"; - - // 注册状态变化监听器 - commandManager.registerStateChangeListener(listenerId, (sn, newStates) -> { - log.info("设备状态变化回调: sn={}, droneState={}, airportState={}, coverState={}, drcState={}", - sn, - newStates.getDroneState(), - newStates.getAirportState(), - newStates.getCoverState(), - newStates.getDrcState()); - - // 这里可以记录到数据库 - // stateChangeRepository.save(new StateChangeEntity(sn, newStates)); - }); - } - - /** - * 示例10:处理MQTT消息(在MQTT消费者中调用) - */ - public void example10_handleMqttMessage(String topic, Object messageBody) { - // 当收到MQTT消息时,调用回调注册中心处理 - mqttCallbackRegistry.handleMessage(topic, messageBody); - - // 如果消息包含状态信息,更新设备状态 - // 这里需要根据实际的消息格式解析 - // 示例: - // if (topic.endsWith("/state")) { - // String sn = extractSnFromTopic(topic); - // MachineStates newStates = parseStatesFromMessage(messageBody); - // commandManager.updateMachineStates(sn, newStates); - // } - } - - /** - * 示例11:完整的业务流程 - */ - public void example11_completeWorkflow() { - String sn = "DJI-12345678"; - - // 1. 绑定设备 - commandManager.bindMachine(sn, "DJI"); - - // 2. 设置初始状态 - MachineStates initialStates = new MachineStates(); - initialStates.setDroneState(DroneState.FLYING); - initialStates.setAirportState(AirportState.ONLINE); - initialStates.setCoverState(CoverState.OPENED); - initialStates.setDrcState(DrcState.EXITED); - initialStates.setDebugModeState(DebugModeState.UNKNOWN); - initialStates.setStopState(StopState.EXITED); - commandManager.updateMachineStates(sn, initialStates); - - // 3. 检查是否可以起飞 - if (!commandManager.isExecutingCommand(sn)) { - List availableCommands = commandManager.getAvailableCommands(sn); - if (availableCommands.contains(CommandType.TAKE_OFF)) { - // 4. 执行起飞命令 - commandManager.executeCommand(sn, CommandType.TAKE_OFF) - .thenAccept(result -> { - if (result.isSuccess()) { - log.info("起飞成功,开始执行任务"); - // 5. 起飞成功后,可以执行其他命令 - executeNextCommand(sn); - } else { - log.error("起飞失败: {}", result.getErrorMessage()); - } - }); - } else { - log.warn("当前状态不允许起飞"); - } - } else { - log.warn("设备正在执行其他命令"); - } - } - - /** - * 执行下一个命令 - */ - private void executeNextCommand(String sn) { - // 等待一段时间后执行返航 - try { - Thread.sleep(60000); // 等待60秒 - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - // 执行返航命令 - commandManager.executeCommand(sn, CommandType.RETURN_HOME) - .thenAccept(result -> { - if (result.isSuccess()) { - log.info("返航成功"); - } else { - log.error("返航失败: {}", result.getErrorMessage()); - } - }); - } -} diff --git a/src/main/java/com/tuoheng/machine/readme.txt b/src/main/java/com/tuoheng/machine/readme.txt new file mode 100644 index 0000000..3a8e057 --- /dev/null +++ b/src/main/java/com/tuoheng/machine/readme.txt @@ -0,0 +1,206 @@ + 单节点部署(默认) + + # 使用内存存储(默认配置) + machine.state.store.type=memory + + 多节点部署 + # 切换到 Redis 存储 + machine.state.store.type=redis + # 配置节点ID(可选,不配置会自动生成) + machine.node.id=node-1 + + # Redis 配置 + spring.redis.host=localhost + spring.redis.port=6379 + spring.redis.password=your-password + + +┌─────────────────────────────────────────────────────────────────┐ + │ 步骤1: 节点A 执行命令并注册回调 │ + └─────────────────────────────────────────────────────────────────┘ + 节点A: executeCommand(TAKE_OFF) + ↓ + 节点A: registerCallback(topic="dji/SN9527/response", ...) + ↓ + 节点A: MqttCallbackStore.registerCallback() + ↓ + Redis: 存储回调信息(使用两个 Key 的原因:性能优化) + + 【Key 1】回调详细信息(Hash 结构) + - Key: mqtt:callback:{callbackId} + - Value: {callbackId, topic, nodeId="nodeA", timeoutMs, registerTime, ...} + - 作用: 存储单个回调的完整信息 + - 查询: O(1) 时间复杂度,通过 callbackId 直接获取 + + 【Key 2】Topic 索引(Set 结构) + - Key: mqtt:topic:dji/SN9527/response + - Value: Set // 例如: ["abc-123", "def-456", "ghi-789"] + - 作用: 快速查询等待某个 topic 的所有回调 + - 查询: O(1) 时间复杂度,直接获取 callbackId 列表 + + 【为什么需要两个 Key?】 + 如果只用一个 Key 存储所有回调,查询时需要遍历所有回调并过滤 topic, + 时间复杂度为 O(n)。使用 Topic 索引后,可以直接获取目标回调列表, + 时间复杂度降为 O(1),大幅提升性能。 + + 【示例】 + 假设有 3 个回调: + - callbackId="abc-123", topic="dji/SN9527/response", nodeId="nodeA" + - callbackId="def-456", topic="dji/SN9527/state", nodeId="nodeB" + - callbackId="ghi-789", topic="dji/SN9527/response", nodeId="nodeA" + + Redis 存储结构: + mqtt:callback:abc-123 → {callbackId:"abc-123", topic:"dji/SN9527/response", nodeId:"nodeA"} + mqtt:callback:def-456 → {callbackId:"def-456", topic:"dji/SN9527/state", nodeId:"nodeB"} + mqtt:callback:ghi-789 → {callbackId:"ghi-789", topic:"dji/SN9527/response", nodeId:"nodeA"} + mqtt:topic:dji/SN9527/response → ["abc-123", "ghi-789"] + mqtt:topic:dji/SN9527/state → ["def-456"] + + 查询 topic="dji/SN9527/response" 的回调: + 1. 从索引获取: SMEMBERS mqtt:topic:dji/SN9527/response → ["abc-123", "ghi-789"] + 2. 批量获取详情: MGET mqtt:callback:abc-123 mqtt:callback:ghi-789 + 3. 总耗时: O(1) + O(k),k 是该 topic 的回调数量(通常很小) + + 【Redis 数据清理时机】 + Redis 中的回调数据有两种清理机制: + + ┌─────────────────────────────────────────────────────────────┐ + │ 1️⃣ 主动清理(业务逻辑触发) │ + └─────────────────────────────────────────────────────────────┘ + 触发时机: + ✅ 回调成功执行后(TransactionExecutor 的 finally 块) + ✅ 回调超时后(TransactionExecutor 的 finally 块) + ✅ handleMessage 检测到超时(转发前检查) + + 清理操作: + unregisterCallback(callbackId) + ↓ + 1. 获取回调信息: GET mqtt:callback:{callbackId} + 2. 删除回调信息: DEL mqtt:callback:{callbackId} + 3. 从索引中移除: SREM mqtt:topic:{topic} {callbackId} + + 示例: + T0: 注册回调,超时时间 10 秒 + T5: 收到 MQTT 响应,回调执行成功 + T5: 立即清理 Redis 数据 ✅ + - DEL mqtt:callback:abc-123 + - SREM mqtt:topic:dji/SN9527/response abc-123 + + ┌─────────────────────────────────────────────────────────────┐ + │ 2️⃣ 被动清理(Redis TTL 自动过期) │ + └─────────────────────────────────────────────────────────────┘ + 作用:兜底机制,防止异常情况下的数据残留 + + 设置方式: + // 注册回调时设置 TTL + SET mqtt:callback:{callbackId} {json} EX 3600 // 1小时后自动过期 + EXPIRE mqtt:topic:{topic} 3600 // 1小时后自动过期 + + 触发时机: + ⚠️ 应用异常崩溃,主动清理未执行 + ⚠️ 网络分区,无法删除 Redis 数据 + ⚠️ 代码 Bug,主动清理失败 + + 示例: + T0: 注册回调,TTL=3600秒(1小时) + T5: 应用崩溃,主动清理未执行 ❌ + T3600: Redis 自动删除过期数据 ✅ + - mqtt:callback:abc-123 自动过期删除 + - mqtt:topic:dji/SN9527/response 自动过期删除 + + 【推荐配置】 + TTL 应该设置为回调超时时间的 2-3 倍,例如: + - 回调超时: 10 秒 + - Redis TTL: 30 秒(10秒 × 3) + + 这样可以确保: + ✅ 正常情况下,主动清理会在 10 秒内完成 + ✅ 异常情况下,Redis 会在 30 秒后自动清理 + ✅ 避免设置过长的 TTL 导致内存浪费 + + 【注意事项】 + ⚠️ Topic 索引的 TTL 问题: + 如果同一个 topic 有多个回调,每次添加新回调时都会刷新 TTL。 + 这可能导致索引的 TTL 比单个回调的 TTL 更长。 + + 解决方案: + 方案1: 不为 Topic 索引设置 TTL,只在删除最后一个 callbackId 时删除索引 + 方案2: 每次查询时过滤掉已过期的 callbackId(推荐) + ↓ + 节点A: 本地内存存储 Consumer + - localHandlers.put(callbackId, consumer) + ↓ + 节点A: 订阅 Redis Pub/Sub 频道 + - Channel: mqtt:node:nodeA + + ┌─────────────────────────────────────────────────────────────────┐ + │ 步骤2: MQTT Broker 将响应路由到节点B(不是节点A) │ + └─────────────────────────────────────────────────────────────────┘ + MQTT Broker: 收到设备响应 + ↓ + MQTT Broker: 将消息路由到节点B(随机/轮询) + ↓ + 节点B: MqttCallbackRegistry.handleMessage(topic, messageBody) + + ┌─────────────────────────────────────────────────────────────────┐ + │ 步骤3: 节点B 从 Redis 查询等待该 topic 的回调 │ + └─────────────────────────────────────────────────────────────────┘ + 节点B: callbackStore.getCallbacksByTopic("dji/SN9527/response") + ↓ + Redis: 查询 mqtt:topic:dji/SN9527/response + ↓ + Redis: 返回 Set + ↓ + Redis: 批量获取回调信息 + - mqtt:callback:{callbackId1} → {nodeId="nodeA", ...} + - mqtt:callback:{callbackId2} → {nodeId="nodeA", ...} + ↓ + 节点B: 获得回调列表 List + + ┌─────────────────────────────────────────────────────────────────┐ + │ 步骤4: 节点B 判断回调属于哪个节点 │ + └─────────────────────────────────────────────────────────────────┘ + 节点B: for (MqttCallbackInfo callback : callbacks) { + if (nodeId.equals(callback.getNodeId())) { + // 本节点的回调,直接执行 + executeLocalCallback(...) + } else { + // 其他节点的回调,转发到目标节点 + callbackStore.publishMessageToNode(...) + } + } + + ┌─────────────────────────────────────────────────────────────────┐ + │ 步骤5: 节点B 通过 Redis Pub/Sub 转发消息到节点A │ + └─────────────────────────────────┘ + 节点B: callbackStore.publishMessageToNode( + nodeId="nodeA", + callbackId="xxx", + messageBody="{...}" // JSON 字符串 + ) + ↓ + Redis Pub/Sub: PUBLISH mqtt:node:nodeA + { + "callbackId": "xxx", + "messageBody": "{...}" + } + + ┌─────────────────────────────────────────────────────────────────┐ + │ 步骤6: 节点A 收到 Redis Pub/Sub 消息 │ + └─────────────────────────────────────────────────────────────────┘ + 节点A: Redis Pub/Sub Listener 收到消息 + ↓ + 节点A: handleNodeMessage(callbackId, messageBodyJson) + ↓ + 节点A: 反序列化消息体 + - Object messageBody = objectMapper.readValue(messageBodyJson) + ↓ + 节点A: executeLocalCallback(callbackId, messageBody) + ↓ + 节点A: 从本地内存获取 Consumer + - Consumer handler = localHandlers.get(callbackId) + ↓ + 节点A: 执行回调 + - handler.accept(messageBody) + ↓ + ✅ 命令执行成功! \ No newline at end of file