diff --git a/src/main/java/com/tuoheng/machine/MachineCommandManager.java b/src/main/java/com/tuoheng/machine/MachineCommandManager.java index 401253d..dee7be8 100644 --- a/src/main/java/com/tuoheng/machine/MachineCommandManager.java +++ b/src/main/java/com/tuoheng/machine/MachineCommandManager.java @@ -2,6 +2,7 @@ package com.tuoheng.machine; import com.tuoheng.machine.command.*; import com.tuoheng.machine.instruction.InstructionContext; +import com.tuoheng.machine.mqtt.MqttClient; import com.tuoheng.machine.state.MachineStates; import com.tuoheng.machine.statemachine.MachineStateManager; import com.tuoheng.machine.statemachine.StateChangeListener; @@ -25,6 +26,7 @@ public class MachineCommandManager { private final VendorRegistry vendorRegistry; private final MachineStateManager stateManager; private final TransactionExecutor transactionExecutor; + private final MqttClient mqttClient; /** * SN -> 当前正在执行的命令 @@ -38,10 +40,12 @@ public class MachineCommandManager { public MachineCommandManager(VendorRegistry vendorRegistry, MachineStateManager stateManager, - TransactionExecutor transactionExecutor) { + TransactionExecutor transactionExecutor, + MqttClient mqttClient) { this.vendorRegistry = vendorRegistry; this.stateManager = stateManager; this.transactionExecutor = transactionExecutor; + this.mqttClient = mqttClient; } /** @@ -183,7 +187,7 @@ public class MachineCommandManager { } // 5. 创建指令上下文 - InstructionContext context = new InstructionContext(sn, vendorConfig.getVendorType()); + InstructionContext context = new InstructionContext(sn, vendorConfig.getVendorType(), mqttClient); params.forEach(context::putCommandParam); // 6. 执行事务 diff --git a/src/main/java/com/tuoheng/machine/README.md b/src/main/java/com/tuoheng/machine/README.md deleted file mode 100644 index 7cfc8e7..0000000 --- a/src/main/java/com/tuoheng/machine/README.md +++ /dev/null @@ -1,414 +0,0 @@ -# 设备命令执行框架 - -## 概述 - -这是一个通用的设备命令执行框架,用于管理无人机、机巢等设备的状态和命令执行。框架采用状态机模式,支持多厂家接入,提供了完整的指令执行、回调处理和状态管理功能。 - -## 核心概念 - -### 1. 四套大状态 - -框架管理四套独立的状态系统: - -- **DroneState(无人机状态)**:准备中、飞行中、返航、急停、指点飞行等 -- **AirportState(机巢状态)**:在线、待机、调试模式、重启中等 -- **CoverState(舱门状态)**:关闭、打开中、已打开、关闭中等 -- **DrcState(飞行控制模式)**:未知、退出、进入中、已进入、退出中 - -### 2. 命令(Command) - -命令是用户发起的操作请求,如起飞、降落、返航等。每个命令由一个或多个指令组成。 - -### 3. 事务(Transaction) - -事务是命令的具体实现,包含一系列按顺序执行的指令。事务定义了: -- 指令列表(按顺序执行) -- 超时时间 -- 命令类型 - -### 4. 指令(Instruction) - -指令是事务的最小执行单元,包含四个部分: - -**a. canExecute** - 判断是否可以执行该指令 -```java -boolean canExecute(InstructionContext context) -``` - -**b. executeRemoteCall** - 执行远程调用(如MQTT发送) -```java -void executeRemoteCall(InstructionContext context) -``` - -**c. getMethodCallbackConfig** - 方法回调配置(等待方法执行结果) -```java -CallbackConfig getMethodCallbackConfig(InstructionContext context) -``` - -**d. getStateCallbackConfig** - 状态回调配置(等待状态变化) -```java -CallbackConfig getStateCallbackConfig(InstructionContext context) -``` - -### 5. 回调机制 - -框架提供了基于MQTT的回调注册机制: -- 指令执行后自动注册回调 -- 支持字段匹配和自定义判断逻辑 -- 支持超时处理 -- 支持短路(跳过某些回调) -- 回调完成后自动取消注册 - -## 架构设计 - -``` -┌─────────────────────────────────────────────────────────────┐ -│ MachineCommandManager │ -│ (框架使用者入口) │ -└─────────────────────────────────────────────────────────────┘ - │ - ┌─────────────┼─────────────┐ - │ │ │ - ▼ ▼ ▼ - ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ - │ VendorRegistry│ │StateManager │ │Transaction │ - │ (厂家注册) │ │ (状态管理) │ │ Executor │ - └──────────────┘ └──────────────┘ └──────────────┘ - │ │ - ▼ ▼ - ┌──────────────┐ ┌──────────────┐ - │ VendorConfig │ │ Instruction │ - │ (厂家配置) │ │ (指令) │ - └──────────────┘ └──────────────┘ - │ │ - ▼ ▼ - ┌──────────────┐ ┌──────────────┐ - │ Transaction │ │ Callback │ - │ (事务) │ │ Registry │ - └──────────────┘ └──────────────┘ -``` - -## 目录结构 - -``` -com.tuoheng.machine/ -├── state/ # 状态定义 -│ ├── DroneState.java # 无人机状态 -│ ├── AirportState.java # 机巢状态 -│ ├── CoverState.java # 舱门状态 -│ ├── DrcState.java # DRC状态 -│ └── MachineStates.java # 状态容器 -│ -├── instruction/ # 指令框架 -│ ├── Instruction.java # 指令接口 -│ ├── AbstractInstruction.java # 抽象指令基类 -│ ├── InstructionContext.java # 指令上下文 -│ ├── InstructionResult.java # 指令执行结果 -│ └── CallbackConfig.java # 回调配置 -│ -├── command/ # 命令框架 -│ ├── CommandType.java # 命令类型枚举 -│ ├── CommandResult.java # 命令执行结果 -│ ├── Transaction.java # 事务定义 -│ ├── TransactionExecutor.java # 事务执行器 -│ └── CommandExecutionListener.java # 命令执行监听器 -│ -├── mqtt/ # MQTT回调系统 -│ ├── MqttCallbackHandler.java # 回调处理器 -│ └── MqttCallbackRegistry.java # 回调注册中心 -│ -├── statemachine/ # 状态机 -│ ├── MachineStateManager.java # 状态管理器 -│ └── StateChangeListener.java # 状态变化监听器 -│ -├── vendor/ # 厂家配置 -│ ├── VendorConfig.java # 厂家配置接口 -│ ├── VendorRegistry.java # 厂家注册中心 -│ └── dji/ # 大疆实现 -│ ├── DjiVendorConfig.java # 大疆厂家配置 -│ └── instruction/ # 大疆指令实现 -│ ├── DjiTakeOffInstruction.java -│ ├── DjiLandInstruction.java -│ ├── DjiReturnHomeInstruction.java -│ ├── DjiEmergencyStopInstruction.java -│ ├── DjiResumeFlightInstruction.java -│ ├── DjiPointFlyInstruction.java -│ ├── DjiCancelPointInstruction.java -│ ├── DjiStartMissionInstruction.java -│ ├── DjiOpenCoverInstruction.java -│ └── DjiCloseCoverInstruction.java -│ -├── config/ # 配置 -│ └── MachineFrameworkConfig.java -│ -├── example/ # 使用示例 -│ └── MachineFrameworkUsageExample.java -│ -├── MachineCommandManager.java # 主入口 -└── README.md # 本文档 -``` - -## 使用指南 - -### 1. 绑定设备到厂家 - -```java -@Autowired -private MachineCommandManager commandManager; - -// 绑定设备 -String sn = "DJI-12345678"; -commandManager.bindMachine(sn, "DJI"); -``` - -### 2. 更新设备状态(心跳中调用) - -```java -// 创建新状态 -MachineStates newStates = new MachineStates(); -newStates.setDroneState(DroneState.PREPARING); -newStates.setAirportState(AirportState.STANDBY); -newStates.setCoverState(CoverState.OPENED); -newStates.setDrcState(DrcState.EXITED); - -// 更新状态 -commandManager.updateMachineStates(sn, newStates); -``` - -### 3. 查询设备状态 - -```java -// 获取当前状态 -MachineStates currentStates = commandManager.getMachineStates(sn); - -// 检查是否正在执行命令 -boolean isExecuting = commandManager.isExecutingCommand(sn); - -// 获取可执行的命令列表 -List availableCommands = commandManager.getAvailableCommands(sn); -``` - -### 4. 执行命令 - -```java -// 执行简单命令 -commandManager.executeCommand(sn, CommandType.TAKE_OFF) - .thenAccept(result -> { - if (result.isSuccess()) { - log.info("起飞成功"); - } else { - log.error("起飞失败: {}", result.getErrorMessage()); - } - }); - -// 执行带参数的命令 -Map params = new HashMap<>(); -params.put("latitude", 39.9042); -params.put("longitude", 116.4074); -params.put("altitude", 100.0); - -commandManager.executeCommand(sn, CommandType.POINT_FLY, params) - .thenAccept(result -> { - // 处理结果 - }); -``` - -### 5. 注册监听器 - -```java -// 注册命令执行监听器 -commandManager.registerCommandListener("my-listener", (sn, result) -> { - log.info("命令执行完成: sn={}, commandType={}, success={}", - sn, result.getCommandType(), result.isSuccess()); - // 记录到数据库 -}); - -// 注册状态变化监听器 -commandManager.registerStateChangeListener("state-listener", (sn, newStates) -> { - log.info("状态变化: sn={}, droneState={}", - sn, newStates.getDroneState()); - // 记录到数据库 -}); -``` - -### 6. 处理MQTT消息 - -```java -@Autowired -private MqttCallbackRegistry mqttCallbackRegistry; - -// 在MQTT消费者中调用 -public void onMqttMessage(String topic, Object messageBody) { - // 分发给回调注册中心处理 - mqttCallbackRegistry.handleMessage(topic, messageBody); - - // 如果是状态消息,更新设备状态 - if (topic.endsWith("/state")) { - String sn = extractSnFromTopic(topic); - MachineStates newStates = parseStatesFromMessage(messageBody); - commandManager.updateMachineStates(sn, newStates); - } -} -``` - -## 接入新厂家 - -### 1. 创建厂家配置类 - -```java -@Component -public class MyVendorConfig implements VendorConfig { - - @Override - public String getVendorType() { - return "MY_VENDOR"; - } - - @Override - public String getVendorName() { - return "我的厂家"; - } - - @Override - public Transaction getTransaction(CommandType commandType) { - // 返回命令对应的事务定义 - return transactionMap.get(commandType); - } - - @Override - public boolean canExecuteCommand(MachineStates currentStates, CommandType commandType) { - // 判断当前状态是否可以执行该命令 - return true; - } - - @Override - public List getAvailableCommands(MachineStates currentStates) { - // 返回当前状态下可执行的命令列表 - return availableCommands; - } -} -``` - -### 2. 实现指令 - -```java -@Slf4j -public class MyTakeOffInstruction extends AbstractInstruction { - - @Override - public String getName() { - return "MY_TAKE_OFF"; - } - - @Override - public void executeRemoteCall(InstructionContext context) throws Exception { - String sn = context.getSn(); - // 发送MQTT消息 - mqttClient.publish("my/" + sn + "/command", "{\"cmd\":\"takeoff\"}"); - } - - @Override - public CallbackConfig getMethodCallbackConfig(InstructionContext context) { - return CallbackConfig.builder() - .topic("my/" + context.getSn() + "/response") - .fieldPath("cmd") - .expectedValue("takeoff") - .canShortCircuit(false) - .timeoutMs(10000) - .build(); - } - - @Override - public CallbackConfig getStateCallbackConfig(InstructionContext context) { - return CallbackConfig.builder() - .topic("my/" + context.getSn() + "/state") - .fieldPath("droneState") - .expectedValue("FLYING") - .canShortCircuit(false) - .timeoutMs(60000) - .build(); - } -} -``` - -### 3. 注册厂家配置 - -```java -@Configuration -public class MyVendorAutoConfig { - - @Bean - public CommandLineRunner registerMyVendor(VendorRegistry vendorRegistry, - MyVendorConfig myVendorConfig) { - return args -> { - vendorRegistry.registerVendor(myVendorConfig); - }; - } -} -``` - -## 核心特性 - -### 1. 状态驱动 - -- 命令执行前自动检查状态是否允许 -- 状态变化自动通知监听器 -- 支持四套独立的状态系统 - -### 2. 异步执行 - -- 所有命令异步执行,返回CompletableFuture -- 支持链式调用和组合 -- 不阻塞主线程 - -### 3. 回调机制 - -- 自动注册和取消MQTT回调 -- 支持超时处理 -- 支持自定义匹配逻辑 - -### 4. 厂家隔离 - -- 不同厂家的实现完全隔离 -- 框架使用者无需关心厂家差异 -- 通过SN自动路由到对应厂家 - -### 5. 可扩展性 - -- 易于添加新命令 -- 易于接入新厂家 -- 易于自定义指令逻辑 - -### 6. 监控和日志 - -- 完整的命令执行日志 -- 状态变化记录 -- 支持自定义监听器 - -## 注意事项 - -1. **MQTT发送逻辑**:当前DJI指令中的MQTT发送是示例代码,需要替换为实际的MQTT客户端调用 - -2. **消息格式**:回调配置中的字段路径需要根据实际的MQTT消息格式调整 - -3. **状态同步**:建议在心跳中定期调用`updateMachineStates`同步设备状态 - -4. **并发控制**:框架自动确保同一设备同时只能执行一个命令 - -5. **超时处理**:合理设置指令和事务的超时时间,避免长时间等待 - -6. **错误处理**:建议注册命令执行监听器,记录失败的命令以便排查问题 - -## 后续优化建议 - -1. 添加命令队列功能,支持命令排队执行 -2. 添加命令取消功能 -3. 添加命令重试机制 -4. 添加更详细的执行进度回调 -5. 支持命令优先级 -6. 添加命令执行历史查询 -7. 添加性能监控和统计 - -## 联系方式 - -如有问题或建议,请联系开发团队。 diff --git a/src/main/java/com/tuoheng/machine/instruction/InstructionContext.java b/src/main/java/com/tuoheng/machine/instruction/InstructionContext.java index fb16df4..46fd54f 100644 --- a/src/main/java/com/tuoheng/machine/instruction/InstructionContext.java +++ b/src/main/java/com/tuoheng/machine/instruction/InstructionContext.java @@ -1,5 +1,6 @@ package com.tuoheng.machine.instruction; +import com.tuoheng.machine.mqtt.MqttClient; import lombok.Data; import java.util.HashMap; @@ -20,6 +21,11 @@ public class InstructionContext { */ private String vendorType; + /** + * MQTT客户端(用于发送MQTT消息) + */ + private MqttClient mqttClient; + /** * 上下文数据(用于在指令间传递数据) */ @@ -35,6 +41,12 @@ public class InstructionContext { this.vendorType = vendorType; } + public InstructionContext(String sn, String vendorType, MqttClient mqttClient) { + this.sn = sn; + this.vendorType = vendorType; + this.mqttClient = mqttClient; + } + public void putContextData(String key, Object value) { contextData.put(key, value); } diff --git a/src/main/java/com/tuoheng/machine/mqtt/MqttCallbackRegistry.java b/src/main/java/com/tuoheng/machine/mqtt/MqttCallbackRegistry.java index fd0f8f6..6864157 100644 --- a/src/main/java/com/tuoheng/machine/mqtt/MqttCallbackRegistry.java +++ b/src/main/java/com/tuoheng/machine/mqtt/MqttCallbackRegistry.java @@ -11,7 +11,7 @@ import java.util.function.Consumer; /** * MQTT回调注册中心 - * 用于注册和管理MQTT消息的回调处理器 + * 用于注册和管理MQTT消息的回调处理器,他的 handleMessage 需要被真实的MQTT回调去调用 */ @Slf4j @Component diff --git a/src/main/java/com/tuoheng/machine/mqtt/MqttClient.java b/src/main/java/com/tuoheng/machine/mqtt/MqttClient.java new file mode 100644 index 0000000..a1c65a5 --- /dev/null +++ b/src/main/java/com/tuoheng/machine/mqtt/MqttClient.java @@ -0,0 +1,14 @@ +package com.tuoheng.machine.mqtt; + +import org.springframework.stereotype.Component; + +/** + * MQTT客户端 + */ +@Component +public class MqttClient { + + public void sendMessage(String topic, String message) { + + } +} diff --git a/src/main/java/com/tuoheng/machine/vendor/dji/instruction/DjiTakeOffInstruction.java b/src/main/java/com/tuoheng/machine/vendor/dji/instruction/DjiTakeOffInstruction.java index 69129c5..c519056 100644 --- a/src/main/java/com/tuoheng/machine/vendor/dji/instruction/DjiTakeOffInstruction.java +++ b/src/main/java/com/tuoheng/machine/vendor/dji/instruction/DjiTakeOffInstruction.java @@ -21,13 +21,12 @@ public class DjiTakeOffInstruction extends AbstractInstruction { String sn = context.getSn(); log.info("发送大疆起飞指令: sn={}", sn); - // TODO: 实际的MQTT发送逻辑 - // 示例:mqttClient.publish("dji/" + sn + "/command", "{\"cmd\":\"takeoff\"}"); - - // 这里是示例代码,实际使用时需要替换为真实的MQTT发送逻辑 + // 通过 context 获取 MqttClient 并发送消息 String topic = "dji/" + sn + "/command"; String payload = "{\"cmd\":\"takeoff\"}"; - log.debug("MQTT发送: topic={}, payload={}", topic, payload); + + context.getMqttClient().sendMessage(topic, payload); + log.debug("MQTT发送成功: topic={}, payload={}", topic, payload); } @Override