Skip to content

Commit 81e974c

Browse files
authored
DeterministicRunnerImpl::close: wait for tasks outside lock (#213)
* DeterministicRunnerImpl: wait for tasks outside lock * return completable future on null
1 parent b668b27 commit 81e974c

File tree

3 files changed

+30
-18
lines changed

3 files changed

+30
-18
lines changed

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ public void cancel(String reason) {
287287

288288
@Override
289289
public void close() {
290+
List<Future<?>> threadFutures = new ArrayList<>();
290291
lock.lock();
291292
if (closed) {
292293
return;
@@ -299,7 +300,7 @@ public void close() {
299300
}
300301
try {
301302
for (WorkflowThread c : threads) {
302-
c.stop();
303+
threadFutures.add(c.stopNow());
303304
}
304305
threads.clear();
305306

@@ -326,6 +327,18 @@ public void close() {
326327
closed = true;
327328
lock.unlock();
328329
}
330+
331+
// Context is destroyed in c.StopNow(). Wait on all tasks outside the lock since
332+
// these tasks use the same lock to execute.
333+
for (Future<?> future : threadFutures) {
334+
try {
335+
future.get();
336+
} catch (InterruptedException e) {
337+
throw new Error("Unexpected interrupt", e);
338+
} catch (ExecutionException e) {
339+
throw new Error("Unexpected failure stopping coroutine", e);
340+
}
341+
}
329342
}
330343

331344
@Override

src/main/java/com/uber/cadence/internal/sync/WorkflowThread.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.workflow.CancellationScope;
2323
import java.util.Optional;
2424
import java.util.concurrent.CancellationException;
25+
import java.util.concurrent.Future;
2526
import java.util.function.Supplier;
2627

2728
/** Thread that is scheduled deterministically by {@link DeterministicRunner}. */
@@ -89,7 +90,7 @@ static WorkflowThread newThread(Runnable runnable, boolean detached, String name
8990

9091
boolean isDone();
9192

92-
void stop();
93+
Future<?> stopNow();
9394

9495
void addStackTrace(StringBuilder result);
9596

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,15 @@
2727
import java.util.HashMap;
2828
import java.util.Map;
2929
import java.util.Optional;
30-
import java.util.concurrent.CancellationException;
31-
import java.util.concurrent.ExecutionException;
32-
import java.util.concurrent.ExecutorService;
33-
import java.util.concurrent.Future;
34-
import java.util.concurrent.RejectedExecutionException;
30+
import java.util.concurrent.*;
3531
import java.util.function.Consumer;
3632
import java.util.function.Supplier;
3733
import org.slf4j.Logger;
3834
import org.slf4j.LoggerFactory;
3935
import org.slf4j.MDC;
4036

37+
import static javafx.scene.input.KeyCode.T;
38+
4139
class WorkflowThreadImpl implements WorkflowThread {
4240

4341
/**
@@ -331,10 +329,10 @@ public void evaluateInCoroutineContext(Consumer<String> function) {
331329

332330
/**
333331
* Interrupt coroutine by throwing DestroyWorkflowThreadError from a await method it is blocked on
334-
* and wait for coroutine thread to finish execution.
332+
* and return underlying Future to be waited on.
335333
*/
336334
@Override
337-
public void stop() {
335+
public Future<?> stopNow() {
338336
// Cannot call destroy() on itself
339337
if (thread == Thread.currentThread()) {
340338
throw new Error("Cannot call destroy on itself: " + thread.getName());
@@ -344,16 +342,16 @@ public void stop() {
344342
throw new RuntimeException(
345343
"Couldn't destroy the thread. " + "The blocked thread stack trace: " + getStackTrace());
346344
}
347-
try {
348-
// Check if thread was started
349-
if (taskFuture != null) {
350-
taskFuture.get();
351-
}
352-
} catch (InterruptedException e) {
353-
throw new Error("Unexpected interrupt", e);
354-
} catch (ExecutionException e) {
355-
throw new Error("Unexpected failure stopping coroutine", e);
345+
if(taskFuture == null){
346+
return getCompletedFuture();
356347
}
348+
return taskFuture;
349+
}
350+
351+
private Future<?> getCompletedFuture(){
352+
CompletableFuture<String> f = new CompletableFuture<>();
353+
f.complete("done");
354+
return f;
357355
}
358356

359357
@Override

0 commit comments

Comments
 (0)