Skip to content

Commit 39c9957

Browse files
committed
init fix
init fix expose a way for customer to create their own data converter
1 parent 57133bd commit 39c9957

File tree

7 files changed

+125
-11
lines changed

7 files changed

+125
-11
lines changed

azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@
99
import com.microsoft.azure.functions.internal.spi.middleware.Middleware;
1010
import com.microsoft.azure.functions.internal.spi.middleware.MiddlewareChain;
1111
import com.microsoft.azure.functions.internal.spi.middleware.MiddlewareContext;
12+
import com.microsoft.durabletask.DataConverter;
1213
import com.microsoft.durabletask.OrchestrationRunner;
1314
import com.microsoft.durabletask.OrchestratorBlockedException;
1415

16+
import java.util.Iterator;
17+
import java.util.ServiceLoader;
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
1520
/**
1621
* Durable Function Orchestration Middleware
1722
*
@@ -21,14 +26,20 @@
2126
public class OrchestrationMiddleware implements Middleware {
2227

2328
private static final String ORCHESTRATION_TRIGGER = "DurableOrchestrationTrigger";
29+
private final Object dataConverterLock = new Object();
30+
private volatile DataConverter dataConverter;
31+
private final AtomicBoolean oneTimeLogicExecuted = new AtomicBoolean(false);
2432

2533
@Override
2634
public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exception {
2735
String parameterName = context.getParameterName(ORCHESTRATION_TRIGGER);
28-
if (parameterName == null){
36+
if (parameterName == null) {
2937
chain.doNext(context);
3038
return;
3139
}
40+
//invoked only for orchestrator function.
41+
System.out.println("from middleware --" + Thread.currentThread().getContextClassLoader());
42+
loadCustomizedDataConverterOnce();
3243
String orchestratorRequestEncodedProtoBytes = (String) context.getParameterValue(parameterName);
3344
String orchestratorOutputEncodedProtoBytes = OrchestrationRunner.loadAndRun(orchestratorRequestEncodedProtoBytes, taskOrchestrationContext -> {
3445
try {
@@ -39,12 +50,30 @@ public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exce
3950
// The OrchestratorBlockedEvent will be wrapped into InvocationTargetException by using reflection to
4051
// invoke method. Thus get the cause to check if it's OrchestratorBlockedEvent.
4152
Throwable cause = e.getCause();
42-
if (cause instanceof OrchestratorBlockedException){
53+
if (cause instanceof OrchestratorBlockedException) {
4354
throw (OrchestratorBlockedException) cause;
4455
}
4556
throw new RuntimeException("Unexpected failure in the task execution", e);
4657
}
47-
});
58+
}, dataConverter);
4859
context.updateReturnValue(orchestratorOutputEncodedProtoBytes);
4960
}
61+
62+
private void loadCustomizedDataConverterOnce() {
63+
if (!oneTimeLogicExecuted.get()) {
64+
synchronized (dataConverterLock) {
65+
if (!oneTimeLogicExecuted.get()) {
66+
Iterator<DataConverter> iterator = ServiceLoader.load(DataConverter.class).iterator();
67+
if (iterator.hasNext()) {
68+
this.dataConverter = iterator.next();
69+
System.out.println("assigned");
70+
if (iterator.hasNext()) {
71+
throw new IllegalStateException("Multiple implementations of DataConverter found on the classpath.");
72+
}
73+
}
74+
oneTimeLogicExecuted.compareAndSet(false,true);
75+
}
76+
}
77+
}
78+
}
5079
}

client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ private OrchestrationRunner() {
3636
*/
3737
public static <R> String loadAndRun(
3838
String base64EncodedOrchestratorRequest,
39-
OrchestratorFunction<R> orchestratorFunc) {
39+
OrchestratorFunction<R> orchestratorFunc,
40+
DataConverter dataConverter) {
4041
// Example string: CiBhOTMyYjdiYWM5MmI0MDM5YjRkMTYxMDIwNzlmYTM1YSIaCP///////////wESCwi254qRBhDk+rgocgAicgj///////////8BEgwIs+eKkQYQzMXjnQMaVwoLSGVsbG9DaXRpZXMSACJGCiBhOTMyYjdiYWM5MmI0MDM5YjRkMTYxMDIwNzlmYTM1YRIiCiA3ODEwOTA2N2Q4Y2Q0ODg1YWU4NjQ0OTNlMmRlMGQ3OA==
4142
byte[] decodedBytes = Base64.getDecoder().decode(base64EncodedOrchestratorRequest);
42-
byte[] resultBytes = loadAndRun(decodedBytes, orchestratorFunc);
43+
byte[] resultBytes = loadAndRun(decodedBytes, orchestratorFunc, dataConverter);
4344
return Base64.getEncoder().encodeToString(resultBytes);
4445
}
4546

@@ -55,7 +56,8 @@ public static <R> String loadAndRun(
5556
*/
5657
public static <R> byte[] loadAndRun(
5758
byte[] orchestratorRequestBytes,
58-
OrchestratorFunction<R> orchestratorFunc) {
59+
OrchestratorFunction<R> orchestratorFunc,
60+
DataConverter dataConverter) {
5961
if (orchestratorFunc == null) {
6062
throw new IllegalArgumentException("orchestratorFunc must not be null");
6163
}
@@ -66,7 +68,7 @@ public static <R> byte[] loadAndRun(
6668
ctx.complete(output);
6769
};
6870

69-
return loadAndRun(orchestratorRequestBytes, orchestration);
71+
return loadAndRun(orchestratorRequestBytes, orchestration, dataConverter);
7072
}
7173

7274
/**
@@ -82,7 +84,7 @@ public static String loadAndRun(
8284
String base64EncodedOrchestratorRequest,
8385
TaskOrchestration orchestration) {
8486
byte[] decodedBytes = Base64.getDecoder().decode(base64EncodedOrchestratorRequest);
85-
byte[] resultBytes = loadAndRun(decodedBytes, orchestration);
87+
byte[] resultBytes = loadAndRun(decodedBytes, orchestration, null);
8688
return Base64.getEncoder().encodeToString(resultBytes);
8789
}
8890

@@ -95,7 +97,7 @@ public static String loadAndRun(
9597
* @return a protobuf-encoded payload of orchestrator actions to be interpreted by the external orchestration engine
9698
* @throws IllegalArgumentException if either parameter is {@code null} or if {@code orchestratorRequestBytes} is not valid protobuf
9799
*/
98-
public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestration orchestration) {
100+
public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestration orchestration, DataConverter dataConverter) {
99101
if (orchestratorRequestBytes == null || orchestratorRequestBytes.length == 0) {
100102
throw new IllegalArgumentException("triggerStateProtoBytes must not be null or empty");
101103
}
@@ -127,7 +129,7 @@ public TaskOrchestration create() {
127129

128130
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
129131
orchestrationFactories,
130-
new JacksonDataConverter(),
132+
dataConverter != null ? dataConverter : new JacksonDataConverter(),
131133
DEFAULT_MAXIMUM_TIMER_INTERVAL,
132134
logger);
133135

samples-azure-functions/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ dependencies {
2020
implementation project(':azurefunctions')
2121

2222
implementation 'com.microsoft.azure.functions:azure-functions-java-library:3.0.0'
23+
implementation 'com.google.code.gson:gson:2.9.0'
2324
testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2'
2425
testImplementation 'io.rest-assured:rest-assured:5.3.0'
2526
testImplementation 'io.rest-assured:json-path:5.3.0'

samples-azure-functions/src/main/java/com/functions/AzureFunctions.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ public HttpResponseMessage startOrchestration(
3636
*/
3737
@FunctionName("Cities")
3838
public String citiesOrchestrator(
39-
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
39+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx,
40+
final ExecutionContext context) {
4041
String result = "";
4142
result += ctx.callActivity("Capitalize", "Tokyo", String.class).await() + ", ";
4243
result += ctx.callActivity("Capitalize", "London", String.class).await() + ", ";
4344
result += ctx.callActivity("Capitalize", "Seattle", String.class).await() + ", ";
4445
result += ctx.callActivity("Capitalize", "Austin", String.class).await();
46+
context.getLogger().info("Orchestrator function completed!");
4547
return result;
4648
}
4749

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.functions;
2+
3+
import com.microsoft.azure.functions.ExecutionContext;
4+
import com.microsoft.azure.functions.HttpMethod;
5+
import com.microsoft.azure.functions.HttpRequestMessage;
6+
import com.microsoft.azure.functions.HttpResponseMessage;
7+
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
8+
import com.microsoft.azure.functions.annotation.FunctionName;
9+
import com.microsoft.azure.functions.annotation.HttpTrigger;
10+
import com.microsoft.durabletask.DurableTaskClient;
11+
import com.microsoft.durabletask.TaskOrchestrationContext;
12+
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
13+
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
14+
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
15+
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;
16+
17+
import java.time.LocalDate;
18+
import java.util.Optional;
19+
20+
public class ExampleFunction {
21+
22+
@FunctionName("StartExampleProcess")
23+
public HttpResponseMessage startExampleProcess(
24+
@HttpTrigger(name = "req",
25+
methods = {HttpMethod.GET, HttpMethod.POST},
26+
authLevel = AuthorizationLevel.ANONYMOUS) final HttpRequestMessage<Optional<String>> request,
27+
@DurableClientInput(name = "durableContext") final DurableClientContext durableContext,
28+
final ExecutionContext context) {
29+
context.getLogger().info("Java HTTP trigger processed a request");
30+
31+
final DurableTaskClient client = durableContext.getClient();
32+
final String instanceId = client.scheduleNewOrchestrationInstance("ExampleProcess");
33+
return durableContext.createCheckStatusResponse(request, instanceId);
34+
}
35+
36+
@FunctionName("ExampleProcess")
37+
public ExampleResponse exampleOrchestrator(
38+
@DurableOrchestrationTrigger(name = "taskOrchestrationContext") final TaskOrchestrationContext context,
39+
final ExecutionContext functionContext) {
40+
return context.callActivity("ToLower", "Foo", ExampleResponse.class).await();
41+
// " " +
42+
// context.callActivity("ToLower", "Bar", ExampleResponse.class).await();
43+
}
44+
45+
@FunctionName("ToLower")
46+
public ExampleResponse toLower(
47+
@DurableActivityTrigger(name = "value") final String value,
48+
final ExecutionContext context) {
49+
return new ExampleResponse(LocalDate.now(), value.toLowerCase());
50+
}
51+
52+
static class ExampleResponse {
53+
private LocalDate date;
54+
private String value;
55+
56+
public ExampleResponse(LocalDate date, String value) {
57+
this.date = date;
58+
this.value = value;
59+
}
60+
}
61+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.functions.converter;
2+
3+
import com.google.gson.Gson;
4+
import com.microsoft.durabletask.DataConverter;
5+
6+
public class MyConverter implements DataConverter {
7+
8+
private static final Gson gson = new Gson();
9+
@Override
10+
public String serialize(Object value) {
11+
return gson.toJson(value);
12+
}
13+
14+
@Override
15+
public <T> T deserialize(String data, Class<T> target) {
16+
return gson.fromJson(data, target);
17+
}
18+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.functions.converter.MyConverter

0 commit comments

Comments
 (0)