Skip to content

Commit 42d7d5a

Browse files
Added support for rewinding a failed instance
1 parent 56ca926 commit 42d7d5a

File tree

7 files changed

+164
-4
lines changed

7 files changed

+164
-4
lines changed

azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ private static class HttpCreateCheckStatusResponse {
139139
public final String sendEventPostUri;
140140
public final String statusQueryGetUri;
141141
public final String terminatePostUri;
142+
public final String rewindPostUri;
142143

143144
public HttpCreateCheckStatusResponse(
144145
String instanceId,
@@ -149,6 +150,7 @@ public HttpCreateCheckStatusResponse(
149150
this.sendEventPostUri = instanceStatusURL + "/raiseEvent/{eventName}?" + requiredQueryStringParameters;
150151
this.statusQueryGetUri = instanceStatusURL + "?" + requiredQueryStringParameters;
151152
this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters;
153+
this.rewindPostUri = instanceStatusURL + "/rewind?reason={text}&" + requiredQueryStringParameters;
152154
}
153155
}
154156
}

client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,4 +311,19 @@ public void resumeInstance(String instanceId) {
311311
* @param reason the reason for resuming the orchestration instance
312312
*/
313313
public abstract void resumeInstance(String instanceId, @Nullable String reason);
314+
315+
/**
316+
* Rewinds a failed orchestration instance.
317+
* @param instanceId the ID of the orchestration instance to rewind
318+
* @param reason the reason for rewinding the orchestration instance
319+
*/
320+
public abstract void rewindInstance(String instanceId, @Nullable String reason);
321+
322+
/**
323+
* Rewinds a failed orchestration instance.
324+
* @param instanceId the ID of the orchestration instance to rewind
325+
*/
326+
public void rewindInstance(String instanceId) {
327+
this.rewindInstance(instanceId, null);
328+
}
314329
}

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,16 @@ public void resumeInstance(String instanceId, @Nullable String reason) {
303303
this.sidecarClient.resumeInstance(resumeRequestBuilder.build());
304304
}
305305

306+
@Override
307+
public void rewindInstance(String instanceId, @Nullable String reason) {
308+
RewindInstanceRequest.Builder rewindInstanceRequestBuilder = RewindInstanceRequest.newBuilder();
309+
rewindInstanceRequestBuilder.setInstanceId(instanceId);
310+
if (reason != null) {
311+
rewindInstanceRequestBuilder.setReason(StringValue.of(reason));
312+
}
313+
this.sidecarClient.rewindInstance(rewindInstanceRequestBuilder.build());
314+
}
315+
306316
private PurgeResult toPurgeResult(PurgeInstancesResponse response){
307317
return new PurgeResult(response.getDeletedInstanceCount());
308318
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net6.0</TargetFramework>
4+
<WarningsAsErrors></WarningsAsErrors>
5+
<DefaultItemExcludes>**</DefaultItemExcludes>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.9.2" />
9+
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="5.1.0" />
10+
<PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="4.0.1" />
11+
</ItemGroup>
12+
</Project>

samples-azure-functions/host.json

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,5 @@
1515
"durableTask": {
1616
"hubName": "DFJavaSmokeTest"
1717
}
18-
},
19-
"extensionBundle": {
20-
"id": "Microsoft.Azure.Functions.ExtensionBundle",
21-
"version": "[4.*, 5.0.0)"
2218
}
2319
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.functions;
2+
3+
import com.microsoft.azure.functions.ExecutionContext;
4+
import com.microsoft.azure.functions.HttpMethod;
5+
import com.microsoft.azure.functions.HttpRequestMessage;
6+
import com.microsoft.azure.functions.HttpResponseMessage;
7+
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
8+
import com.microsoft.azure.functions.annotation.FunctionName;
9+
import com.microsoft.azure.functions.annotation.HttpTrigger;
10+
import com.microsoft.durabletask.DurableTaskClient;
11+
import com.microsoft.durabletask.TaskOrchestrationContext;
12+
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
13+
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
14+
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
15+
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;
16+
17+
import java.util.Optional;
18+
19+
/**
20+
* Azure Durable Functions with HTTP trigger - Rewind instance sample.
21+
*/
22+
public class RewindInstance {
23+
private static int approvalFlag = 0;
24+
25+
/**
26+
* This HTTP-triggered function starts the approval orchestration.
27+
*/
28+
@FunctionName("ApprovalWorkflowOrchestration")
29+
public HttpResponseMessage approvalWorkflowOrchestration(
30+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
31+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
32+
final ExecutionContext context) throws InterruptedException {
33+
context.getLogger().info("Java HTTP trigger processed a request.");
34+
35+
DurableTaskClient client = durableContext.getClient();
36+
String instanceId = client.scheduleNewOrchestrationInstance("ApprovalWorkflow");
37+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
38+
return durableContext.createCheckStatusResponse(request, instanceId);
39+
}
40+
41+
@FunctionName("ApprovalWorkflow")
42+
public int approvalWorkflow(
43+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
44+
int result = 0;
45+
result += ctx.callActivity("RequestPrimaryApproval", 1, Integer.class).await();
46+
result += ctx.callActivity("RequestSecondaryApproval", 1, Integer.class).await();
47+
return result;
48+
}
49+
50+
/**
51+
* This is the activity function that gets invoked by the approval orchestration.
52+
*/
53+
@FunctionName("RequestPrimaryApproval")
54+
public int requestPrimaryApproval(
55+
@DurableActivityTrigger(name = "name") int number,
56+
final ExecutionContext context) {
57+
return 1;
58+
}
59+
60+
/**
61+
* This is the activity function that fails the first try and is then revived.
62+
*/
63+
@FunctionName("RequestSecondaryApproval")
64+
public int requestSecondaryApproval(
65+
@DurableActivityTrigger(name = "name") int number,
66+
final ExecutionContext context) throws InterruptedException {
67+
System.out.println("Test RequestSecondaryApproval " + approvalFlag);
68+
return number / approvalFlag++;
69+
}
70+
71+
/**
72+
* This HTTP-triggered function rewinds the orchestration using instanceId.
73+
*/
74+
@FunctionName("RewindInstance")
75+
public String rewindInstance(
76+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
77+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
78+
final ExecutionContext context) {
79+
String instanceId = request.getQueryParameters().getOrDefault("instanceId", "");
80+
String reason = "Orchestrator failed and needs to be revived.";
81+
82+
DurableTaskClient client = durableContext.getClient();
83+
84+
try {
85+
client.rewindInstance(instanceId, reason);
86+
return "Failed orchestration instance is revived.";
87+
}catch (Exception e){
88+
return "Exception when rewinding orchestration instance";
89+
}
90+
}
91+
}

samples-azure-functions/src/test/java/com/functions/EndToEndTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
public class EndToEndTests {
1515
private static final String hostHealthPingPath = "/admin/host/ping";
1616
private static final String startOrchestrationPath = "/api/StartOrchestration";
17+
private static final String approvalWorkFlow = "/api/ApprovalWorkflowOrchestration";
18+
private static JsonPath rewindTestJsonPath = null;
1719

1820
@Order(1)
1921
@Test
@@ -36,4 +38,36 @@ public void basicChain() throws InterruptedException {
3638
}
3739
assertEquals("Completed", runTimeStatus);
3840
}
41+
42+
@Order(2)
43+
@Test
44+
public void approvalWorkFlow() throws InterruptedException {
45+
Response response = post(approvalWorkFlow);
46+
rewindTestJsonPath = response.jsonPath();
47+
Thread.sleep(3000);
48+
String statusQueryGetUri = rewindTestJsonPath.get("statusQueryGetUri");
49+
Response statusResponse = get(statusQueryGetUri);
50+
String runTimeStatus = statusResponse.jsonPath().get("runtimeStatus");
51+
assertEquals("Failed", runTimeStatus);
52+
}
53+
54+
@Order(3)
55+
@Test
56+
public void rewindInstance() throws InterruptedException {
57+
String rewindPostUri = rewindTestJsonPath.get("rewindPostUri");
58+
Response response = post(rewindPostUri);
59+
60+
Thread.sleep(3000);
61+
62+
String statusQueryGetUri = rewindTestJsonPath.get("statusQueryGetUri");
63+
String runTimeStatus = null;
64+
for (int i = 0; i < 5; i++) {
65+
Response statusResponse = get(statusQueryGetUri);
66+
runTimeStatus = statusResponse.jsonPath().get("runtimeStatus");
67+
if (!"Completed".equals(runTimeStatus)) {
68+
Thread.sleep(1000);
69+
} else break;
70+
}
71+
assertEquals("Completed", runTimeStatus);
72+
}
3973
}

0 commit comments

Comments
 (0)