thingsboard-client-demo/src/main/java/com/tuoheng/machine/MachineCommandManager.java

291 lines
9.7 KiB
Java
Raw Normal View History

2025-12-17 10:23:45 +08:00
package com.tuoheng.machine;
import com.tuoheng.machine.command.*;
import com.tuoheng.machine.instruction.InstructionContext;
import com.tuoheng.machine.state.MachineStates;
import com.tuoheng.machine.statemachine.MachineStateManager;
import com.tuoheng.machine.statemachine.StateChangeListener;
import com.tuoheng.machine.vendor.VendorConfig;
import com.tuoheng.machine.vendor.VendorRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
* 设备命令管理器框架使用者的主要入口
*/
@Slf4j
@Component
public class MachineCommandManager {
private final VendorRegistry vendorRegistry;
private final MachineStateManager stateManager;
private final TransactionExecutor transactionExecutor;
/**
* SN -> 当前正在执行的命令
*/
private final Map<String, CommandExecution> executingCommands = new ConcurrentHashMap<>();
/**
* 命令执行监听器
*/
private final Map<String, CommandExecutionListener> commandListeners = new ConcurrentHashMap<>();
public MachineCommandManager(VendorRegistry vendorRegistry,
MachineStateManager stateManager,
TransactionExecutor transactionExecutor) {
this.vendorRegistry = vendorRegistry;
this.stateManager = stateManager;
this.transactionExecutor = transactionExecutor;
}
/**
* 绑定设备到厂家
*
* @param sn 设备SN号
* @param vendorType 厂家类型
*/
public void bindMachine(String sn, String vendorType) {
vendorRegistry.bindSnToVendor(sn, vendorType);
log.info("绑定设备到厂家: sn=, vendorType={}", sn, vendorType);
}
/**
* 解绑设备
*
* @param sn 设备SN号
*/
public void unbindMachine(String sn) {
vendorRegistry.unbindSn(sn);
executingCommands.remove(sn);
stateManager.removeStates(sn);
log.info("解绑设备: sn={}", sn);
}
/**
* 获取设备当前状态
*
* @param sn 设备SN号
* @return 设备状态
*/
public MachineStates getMachineStates(String sn) {
return stateManager.getStates(sn);
}
/**
* 更新设备状态通常在心跳中调用
*
* @param sn 设备SN号
* @param newStates 新状态
*/
public void updateMachineStates(String sn, MachineStates newStates) {
stateManager.updateStates(sn, newStates);
}
/**
* 判断设备是否正在执行命令
*
* @param sn 设备SN号
* @return 是否正在执行命令
*/
public boolean isExecutingCommand(String sn) {
CommandExecution execution = executingCommands.get(sn);
return execution != null && !execution.getFuture().isDone();
}
/**
* 获取设备当前正在执行的命令类型
*
* @param sn 设备SN号
* @return 命令类型如果没有正在执行的命令则返回null
*/
public CommandType getExecutingCommandType(String sn) {
CommandExecution execution = executingCommands.get(sn);
if (execution != null && !execution.getFuture().isDone()) {
return execution.getCommandType();
}
return null;
}
/**
* 获取设备在当前状态下可以执行的命令列表
*
* @param sn 设备SN号
* @return 可执行的命令列表
*/
public List<CommandType> getAvailableCommands(String sn) {
VendorConfig vendorConfig = vendorRegistry.getVendorConfig(sn);
if (vendorConfig == null) {
log.warn("设备未绑定厂家: sn={}", sn);
return List.of();
}
MachineStates currentStates = stateManager.getStates(sn);
return vendorConfig.getAvailableCommands(currentStates);
}
/**
* 执行命令
*
* @param sn 设备SN号
* @param commandType 命令类型
* @return 命令执行结果的Future
*/
public CompletableFuture<CommandResult> executeCommand(String sn, CommandType commandType) {
return executeCommand(sn, commandType, Map.of());
}
/**
* 执行命令带参数
*
* @param sn 设备SN号
* @param commandType 命令类型
* @param params 命令参数
* @return 命令执行结果的Future
*/
public CompletableFuture<CommandResult> executeCommand(String sn, CommandType commandType, Map<String, Object> params) {
log.info("收到命令执行请求: sn={}, commandType={}, params={}", sn, commandType, params);
// 1. 检查设备是否已绑定厂家
VendorConfig vendorConfig = vendorRegistry.getVendorConfig(sn);
if (vendorConfig == null) {
String error = "设备未绑定厂家";
log.error("{}: sn={}", error, sn);
return CompletableFuture.completedFuture(CommandResult.failure(commandType, error));
}
// 2. 检查是否正在执行其他命令
if (isExecutingCommand(sn)) {
String error = "设备正在执行其他命令: " + getExecutingCommandType(sn);
log.warn("{}: sn={}", error, sn);
return CompletableFuture.completedFuture(CommandResult.failure(commandType, error));
}
// 3. 检查当前状态是否可以执行该命令
MachineStates currentStates = stateManager.getStates(sn);
if (!vendorConfig.canExecuteCommand(currentStates, commandType)) {
String error = "当前状态不允许执行该命令";
log.warn("{}: sn={}, commandType={}, currentStates={}", error, sn, commandType, currentStates);
return CompletableFuture.completedFuture(CommandResult.failure(commandType, error));
}
// 4. 获取事务定义
Transaction transaction = vendorConfig.getTransaction(commandType);
if (transaction == null) {
String error = "厂家不支持该命令";
log.error("{}: sn={}, commandType={}, vendorType={}", error, sn, commandType, vendorConfig.getVendorType());
return CompletableFuture.completedFuture(CommandResult.failure(commandType, error));
}
// 5. 创建指令上下文
InstructionContext context = new InstructionContext(sn, vendorConfig.getVendorType());
params.forEach(context::putCommandParam);
// 6. 执行事务
CompletableFuture<CommandResult> future = transactionExecutor.executeTransaction(transaction, context);
// 7. 记录正在执行的命令
executingCommands.put(sn, new CommandExecution(commandType, future, System.currentTimeMillis()));
// 8. 添加完成回调
future.whenComplete((result, throwable) -> {
executingCommands.remove(sn);
if (throwable != null) {
log.error("命令执行异常: sn={}, commandType={}", sn, commandType, throwable);
notifyCommandComplete(sn, CommandResult.failure(commandType, "命令执行异常: " + throwable.getMessage()));
} else {
log.info("命令执行完成: sn={}, commandType={}, success={}", sn, commandType, result.isSuccess());
notifyCommandComplete(sn, result);
}
});
return future;
}
/**
* 注册命令执行监听器
*
* @param listenerId 监听器ID
* @param listener 监听器
*/
public void registerCommandListener(String listenerId, CommandExecutionListener listener) {
commandListeners.put(listenerId, listener);
log.debug("注册命令执行监听器: listenerId={}", listenerId);
}
/**
* 取消注册命令执行监听器
*
* @param listenerId 监听器ID
*/
public void unregisterCommandListener(String listenerId) {
commandListeners.remove(listenerId);
log.debug("取消注册命令执行监听器: listenerId={}", listenerId);
}
/**
* 注册状态变化监听器
*
* @param listenerId 监听器ID
* @param listener 监听器
*/
public void registerStateChangeListener(String listenerId, StateChangeListener listener) {
stateManager.registerStateChangeListener(listenerId, listener);
}
/**
* 取消注册状态变化监听器
*
* @param listenerId 监听器ID
*/
public void unregisterStateChangeListener(String listenerId) {
stateManager.unregisterStateChangeListener(listenerId);
}
/**
* 通知命令执行完成
*/
private void notifyCommandComplete(String sn, CommandResult result) {
for (CommandExecutionListener listener : commandListeners.values()) {
try {
listener.onCommandComplete(sn, result);
} catch (Exception e) {
log.error("命令执行监听器执行失败: sn={}, commandType={}", sn, result.getCommandType(), e);
}
}
}
/**
* 命令执行信息
*/
private static class CommandExecution {
private final CommandType commandType;
private final CompletableFuture<CommandResult> future;
private final long startTime;
public CommandExecution(CommandType commandType, CompletableFuture<CommandResult> future, long startTime) {
this.commandType = commandType;
this.future = future;
this.startTime = startTime;
}
public CommandType getCommandType() {
return commandType;
}
public CompletableFuture<CommandResult> getFuture() {
return future;
}
public long getStartTime() {
return startTime;
}
}
}