Skip to content

Commit b3e2d3a

Browse files
authored
Fix swallowed exception - Improve output exception message (microsoft#185)
* fix issue 184 - improve output exception message update changelog fix same issue for handleTaskCompleted, handleSubOrchestrationCompleted improve the example and e2e tests * add deserialize erro test - add POJO input sample * quick test * improve test method name * improve samples
1 parent ad8c959 commit b3e2d3a

File tree

10 files changed

+361
-27
lines changed

10 files changed

+361
-27
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## placeholder
2+
* Fix infinite loop when use continueasnew after wait external event ([#183](https://github.com/microsoft/durabletask-java/pull/183))
3+
* Fix the issue "Deserialize Exception got swallowed when use anyOf with external event." ([#185](https://github.com/microsoft/durabletask-java/pull/185))
4+
15
## v1.5.0
26
* Fix exception type issue when using `RetriableTask` in fan in/out pattern ([#174](https://github.com/microsoft/durabletask-java/pull/174))
37
* Add implementation to generate name-based deterministic UUID ([#176](https://github.com/microsoft/durabletask-java/pull/176))

azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
import com.microsoft.azure.functions.internal.spi.middleware.Middleware;
1010
import com.microsoft.azure.functions.internal.spi.middleware.MiddlewareChain;
1111
import com.microsoft.azure.functions.internal.spi.middleware.MiddlewareContext;
12+
import com.microsoft.durabletask.CompositeTaskFailedException;
13+
import com.microsoft.durabletask.DataConverter;
1214
import com.microsoft.durabletask.OrchestrationRunner;
15+
import com.microsoft.durabletask.TaskFailedException;
1316
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
1417
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
1518

@@ -48,6 +51,23 @@ public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exce
4851
if (cause instanceof ContinueAsNewInterruption) {
4952
throw (ContinueAsNewInterruption) cause;
5053
}
54+
// Below types of exception are raised by the client sdk, they data should be correctly pass back to
55+
// durable function host. We need to cast them to the correct type so later when build the FailureDetails
56+
// the correct exception data can be saved and pass back.
57+
if (cause instanceof TaskFailedException) {
58+
throw (TaskFailedException) cause;
59+
}
60+
61+
if (cause instanceof CompositeTaskFailedException) {
62+
throw (CompositeTaskFailedException) cause;
63+
}
64+
65+
if (cause instanceof DataConverter.DataConverterException) {
66+
throw (DataConverter.DataConverterException) cause;
67+
}
68+
// e will be InvocationTargetException as using reflection, so we wrap it into a RuntimeException, so it
69+
// won't change the current OrchestratorFunction API. We cannot throw the cause which is a Throwable, it
70+
// requires update on OrchestratorFunction API.
5171
throw new RuntimeException("Unexpected failure in the task execution", e);
5272
}
5373
});

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.function.Function;
2323
import java.util.function.IntFunction;
2424
import java.util.logging.Logger;
25-
import java.util.stream.Collectors;
2625

2726
final class TaskOrchestrationExecutor {
2827

@@ -511,10 +510,13 @@ private void handleTaskCompleted(HistoryEvent e) {
511510
rawResult != null ? rawResult : "(null)"));
512511

513512
}
514-
515-
Object result = this.dataConverter.deserialize(rawResult, record.getDataType());
516513
CompletableTask task = record.getTask();
517-
task.complete(result);
514+
try {
515+
Object result = this.dataConverter.deserialize(rawResult, record.getDataType());
516+
task.complete(result);
517+
} catch (Exception ex) {
518+
task.completeExceptionally(ex);
519+
}
518520
}
519521

520522
private void handleTaskFailed(HistoryEvent e) {
@@ -558,11 +560,15 @@ private void handleEventRaised(HistoryEvent e) {
558560
this.outstandingEvents.remove(eventName);
559561
}
560562
String rawResult = eventRaised.getInput().getValue();
561-
Object result = this.dataConverter.deserialize(
562-
rawResult,
563-
matchingTaskRecord.getDataType());
564563
CompletableTask task = matchingTaskRecord.getTask();
565-
task.complete(result);
564+
try {
565+
Object result = this.dataConverter.deserialize(
566+
rawResult,
567+
matchingTaskRecord.getDataType());
568+
task.complete(result);
569+
} catch (Exception ex) {
570+
task.completeExceptionally(ex);
571+
}
566572
}
567573

568574
private void handleEventWhileSuspended (HistoryEvent historyEvent){
@@ -694,10 +700,13 @@ private void handleSubOrchestrationCompleted(HistoryEvent e) {
694700
rawResult != null ? rawResult : "(null)"));
695701

696702
}
697-
698-
Object result = this.dataConverter.deserialize(rawResult, record.getDataType());
699703
CompletableTask task = record.getTask();
700-
task.complete(result);
704+
try {
705+
Object result = this.dataConverter.deserialize(rawResult, record.getDataType());
706+
task.complete(result);
707+
} catch (Exception ex) {
708+
task.completeExceptionally(ex);
709+
}
701710
}
702711

703712
private void handleSubOrchestrationFailed(HistoryEvent e){
@@ -1331,6 +1340,10 @@ protected void handleException(Throwable e) {
13311340
throw (CompositeTaskFailedException)e;
13321341
}
13331342

1343+
if (e instanceof DataConverter.DataConverterException) {
1344+
throw (DataConverter.DataConverterException)e;
1345+
}
1346+
13341347
throw new RuntimeException("Unexpected failure in the task execution", e);
13351348
}
13361349

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package com.functions;
2+
3+
import com.microsoft.azure.functions.ExecutionContext;
4+
import com.microsoft.azure.functions.HttpMethod;
5+
import com.microsoft.azure.functions.HttpRequestMessage;
6+
import com.microsoft.azure.functions.HttpResponseMessage;
7+
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
8+
import com.microsoft.azure.functions.annotation.FunctionName;
9+
import com.microsoft.azure.functions.annotation.HttpTrigger;
10+
import com.microsoft.durabletask.DurableTaskClient;
11+
import com.microsoft.durabletask.Task;
12+
import com.microsoft.durabletask.TaskOrchestrationContext;
13+
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
14+
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
15+
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;
16+
17+
import java.util.Optional;
18+
19+
public class DeserializeErrorTest {
20+
@FunctionName("DeserializeErrorHttp")
21+
public HttpResponseMessage deserializeErrorHttp(
22+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
23+
HttpRequestMessage<Optional<String>> request,
24+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
25+
final ExecutionContext context) {
26+
context.getLogger().info("Java HTTP trigger processed a request.");
27+
28+
DurableTaskClient client = durableContext.getClient();
29+
String instanceId = client.scheduleNewOrchestrationInstance("DeserializeErrorOrchestrator");
30+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
31+
return durableContext.createCheckStatusResponse(request, instanceId);
32+
}
33+
34+
@FunctionName("DeserializeErrorOrchestrator")
35+
public String deserializeErrorOrchestrator(
36+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
37+
// cause deserialize error
38+
Person result = ctx.callActivity("Capitalize", "Austin", Person.class).await();
39+
return result.getName();
40+
}
41+
42+
@FunctionName("SubCompletedErrorHttp")
43+
public HttpResponseMessage subCompletedErrorHttp(
44+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
45+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
46+
final ExecutionContext context) {
47+
context.getLogger().info("Java HTTP trigger processed a request.");
48+
49+
DurableTaskClient client = durableContext.getClient();
50+
String instanceId = client.scheduleNewOrchestrationInstance("CompletedErrorOrchestrator");
51+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
52+
return durableContext.createCheckStatusResponse(request, instanceId);
53+
}
54+
55+
@FunctionName("CompletedErrorOrchestrator")
56+
public String completedErrorOrchestrator(
57+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
58+
// cause deserialize issue
59+
Person result = ctx.callSubOrchestrator("CompletedErrorSubOrchestrator", "Austin", Person.class).await();
60+
return result.getName();
61+
}
62+
63+
@FunctionName("CompletedErrorSubOrchestrator")
64+
public String completedErrorSubOrchestrator(
65+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
66+
return "test";
67+
}
68+
69+
@FunctionName("ExternalEventHttp")
70+
public HttpResponseMessage externalEventHttp(
71+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
72+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
73+
final ExecutionContext context) {
74+
context.getLogger().info("Java HTTP trigger processed a request.");
75+
76+
DurableTaskClient client = durableContext.getClient();
77+
String instanceId = client.scheduleNewOrchestrationInstance("ExternalEventActivity");
78+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
79+
return durableContext.createCheckStatusResponse(request, instanceId);
80+
}
81+
82+
@FunctionName("ExternalEventActivity")
83+
public void externalEventActivity(@DurableOrchestrationTrigger(name = "runtimeState") TaskOrchestrationContext ctx)
84+
{
85+
System.out.println("Waiting external event...");
86+
Task<String> event = ctx.waitForExternalEvent("event", String.class);
87+
Task<?> result = ctx.anyOf(event).await();
88+
Object input = result.await();
89+
System.out.println(input);
90+
}
91+
92+
static class Person {
93+
String name;
94+
95+
public String getName() {
96+
return name;
97+
}
98+
99+
public void setName(String name) {
100+
this.name = name;
101+
}
102+
}
103+
}

endtoendtests/src/test/java/com/functions/EndToEndTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,53 @@ public void suspendResume() throws InterruptedException {
215215
assertTrue(completed);
216216
}
217217

218+
@Test
219+
public void externalEventDeserializeFail() throws InterruptedException {
220+
String startOrchestrationPath = "api/ExternalEventHttp";
221+
Response response = post(startOrchestrationPath);
222+
JsonPath jsonPath = response.jsonPath();
223+
String sendEventPostUri = jsonPath.get("sendEventPostUri");
224+
sendEventPostUri = sendEventPostUri.replace("{eventName}", "event");
225+
226+
String requestBody = "{\"value\":\"Test\"}";
227+
RestAssured
228+
.given()
229+
.contentType(ContentType.JSON) // Set the request content type
230+
.body(requestBody) // Set the request body
231+
.post(sendEventPostUri)
232+
.then()
233+
.statusCode(202);
234+
// assert orchestration status
235+
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
236+
boolean completed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10));
237+
assertTrue(completed);
238+
239+
// assert exception message
240+
Response resp = get(statusQueryGetUri);
241+
String errorMessage = resp.jsonPath().get("output");
242+
assertTrue(errorMessage.contains("Failed to deserialize the JSON text to java.lang.String"));
243+
}
244+
245+
@ParameterizedTest
246+
@ValueSource(strings = {
247+
"DeserializeErrorHttp",
248+
"SubCompletedErrorHttp"
249+
})
250+
public void DeserializeFail(String functionName) throws InterruptedException {
251+
String startOrchestrationPath = "api/" + functionName;
252+
Response response = post(startOrchestrationPath);
253+
JsonPath jsonPath = response.jsonPath();
254+
// assert orchestration status
255+
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
256+
boolean completed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10));
257+
assertTrue(completed);
258+
259+
// assert exception message
260+
Response resp = get(statusQueryGetUri);
261+
String errorMessage = resp.jsonPath().get("output");
262+
assertTrue(errorMessage.contains("Failed to deserialize the JSON text"));
263+
}
264+
218265
private boolean pollingCheck(String statusQueryGetUri,
219266
String expectedState,
220267
Set<String> continueStates,

samples-azure-functions/src/main/java/com/functions/AzureFunctions.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.functions;
22

3+
import com.functions.model.Person;
34
import com.microsoft.azure.functions.annotation.*;
45
import com.microsoft.azure.functions.*;
56
import java.util.*;
@@ -55,4 +56,29 @@ public String capitalize(
5556
context.getLogger().info("Capitalizing: " + name);
5657
return name.toUpperCase();
5758
}
59+
60+
// Orchestration with POJO input
61+
@FunctionName("StartOrchestrationPOJO")
62+
public HttpResponseMessage startOrchestrationPOJO(
63+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
64+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
65+
final ExecutionContext context) {
66+
context.getLogger().info("Java HTTP trigger processed a request.");
67+
68+
Person person = new Person();
69+
person.setName("testname");
70+
71+
DurableTaskClient client = durableContext.getClient();
72+
String instanceId = client.scheduleNewOrchestrationInstance("Person", person);
73+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
74+
return durableContext.createCheckStatusResponse(request, instanceId);
75+
}
76+
77+
@FunctionName("Person")
78+
public Person personOrchestrator(
79+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
80+
Person person = ctx.getInput(Person.class);
81+
person.setName(ctx.callActivity("Capitalize", person.getName(), String.class).await());
82+
return person;
83+
}
5884
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.functions;
2+
3+
import com.functions.model.Person;
4+
import com.microsoft.azure.functions.ExecutionContext;
5+
import com.microsoft.azure.functions.HttpMethod;
6+
import com.microsoft.azure.functions.HttpRequestMessage;
7+
import com.microsoft.azure.functions.HttpResponseMessage;
8+
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
9+
import com.microsoft.azure.functions.annotation.FunctionName;
10+
import com.microsoft.azure.functions.annotation.HttpTrigger;
11+
import com.microsoft.durabletask.DurableTaskClient;
12+
import com.microsoft.durabletask.TaskOrchestrationContext;
13+
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
14+
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
15+
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;
16+
17+
import java.util.Optional;
18+
19+
public class SubOrchestrator {
20+
21+
@FunctionName("SubOrchestratorHttp")
22+
public HttpResponseMessage subOrchestratorHttp(
23+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
24+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
25+
final ExecutionContext context) {
26+
context.getLogger().info("Java HTTP trigger processed a request.");
27+
28+
DurableTaskClient client = durableContext.getClient();
29+
String instanceId = client.scheduleNewOrchestrationInstance("RootOrchestrator");
30+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
31+
return durableContext.createCheckStatusResponse(request, instanceId);
32+
}
33+
34+
@FunctionName("RootOrchestrator")
35+
public String rootOrchestrator(
36+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
37+
return ctx.callSubOrchestrator("SubOrchestrator", "Austin", String.class).await();
38+
}
39+
40+
@FunctionName("SubOrchestrator")
41+
public String subOrchestrator(
42+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
43+
String input = ctx.getInput(String.class);
44+
return ctx.callSubOrchestrator("Capitalize", input, String.class).await();
45+
}
46+
}

0 commit comments

Comments
 (0)