Skip to content

Commit 65ae95e

Browse files
Added support for rewinding a failed instance
1 parent 8a74001 commit 65ae95e

File tree

7 files changed

+199
-6
lines changed

7 files changed

+199
-6
lines changed

azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,38 @@ private String getInstanceStatusURL(HttpRequestMessage<?> request, String instan
146146
throw new IllegalArgumentException("Failed to encode the instance ID: " + instanceId, ex);
147147
}
148148

149-
return baseUrl + "/runtime/webhooks/durabletask/instances/" + encodedInstanceId;
149+
String instanceStatusURL = baseUrl + "/runtime/webhooks/durabletask/instances/" + encodedInstanceId;
150+
151+
// Construct the response as an HTTP 202 with a JSON object payload
152+
return request.createResponseBuilder(HttpStatus.ACCEPTED)
153+
.header("Location", instanceStatusURL + "?" + this.requiredQueryStringParameters)
154+
.header("Content-Type", "application/json")
155+
.body(new HttpCreateCheckStatusResponse(
156+
instanceId,
157+
instanceStatusURL,
158+
this.requiredQueryStringParameters))
159+
.build();
160+
}
161+
162+
private static class HttpCreateCheckStatusResponse {
163+
// These fields are serialized to JSON
164+
public final String id;
165+
public final String purgeHistoryDeleteUri;
166+
public final String sendEventPostUri;
167+
public final String statusQueryGetUri;
168+
public final String terminatePostUri;
169+
public final String rewindPostUri;
170+
171+
public HttpCreateCheckStatusResponse(
172+
String instanceId,
173+
String instanceStatusURL,
174+
String requiredQueryStringParameters) {
175+
this.id = instanceId;
176+
this.purgeHistoryDeleteUri = instanceStatusURL + "?" + requiredQueryStringParameters;
177+
this.sendEventPostUri = instanceStatusURL + "/raiseEvent/{eventName}?" + requiredQueryStringParameters;
178+
this.statusQueryGetUri = instanceStatusURL + "?" + requiredQueryStringParameters;
179+
this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters;
180+
this.rewindPostUri = instanceStatusURL + "/rewind?reason={text}&" + requiredQueryStringParameters;
181+
}
150182
}
151183
}

client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,4 +321,19 @@ public void resumeInstance(String instanceId) {
321321
* @param reason the reason for resuming the orchestration instance
322322
*/
323323
public abstract void resumeInstance(String instanceId, @Nullable String reason);
324+
325+
/**
326+
* Rewinds a failed orchestration instance.
327+
* @param instanceId the ID of the orchestration instance to rewind
328+
* @param reason the reason for rewinding the orchestration instance
329+
*/
330+
public abstract void rewindInstance(String instanceId, @Nullable String reason);
331+
332+
/**
333+
* Rewinds a failed orchestration instance.
334+
* @param instanceId the ID of the orchestration instance to rewind
335+
*/
336+
public void rewindInstance(String instanceId) {
337+
this.rewindInstance(instanceId, null);
338+
}
324339
}

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,16 @@ public String restartInstance(String instanceId, boolean restartWithNewInstanceI
321321
}
322322
}
323323

324+
@Override
325+
public void rewindInstance(String instanceId, @Nullable String reason) {
326+
RewindInstanceRequest.Builder rewindInstanceRequestBuilder = RewindInstanceRequest.newBuilder();
327+
rewindInstanceRequestBuilder.setInstanceId(instanceId);
328+
if (reason != null) {
329+
rewindInstanceRequestBuilder.setReason(StringValue.of(reason));
330+
}
331+
this.sidecarClient.rewindInstance(rewindInstanceRequestBuilder.build());
332+
}
333+
324334
private PurgeResult toPurgeResult(PurgeInstancesResponse response){
325335
return new PurgeResult(response.getDeletedInstanceCount());
326336
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net6.0</TargetFramework>
4+
<WarningsAsErrors></WarningsAsErrors>
5+
<DefaultItemExcludes>**</DefaultItemExcludes>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.9.2" />
9+
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="5.1.0" />
10+
<PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="4.0.1" />
11+
</ItemGroup>
12+
</Project>

samples-azure-functions/host.json

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,5 @@
1515
"durableTask": {
1616
"hubName": "DFJavaSmokeTest"
1717
}
18-
},
19-
"extensionBundle": {
20-
"id": "Microsoft.Azure.Functions.ExtensionBundle",
21-
"version": "[4.*, 5.0.0)"
2218
}
2319
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.functions;
2+
3+
import com.microsoft.azure.functions.ExecutionContext;
4+
import com.microsoft.azure.functions.HttpMethod;
5+
import com.microsoft.azure.functions.HttpRequestMessage;
6+
import com.microsoft.azure.functions.HttpResponseMessage;
7+
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
8+
import com.microsoft.azure.functions.annotation.FunctionName;
9+
import com.microsoft.azure.functions.annotation.HttpTrigger;
10+
import com.microsoft.durabletask.DurableTaskClient;
11+
import com.microsoft.durabletask.TaskOrchestrationContext;
12+
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
13+
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
14+
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
15+
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;
16+
17+
import java.util.Optional;
18+
19+
/**
20+
* Azure Durable Functions with HTTP trigger - Rewind instance sample.
21+
*/
22+
public class RewindInstance {
23+
private static int approvalFlag = 0;
24+
25+
/**
26+
* This HTTP-triggered function starts the approval orchestration.
27+
*/
28+
@FunctionName("ApprovalWorkflowOrchestration")
29+
public HttpResponseMessage approvalWorkflowOrchestration(
30+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
31+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
32+
final ExecutionContext context) throws InterruptedException {
33+
context.getLogger().info("Java HTTP trigger processed a request.");
34+
35+
DurableTaskClient client = durableContext.getClient();
36+
String instanceId = client.scheduleNewOrchestrationInstance("ApprovalWorkflow");
37+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
38+
return durableContext.createCheckStatusResponse(request, instanceId);
39+
}
40+
41+
@FunctionName("ApprovalWorkflow")
42+
public int approvalWorkflow(
43+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
44+
int result = 0;
45+
result += ctx.callActivity("RequestPrimaryApproval", 1, Integer.class).await();
46+
result += ctx.callActivity("RequestSecondaryApproval", 1, Integer.class).await();
47+
return result;
48+
}
49+
50+
/**
51+
* This is the activity function that gets invoked by the approval orchestration.
52+
*/
53+
@FunctionName("RequestPrimaryApproval")
54+
public int requestPrimaryApproval(
55+
@DurableActivityTrigger(name = "name") int number,
56+
final ExecutionContext context) {
57+
return 1;
58+
}
59+
60+
/**
61+
* This is the activity function that fails the first try and is then revived.
62+
*/
63+
@FunctionName("RequestSecondaryApproval")
64+
public int requestSecondaryApproval(
65+
@DurableActivityTrigger(name = "name") int number,
66+
final ExecutionContext context) throws InterruptedException {
67+
System.out.println("Test RequestSecondaryApproval " + approvalFlag);
68+
return number / approvalFlag++;
69+
}
70+
71+
/**
72+
* This HTTP-triggered function rewinds the orchestration using instanceId.
73+
*/
74+
@FunctionName("RewindInstance")
75+
public String rewindInstance(
76+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
77+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
78+
final ExecutionContext context) {
79+
String instanceId = request.getQueryParameters().getOrDefault("instanceId", "");
80+
String reason = "Orchestrator failed and needs to be revived.";
81+
82+
DurableTaskClient client = durableContext.getClient();
83+
84+
try {
85+
client.rewindInstance(instanceId, reason);
86+
return "Failed orchestration instance is revived.";
87+
}catch (Exception e){
88+
return "Exception when rewinding orchestration instance";
89+
}
90+
}
91+
}

samples-azure-functions/src/test/java/com/functions/EndToEndTests.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121

2222
@Tag("e2e")
2323
public class EndToEndTests {
24+
private static final String hostHealthPingPath = "/admin/host/ping";
25+
private static final String startOrchestrationPath = "/api/StartOrchestration";
26+
private static final String approvalWorkFlow = "/api/ApprovalWorkflowOrchestration";
27+
private static JsonPath rewindTestJsonPath = null;
2428

2529
@Order(1)
2630
@Test
@@ -202,4 +206,37 @@ private boolean pollingCheck(String statusQueryGetUri,
202206
}
203207
return false;
204208
}
205-
}
209+
210+
@Order(2)
211+
@Test
212+
public void approvalWorkFlow() throws InterruptedException {
213+
Response response = post(approvalWorkFlow);
214+
rewindTestJsonPath = response.jsonPath();
215+
Thread.sleep(3000);
216+
String statusQueryGetUri = rewindTestJsonPath.get("statusQueryGetUri");
217+
Response statusResponse = get(statusQueryGetUri);
218+
String runTimeStatus = statusResponse.jsonPath().get("runtimeStatus");
219+
assertEquals("Failed", runTimeStatus);
220+
}
221+
222+
@Order(3)
223+
@Test
224+
public void rewindInstance() throws InterruptedException {
225+
String rewindPostUri = rewindTestJsonPath.get("rewindPostUri");
226+
Response response = post(rewindPostUri);
227+
228+
Thread.sleep(3000);
229+
230+
String statusQueryGetUri = rewindTestJsonPath.get("statusQueryGetUri");
231+
String runTimeStatus = null;
232+
for (int i = 0; i < 5; i++) {
233+
Response statusResponse = get(statusQueryGetUri);
234+
runTimeStatus = statusResponse.jsonPath().get("runtimeStatus");
235+
if (!"Completed".equals(runTimeStatus)) {
236+
Thread.sleep(1000);
237+
} else break;
238+
}
239+
assertEquals("Completed", runTimeStatus);
240+
}
241+
}
242+

0 commit comments

Comments
 (0)