修改了MQTT的配置

This commit is contained in:
孙小云 2025-12-18 13:31:46 +08:00
parent e44347eec9
commit c8099d90bd
6 changed files with 37 additions and 422 deletions

View File

@ -2,6 +2,7 @@ package com.tuoheng.machine;
import com.tuoheng.machine.command.*; import com.tuoheng.machine.command.*;
import com.tuoheng.machine.instruction.InstructionContext; import com.tuoheng.machine.instruction.InstructionContext;
import com.tuoheng.machine.mqtt.MqttClient;
import com.tuoheng.machine.state.MachineStates; import com.tuoheng.machine.state.MachineStates;
import com.tuoheng.machine.statemachine.MachineStateManager; import com.tuoheng.machine.statemachine.MachineStateManager;
import com.tuoheng.machine.statemachine.StateChangeListener; import com.tuoheng.machine.statemachine.StateChangeListener;
@ -25,6 +26,7 @@ public class MachineCommandManager {
private final VendorRegistry vendorRegistry; private final VendorRegistry vendorRegistry;
private final MachineStateManager stateManager; private final MachineStateManager stateManager;
private final TransactionExecutor transactionExecutor; private final TransactionExecutor transactionExecutor;
private final MqttClient mqttClient;
/** /**
* SN -> 当前正在执行的命令 * SN -> 当前正在执行的命令
@ -38,10 +40,12 @@ public class MachineCommandManager {
public MachineCommandManager(VendorRegistry vendorRegistry, public MachineCommandManager(VendorRegistry vendorRegistry,
MachineStateManager stateManager, MachineStateManager stateManager,
TransactionExecutor transactionExecutor) { TransactionExecutor transactionExecutor,
MqttClient mqttClient) {
this.vendorRegistry = vendorRegistry; this.vendorRegistry = vendorRegistry;
this.stateManager = stateManager; this.stateManager = stateManager;
this.transactionExecutor = transactionExecutor; this.transactionExecutor = transactionExecutor;
this.mqttClient = mqttClient;
} }
/** /**
@ -183,7 +187,7 @@ public class MachineCommandManager {
} }
// 5. 创建指令上下文 // 5. 创建指令上下文
InstructionContext context = new InstructionContext(sn, vendorConfig.getVendorType()); InstructionContext context = new InstructionContext(sn, vendorConfig.getVendorType(), mqttClient);
params.forEach(context::putCommandParam); params.forEach(context::putCommandParam);
// 6. 执行事务 // 6. 执行事务

View File

@ -1,414 +0,0 @@
# 设备命令执行框架
## 概述
这是一个通用的设备命令执行框架,用于管理无人机、机巢等设备的状态和命令执行。框架采用状态机模式,支持多厂家接入,提供了完整的指令执行、回调处理和状态管理功能。
## 核心概念
### 1. 四套大状态
框架管理四套独立的状态系统:
- **DroneState无人机状态**:准备中、飞行中、返航、急停、指点飞行等
- **AirportState机巢状态**:在线、待机、调试模式、重启中等
- **CoverState舱门状态**:关闭、打开中、已打开、关闭中等
- **DrcState飞行控制模式**:未知、退出、进入中、已进入、退出中
### 2. 命令Command
命令是用户发起的操作请求,如起飞、降落、返航等。每个命令由一个或多个指令组成。
### 3. 事务Transaction
事务是命令的具体实现,包含一系列按顺序执行的指令。事务定义了:
- 指令列表(按顺序执行)
- 超时时间
- 命令类型
### 4. 指令Instruction
指令是事务的最小执行单元,包含四个部分:
**a. canExecute** - 判断是否可以执行该指令
```java
boolean canExecute(InstructionContext context)
```
**b. executeRemoteCall** - 执行远程调用如MQTT发送
```java
void executeRemoteCall(InstructionContext context)
```
**c. getMethodCallbackConfig** - 方法回调配置(等待方法执行结果)
```java
CallbackConfig getMethodCallbackConfig(InstructionContext context)
```
**d. getStateCallbackConfig** - 状态回调配置(等待状态变化)
```java
CallbackConfig getStateCallbackConfig(InstructionContext context)
```
### 5. 回调机制
框架提供了基于MQTT的回调注册机制
- 指令执行后自动注册回调
- 支持字段匹配和自定义判断逻辑
- 支持超时处理
- 支持短路(跳过某些回调)
- 回调完成后自动取消注册
## 架构设计
```
┌─────────────────────────────────────────────────────────────┐
│ MachineCommandManager │
│ (框架使用者入口) │
└─────────────────────────────────────────────────────────────┘
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ VendorRegistry│ │StateManager │ │Transaction │
│ (厂家注册) │ │ (状态管理) │ │ Executor │
└──────────────┘ └──────────────┘ └──────────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ VendorConfig │ │ Instruction │
│ (厂家配置) │ │ (指令) │
└──────────────┘ └──────────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Transaction │ │ Callback │
│ (事务) │ │ Registry │
└──────────────┘ └──────────────┘
```
## 目录结构
```
com.tuoheng.machine/
├── state/ # 状态定义
│ ├── DroneState.java # 无人机状态
│ ├── AirportState.java # 机巢状态
│ ├── CoverState.java # 舱门状态
│ ├── DrcState.java # DRC状态
│ └── MachineStates.java # 状态容器
├── instruction/ # 指令框架
│ ├── Instruction.java # 指令接口
│ ├── AbstractInstruction.java # 抽象指令基类
│ ├── InstructionContext.java # 指令上下文
│ ├── InstructionResult.java # 指令执行结果
│ └── CallbackConfig.java # 回调配置
├── command/ # 命令框架
│ ├── CommandType.java # 命令类型枚举
│ ├── CommandResult.java # 命令执行结果
│ ├── Transaction.java # 事务定义
│ ├── TransactionExecutor.java # 事务执行器
│ └── CommandExecutionListener.java # 命令执行监听器
├── mqtt/ # MQTT回调系统
│ ├── MqttCallbackHandler.java # 回调处理器
│ └── MqttCallbackRegistry.java # 回调注册中心
├── statemachine/ # 状态机
│ ├── MachineStateManager.java # 状态管理器
│ └── StateChangeListener.java # 状态变化监听器
├── vendor/ # 厂家配置
│ ├── VendorConfig.java # 厂家配置接口
│ ├── VendorRegistry.java # 厂家注册中心
│ └── dji/ # 大疆实现
│ ├── DjiVendorConfig.java # 大疆厂家配置
│ └── instruction/ # 大疆指令实现
│ ├── DjiTakeOffInstruction.java
│ ├── DjiLandInstruction.java
│ ├── DjiReturnHomeInstruction.java
│ ├── DjiEmergencyStopInstruction.java
│ ├── DjiResumeFlightInstruction.java
│ ├── DjiPointFlyInstruction.java
│ ├── DjiCancelPointInstruction.java
│ ├── DjiStartMissionInstruction.java
│ ├── DjiOpenCoverInstruction.java
│ └── DjiCloseCoverInstruction.java
├── config/ # 配置
│ └── MachineFrameworkConfig.java
├── example/ # 使用示例
│ └── MachineFrameworkUsageExample.java
├── MachineCommandManager.java # 主入口
└── README.md # 本文档
```
## 使用指南
### 1. 绑定设备到厂家
```java
@Autowired
private MachineCommandManager commandManager;
// 绑定设备
String sn = "DJI-12345678";
commandManager.bindMachine(sn, "DJI");
```
### 2. 更新设备状态(心跳中调用)
```java
// 创建新状态
MachineStates newStates = new MachineStates();
newStates.setDroneState(DroneState.PREPARING);
newStates.setAirportState(AirportState.STANDBY);
newStates.setCoverState(CoverState.OPENED);
newStates.setDrcState(DrcState.EXITED);
// 更新状态
commandManager.updateMachineStates(sn, newStates);
```
### 3. 查询设备状态
```java
// 获取当前状态
MachineStates currentStates = commandManager.getMachineStates(sn);
// 检查是否正在执行命令
boolean isExecuting = commandManager.isExecutingCommand(sn);
// 获取可执行的命令列表
List<CommandType> availableCommands = commandManager.getAvailableCommands(sn);
```
### 4. 执行命令
```java
// 执行简单命令
commandManager.executeCommand(sn, CommandType.TAKE_OFF)
.thenAccept(result -> {
if (result.isSuccess()) {
log.info("起飞成功");
} else {
log.error("起飞失败: {}", result.getErrorMessage());
}
});
// 执行带参数的命令
Map<String, Object> params = new HashMap<>();
params.put("latitude", 39.9042);
params.put("longitude", 116.4074);
params.put("altitude", 100.0);
commandManager.executeCommand(sn, CommandType.POINT_FLY, params)
.thenAccept(result -> {
// 处理结果
});
```
### 5. 注册监听器
```java
// 注册命令执行监听器
commandManager.registerCommandListener("my-listener", (sn, result) -> {
log.info("命令执行完成: sn={}, commandType={}, success={}",
sn, result.getCommandType(), result.isSuccess());
// 记录到数据库
});
// 注册状态变化监听器
commandManager.registerStateChangeListener("state-listener", (sn, newStates) -> {
log.info("状态变化: sn={}, droneState={}",
sn, newStates.getDroneState());
// 记录到数据库
});
```
### 6. 处理MQTT消息
```java
@Autowired
private MqttCallbackRegistry mqttCallbackRegistry;
// 在MQTT消费者中调用
public void onMqttMessage(String topic, Object messageBody) {
// 分发给回调注册中心处理
mqttCallbackRegistry.handleMessage(topic, messageBody);
// 如果是状态消息,更新设备状态
if (topic.endsWith("/state")) {
String sn = extractSnFromTopic(topic);
MachineStates newStates = parseStatesFromMessage(messageBody);
commandManager.updateMachineStates(sn, newStates);
}
}
```
## 接入新厂家
### 1. 创建厂家配置类
```java
@Component
public class MyVendorConfig implements VendorConfig {
@Override
public String getVendorType() {
return "MY_VENDOR";
}
@Override
public String getVendorName() {
return "我的厂家";
}
@Override
public Transaction getTransaction(CommandType commandType) {
// 返回命令对应的事务定义
return transactionMap.get(commandType);
}
@Override
public boolean canExecuteCommand(MachineStates currentStates, CommandType commandType) {
// 判断当前状态是否可以执行该命令
return true;
}
@Override
public List<CommandType> getAvailableCommands(MachineStates currentStates) {
// 返回当前状态下可执行的命令列表
return availableCommands;
}
}
```
### 2. 实现指令
```java
@Slf4j
public class MyTakeOffInstruction extends AbstractInstruction {
@Override
public String getName() {
return "MY_TAKE_OFF";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
String sn = context.getSn();
// 发送MQTT消息
mqttClient.publish("my/" + sn + "/command", "{\"cmd\":\"takeoff\"}");
}
@Override
public CallbackConfig getMethodCallbackConfig(InstructionContext context) {
return CallbackConfig.builder()
.topic("my/" + context.getSn() + "/response")
.fieldPath("cmd")
.expectedValue("takeoff")
.canShortCircuit(false)
.timeoutMs(10000)
.build();
}
@Override
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
return CallbackConfig.builder()
.topic("my/" + context.getSn() + "/state")
.fieldPath("droneState")
.expectedValue("FLYING")
.canShortCircuit(false)
.timeoutMs(60000)
.build();
}
}
```
### 3. 注册厂家配置
```java
@Configuration
public class MyVendorAutoConfig {
@Bean
public CommandLineRunner registerMyVendor(VendorRegistry vendorRegistry,
MyVendorConfig myVendorConfig) {
return args -> {
vendorRegistry.registerVendor(myVendorConfig);
};
}
}
```
## 核心特性
### 1. 状态驱动
- 命令执行前自动检查状态是否允许
- 状态变化自动通知监听器
- 支持四套独立的状态系统
### 2. 异步执行
- 所有命令异步执行返回CompletableFuture
- 支持链式调用和组合
- 不阻塞主线程
### 3. 回调机制
- 自动注册和取消MQTT回调
- 支持超时处理
- 支持自定义匹配逻辑
### 4. 厂家隔离
- 不同厂家的实现完全隔离
- 框架使用者无需关心厂家差异
- 通过SN自动路由到对应厂家
### 5. 可扩展性
- 易于添加新命令
- 易于接入新厂家
- 易于自定义指令逻辑
### 6. 监控和日志
- 完整的命令执行日志
- 状态变化记录
- 支持自定义监听器
## 注意事项
1. **MQTT发送逻辑**当前DJI指令中的MQTT发送是示例代码需要替换为实际的MQTT客户端调用
2. **消息格式**回调配置中的字段路径需要根据实际的MQTT消息格式调整
3. **状态同步**:建议在心跳中定期调用`updateMachineStates`同步设备状态
4. **并发控制**:框架自动确保同一设备同时只能执行一个命令
5. **超时处理**:合理设置指令和事务的超时时间,避免长时间等待
6. **错误处理**:建议注册命令执行监听器,记录失败的命令以便排查问题
## 后续优化建议
1. 添加命令队列功能,支持命令排队执行
2. 添加命令取消功能
3. 添加命令重试机制
4. 添加更详细的执行进度回调
5. 支持命令优先级
6. 添加命令执行历史查询
7. 添加性能监控和统计
## 联系方式
如有问题或建议,请联系开发团队。

View File

@ -1,5 +1,6 @@
package com.tuoheng.machine.instruction; package com.tuoheng.machine.instruction;
import com.tuoheng.machine.mqtt.MqttClient;
import lombok.Data; import lombok.Data;
import java.util.HashMap; import java.util.HashMap;
@ -20,6 +21,11 @@ public class InstructionContext {
*/ */
private String vendorType; private String vendorType;
/**
* MQTT客户端用于发送MQTT消息
*/
private MqttClient mqttClient;
/** /**
* 上下文数据用于在指令间传递数据 * 上下文数据用于在指令间传递数据
*/ */
@ -35,6 +41,12 @@ public class InstructionContext {
this.vendorType = vendorType; this.vendorType = vendorType;
} }
public InstructionContext(String sn, String vendorType, MqttClient mqttClient) {
this.sn = sn;
this.vendorType = vendorType;
this.mqttClient = mqttClient;
}
public void putContextData(String key, Object value) { public void putContextData(String key, Object value) {
contextData.put(key, value); contextData.put(key, value);
} }

View File

@ -11,7 +11,7 @@ import java.util.function.Consumer;
/** /**
* MQTT回调注册中心 * MQTT回调注册中心
* 用于注册和管理MQTT消息的回调处理器 * 用于注册和管理MQTT消息的回调处理器,他的 handleMessage 需要被真实的MQTT回调去调用
*/ */
@Slf4j @Slf4j
@Component @Component

View File

@ -0,0 +1,14 @@
package com.tuoheng.machine.mqtt;
import org.springframework.stereotype.Component;
/**
* MQTT客户端
*/
@Component
public class MqttClient {
public void sendMessage(String topic, String message) {
}
}

View File

@ -21,13 +21,12 @@ public class DjiTakeOffInstruction extends AbstractInstruction {
String sn = context.getSn(); String sn = context.getSn();
log.info("发送大疆起飞指令: sn={}", sn); log.info("发送大疆起飞指令: sn={}", sn);
// TODO: 实际的MQTT发送逻辑 // 通过 context 获取 MqttClient 并发送消息
// 示例mqttClient.publish("dji/" + sn + "/command", "{\"cmd\":\"takeoff\"}");
// 这里是示例代码实际使用时需要替换为真实的MQTT发送逻辑
String topic = "dji/" + sn + "/command"; String topic = "dji/" + sn + "/command";
String payload = "{\"cmd\":\"takeoff\"}"; String payload = "{\"cmd\":\"takeoff\"}";
log.debug("MQTT发送: topic={}, payload={}", topic, payload);
context.getMqttClient().sendMessage(topic, payload);
log.debug("MQTT发送成功: topic={}, payload={}", topic, payload);
} }
@Override @Override