Skip to content

Commit 6395485

Browse files
authored
Fix exception type when use RetriableTask in Fan in/out pattern (#174)
* fix exception type when use RetriableTask in Fan in/out pattern * update CHANGELOG * rethrow OrchestratorBlockedException and ContinueAsNewInterruption * minor updates
1 parent e938278 commit 6395485

File tree

5 files changed

+130
-2
lines changed

5 files changed

+130
-2
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## placeholder
2+
* Fix exception type issue when using `RetriableTask` in fan in/out pattern ([#174](https://github.com/microsoft/durabletask-java/pull/174))
3+
4+
15
## v1.4.0
26

37
### Updates

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1261,7 +1261,7 @@ public V await() {
12611261
this.handleException(e);
12621262
}
12631263
}
1264-
} while (ContextImplTask.this.processNextEvent());
1264+
} while (processNextEvent());
12651265

12661266
// There's no more history left to replay and the current task is still not completed. This is normal.
12671267
// The OrchestratorBlockedException exception allows us to yield the current thread back to the executor so
@@ -1271,6 +1271,22 @@ public V await() {
12711271
"The orchestrator is blocked and waiting for new inputs. This Throwable should never be caught by user code.");
12721272
}
12731273

1274+
private boolean processNextEvent() {
1275+
try {
1276+
return ContextImplTask.this.processNextEvent();
1277+
} catch (OrchestratorBlockedException | ContinueAsNewInterruption exception) {
1278+
throw exception;
1279+
} catch (Exception e) {
1280+
// ignore
1281+
/**
1282+
* We ignore the exception. Any Durable Task exceptions thrown here can be obtained when calling
1283+
* {code#future.get()} in the implementation of 'await'. We defer to that loop to handle the exception.
1284+
*/
1285+
}
1286+
// Any exception happen we return true so that we will enter to the do-while block for the last time.
1287+
return true;
1288+
}
1289+
12741290
@Override
12751291
public <U> CompletableTask<U> thenApply(Function<V, U> fn) {
12761292
CompletableFuture<U> newFuture = this.future.thenApply(fn);

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,68 @@ void activityAllOf() throws IOException, TimeoutException {
13741374
}
13751375
}
13761376

1377+
@Test
1378+
void activityAllOfException() throws IOException, TimeoutException {
1379+
final String orchestratorName = "ActivityAllOf";
1380+
final String activityName = "ToString";
1381+
final String retryActivityName = "RetryToStringException";
1382+
final String result = "test fail";
1383+
final int activityMiddle = 5;
1384+
final RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5));
1385+
final TaskOptions taskOptions = new TaskOptions(retryPolicy);
1386+
1387+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
1388+
.addOrchestrator(orchestratorName, ctx -> {
1389+
List<Task<String>> parallelTasks = IntStream.range(0, activityMiddle * 2)
1390+
.mapToObj(i -> {
1391+
if (i < activityMiddle) {
1392+
return ctx.callActivity(activityName, i, String.class);
1393+
} else {
1394+
return ctx.callActivity(retryActivityName, i, taskOptions, String.class);
1395+
}
1396+
})
1397+
.collect(Collectors.toList());
1398+
1399+
// Wait for all tasks to complete, then sort and reverse the results
1400+
try {
1401+
List<String> results = null;
1402+
results = ctx.allOf(parallelTasks).await();
1403+
Collections.sort(results);
1404+
Collections.reverse(results);
1405+
ctx.complete(results);
1406+
} catch (CompositeTaskFailedException e) {
1407+
// only catch this type of exception to ensure the expected type of exception is thrown out.
1408+
for (Exception exception : e.getExceptions()) {
1409+
if (exception instanceof TaskFailedException) {
1410+
TaskFailedException taskFailedException = (TaskFailedException) exception;
1411+
System.out.println("Task: " + taskFailedException.getTaskName() +
1412+
" Failed for cause: " + taskFailedException.getErrorDetails().getErrorMessage());
1413+
}
1414+
}
1415+
}
1416+
ctx.complete(result);
1417+
})
1418+
.addActivity(activityName, ctx -> ctx.getInput(Object.class).toString())
1419+
.addActivity(retryActivityName, ctx -> {
1420+
// only throw exception
1421+
throw new RuntimeException("test retry");
1422+
})
1423+
.buildAndStart();
1424+
1425+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
1426+
try (worker; client) {
1427+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0);
1428+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
1429+
assertNotNull(instance);
1430+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
1431+
1432+
String output = instance.readOutputAs(String.class);
1433+
assertNotNull(output);
1434+
assertEquals(String.class, output.getClass());
1435+
assertEquals(result, output);
1436+
}
1437+
}
1438+
13771439
@Test
13781440
void activityAnyOf() throws IOException, TimeoutException {
13791441
final String orchestratorName = "ActivityAnyOf";

samples-azure-functions/src/main/java/com/functions/ParallelFunctions.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,49 @@ public Object parallelAnyOf(
8989
tasks.add(ctx.callActivity("AppendHappy", 1, Integer.class));
9090
return ctx.anyOf(tasks).await().await();
9191
}
92+
93+
@FunctionName("StartParallelCatchException")
94+
public HttpResponseMessage startParallelCatchException(
95+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
96+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
97+
final ExecutionContext context) {
98+
context.getLogger().info("Java HTTP trigger processed a request.");
99+
100+
DurableTaskClient client = durableContext.getClient();
101+
String instanceId = client.scheduleNewOrchestrationInstance("ParallelCatchException");
102+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
103+
return durableContext.createCheckStatusResponse(request, instanceId);
104+
}
105+
106+
@FunctionName("ParallelCatchException")
107+
public List<String> parallelCatchException(
108+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx,
109+
ExecutionContext context) {
110+
try {
111+
List<Task<String>> tasks = new ArrayList<>();
112+
RetryPolicy policy = new RetryPolicy(2, Duration.ofSeconds(1));
113+
TaskOptions options = new TaskOptions(policy);
114+
tasks.add(ctx.callActivity("AlwaysException", "Input1", options, String.class));
115+
tasks.add(ctx.callActivity("AppendHappy", "Input2", options, String.class));
116+
return ctx.allOf(tasks).await();
117+
} catch (CompositeTaskFailedException e) {
118+
// only catch this type of exception to ensure the expected type of exception is thrown out.
119+
for (Exception exception : e.getExceptions()) {
120+
if (exception instanceof TaskFailedException) {
121+
TaskFailedException taskFailedException = (TaskFailedException) exception;
122+
context.getLogger().info("Task: " + taskFailedException.getTaskName() +
123+
" Failed for cause: " + taskFailedException.getErrorDetails().getErrorMessage());
124+
}
125+
}
126+
}
127+
return null;
128+
}
129+
130+
@FunctionName("AlwaysException")
131+
public String alwaysException(
132+
@DurableActivityTrigger(name = "name") String name,
133+
final ExecutionContext context) {
134+
context.getLogger().info("Throw Test AlwaysException: " + name);
135+
throw new RuntimeException("Test AlwaysException");
136+
}
92137
}

samples-azure-functions/src/test/java/com/functions/EndToEndTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public void setupHost() {
3333
@ValueSource(strings = {
3434
"StartOrchestration",
3535
"StartParallelOrchestration",
36-
"StartParallelAnyOf"
36+
"StartParallelAnyOf",
37+
"StartParallelCatchException"
3738
})
3839
public void generalFunctions(String functionName) throws InterruptedException {
3940
Set<String> continueStates = new HashSet<>();

0 commit comments

Comments
 (0)