Skip to content

Commit 8fa988d

Browse files
committed
682 Fix potential race condition where the future is cancelled before 'awaitTestResult' completes, which could leave resources in an inconsistent state
1 parent ae0a8be commit 8fa988d

File tree

2 files changed

+20
-16
lines changed

2 files changed

+20
-16
lines changed

server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-rest/src/main/java/com/bytechef/platform/workflow/test/web/rest/WorkflowTestApiController.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.List;
3939
import java.util.Map;
4040
import java.util.Objects;
41+
import java.util.concurrent.CancellationException;
4142
import java.util.concurrent.CompletableFuture;
4243
import java.util.concurrent.TimeUnit;
4344
import java.util.regex.Matcher;
@@ -159,7 +160,9 @@ public SseEmitter attachWorkflowTest(@PathVariable Long jobId) {
159160

160161
future.whenComplete((result, throwable) -> {
161162
try {
162-
if (throwable != null) {
163+
if (throwable instanceof CancellationException) {
164+
sendToEmitter(key, createEvent("error", "Aborted"));
165+
} else if (throwable != null) {
163166
sendToEmitter(
164167
key, createEvent("error", Objects.toString(throwable.getMessage(), "An error occurred")));
165168
} else {
@@ -186,23 +189,21 @@ public SseEmitter attachWorkflowTest(@PathVariable Long jobId) {
186189
public ResponseEntity<Void> stopWorkflowTest(@PathVariable String jobId) {
187190
final String key = TenantCacheKeyUtils.getKey(jobId);
188191

189-
workflowTestFacade.stopTest(Long.parseLong(jobId));
190-
191192
CompletableFuture<WorkflowTestExecutionModel> future = runs.getIfPresent(key);
192-
if (future != null) {
193-
runs.invalidate(key);
194-
}
195-
196-
try {
197-
sendToEmitter(key, createEvent("error", "Aborted"));
198-
} finally {
199-
pendingEvents.invalidate(key);
200-
completeAndClearEmitter(key);
201-
unregisterListeners(key);
202-
}
203193

204194
if (future != null && !future.isDone()) {
205195
future.cancel(true);
196+
} else {
197+
workflowTestFacade.stopTest(Long.parseLong(jobId));
198+
199+
try {
200+
sendToEmitter(key, createEvent("error", "Aborted"));
201+
} finally {
202+
runs.invalidate(key);
203+
pendingEvents.invalidate(key);
204+
completeAndClearEmitter(key);
205+
unregisterListeners(key);
206+
}
206207
}
207208

208209
return ResponseEntity.ok()
@@ -261,7 +262,11 @@ public SseEmitter testWorkflow(
261262

262263
future.whenComplete((result, throwable) -> {
263264
try {
264-
if (throwable != null) {
265+
if (throwable instanceof CancellationException) {
266+
workflowTestFacade.stopTest(jobId);
267+
268+
sendToEmitter(key, createEvent("error", "Aborted"));
269+
} else if (throwable != null) {
265270
sendToEmitter(
266271
key, createEvent("error", Objects.toString(throwable.getMessage(), "An error occurred")));
267272
} else {

server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-rest/src/test/java/com/bytechef/platform/workflow/test/web/rest/WorkflowTestApiControllerTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import java.util.Set;
4949
import java.util.concurrent.CompletableFuture;
5050
import java.util.concurrent.ConcurrentMap;
51-
import java.util.concurrent.CopyOnWriteArrayList;
5251
import java.util.concurrent.CountDownLatch;
5352
import java.util.concurrent.TimeUnit;
5453
import java.util.concurrent.atomic.AtomicReference;

0 commit comments

Comments
 (0)