Skip to content

Commit 32f37a0

Browse files
authored
Added Workflow.await (#116)
1 parent b205f20 commit 32f37a0

File tree

9 files changed

+114
-51
lines changed

9 files changed

+114
-51
lines changed

src/main/java/com/uber/cadence/internal/sync/CompletablePromiseImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public boolean isCompleted() {
6969
@Override
7070
public V get() {
7171
if (!completed) {
72-
WorkflowThread.yield("Feature.get", () -> completed);
72+
WorkflowThread.await("Feature.get", () -> completed);
7373
}
7474
if (failure != null) {
7575
unregisterWithRunner();
@@ -81,7 +81,7 @@ public V get() {
8181
@Override
8282
public V get(V defaultValue) {
8383
if (!completed) {
84-
WorkflowThread.yield("Feature.get", () -> completed);
84+
WorkflowThread.await("Feature.get", () -> completed);
8585
}
8686
if (failure != null) {
8787
unregisterWithRunner();
@@ -93,7 +93,7 @@ public V get(V defaultValue) {
9393
@Override
9494
public V get(long timeout, TimeUnit unit) throws TimeoutException {
9595
if (!completed) {
96-
WorkflowThread.yield(unit.toMillis(timeout), "Feature.get", () -> completed);
96+
WorkflowThread.await(unit.toMillis(timeout), "Feature.get", () -> completed);
9797
}
9898
if (!completed) {
9999
throw new TimeoutException();
@@ -116,7 +116,7 @@ private V throwFailure() {
116116
@Override
117117
public V get(long timeout, TimeUnit unit, V defaultValue) {
118118
if (!completed) {
119-
WorkflowThread.yield(unit.toMillis(timeout), "Feature.get", () -> completed);
119+
WorkflowThread.await(unit.toMillis(timeout), "Feature.get", () -> completed);
120120
}
121121
if (!completed) {
122122
return defaultValue;
@@ -131,7 +131,7 @@ public V get(long timeout, TimeUnit unit, V defaultValue) {
131131
@Override
132132
public RuntimeException getFailure() {
133133
if (!completed) {
134-
WorkflowThread.yield("Feature.get", () -> completed);
134+
WorkflowThread.await("Feature.get", () -> completed);
135135
}
136136
if (failure != null) {
137137
unregisterWithRunner();

src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,12 @@ private static SyncDecisionContext getDecisionContext() {
164164
return DeterministicRunnerImpl.currentThreadInternal().getDecisionContext();
165165
}
166166

167-
public static void yield(String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError {
168-
WorkflowThread.yield(reason, unblockCondition);
167+
public static void await(String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError {
168+
WorkflowThread.await(reason, unblockCondition);
169169
}
170170

171-
public static boolean yield(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError {
172-
return WorkflowThread.yield(timeoutMillis, reason, unblockCondition);
171+
public static boolean await(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError {
172+
return WorkflowThread.await(timeoutMillis, reason, unblockCondition);
173173
}
174174

175175
public static <U> Promise<List<U>> promiseAllOf(Collection<Promise<U>> promises) {

src/main/java/com/uber/cadence/internal/sync/WorkflowQueueImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ public WorkflowQueueImpl(int capacity) {
3939

4040
@Override
4141
public E take() throws InterruptedException {
42-
WorkflowThread.yield("WorkflowQueue.take", () -> !queue.isEmpty());
42+
WorkflowThread.await("WorkflowQueue.take", () -> !queue.isEmpty());
4343
return queue.remove();
4444
}
4545

4646
@Override
4747
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
48-
WorkflowThread.yield(unit.toMillis(timeout), "WorkflowQueue.poll", () -> !queue.isEmpty());
48+
WorkflowThread.await(unit.toMillis(timeout), "WorkflowQueue.poll", () -> !queue.isEmpty());
4949
if (queue.isEmpty()) {
5050
return null;
5151
}
@@ -63,19 +63,19 @@ public boolean offer(E e) {
6363

6464
@Override
6565
public void put(E e) throws InterruptedException {
66-
// This condition is excessive as yield already checks it.
67-
// But yield can be called only from the sync owned thread.
66+
// This condition is excessive as await already checks it.
67+
// But await can be called only from the sync owned thread.
6868
// This condition allows puts outside the sync thread which
6969
// is used by signal handling logic.
7070
if (queue.size() >= capacity) {
71-
WorkflowThread.yield("WorkflowQueue.put", () -> queue.size() < capacity);
71+
WorkflowThread.await("WorkflowQueue.put", () -> queue.size() < capacity);
7272
}
7373
queue.add(e);
7474
}
7575

7676
@Override
7777
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
78-
boolean timedOut = WorkflowThread.yield(unit.toMillis(timeout), "WorkflowQueue.offer", () -> queue.size() < capacity);
78+
boolean timedOut = WorkflowThread.await(unit.toMillis(timeout), "WorkflowQueue.offer", () -> queue.size() < capacity);
7979
if (timedOut) {
8080
return false;
8181
}

src/main/java/com/uber/cadence/internal/sync/WorkflowThread.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,17 @@ interface WorkflowThread extends CancellationScope {
3838
* @throws CancellationException if thread (or current cancellation scope was cancelled).
3939
* @throws DestroyWorkflowThreadError if thread was asked to be destroyed.
4040
*/
41-
static void yield(String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError {
42-
currentThreadInternal().yieldImpl(reason, unblockCondition);
41+
static void await(String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError {
42+
currentThreadInternal().yield(reason, unblockCondition);
4343
}
4444

4545
/**
4646
* Block current thread until unblockCondition is evaluated to true or timeoutMillis passes.
4747
*
4848
* @return false if timed out.
4949
*/
50-
static boolean yield(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError {
51-
return currentThreadInternal().yieldImpl(timeoutMillis, reason, unblockCondition);
50+
static boolean await(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError {
51+
return currentThreadInternal().yield(timeoutMillis, reason, unblockCondition);
5252
}
5353

5454
/**
@@ -91,9 +91,9 @@ static WorkflowThread newThread(Runnable runnable, boolean detached, String name
9191

9292
void addStackTrace(StringBuilder result);
9393

94-
void yieldImpl(String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError;
94+
void yield(String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError;
9595

96-
boolean yieldImpl(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError;
96+
boolean yield(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError;
9797

9898
/**
9999
* Stop executing all workflow threads and puts {@link DeterministicRunner} into closed state.

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class WorkflowThreadContext {
2626

2727
// Shared runner lock
2828
private final Lock lock;
29-
// Used to block yield call
29+
// Used to block await call
3030
private final Condition yieldCondition;
3131
// Used to block runUntilBlocked call
3232
private final Condition runCondition;
@@ -85,7 +85,7 @@ public void yield(String reason, Supplier<Boolean> unblockFunction) {
8585
* Execute evaluation function by the thread that owns this context if
8686
* {@link #evaluateInCoroutineContext(Consumer)} was called.
8787
*
88-
* @param reason human readable reason for current thread blockage passed to yield call.
88+
* @param reason human readable reason for current thread blockage passed to await call.
8989
*/
9090
private void mayBeEvaluate(String reason) {
9191
if (status == Status.EVALUATING) {
@@ -101,7 +101,7 @@ private void mayBeEvaluate(String reason) {
101101
}
102102

103103
/**
104-
* Call function by the thread that owns this context and is currently blocked in a yield.
104+
* Call function by the thread that owns this context and is currently blocked in a await.
105105
* Used to get information about current state of the thread like current stack trace.
106106
*
107107
* @param function to evaluate. Consumes reason for yielding parameter.
@@ -189,7 +189,7 @@ public String getYieldReason() {
189189
}
190190

191191
/**
192-
* @return true if thread made some progress. Which is yield was unblocked and some code after it was executed.
192+
* @return true if thread made some progress. Which is await was unblocked and some code after it was executed.
193193
*/
194194
public boolean runUntilBlocked() {
195195
lock.lock();

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ public void evaluateInCoroutineContext(Consumer<String> function) {
275275
}
276276

277277
/**
278-
* Interrupt coroutine by throwing DestroyWorkflowThreadError from a yield method
278+
* Interrupt coroutine by throwing DestroyWorkflowThreadError from a await method
279279
* it is blocked on and wait for coroutine thread to finish execution.
280280
*/
281281
public void stop() {
@@ -317,26 +317,26 @@ public void addStackTrace(StringBuilder result) {
317317
StackTraceElement[] stackTrace = thread.getStackTrace();
318318
for (int i = omitTop; i < stackTrace.length - omitBottom; i++) {
319319
StackTraceElement e = stackTrace[i];
320-
if (i == omitTop && "yield".equals(e.getMethodName())) continue;
320+
if (i == omitTop && "await".equals(e.getMethodName())) continue;
321321
result.append(e);
322322
result.append("\n");
323323
}
324324
}
325325

326326
@Override
327-
public void yieldImpl(String reason, Supplier<Boolean> unblockCondition) {
327+
public void yield(String reason, Supplier<Boolean> unblockCondition) {
328328
context.yield(reason, unblockCondition);
329329
}
330330

331331
@Override
332-
public boolean yieldImpl(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError {
332+
public boolean yield(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition) throws DestroyWorkflowThreadError {
333333
if (timeoutMillis == 0) {
334334
return unblockCondition.get();
335335
}
336336
long blockedUntil = WorkflowInternal.currentTimeMillis() + timeoutMillis;
337337
setBlockedUntil(blockedUntil);
338338
YieldWithTimeoutCondition condition = new YieldWithTimeoutCondition(unblockCondition, blockedUntil);
339-
WorkflowThread.yield(reason, condition);
339+
WorkflowThread.await(reason, condition);
340340
return !condition.isTimedOut();
341341
}
342342

src/main/java/com/uber/cadence/workflow/Workflow.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
import java.time.Duration;
2727
import java.util.Objects;
28+
import java.util.concurrent.CancellationException;
29+
import java.util.function.Supplier;
2830

2931
public final class Workflow {
3032

@@ -138,12 +140,38 @@ public static void sleep(Duration duration) {
138140
}
139141

140142
public static void sleep(long millis) {
141-
WorkflowInternal.yield(millis, "sleep", () -> {
143+
WorkflowInternal.await(millis, "sleep", () -> {
142144
CancellationScope.throwCancelled();
143145
return false;
144146
});
145147
}
146148

149+
/**
150+
* Block current thread until unblockCondition is evaluated to true.
151+
*
152+
* @param unblockCondition condition that should return true to indicate that thread should unblock.
153+
* @throws CancellationException if thread (or current {@link CancellationScope} was cancelled).
154+
*/
155+
public static void await(Supplier<Boolean> unblockCondition) {
156+
WorkflowInternal.await("await", () -> {
157+
CancellationScope.throwCancelled();
158+
return unblockCondition.get();
159+
});
160+
}
161+
162+
/**
163+
* Block current workflow thread until unblockCondition is evaluated to true or timeoutMillis passes.
164+
*
165+
* @return false if timed out.
166+
* @throws CancellationException if thread (or current {@link CancellationScope} was cancelled).
167+
*/
168+
public static boolean await(Duration timeout, Supplier<Boolean> unblockCondition) {
169+
return WorkflowInternal.await(timeout.toMillis(), "await", () -> {
170+
CancellationScope.throwCancelled();
171+
return unblockCondition.get();
172+
});
173+
}
174+
147175
/**
148176
* Invokes function retrying in case of failures according to retry options.
149177
* Synchronous variant. Use {@link Async#retry(RetryOptions, Functions.Func)} for asynchronous functions.
@@ -210,14 +238,16 @@ public static <R> R executeActivity(String name, ActivityOptions options, Class<
210238
* throw CheckedExceptionWrapper.wrap(e);
211239
* }
212240
* </pre>*
241+
*
213242
* @return CheckedExceptionWrapper if e is checked or original exception if e extends RuntimeException.
214243
*/
215244
public static RuntimeException wrap(Exception e) {
216245
return WorkflowInternal.wrap(e);
217246
}
218247

219248
/**
220-
* Removes {@link com.uber.cadence.internal.sync.CheckedExceptionWrapper} from causal exception chain.
249+
* Removes {@link com.uber.cadence.internal.common.CheckedExceptionWrapper} from causal exception chain.
250+
*
221251
* @param e exception with causality chain that might contain wrapped exceptions.
222252
* @return exception causality chain with CheckedExceptionWrapper removed.
223253
*/

0 commit comments

Comments
 (0)