From 8f85fc7a1ef80a91d9014279e5b03d323023cd64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=B0=8F=E4=BA=91?= Date: Wed, 11 Feb 2026 10:12:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=88=E6=B3=A8=E5=86=8C=E5=86=8D=E5=8F=91?= =?UTF-8?q?=E9=80=81=E5=91=BD=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../machine/command/TransactionExecutor.java | 95 +++++++++++-------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/ruoyi/device/domain/impl/machine/command/TransactionExecutor.java b/src/main/java/com/ruoyi/device/domain/impl/machine/command/TransactionExecutor.java index 5b5112c..d314fde 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/machine/command/TransactionExecutor.java +++ b/src/main/java/com/ruoyi/device/domain/impl/machine/command/TransactionExecutor.java @@ -162,7 +162,37 @@ public class TransactionExecutor { return CompletableFuture.completedFuture(result); } - // b. 在线程池中执行远程调用(避免阻塞当前线程) + // b. 预先获取回调配置(在发送命令前) + CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context); + CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); + + // 设置回调类型 + if (methodCallback != null) { + methodCallback.setCallbackType(CallbackConfig.CallbackType.METHOD); + } + if (stateCallback != null) { + stateCallback.setCallbackType(CallbackConfig.CallbackType.STATE); + } + + // c. 预先注册回调(在发送命令前,避免竞态条件) + CompletableFuture methodFuture = null; + CompletableFuture stateFuture = null; + + if (methodCallback != null) { + log.info("【预注册方法回调】instruction={}, topic={}", + instruction.getName(), methodCallback.getTopic()); + methodFuture = waitForCallbackAsync(methodCallback, context); + } + if (stateCallback != null) { + log.info("【预注册状态回调】instruction={}, topic={}", + instruction.getName(), stateCallback.getTopic()); + stateFuture = waitForCallbackAsync(stateCallback, context); + } + + // d. 在线程池中执行远程调用(回调已经注册好了) + CompletableFuture finalMethodFuture = methodFuture; + CompletableFuture finalStateFuture = stateFuture; + return CompletableFuture.supplyAsync(() -> { try { instruction.executeRemoteCall(context); @@ -179,50 +209,35 @@ public class TransactionExecutor { return CompletableFuture.completedFuture(result); } - // c. 等待方法回调(异步) - CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context); - if (methodCallback != null) { - // 自动设置为方法回调类型 - methodCallback.setCallbackType(CallbackConfig.CallbackType.METHOD); + // e. 等待方法回调(已经预注册) + if (finalMethodFuture != null) { + return finalMethodFuture.thenCompose(methodResult -> { + if (!methodResult.isSuccess()) { + instruction.onComplete(context, methodResult); + return CompletableFuture.completedFuture(methodResult); + } - return waitForCallbackAsync(methodCallback, context) - .thenCompose(methodResult -> { - if (!methodResult.isSuccess()) { - instruction.onComplete(context, methodResult); - return CompletableFuture.completedFuture(methodResult); - } + // f. 等待状态回调(已经预注册) + if (finalStateFuture != null) { + return finalStateFuture.thenApply(stateResult -> { + instruction.onComplete(context, stateResult); + return stateResult; + }); + } - // d. 等待状态回调(异步) - CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); - if (stateCallback != null) { - // 自动设置为状态回调类型 - stateCallback.setCallbackType(CallbackConfig.CallbackType.STATE); - - return waitForCallbackAsync(stateCallback, context) - .thenApply(stateResult -> { - instruction.onComplete(context, stateResult); - return stateResult; - }); - } - - // 没有状态回调,直接成功 - InstructionResult result = InstructionResult.success(); - instruction.onComplete(context, result); - return CompletableFuture.completedFuture(result); - }); + // 没有状态回调,直接成功 + InstructionResult result = InstructionResult.success(); + instruction.onComplete(context, result); + return CompletableFuture.completedFuture(result); + }); } // 没有方法回调,检查是否有状态回调 - CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); - if (stateCallback != null) { - // 自动设置为状态回调类型 - stateCallback.setCallbackType(CallbackConfig.CallbackType.STATE); - - return waitForCallbackAsync(stateCallback, context) - .thenApply(stateResult -> { - instruction.onComplete(context, stateResult); - return stateResult; - }); + if (finalStateFuture != null) { + return finalStateFuture.thenApply(stateResult -> { + instruction.onComplete(context, stateResult); + return stateResult; + }); } // 没有任何回调,直接成功