diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java index f78c00f9..b210e577 100644 --- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java +++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java @@ -18,6 +18,7 @@ public class HttpManagementPayload { private final String terminatePostUri; private final String resumePostUri; private final String suspendPostUri; + public final String rewindPostUri; /** * Creates a {@link HttpManagementPayload} to manage orchestration instances @@ -38,6 +39,7 @@ public HttpManagementPayload( this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters; this.resumePostUri = instanceStatusURL + "/resume?reason={text}&" + requiredQueryStringParameters; this.suspendPostUri = instanceStatusURL + "/suspend?reason={text}&" + requiredQueryStringParameters; + this.rewindPostUri = instanceStatusURL + "/rewind?reason={text}&" + requiredQueryStringParameters; } /** diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index 4590277f..3c8188f7 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -321,4 +321,19 @@ public void resumeInstance(String instanceId) { * @param reason the reason for resuming the orchestration instance */ public abstract void resumeInstance(String instanceId, @Nullable String reason); + + /** + * Rewinds a failed orchestration instance. + * @param instanceId the ID of the orchestration instance to rewind + * @param reason the reason for rewinding the orchestration instance + */ + public abstract void rewindInstance(String instanceId, @Nullable String reason); + + /** + * Rewinds a failed orchestration instance. + * @param instanceId the ID of the orchestration instance to rewind + */ + public void rewindInstance(String instanceId) { + this.rewindInstance(instanceId, null); + } } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 95bb984a..5a8be4ea 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -321,6 +321,16 @@ public String restartInstance(String instanceId, boolean restartWithNewInstanceI } } + @Override + public void rewindInstance(String instanceId, @Nullable String reason) { + RewindInstanceRequest.Builder rewindInstanceRequestBuilder = RewindInstanceRequest.newBuilder(); + rewindInstanceRequestBuilder.setInstanceId(instanceId); + if (reason != null) { + rewindInstanceRequestBuilder.setReason(StringValue.of(reason)); + } + this.sidecarClient.rewindInstance(rewindInstanceRequestBuilder.build()); + } + private PurgeResult toPurgeResult(PurgeInstancesResponse response){ return new PurgeResult(response.getDeletedInstanceCount()); } diff --git a/samples-azure-functions/src/main/java/com/functions/RewindInstance.java b/samples-azure-functions/src/main/java/com/functions/RewindInstance.java new file mode 100644 index 00000000..8ac39bb7 --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/RewindInstance.java @@ -0,0 +1,95 @@ +package com.functions; + +import com.microsoft.azure.functions.*; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.util.Optional; + +/** + * Azure Durable Functions with HTTP trigger - Rewind instance sample. + */ +public class RewindInstance { + private static int approvalFlag = 0; + + /** + * This HTTP-triggered function starts the approval orchestration. + */ + @FunctionName("ApprovalWorkflowOrchestration") + public HttpResponseMessage approvalWorkflowOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) throws InterruptedException { + context.getLogger().info("Java HTTP trigger processed a request."); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("ApprovalWorkflow"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + @FunctionName("ApprovalWorkflow") + public int approvalWorkflow( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + int result = 0; + result += ctx.callActivity("RequestPrimaryApproval", 1, Integer.class).await(); + result += ctx.callActivity("RequestSecondaryApproval", 1, Integer.class).await(); + return result; + } + + /** + * This is the activity function that gets invoked by the approval orchestration. + */ + @FunctionName("RequestPrimaryApproval") + public int requestPrimaryApproval( + @DurableActivityTrigger(name = "name") int number, + final ExecutionContext context) { + return 1; + } + + /** + * This is the activity function that fails the first try and is then revived. + */ + @FunctionName("RequestSecondaryApproval") + public int requestSecondaryApproval( + @DurableActivityTrigger(name = "name") int number, + final ExecutionContext context) throws InterruptedException { + return number / approvalFlag++; + } + + /** + * This HTTP-triggered function rewinds the orchestration using instanceId. + */ + @FunctionName("RewindInstance") + public String rewindInstance( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + String instanceId = request.getQueryParameters().getOrDefault("instanceId", ""); + String reason = "Orchestrator failed and needs to be revived."; + + DurableTaskClient client = durableContext.getClient(); + client.rewindInstance(instanceId, reason); + return "Failed orchestration instance is scheduled for rewind."; + } + + /** + * This HTTP-triggered function resets the approvalFlag variable for testing purposes. + */ + @FunctionName("ResetApproval") + public static HttpResponseMessage resetApproval( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) + HttpRequestMessage> request, + final ExecutionContext context) { + context.getLogger().info("ResetApproval function invoked."); + approvalFlag = 0; + return request.createResponseBuilder(HttpStatus.OK).body(approvalFlag).build(); + } +} \ No newline at end of file diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index 411dbbaa..549ef442 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -22,11 +22,16 @@ @Tag("e2e") public class EndToEndTests { + private static final String hostHealthPingUrl = "/admin/host/ping"; + private static final String startOrchestrationUrl = "/api/StartOrchestration"; + private static final String startApprovalWorkflowUrl = "/api/ApprovalWorkflowOrchestration"; + private static final String rewindInstanceFunctionUrl = "/api/RewindInstance"; + private static final String resetApprovalUrl = "/api/ResetApprovalFlag"; + @Order(1) @Test public void setupHost() { - String hostHealthPingPath = "/admin/host/ping"; - post(hostHealthPingPath).then().statusCode(200); + post(hostHealthPingUrl).then().statusCode(200); } @ParameterizedTest @@ -202,4 +207,74 @@ private boolean pollingCheck(String statusQueryGetUri, } return false; } + + @Order(2) + @Test + public void testRewindInstanceJavaAPI() throws InterruptedException { + Response response = post(startApprovalWorkflowUrl); + JsonPath startOrchestrationResponseJson = response.jsonPath(); + + // Wait for the ApprovalWorkflowOrchestration to fail + Thread.sleep(3000); + + String instanceId = startOrchestrationResponseJson.get("id"); + String statusQueryGetUri = startOrchestrationResponseJson.get("statusQueryGetUri"); + Response statusResponse = get(statusQueryGetUri); + String runtimeStatus = statusResponse.jsonPath().get("runtimeStatus"); + assertEquals("Failed", runtimeStatus); + + // Rewind the instance using Java API + String rewindInstanceUrl = rewindInstanceFunctionUrl + "?instanceId=" + instanceId; + response = post(rewindInstanceUrl); + assertEquals("Failed orchestration instance is scheduled for rewind.", response.toString()); + + // Wait for orchestration to rewind and complete + Thread.sleep(3000); + + for (int i = 0; i < 5; i++) { + statusResponse = get(statusQueryGetUri); + runtimeStatus = statusResponse.jsonPath().get("runtimeStatus"); + if (!"Completed".equals(runtimeStatus)) { + Thread.sleep(1000); + } else break; + } + assertEquals("Completed", runtimeStatus); + + // Reset approval for other test cases + post(resetApprovalUrl); + } + + @Order(3) + @Test + public void testRewindInstanceHttpAPI() throws InterruptedException { + Response response = post(startApprovalWorkflowUrl); + JsonPath startOrchestrationResponseJson = response.jsonPath(); + + // Wait for the ApprovalWorkflowOrchestration to fail + Thread.sleep(3000); + + String statusQueryGetUri = startOrchestrationResponseJson.get("statusQueryGetUri"); + Response statusResponse = get(statusQueryGetUri); + String runtimeStatus = statusResponse.jsonPath().get("runtimeStatus"); + assertEquals("Failed", runtimeStatus); + + // Rewind the instance using Http API + String rewindPostUri = startOrchestrationResponseJson.get("rewindPostUri"); + post(rewindPostUri); + + // Wait for orchestration to rewind and complete + Thread.sleep(3000); + + for (int i = 0; i < 5; i++) { + statusResponse = get(statusQueryGetUri); + runtimeStatus = statusResponse.jsonPath().get("runtimeStatus"); + if (!"Completed".equals(runtimeStatus)) { + Thread.sleep(1000); + } else break; + } + assertEquals("Completed", runtimeStatus); + + // Reset approval for other test cases + post(resetApprovalUrl); + } } \ No newline at end of file