Skip to content

Commit af5335f

Browse files
committed
Add AwaitOptions with timerSummary for Async.await() and Workflow.await()
Add overloads for await methods that accept AwaitOptions, allowing users to specify a timer summary that appears in the workflow history UI/CLI. - Add AwaitOptions class with timerSummary field - Add Async.await(Duration, AwaitOptions, Supplier) for non-blocking waits - Add Workflow.await(Duration, AwaitOptions, Supplier) for blocking waits - Add test validating timer summary is set in workflow history
1 parent e8e5414 commit af5335f

File tree

10 files changed

+296
-0
lines changed

10 files changed

+296
-0
lines changed

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,19 @@ public DynamicUpdateHandler getHandler() {
762762
*/
763763
Promise<Boolean> awaitAsync(Duration timeout, Supplier<Boolean> unblockCondition);
764764

765+
/**
766+
* Asynchronously wait until unblockCondition evaluates to true or timeout expires.
767+
*
768+
* @param timeout maximum time to wait for the condition
769+
* @param options options for the await operation, including timer summary
770+
* @param unblockCondition condition that should return true to indicate completion
771+
* @return Promise that completes with true if the condition was satisfied, false if the timeout
772+
* expired, or exceptionally with CanceledFailure if the enclosing CancellationScope is
773+
* canceled
774+
*/
775+
Promise<Boolean> awaitAsync(
776+
Duration timeout, AwaitOptions options, Supplier<Boolean> unblockCondition);
777+
765778
Promise<Void> newTimer(Duration duration);
766779

767780
Promise<Void> newTimer(Duration duration, TimerOptions options);

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.uber.m3.tally.Scope;
44
import io.temporal.common.SearchAttributeUpdate;
5+
import io.temporal.workflow.AwaitOptions;
56
import io.temporal.workflow.Functions.Func;
67
import io.temporal.workflow.MutableSideEffectOptions;
78
import io.temporal.workflow.Promise;
@@ -85,6 +86,12 @@ public Promise<Boolean> awaitAsync(Duration timeout, Supplier<Boolean> unblockCo
8586
return next.awaitAsync(timeout, unblockCondition);
8687
}
8788

89+
@Override
90+
public Promise<Boolean> awaitAsync(
91+
Duration timeout, AwaitOptions options, Supplier<Boolean> unblockCondition) {
92+
return next.awaitAsync(timeout, options, unblockCondition);
93+
}
94+
8895
@Override
8996
public Promise<Void> newTimer(Duration duration) {
9097
return next.newTimer(duration);

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,90 @@ public Promise<Boolean> awaitAsync(Duration timeout, Supplier<Boolean> unblockCo
14601460
return result;
14611461
}
14621462

1463+
@Override
1464+
public Promise<Boolean> awaitAsync(
1465+
Duration timeout, AwaitOptions options, Supplier<Boolean> unblockCondition) {
1466+
// Check if condition is already true
1467+
setReadOnly(true);
1468+
try {
1469+
if (unblockCondition.get()) {
1470+
return Workflow.newPromise(true);
1471+
}
1472+
} finally {
1473+
setReadOnly(false);
1474+
}
1475+
1476+
CompletablePromise<Boolean> result = Workflow.newPromise();
1477+
1478+
// Capture cancellation state - the condition will be evaluated from the runner thread
1479+
// where CancellationScope.current() is not available
1480+
AtomicBoolean cancelled = new AtomicBoolean(false);
1481+
1482+
// Create timer - need access to cancellation handle
1483+
AtomicReference<Functions.Proc1<RuntimeException>> timerCancellation = new AtomicReference<>();
1484+
AtomicBoolean timerCompleted = new AtomicBoolean(false);
1485+
1486+
@Nullable
1487+
UserMetadata userMetadata =
1488+
makeUserMetaData(options.getTimerSummary(), null, dataConverterWithCurrentWorkflowContext);
1489+
1490+
timerCancellation.set(
1491+
replayContext.newTimer(
1492+
timeout,
1493+
userMetadata,
1494+
(e) -> {
1495+
// Set timer flag directly so condition watcher sees it immediately
1496+
if (e == null) {
1497+
timerCompleted.set(true);
1498+
}
1499+
// Timer cancellation exceptions are ignored - we just care if it fired
1500+
}));
1501+
1502+
// Register with current CancellationScope for timer cancellation
1503+
CancellationScope.current()
1504+
.getCancellationRequest()
1505+
.thenApply(
1506+
(r) -> {
1507+
timerCancellation.get().apply(new CanceledFailure(r));
1508+
return r;
1509+
});
1510+
1511+
Functions.Proc cancelHandle =
1512+
registerConditionWatcher(
1513+
() -> {
1514+
if (cancelled.get()) {
1515+
throw new CanceledFailure("cancelled");
1516+
}
1517+
return unblockCondition.get() || timerCompleted.get();
1518+
},
1519+
(e) -> {
1520+
// Complete promise directly so blocked threads see it immediately
1521+
if (e != null) {
1522+
result.completeExceptionally(e);
1523+
} else {
1524+
boolean conditionMet = unblockCondition.get();
1525+
result.complete(conditionMet);
1526+
if (conditionMet && !timerCompleted.get()) {
1527+
// Cancel timer since condition was met first
1528+
timerCancellation.get().apply(new CanceledFailure("condition met"));
1529+
}
1530+
}
1531+
});
1532+
1533+
// Handle cancellation - complete result promise
1534+
CancellationScope.current()
1535+
.getCancellationRequest()
1536+
.thenApply(
1537+
(r) -> {
1538+
cancelled.set(true);
1539+
result.completeExceptionally(new CanceledFailure(r));
1540+
cancelHandle.apply(); // Remove the watcher
1541+
return r;
1542+
});
1543+
1544+
return result;
1545+
}
1546+
14631547
@SuppressWarnings("deprecation")
14641548
@Override
14651549
public void continueAsNew(ContinueAsNewInput input) {

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,15 @@ public static Promise<Boolean> awaitAsync(Duration timeout, Supplier<Boolean> un
536536
return getWorkflowOutboundInterceptor().awaitAsync(timeout, unblockCondition);
537537
}
538538

539+
public static Promise<Boolean> awaitAsync(
540+
Duration timeout, AwaitOptions options, Supplier<Boolean> unblockCondition) {
541+
assertNotReadOnly("awaitAsync");
542+
// Don't wrap the condition with setReadOnly here - the condition will be evaluated
543+
// from the runner thread (not a workflow thread), so getRootWorkflowContext() won't work.
544+
// SyncWorkflowContext.evaluateConditionWatchers handles setReadOnly directly.
545+
return getWorkflowOutboundInterceptor().awaitAsync(timeout, options, unblockCondition);
546+
}
547+
539548
public static <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
540549
assertNotReadOnly("side effect");
541550
return getWorkflowOutboundInterceptor().sideEffect(resultClass, resultType, func);

temporal-sdk/src/main/java/io/temporal/workflow/Async.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,30 @@ public static Promise<Boolean> await(
267267
return WorkflowInternal.awaitAsync(timeout, unblockCondition);
268268
}
269269

270+
/**
271+
* Asynchronously wait until unblockCondition evaluates to true or timeout expires.
272+
*
273+
* @param timeout maximum time to wait for the condition
274+
* @param options options for the await operation, including timer summary
275+
* @param unblockCondition condition that should return true to indicate completion. The condition
276+
* is called on every state transition, so it should never call any blocking operations or
277+
* contain code that mutates workflow state.
278+
* @return Promise that completes with:
279+
* <ul>
280+
* <li>true if the condition was satisfied
281+
* <li>false if the timeout expired before the condition was satisfied
282+
* <li>exceptionally with CanceledFailure if the enclosing CancellationScope is canceled
283+
* </ul>
284+
*
285+
* @see Workflow#await(Duration, AwaitOptions, java.util.function.Supplier) for a blocking version
286+
*/
287+
public static Promise<Boolean> await(
288+
Duration timeout,
289+
AwaitOptions options,
290+
java.util.function.Supplier<Boolean> unblockCondition) {
291+
return WorkflowInternal.awaitAsync(timeout, options, unblockCondition);
292+
}
293+
270294
/** Prohibits instantiation. */
271295
private Async() {}
272296
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package io.temporal.workflow;
2+
3+
import io.temporal.common.Experimental;
4+
import java.util.Objects;
5+
6+
/** Options for await operations that involve timers. */
7+
public final class AwaitOptions {
8+
9+
public static AwaitOptions.Builder newBuilder() {
10+
return new AwaitOptions.Builder();
11+
}
12+
13+
public static AwaitOptions.Builder newBuilder(AwaitOptions options) {
14+
return new AwaitOptions.Builder(options);
15+
}
16+
17+
public static AwaitOptions getDefaultInstance() {
18+
return DEFAULT_INSTANCE;
19+
}
20+
21+
private static final AwaitOptions DEFAULT_INSTANCE;
22+
23+
static {
24+
DEFAULT_INSTANCE = AwaitOptions.newBuilder().build();
25+
}
26+
27+
public static final class Builder {
28+
private String timerSummary;
29+
30+
private Builder() {}
31+
32+
private Builder(AwaitOptions options) {
33+
if (options == null) {
34+
return;
35+
}
36+
this.timerSummary = options.timerSummary;
37+
}
38+
39+
/**
40+
* Single-line fixed summary for the timer used by timed await operations. This will appear in
41+
* UI/CLI. Can be in single-line Temporal Markdown format.
42+
*
43+
* <p>Default is none/empty.
44+
*/
45+
@Experimental
46+
public Builder setTimerSummary(String timerSummary) {
47+
this.timerSummary = timerSummary;
48+
return this;
49+
}
50+
51+
public AwaitOptions build() {
52+
return new AwaitOptions(timerSummary);
53+
}
54+
}
55+
56+
private final String timerSummary;
57+
58+
private AwaitOptions(String timerSummary) {
59+
this.timerSummary = timerSummary;
60+
}
61+
62+
public String getTimerSummary() {
63+
return timerSummary;
64+
}
65+
66+
public Builder toBuilder() {
67+
return new Builder(this);
68+
}
69+
70+
@Override
71+
public String toString() {
72+
return "AwaitOptions{" + "timerSummary='" + timerSummary + '\'' + '}';
73+
}
74+
75+
@Override
76+
public boolean equals(Object o) {
77+
if (this == o) return true;
78+
if (o == null || getClass() != o.getClass()) return false;
79+
AwaitOptions that = (AwaitOptions) o;
80+
return Objects.equals(timerSummary, that.timerSummary);
81+
}
82+
83+
@Override
84+
public int hashCode() {
85+
return Objects.hash(timerSummary);
86+
}
87+
}

temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,32 @@ public static boolean await(Duration timeout, Supplier<Boolean> unblockCondition
630630
});
631631
}
632632

633+
/**
634+
* Block current workflow thread until unblockCondition is evaluated to true or timeout passes.
635+
*
636+
* @param timeout time to unblock even if unblockCondition is not satisfied.
637+
* @param options options for the await operation, including timer summary
638+
* @param unblockCondition condition that should return true to indicate that thread should
639+
* unblock. The condition is called on every state transition, so it should not contain any
640+
* code that mutates any workflow state. It should also not contain any time based conditions.
641+
* Use timeout parameter for those.
642+
* @return false if timed out.
643+
* @throws CanceledFailure if thread (or current {@link CancellationScope} was canceled).
644+
* @see Async#await(Duration, AwaitOptions, java.util.function.Supplier) for a non-blocking
645+
* version that returns a Promise
646+
*/
647+
public static boolean await(
648+
Duration timeout, AwaitOptions options, Supplier<Boolean> unblockCondition) {
649+
return WorkflowInternal.awaitAsync(
650+
timeout,
651+
options,
652+
() -> {
653+
CancellationScope.throwCanceled();
654+
return unblockCondition.get();
655+
})
656+
.get();
657+
}
658+
633659
/**
634660
* Invokes function retrying in case of failures according to retry options. Synchronous variant.
635661
* Use {@link Async#retry(RetryOptions, Optional, Functions.Func)} for asynchronous functions.

temporal-sdk/src/test/java/io/temporal/workflow/AsyncAwaitTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22

33
import static org.junit.Assert.*;
44

5+
import io.temporal.api.common.v1.WorkflowExecution;
6+
import io.temporal.api.enums.v1.EventType;
7+
import io.temporal.api.history.v1.HistoryEvent;
8+
import io.temporal.client.WorkflowStub;
59
import io.temporal.failure.CanceledFailure;
10+
import io.temporal.testUtils.HistoryUtils;
611
import io.temporal.testing.internal.SDKTestWorkflowRule;
712
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
813
import java.time.Duration;
@@ -106,6 +111,21 @@ public void testAsyncAwaitConditionThrows() {
106111
assertEquals("caught:simulated error", result);
107112
}
108113

114+
static final String awaitTimerSummary = "await-timer-summary";
115+
116+
@Test
117+
public void testAwaitWithOptionsSetsTimerSummary() {
118+
TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
119+
String result = workflow.execute("await-with-options");
120+
assertEquals("await-with-options:false", result);
121+
122+
// Verify the timer summary is set in the workflow history
123+
WorkflowExecution exec = WorkflowStub.fromTyped(workflow).getExecution();
124+
HistoryEvent timerStartedEvent =
125+
testWorkflowRule.getHistoryEvent(exec.getWorkflowId(), EventType.EVENT_TYPE_TIMER_STARTED);
126+
HistoryUtils.assertEventMetadata(timerStartedEvent, awaitTimerSummary, null);
127+
}
128+
109129
/** Combined workflow that handles all test scenarios. */
110130
public static class TestAsyncAwaitWorkflow implements TestWorkflow1 {
111131
private boolean condition1 = false;
@@ -140,6 +160,8 @@ public String execute(String testCase) {
140160
return testTimedCancellation();
141161
case "condition-throws":
142162
return testConditionThrows();
163+
case "await-with-options":
164+
return testAwaitWithOptions();
143165
default:
144166
return "unknown test case";
145167
}
@@ -359,5 +381,14 @@ private String testConditionThrows() {
359381
return "caught:" + e.getMessage();
360382
}
361383
}
384+
385+
private String testAwaitWithOptions() {
386+
// Use Async.await with AwaitOptions to set a timer summary
387+
AwaitOptions options = AwaitOptions.newBuilder().setTimerSummary(awaitTimerSummary).build();
388+
// Use a condition that will never be true, so it times out
389+
Promise<Boolean> promise = Async.await(Duration.ofMillis(100), options, () -> false);
390+
boolean result = promise.get();
391+
return "await-with-options:" + result;
392+
}
362393
}
363394
}

temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,12 @@ public Promise<Boolean> awaitAsync(Duration timeout, Supplier<Boolean> unblockCo
404404
throw new UnsupportedOperationException("not implemented");
405405
}
406406

407+
@Override
408+
public Promise<Boolean> awaitAsync(
409+
Duration timeout, AwaitOptions options, Supplier<Boolean> unblockCondition) {
410+
throw new UnsupportedOperationException("not implemented");
411+
}
412+
407413
@Override
408414
public Promise<Void> newTimer(Duration duration) {
409415
throw new UnsupportedOperationException("not implemented");

temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,15 @@ public Promise<Boolean> awaitAsync(Duration timeout, Supplier<Boolean> unblockCo
249249
return next.awaitAsync(timeout, unblockCondition);
250250
}
251251

252+
@Override
253+
public Promise<Boolean> awaitAsync(
254+
Duration timeout, AwaitOptions options, Supplier<Boolean> unblockCondition) {
255+
if (!WorkflowUnsafe.isReplaying()) {
256+
trace.add("awaitAsync " + timeout + " " + options);
257+
}
258+
return next.awaitAsync(timeout, options, unblockCondition);
259+
}
260+
252261
@Override
253262
public Promise<Void> newTimer(Duration duration) {
254263
if (!WorkflowUnsafe.isReplaying()) {

0 commit comments

Comments
 (0)