Skip to content

Commit 12adb29

Browse files
committed
Add Async.await() for non-blocking condition waiting
Introduces Async.await() methods that return a Promise instead of blocking, allowing workflows to wait for conditions asynchronously. This enables concurrent condition waiting and Promise composition with anyOf/allOf. Key changes: - Add Async.await(Supplier<Boolean>) returning Promise<Void> - Add Async.await(Duration, Supplier<Boolean>) returning Promise<Boolean> - Implement condition watchers in SyncWorkflowContext evaluated via beforeThreadsWakeUp callback in DeterministicRunner - Support cancellation through CancellationScope - Add comprehensive tests including chaining, anyOf/allOf, and cancellation
1 parent 2322bd0 commit 12adb29

File tree

12 files changed

+743
-5
lines changed

12 files changed

+743
-5
lines changed

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,26 @@ public DynamicUpdateHandler getHandler() {
742742

743743
void await(String reason, Supplier<Boolean> unblockCondition);
744744

745+
/**
746+
* Asynchronously wait until unblockCondition evaluates to true.
747+
*
748+
* @param unblockCondition condition that should return true to indicate completion
749+
* @return Promise that completes when the condition becomes true, or completes exceptionally with
750+
* CanceledFailure if the enclosing CancellationScope is canceled
751+
*/
752+
Promise<Void> awaitAsync(Supplier<Boolean> unblockCondition);
753+
754+
/**
755+
* Asynchronously wait until unblockCondition evaluates to true or timeout expires.
756+
*
757+
* @param timeout maximum time to wait for the condition
758+
* @param unblockCondition condition that should return true to indicate completion
759+
* @return Promise that completes with true if the condition was satisfied, false if the timeout
760+
* expired, or exceptionally with CanceledFailure if the enclosing CancellationScope is
761+
* canceled
762+
*/
763+
Promise<Boolean> awaitAsync(Duration timeout, Supplier<Boolean> unblockCondition);
764+
745765
Promise<Void> newTimer(Duration duration);
746766

747767
Promise<Void> newTimer(Duration duration, TimerOptions options);

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ public void await(String reason, Supplier<Boolean> unblockCondition) {
7575
next.await(reason, unblockCondition);
7676
}
7777

78+
@Override
79+
public Promise<Void> awaitAsync(Supplier<Boolean> unblockCondition) {
80+
return next.awaitAsync(unblockCondition);
81+
}
82+
83+
@Override
84+
public Promise<Boolean> awaitAsync(Duration timeout, Supplier<Boolean> unblockCondition) {
85+
return next.awaitAsync(timeout, unblockCondition);
86+
}
87+
7888
@Override
7989
public Promise<Void> newTimer(Duration duration) {
8090
return next.newTimer(duration);

temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.temporal.internal.worker.WorkflowExecutorCache;
44
import io.temporal.workflow.CancellationScope;
55
import java.util.Optional;
6+
import java.util.function.Supplier;
67
import javax.annotation.Nonnull;
78
import javax.annotation.Nullable;
89

@@ -29,7 +30,7 @@ static DeterministicRunner newRunner(
2930
SyncWorkflowContext workflowContext,
3031
Runnable root,
3132
WorkflowExecutorCache cache) {
32-
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, cache);
33+
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, cache, null);
3334
}
3435

3536
/**
@@ -44,7 +45,29 @@ static DeterministicRunner newRunner(
4445
WorkflowThreadExecutor workflowThreadExecutor,
4546
SyncWorkflowContext workflowContext,
4647
Runnable root) {
47-
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, null);
48+
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, null, null);
49+
}
50+
51+
/**
52+
* Create new instance of DeterministicRunner with a callback invoked before threads wake up.
53+
*
54+
* @param workflowThreadExecutor executor for workflow thread Runnables
55+
* @param workflowContext workflow context to use
56+
* @param root function that root thread of the runner executes.
57+
* @param cache WorkflowExecutorCache used cache inflight workflows
58+
* @param beforeThreadsWakeUp callback invoked once per loop iteration before threads run. Returns
59+
* true if progress was made (e.g., a condition watcher fired), which causes the loop to
60+
* continue even if all threads are blocked. Returns false if no progress was made.
61+
* @return instance of the DeterministicRunner.
62+
*/
63+
static DeterministicRunner newRunner(
64+
WorkflowThreadExecutor workflowThreadExecutor,
65+
SyncWorkflowContext workflowContext,
66+
Runnable root,
67+
WorkflowExecutorCache cache,
68+
@Nullable Supplier<Boolean> beforeThreadsWakeUp) {
69+
return new DeterministicRunnerImpl(
70+
workflowThreadExecutor, workflowContext, root, cache, beforeThreadsWakeUp);
4871
}
4972

5073
/**

temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ class DeterministicRunnerImpl implements DeterministicRunner {
6969
// always accessed under the runner lock
7070
private final List<NamedRunnable> toExecuteInWorkflowThread = new ArrayList<>();
7171

72+
// Callback invoked before threads wake up in each event loop iteration
73+
@Nullable private final Supplier<Boolean> beforeThreadsWakeUp;
74+
7275
// Access to workflowThreadsToAdd, callbackThreadsToAdd, addedThreads doesn't have to be
7376
// synchronized.
7477
// Inside DeterministicRunner the access to these variables is under the runner lock.
@@ -144,20 +147,30 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) {
144147
WorkflowThreadExecutor workflowThreadExecutor,
145148
@Nonnull SyncWorkflowContext workflowContext,
146149
Runnable root) {
147-
this(workflowThreadExecutor, workflowContext, root, null);
150+
this(workflowThreadExecutor, workflowContext, root, null, null);
148151
}
149152

150153
DeterministicRunnerImpl(
151154
WorkflowThreadExecutor workflowThreadExecutor,
152155
@Nonnull SyncWorkflowContext workflowContext,
153156
Runnable root,
154157
WorkflowExecutorCache cache) {
158+
this(workflowThreadExecutor, workflowContext, root, cache, null);
159+
}
160+
161+
DeterministicRunnerImpl(
162+
WorkflowThreadExecutor workflowThreadExecutor,
163+
@Nonnull SyncWorkflowContext workflowContext,
164+
Runnable root,
165+
WorkflowExecutorCache cache,
166+
@Nullable Supplier<Boolean> beforeThreadsWakeUp) {
155167
this.workflowThreadExecutor = workflowThreadExecutor;
156168
this.workflowContext = Preconditions.checkNotNull(workflowContext, "workflowContext");
157169
// TODO this should be refactored, publishing of this in an constructor into external objects is
158170
// a bad practice
159171
this.workflowContext.setRunner(this);
160172
this.cache = cache;
173+
this.beforeThreadsWakeUp = beforeThreadsWakeUp;
161174
boolean deterministicCancellationScopeOrder =
162175
workflowContext
163176
.getReplayContext()
@@ -208,7 +221,16 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) {
208221
appendCallbackThreadsLocked();
209222
}
210223
toExecuteInWorkflowThread.clear();
211-
progress = false;
224+
225+
// Invoke beforeThreadsWakeUp callback BEFORE running threads.
226+
// This allows the callback to evaluate conditions and complete promises,
227+
// ensuring threads see updated state when they wake up.
228+
if (beforeThreadsWakeUp != null) {
229+
progress = beforeThreadsWakeUp.get();
230+
} else {
231+
progress = false;
232+
}
233+
212234
Iterator<WorkflowThread> ci = threads.iterator();
213235
while (ci.hasNext()) {
214236
WorkflowThread c = ci.next();

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) {
124124
context.getWorkflowExecution()))
125125
.start();
126126
},
127-
cache);
127+
cache,
128+
workflowContext.getBeforeThreadsWakeUpCallback());
128129
}
129130

130131
@Override

0 commit comments

Comments
 (0)