Skip to content

Commit 921f5ff

Browse files
Throw CompositeTaskFailedException for context.allOf (#86)
* Throw CompositeTaskFailedException for context.allOf * Updated ctx.allOf() to throw CompositeTaskFailedException * Added detailed exception * Merge changes * Fixed Java doc * Catch TaskFailedException from ExecutionException * Updated exception message
1 parent 6f3ad53 commit 921f5ff

File tree

5 files changed

+164
-20
lines changed

5 files changed

+164
-20
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
### New
44

55
* Add CHANGELOG.md file to track changes across versions
6+
* context.allOf() throws CompositeTaskFailedException(RuntimeException) when one or more tasks fail ([#54](https://github.com/microsoft/durabletask-java/issues/54))
7+
68

79
### Updates
810

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
8+
/**
9+
* Exception that gets thrown when multiple {@link Task}s for an activity or sub-orchestration fails with an
10+
* unhandled exception.
11+
* <p>
12+
* Detailed information associated with each task failure can be retrieved using the {@link #getExceptions()}
13+
* method.
14+
*/
15+
public class CompositeTaskFailedException extends RuntimeException {
16+
private final List<Exception> exceptions;
17+
18+
CompositeTaskFailedException() {
19+
this.exceptions = new ArrayList<>();
20+
}
21+
22+
CompositeTaskFailedException(List<Exception> exceptions) {
23+
this.exceptions = exceptions;
24+
}
25+
26+
CompositeTaskFailedException(String message, List<Exception> exceptions) {
27+
super(message);
28+
this.exceptions = exceptions;
29+
}
30+
31+
CompositeTaskFailedException(String message, Throwable cause, List<Exception> exceptions) {
32+
super(message, cause);
33+
this.exceptions = exceptions;
34+
}
35+
36+
CompositeTaskFailedException(Throwable cause, List<Exception> exceptions) {
37+
super(cause);
38+
this.exceptions = exceptions;
39+
}
40+
41+
CompositeTaskFailedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace, List<Exception> exceptions) {
42+
super(message, cause, enableSuppression, writableStackTrace);
43+
this.exceptions = exceptions;
44+
}
45+
46+
/**
47+
* Gets a list of exceptions that occurred during execution of a group of {@link Task}
48+
* These exceptions include details of the task failure and exception information
49+
*
50+
* @return a list of exceptions
51+
*/
52+
public List<Exception> getExceptions() {
53+
return new ArrayList<>(this.exceptions);
54+
}
55+
56+
}

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public interface TaskOrchestrationContext {
6161
// https://github.com/microsoft/durabletask-java/issues/54
6262
/**
6363
* Returns a new {@code Task} that is completed when all the given {@code Task}s complete. If any of the given
64-
* {@code Task}s complete with an exception, the returned {@code Task} will also complete with an exception
64+
* {@code Task}s complete with an exception, the returned {@code Task} will also complete with an {@link CompositeTaskFailedException}
6565
* containing details of the first encountered failure. The value of the returned {@code Task} is an ordered list of
6666
* the return values of the given tasks. If no tasks are provided, returns a {@code Task} completed with value
6767
* {@code null}.
@@ -76,6 +76,16 @@ public interface TaskOrchestrationContext {
7676
* List<String> orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
7777
* }</pre>
7878
*
79+
* Exceptions in any of the given tasks results in an unchecked {@link CompositeTaskFailedException}.
80+
* This exception can be inspected to obtain failure details of individual {@link Task}s.
81+
* <pre>{@code
82+
* try {
83+
* List<String> orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
84+
* } catch (CompositeTaskFailedException e) {
85+
* List<Exception> exceptions = e.getExceptions()
86+
* }
87+
* }</pre>
88+
*
7989
* @param tasks the list of {@code Task} objects
8090
* @param <V> the return type of the {@code Task} objects
8191
* @return the values of the completed {@code Task} objects in the same order as the source list

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
import java.util.concurrent.CancellationException;
1515
import java.util.concurrent.CompletableFuture;
1616
import java.util.concurrent.ExecutionException;
17-
import java.util.function.Consumer;
18-
import java.util.function.Function;
1917
import java.util.function.IntFunction;
2018
import java.util.logging.Logger;
2119

@@ -173,23 +171,39 @@ public <V> Task<List<V>> allOf(List<Task<V>> tasks) {
173171
.map(t -> t.future)
174172
.toArray((IntFunction<CompletableFuture<V>[]>) CompletableFuture[]::new);
175173

176-
return new CompletableTask<>(CompletableFuture.allOf(futures).thenApply(x -> {
177-
ArrayList<V> results = new ArrayList<>(futures.length);
178-
179-
// All futures are expected to be completed at this point
180-
for (CompletableFuture<V> cf : futures) {
181-
try {
182-
results.add(cf.get());
183-
} catch (Exception ex) {
184-
// TODO: Better exception message than this
185-
// TODO: This needs to be a TaskFailedException or some other documented exception type.
186-
// https://github.com/microsoft/durabletask-java/issues/54
187-
throw new RuntimeException("One or more tasks failed.", ex);
188-
}
189-
}
190-
191-
return results;
192-
}));
174+
return new CompletableTask<>(CompletableFuture.allOf(futures)
175+
.thenApply(x -> {
176+
List<V> results = new ArrayList<>(futures.length);
177+
178+
// All futures are expected to be completed at this point
179+
for (CompletableFuture<V> cf : futures) {
180+
try {
181+
results.add(cf.get());
182+
} catch (Exception ex) {
183+
results.add(null);
184+
}
185+
}
186+
return results;
187+
})
188+
.exceptionally(throwable -> {
189+
ArrayList<Exception> exceptions = new ArrayList<>(futures.length);
190+
for (CompletableFuture<V> cf : futures) {
191+
try {
192+
cf.get();
193+
} catch (ExecutionException ex) {
194+
exceptions.add((Exception) ex.getCause());
195+
} catch (Exception ex){
196+
exceptions.add(ex);
197+
}
198+
}
199+
throw new CompositeTaskFailedException(
200+
String.format(
201+
"%d out of %d tasks failed with an exception. See the exceptions list for details.",
202+
exceptions.size(),
203+
futures.length),
204+
exceptions);
205+
})
206+
);
193207
}
194208

195209
@Override
@@ -1037,6 +1051,10 @@ protected void handleException(Throwable e) throws TaskFailedException {
10371051
throw (TaskFailedException)e;
10381052
}
10391053

1054+
if (e instanceof CompositeTaskFailedException) {
1055+
throw (CompositeTaskFailedException)e;
1056+
}
1057+
10401058
throw new RuntimeException("Unexpected failure in the task execution", e);
10411059
}
10421060

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.*;
99
import java.util.concurrent.TimeUnit;
1010
import java.util.concurrent.TimeoutException;
11+
import java.util.concurrent.ExecutionException;
1112
import java.util.stream.Collectors;
1213
import java.util.stream.IntStream;
1314
import java.util.stream.Stream;
@@ -840,4 +841,61 @@ void waitForInstanceCompletionThrowsException() {
840841
assertThrows(TimeoutException.class, () -> client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(2), false));
841842
}
842843
}
844+
845+
@Test
846+
void activityFanOutWithException() throws TimeoutException {
847+
final String orchestratorName = "ActivityFanOut";
848+
final String activityName = "Divide";
849+
final int count = 10;
850+
final String exceptionMessage = "2 out of 6 tasks failed with an exception. See the exceptions list for details.";
851+
852+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
853+
.addOrchestrator(orchestratorName, ctx -> {
854+
// Schedule each task to run in parallel
855+
List<Task<Integer>> parallelTasks = IntStream.of(1,2,0,4,0,6)
856+
.mapToObj(i -> ctx.callActivity(activityName, i, Integer.class))
857+
.collect(Collectors.toList());
858+
859+
// Wait for all tasks to complete
860+
try {
861+
List<Integer> results = ctx.allOf(parallelTasks).await();
862+
ctx.complete(results);
863+
}catch (CompositeTaskFailedException e){
864+
assertNotNull(e);
865+
assertEquals(2, e.getExceptions().size());
866+
assertEquals(TaskFailedException.class, e.getExceptions().get(0).getClass());
867+
assertEquals(TaskFailedException.class, e.getExceptions().get(1).getClass());
868+
// taskId in the exception below is based on parallelTasks input
869+
assertEquals(getExceptionMessage(activityName, 2, "/ by zero"), e.getExceptions().get(0).getMessage());
870+
assertEquals(getExceptionMessage(activityName, 4, "/ by zero"), e.getExceptions().get(1).getMessage());
871+
throw e;
872+
}
873+
})
874+
.addActivity(activityName, ctx -> count / ctx.getInput(Integer.class))
875+
.buildAndStart();
876+
877+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
878+
try (worker; client) {
879+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0);
880+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
881+
assertNotNull(instance);
882+
assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus());
883+
884+
List<?> output = instance.readOutputAs(List.class);
885+
assertNull(output);
886+
887+
FailureDetails details = instance.getFailureDetails();
888+
assertNotNull(details);
889+
assertEquals(exceptionMessage, details.getErrorMessage());
890+
assertEquals("com.microsoft.durabletask.CompositeTaskFailedException", details.getErrorType());
891+
assertNotNull(details.getStackTrace());
892+
}
893+
}
894+
private static String getExceptionMessage(String taskName, int expectedTaskId, String expectedExceptionMessage) {
895+
return String.format(
896+
"Task '%s' (#%d) failed with an unhandled exception: %s",
897+
taskName,
898+
expectedTaskId,
899+
expectedExceptionMessage);
900+
}
843901
}

0 commit comments

Comments
 (0)