Skip to content

Commit 3942688

Browse files
authored
Re-implement retriable task (#157)
* re-implement retriable task update CHANGELOG update continueAsNew e2e test * complete outer task accordingly * fix code style * fix style * refactor design * rename createChildTask method * refactor childTask and parentTask * remove unnecessary check * add e2e test for failure path of retry * refactor handle methods * refactor generics - add more e2e tests * fix anyOf - add more integration/e2e tests * fix typo * fix e2e test case * fix issue for anyOf
1 parent c3e5f5a commit 3942688

File tree

8 files changed

+483
-79
lines changed

8 files changed

+483
-79
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## v1.3.0
2+
* Refactor `RetriableTask` and add new `CompoundTask`, fixing Fan-out/Fan-in stuck when using `RetriableTask` ([#157](https://github.com/microsoft/durabletask-java/pull/157))
3+
14
## v1.2.0
25

36
### Updates

client/src/main/java/com/microsoft/durabletask/Task.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import java.util.concurrent.CompletableFuture;
88
import java.util.function.Consumer;
99
import java.util.function.Function;
10-
import java.util.function.Supplier;
1110

1211
/**
1312
* Represents an asynchronous operation in a durable orchestration.
@@ -32,7 +31,6 @@
3231
*/
3332
public abstract class Task<V> {
3433
final CompletableFuture<V> future;
35-
3634
Task(CompletableFuture<V> future) {
3735
this.future = future;
3836
}

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,19 @@ public interface TaskOrchestrationContext {
5858
*/
5959
boolean getIsReplaying();
6060

61+
/**
62+
* Returns a new {@code Task} that is completed when all tasks in {@code tasks} completes.
63+
* See {@link #allOf(Task[])} for more detailed information.
64+
*
65+
* @param tasks the list of {@code Task} objects
66+
* @param <V> the return type of the {@code Task} objects
67+
* @return a new {@code Task} that is completed when any of the given {@code Task}s complete
68+
* @see #allOf(Task[])
69+
*/
70+
<V> Task<List<V>> allOf(List<Task<V>> tasks);
71+
6172
// TODO: Update the description of allOf to be more specific about the exception behavior.
73+
6274
// https://github.com/microsoft/durabletask-java/issues/54
6375
/**
6476
* Returns a new {@code Task} that is completed when all the given {@code Task}s complete. If any of the given
@@ -74,24 +86,26 @@ public interface TaskOrchestrationContext {
7486
* Task<String> t2 = ctx.callActivity("MyActivity", String.class);
7587
* Task<String> t3 = ctx.callActivity("MyActivity", String.class);
7688
*
77-
* List<String> orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
89+
* List<String> orderedResults = ctx.allOf(t1, t2, t3).await();
7890
* }</pre>
7991
*
8092
* Exceptions in any of the given tasks results in an unchecked {@link CompositeTaskFailedException}.
8193
* This exception can be inspected to obtain failure details of individual {@link Task}s.
8294
* <pre>{@code
8395
* try {
84-
* List<String> orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
96+
* List<String> orderedResults = ctx.allOf(t1, t2, t3).await();
8597
* } catch (CompositeTaskFailedException e) {
8698
* List<Exception> exceptions = e.getExceptions()
8799
* }
88100
* }</pre>
89101
*
90-
* @param tasks the list of {@code Task} objects
102+
* @param tasks the {@code Task}s
91103
* @param <V> the return type of the {@code Task} objects
92104
* @return the values of the completed {@code Task} objects in the same order as the source list
93105
*/
94-
<V> Task<List<V>> allOf(List<Task<V>> tasks);
106+
default <V> Task<List<V>> allOf(Task<V>... tasks) {
107+
return this.allOf(Arrays.asList(tasks));
108+
}
95109

96110
/**
97111
* Returns a new {@code Task} that is completed when any of the tasks in {@code tasks} completes.

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 158 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -185,39 +185,43 @@ public <V> Task<List<V>> allOf(List<Task<V>> tasks) {
185185
.map(t -> t.future)
186186
.toArray((IntFunction<CompletableFuture<V>[]>) CompletableFuture[]::new);
187187

188-
return new CompletableTask<>(CompletableFuture.allOf(futures)
189-
.thenApply(x -> {
190-
List<V> results = new ArrayList<>(futures.length);
191-
192-
// All futures are expected to be completed at this point
193-
for (CompletableFuture<V> cf : futures) {
194-
try {
195-
results.add(cf.get());
196-
} catch (Exception ex) {
197-
results.add(null);
198-
}
199-
}
200-
return results;
201-
})
202-
.exceptionally(throwable -> {
203-
ArrayList<Exception> exceptions = new ArrayList<>(futures.length);
204-
for (CompletableFuture<V> cf : futures) {
205-
try {
206-
cf.get();
207-
} catch (ExecutionException ex) {
208-
exceptions.add((Exception) ex.getCause());
209-
} catch (Exception ex){
210-
exceptions.add(ex);
211-
}
212-
}
213-
throw new CompositeTaskFailedException(
214-
String.format(
215-
"%d out of %d tasks failed with an exception. See the exceptions list for details.",
216-
exceptions.size(),
217-
futures.length),
218-
exceptions);
219-
})
220-
);
188+
Function<Void, List<V>> resultPath = x -> {
189+
List<V> results = new ArrayList<>(futures.length);
190+
191+
// All futures are expected to be completed at this point
192+
for (CompletableFuture<V> cf : futures) {
193+
try {
194+
results.add(cf.get());
195+
} catch (Exception ex) {
196+
results.add(null);
197+
}
198+
}
199+
return results;
200+
};
201+
202+
Function<Throwable, ? extends List<V>> exceptionPath = throwable -> {
203+
ArrayList<Exception> exceptions = new ArrayList<>(futures.length);
204+
for (CompletableFuture<V> cf : futures) {
205+
try {
206+
cf.get();
207+
} catch (ExecutionException ex) {
208+
exceptions.add((Exception) ex.getCause());
209+
} catch (Exception ex) {
210+
exceptions.add(ex);
211+
}
212+
}
213+
throw new CompositeTaskFailedException(
214+
String.format(
215+
"%d out of %d tasks failed with an exception. See the exceptions list for details.",
216+
exceptions.size(),
217+
futures.length),
218+
exceptions);
219+
};
220+
CompletableFuture<List<V>> future = CompletableFuture.allOf(futures)
221+
.thenApply(resultPath)
222+
.exceptionally(exceptionPath);
223+
224+
return new CompoundTask<>(tasks, future);
221225
}
222226

223227
@Override
@@ -228,7 +232,7 @@ public Task<Task<?>> anyOf(List<Task<?>> tasks) {
228232
.map(t -> t.future)
229233
.toArray((IntFunction<CompletableFuture<?>[]>) CompletableFuture[]::new);
230234

231-
return new CompletableTask<>(CompletableFuture.anyOf(futures).thenApply(x -> {
235+
CompletableFuture<Task<?>> future = CompletableFuture.anyOf(futures).thenApply(x -> {
232236
// Return the first completed task in the list. Unlike the implementation in other languages,
233237
// this might not necessarily be the first task that completed, so calling code shouldn't make
234238
// assumptions about this. Note that changing this behavior later could be breaking.
@@ -240,7 +244,9 @@ public Task<Task<?>> anyOf(List<Task<?>> tasks) {
240244

241245
// Should never get here
242246
return completedTask(null);
243-
}));
247+
});
248+
249+
return new CompoundTask(tasks, future);
244250
}
245251

246252
@Override
@@ -971,9 +977,12 @@ private class RetriableTask<V> extends CompletableTask<V> {
971977
private final Instant firstAttempt;
972978
private final TaskFactory<V> taskFactory;
973979

974-
private int attemptNumber;
975980
private FailureDetails lastFailure;
976981
private Duration totalRetryTime;
982+
private Instant startTime;
983+
private int attemptNumber;
984+
private Task<V> childTask;
985+
977986

978987
public RetriableTask(TaskOrchestrationContext context, TaskFactory<V> taskFactory, RetryPolicy policy) {
979988
this(context, taskFactory, policy, null);
@@ -988,45 +997,88 @@ private RetriableTask(
988997
TaskFactory<V> taskFactory,
989998
@Nullable RetryPolicy retryPolicy,
990999
@Nullable RetryHandler retryHandler) {
991-
super(new CompletableFuture<>());
9921000
this.context = context;
9931001
this.taskFactory = taskFactory;
9941002
this.policy = retryPolicy;
9951003
this.handler = retryHandler;
9961004
this.firstAttempt = context.getCurrentInstant();
9971005
this.totalRetryTime = Duration.ZERO;
1006+
this.createChildTask(taskFactory);
9981007
}
9991008

1000-
@Override
1001-
public V await() {
1002-
Instant startTime = this.context.getCurrentInstant();
1003-
while (true) {
1004-
Task<V> currentTask = this.taskFactory.create();
1009+
// Every RetriableTask will have a CompletableTask as a child task.
1010+
private void createChildTask(TaskFactory<V> taskFactory) {
1011+
CompletableTask<V> childTask = (CompletableTask<V>) taskFactory.create();
1012+
this.setChildTask(childTask);
1013+
childTask.setParentTask(this);
1014+
}
10051015

1006-
this.attemptNumber++;
1016+
public void setChildTask(Task<V> childTask) {
1017+
this.childTask = childTask;
1018+
}
10071019

1008-
try {
1009-
return currentTask.await();
1010-
} catch (TaskFailedException ex) {
1011-
this.lastFailure = ex.getErrorDetails();
1012-
if (!this.shouldRetry()) {
1013-
throw ex;
1014-
}
1020+
public Task<V> getChildTask() {
1021+
return this.childTask;
1022+
}
10151023

1016-
// Overflow/runaway retry protection
1017-
if (this.attemptNumber == Integer.MAX_VALUE) {
1018-
throw ex;
1019-
}
1020-
}
1024+
void handleChildSuccess(V result) {
1025+
this.complete(result);
1026+
}
10211027

1022-
Duration delay = this.getNextDelay();
1023-
if (!delay.isZero() && !delay.isNegative()) {
1024-
// Use a durable timer to create the delay between retries
1025-
this.context.createTimer(delay).await();
1026-
}
1028+
void handleChildException(Throwable ex) {
1029+
tryRetry((TaskFailedException) ex);
1030+
}
10271031

1028-
this.totalRetryTime = Duration.between(startTime, this.context.getCurrentInstant());
1032+
void init() {
1033+
this.startTime = this.startTime == null ? this.context.getCurrentInstant() : this.startTime;
1034+
this.attemptNumber++;
1035+
}
1036+
1037+
public void tryRetry(TaskFailedException ex) {
1038+
this.lastFailure = ex.getErrorDetails();
1039+
if (!this.shouldRetry()) {
1040+
this.completeExceptionally(ex);
1041+
return;
1042+
}
1043+
1044+
// Overflow/runaway retry protection
1045+
if (this.attemptNumber == Integer.MAX_VALUE) {
1046+
this.completeExceptionally(ex);
1047+
return;
10291048
}
1049+
1050+
Duration delay = this.getNextDelay();
1051+
if (!delay.isZero() && !delay.isNegative()) {
1052+
// Use a durable timer to create the delay between retries
1053+
this.context.createTimer(delay).await();
1054+
}
1055+
1056+
this.totalRetryTime = Duration.between(this.startTime, this.context.getCurrentInstant());
1057+
this.createChildTask(this.taskFactory);
1058+
this.await();
1059+
}
1060+
1061+
@Override
1062+
public V await() {
1063+
this.init();
1064+
// when awaiting the first child task, we will continue iterating over the history until a result is found
1065+
// for that task. If the result is an exception, the child task will invoke "handleChildException" on this
1066+
// object, which awaits a timer, *re-sets the current child task to correspond to a retry of this task*,
1067+
// and then awaits that child.
1068+
// This logic continues until either the operation succeeds, or are our retry quota is met.
1069+
// At that point, we break the `await()` on the child task.
1070+
// Therefore, once we return from the following `await`,
1071+
// we just need to await again on the *current* child task to obtain the result of this task
1072+
try{
1073+
this.getChildTask().await();
1074+
} catch (OrchestratorBlockedException ex) {
1075+
throw ex;
1076+
} catch (Exception ignored) {
1077+
// ignore the exception from previous child tasks.
1078+
// Only needs to return result from the last child task, which is on next line.
1079+
}
1080+
// Always return the last child task result.
1081+
return this.getChildTask().await();
10301082
}
10311083

10321084
private boolean shouldRetry() {
@@ -1101,7 +1153,30 @@ private Duration getNextDelay() {
11011153
}
11021154
}
11031155

1156+
private class CompoundTask<V, U> extends CompletableTask<U> {
1157+
1158+
List<Task<V>> subTasks;
1159+
1160+
CompoundTask(List<Task<V>> subtasks, CompletableFuture<U> future) {
1161+
super(future);
1162+
this.subTasks = subtasks;
1163+
}
1164+
1165+
@Override
1166+
public U await() {
1167+
this.initSubTasks();
1168+
return super.await();
1169+
}
1170+
1171+
private void initSubTasks() {
1172+
for (Task<V> subTask : this.subTasks) {
1173+
if (subTask instanceof RetriableTask) ((RetriableTask<V>)subTask).init();
1174+
}
1175+
}
1176+
}
1177+
11041178
private class CompletableTask<V> extends Task<V> {
1179+
private Task<V> parentTask;
11051180

11061181
public CompletableTask() {
11071182
this(new CompletableFuture<>());
@@ -1111,6 +1186,14 @@ public CompletableTask() {
11111186
super(future);
11121187
}
11131188

1189+
public void setParentTask(Task<V> parentTask) {
1190+
this.parentTask = parentTask;
1191+
}
1192+
1193+
public Task<V> getParentTask() {
1194+
return this.parentTask;
1195+
}
1196+
11141197
@Override
11151198
public V await() {
11161199
do {
@@ -1168,15 +1251,27 @@ public boolean isDone() {
11681251
}
11691252

11701253
public boolean complete(V value) {
1171-
return this.future.complete(value);
1254+
Task<V> parentTask = this.getParentTask();
1255+
boolean result = this.future.complete(value);
1256+
if (parentTask instanceof RetriableTask) {
1257+
// notify parent task
1258+
((RetriableTask<V>) parentTask).handleChildSuccess(value);
1259+
}
1260+
return result;
11721261
}
11731262

11741263
private boolean cancel() {
11751264
return this.future.cancel(true);
11761265
}
11771266

11781267
public boolean completeExceptionally(Throwable ex) {
1179-
return this.future.completeExceptionally(ex);
1268+
Task<V> parentTask = this.getParentTask();
1269+
boolean result = this.future.completeExceptionally(ex);
1270+
if (parentTask instanceof RetriableTask) {
1271+
// notify parent task
1272+
((RetriableTask<V>) parentTask).handleChildException(ex);
1273+
}
1274+
return result;
11801275
}
11811276
}
11821277
}

0 commit comments

Comments
 (0)