diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index f8ee9265372..faa92d946ae 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -28,7 +28,6 @@ Dapr Workflow solves these complexities by allowing you to implement the task ch {{< tabpane text=true >}} {{% tab "Python" %}} - ```python import dapr.ext.workflow as wf @@ -65,7 +64,7 @@ def step3(ctx, activity_input): def error_handler(ctx, error): print(f'Executing error handler: {error}.') - # Do some compensating work + # Apply some compensating work ``` > **Note** Workflow retry policies will be available in a future version of the Python SDK. @@ -73,7 +72,6 @@ def error_handler(ctx, error): {{% /tab %}} {{% tab "JavaScript" %}} - ```javascript import { DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, WorkflowRuntime, TWorkflow } from "@dapr/dapr"; @@ -141,13 +139,13 @@ async function start() { start().catch((e) => { console.error(e); process.exit(1); + # Apply custom compensation logic }); ``` {{% /tab %}} {{% tab ".NET" %}} - ```csharp // Expotential backoff retry policy that survives long outages @@ -180,7 +178,6 @@ catch (TaskFailedException) // Task failures are surfaced as TaskFailedException {{% /tab %}} {{% tab "Java" %}} - ```java public class ChainWorkflow extends Workflow { @@ -235,7 +232,6 @@ public class ChainWorkflow extends Workflow { {{% /tab %}} {{% tab "Go" %}} - ```go func TaskChainWorkflow(ctx *workflow.WorkflowContext) (any, error) { @@ -314,7 +310,6 @@ Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple {{< tabpane text=true >}} {{% tab "Python" %}} - ```python import time @@ -354,7 +349,6 @@ def process_results(ctx, final_result: int): {{% /tab %}} {{% tab "JavaScript" %}} - ```javascript import { @@ -462,7 +456,6 @@ start().catch((e) => { {{% /tab %}} {{% tab ".NET" %}} - ```csharp // Get a list of N work items to process in parallel. @@ -487,7 +480,6 @@ await context.CallActivityAsync("PostResults", sum); {{% /tab %}} {{% tab "Java" %}} - ```java public class FaninoutWorkflow extends Workflow { @@ -513,7 +505,6 @@ public class FaninoutWorkflow extends Workflow { {{% /tab %}} {{% tab "Go" %}} - ```go func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) { @@ -593,7 +584,7 @@ It's possible to go further and limit the degree of concurrency using simple, la {{< tabpane text=true >}} {{% tab ".NET" %}} - + ```csharp //Revisiting the earlier example... @@ -630,7 +621,7 @@ concurrency by using the following extension methods on the `WorkflowContext`: {{< tabpane text=true >}} {{% tab header=".NET" %}} - + ```csharp //Revisiting the earlier example... // Get a list of work items to process @@ -742,7 +733,6 @@ Dapr Workflow supports this pattern natively by allowing you to implement _etern {{< tabpane text=true >}} {{% tab "Python" %}} - ```python from dataclasses import dataclass @@ -789,7 +779,6 @@ def send_alert(ctx, message: str): {{% /tab %}} {{% tab "JavaScript" %}} - ```javascript const statusMonitorWorkflow: TWorkflow = async function* (ctx: WorkflowContext): any { @@ -817,7 +806,6 @@ const statusMonitorWorkflow: TWorkflow = async function* (ctx: WorkflowContext): {{% /tab %}} {{% tab ".NET" %}} - ```csharp public override async Task RunAsync(WorkflowContext context, MyEntityState myEntityState) @@ -858,7 +846,6 @@ public override async Task RunAsync(WorkflowContext context, MyEntitySta {{% /tab %}} {{% tab "Java" %}} - ```java public class MonitorWorkflow extends Workflow { @@ -900,7 +887,6 @@ public class MonitorWorkflow extends Workflow { {{% /tab %}} {{% tab "Go" %}} - ```go type JobStatus struct { @@ -983,7 +969,6 @@ The following example code shows how this pattern can be implemented using Dapr {{< tabpane text=true >}} {{% tab "Python" %}} - ```python from dataclasses import dataclass @@ -1042,7 +1027,6 @@ def place_order(_, order: Order) -> None: {{% /tab %}} {{% tab "JavaScript" %}} - ```javascript import { @@ -1182,7 +1166,6 @@ start().catch((e) => { {{% /tab %}} {{% tab ".NET" %}} - ```csharp public override async Task RunAsync(WorkflowContext context, OrderPayload order) @@ -1226,7 +1209,6 @@ public override async Task RunAsync(WorkflowContext context, OrderP {{% /tab %}} {{% tab "Java" %}} - ```java public class ExternalSystemInteractionWorkflow extends Workflow { @@ -1263,7 +1245,6 @@ public class ExternalSystemInteractionWorkflow extends Workflow { {{% /tab %}} {{% tab "Go" %}} - ```go type Order struct { @@ -1326,7 +1307,6 @@ The code that delivers the event to resume the workflow execution is external to {{< tabpane text=true >}} {{% tab "Python" %}} - ```python from dapr.clients import DaprClient @@ -1343,7 +1323,6 @@ with DaprClient() as d: {{% /tab %}} {{% tab "JavaScript" %}} - ```javascript import { DaprClient } from "@dapr/dapr"; @@ -1356,7 +1335,6 @@ import { DaprClient } from "@dapr/dapr"; {{% /tab %}} {{% tab ".NET" %}} - ```csharp // Raise the workflow event to the waiting workflow @@ -1370,7 +1348,6 @@ await daprClient.RaiseWorkflowEventAsync( {{% /tab %}} {{% tab "Java" %}} - ```java System.out.println("**SendExternalMessage: RestartEvent**"); @@ -1380,7 +1357,6 @@ client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload"); {{% /tab %}} {{% tab "Go" %}} - ```go func raiseEvent() { @@ -1409,6 +1385,209 @@ func raiseEvent() { External events don't have to be directly triggered by humans. They can also be triggered by other systems. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API. +## Compensation + +The compensation pattern (also known as the saga pattern) provides a mechanism for rolling back or undoing operations that have already been executed when a workflow fails partway through. This pattern is particularly important for long-running workflows that span multiple microservices where traditional database transactions are not feasible. + +In distributed microservice architectures, you often need to coordinate operations across multiple services. When these operations cannot be wrapped in a single transaction, the compensation pattern provides a way to maintain consistency by defining compensating actions for each step in the workflow. + +The compensation pattern addresses several critical challenges: + +- **Distributed Transaction Management**: When a workflow spans multiple microservices, each with their own data stores, traditional ACID transactions are not possible. The compensation pattern provides transactional consistency by ensuring operations are either all completed successfully or all undone through compensation. +- **Partial Failure Recovery**: If a workflow fails after some steps have completed successfully, the compensation pattern allows you to undo those completed steps gracefully. +- **Business Process Integrity**: Ensures that business processes can be properly rolled back in case of failures, maintaining the integrity of your business operations. +- **Long-Running Processes**: For workflows that may run for hours, days, or longer, traditional locking mechanisms are impractical. Compensation provides a way to handle failures in these scenarios. + +Common use cases for the compensation pattern include: + +- **E-commerce Order Processing**: Reserve inventory, charge payment, and ship orders. If shipping fails, you need to release the inventory and refund the payment. +- **Financial Transactions**: In a money transfer, if crediting the destination account fails, you need to rollback the debit from the source account. +- **Resource Provisioning**: When provisioning cloud resources across multiple providers, if one step fails, you need to clean up all previously provisioned resources. +- **Multi-Step Business Processes**: Any business process that involves multiple irreversible steps that may need to be undone in case of later failures. + +Dapr Workflow provides support for the compensation pattern, allowing you to register compensation activities for each step and execute them in reverse order when needed. + +Here's an example workflow for an e-commerce process: + +1. A workflow is triggered when an order is received. +1. A reservation is made for the order in the inventory. +1. The payment is processed. +1. The order is shipped. +1. If any of the above actions results in an error, the actions are compensated with another action: + - The shipment is cancelled. + - The payment is refunded. + - The inventory reservation is released. + +The following diagram illustrates this flow. + +Diagram showing how the compensation pattern. + +{{< tabpane text=true >}} + +{{% tab "Java" %}} + +```java +public class PaymentProcessingWorkflow implements Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + var orderId = ctx.getInput(String.class); + List compensations = new ArrayList<>(); + + try { + // Step 1: Reserve inventory + String reservationId = ctx.callActivity(ReserveInventoryActivity.class.getName(), orderId, String.class).await(); + ctx.getLogger().info("Inventory reserved: {}", reservationId); + compensations.add("ReleaseInventory"); + + // Step 2: Process payment + String paymentId = ctx.callActivity(ProcessPaymentActivity.class.getName(), orderId, String.class).await(); + ctx.getLogger().info("Payment processed: {}", paymentId); + compensations.add("RefundPayment"); + + // Step 3: Ship order + String shipmentId = ctx.callActivity(ShipOrderActivity.class.getName(), orderId, String.class).await(); + ctx.getLogger().info("Order shipped: {}", shipmentId); + compensations.add("CancelShipment"); + + } catch (TaskFailedException e) { + ctx.getLogger().error("Activity failed: {}", e.getMessage()); + + // Execute compensations in reverse order + Collections.reverse(compensations); + for (String compensation : compensations) { + try { + switch (compensation) { + case "CancelShipment": + String shipmentCancelResult = ctx.callActivity( + CancelShipmentActivity.class.getName(), + orderId, + String.class).await(); + ctx.getLogger().info("Shipment cancellation completed: {}", shipmentCancelResult); + break; + + case "RefundPayment": + String refundResult = ctx.callActivity( + RefundPaymentActivity.class.getName(), + orderId, + String.class).await(); + ctx.getLogger().info("Payment refund completed: {}", refundResult); + break; + + case "ReleaseInventory": + String releaseResult = ctx.callActivity( + ReleaseInventoryActivity.class.getName(), + orderId, + String.class).await(); + ctx.getLogger().info("Inventory release completed: {}", releaseResult); + break; + } + } catch (TaskFailedException ex) { + ctx.getLogger().error("Compensation activity failed: {}", ex.getMessage()); + } + } + ctx.complete("Order processing failed, compensation applied"); + } + + // Step 4: Send confirmation + ctx.callActivity(SendConfirmationActivity.class.getName(), orderId, Void.class).await(); + ctx.getLogger().info("Confirmation sent for order: {}", orderId); + + ctx.complete("Order processed successfully: " + orderId); + }; + } +} + +// Example activities +class ReserveInventoryActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + String orderId = ctx.getInput(String.class); + // Logic to reserve inventory + String reservationId = "reservation_" + orderId; + System.out.println("Reserved inventory for order: " + orderId); + return reservationId; + } +} + +class ReleaseInventoryActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + String reservationId = ctx.getInput(String.class); + // Logic to release inventory reservation + System.out.println("Released inventory reservation: " + reservationId); + return "Released: " + reservationId; + } +} + +class ProcessPaymentActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + String orderId = ctx.getInput(String.class); + // Logic to process payment + String paymentId = "payment_" + orderId; + System.out.println("Processed payment for order: " + orderId); + return paymentId; + } +} + +class RefundPaymentActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + String paymentId = ctx.getInput(String.class); + // Logic to refund payment + System.out.println("Refunded payment: " + paymentId); + return "Refunded: " + paymentId; + } +} + +class ShipOrderActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + String orderId = ctx.getInput(String.class); + // Logic to ship order + String shipmentId = "shipment_" + orderId; + System.out.println("Shipped order: " + orderId); + return shipmentId; + } +} + +class CancelShipmentActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + String shipmentId = ctx.getInput(String.class); + // Logic to cancel shipment + System.out.println("Canceled shipment: " + shipmentId); + return "Canceled: " + shipmentId; + } +} + +class SendConfirmationActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + String orderId = ctx.getInput(String.class); + // Logic to send confirmation + System.out.println("Sent confirmation for order: " + orderId); + return null; + } +} +``` + +{{% /tab %}} + +{{< /tabpane >}} + +The key benefits of using Dapr Workflow's compensation pattern include: + +- **Compensation Control**: You have full control over when and how compensation activities are executed. +- **Flexible Configuration**: You can implement custom logic for determining which compensations to run. +- **Error Handling**: Handle compensation failures according to your specific business requirements. +- **Simple Implementation**: No additional framework dependencies - just standard workflow activities and exception handling. + +The compensation pattern ensures that your distributed workflows can maintain consistency and recover gracefully from failures, making it an essential tool for building reliable microservice architectures. + ## Next steps {{< button text="Workflow architecture >>" page="workflow-architecture.md" >}}