diff --git a/daprdocs/content/en/python-sdk-docs/python-client.md b/daprdocs/content/en/python-sdk-docs/python-client.md index fc6ef496..46358437 100644 --- a/daprdocs/content/en/python-sdk-docs/python-client.md +++ b/daprdocs/content/en/python-sdk-docs/python-client.md @@ -515,89 +515,5 @@ def main(): - For a full list of state operations visit [How-To: Use the cryptography APIs]({{< ref howto-cryptography.md >}}). - Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/master/examples/crypto) for code samples and instructions to try out cryptography -### Workflow - -```python -from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext -from dapr.clients import DaprClient - -instanceId = "exampleInstanceID" -workflowComponent = "dapr" -workflowName = "hello_world_wf" -eventName = "event1" -eventData = "eventData" - -def main(): - with DaprClient() as d: - host = settings.DAPR_RUNTIME_HOST - port = settings.DAPR_GRPC_PORT - workflowRuntime = WorkflowRuntime(host, port) - workflowRuntime = WorkflowRuntime() - workflowRuntime.register_workflow(hello_world_wf) - workflowRuntime.register_activity(hello_act) - workflowRuntime.start() - - # Start the workflow - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) - print(f"start_resp {start_resp.instance_id}") - - # ... - - # Pause Test - d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}") - - # Resume Test - d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}") - - sleep(1) - # Raise event - d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent, - event_name=eventName, event_data=eventData) - - sleep(5) - # Purge Test - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) - try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - except DaprInternalError as err: - if nonExistentIDError in err._message: - print("Instance Successfully Purged") - - - # Kick off another workflow for termination purposes - # This will also test using the same instance ID on a new workflow after - # the old instance was purged - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) - print(f"start_resp {start_resp.instance_id}") - - # Terminate Test - d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent) - sleep(1) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}") - - # Purge Test - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) - try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - except DaprInternalError as err: - if nonExistentIDError in err._message: - print("Instance Successfully Purged") - - workflowRuntime.shutdown() -``` - -- Learn more about authoring and managing workflows: - - [How-To: Author a workflow]({{< ref howto-author-workflow.md >}}). - - [How-To: Manage a workflow]({{< ref howto-manage-workflow.md >}}). -- Visit [Python SDK examples](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py) for code samples and instructions to try out Dapr Workflow. - - ## Related links [Python SDK examples](https://github.com/dapr/python-sdk/tree/master/examples) diff --git a/daprdocs/content/en/python-sdk-docs/python-sdk-extensions/python-workflow-ext/_index.md b/daprdocs/content/en/python-sdk-docs/python-sdk-extensions/python-workflow-ext/_index.md index e5fe5ce8..ed75a1ee 100644 --- a/daprdocs/content/en/python-sdk-docs/python-sdk-extensions/python-workflow-ext/_index.md +++ b/daprdocs/content/en/python-sdk-docs/python-sdk-extensions/python-workflow-ext/_index.md @@ -7,11 +7,7 @@ description: How to get up and running with the Dapr Workflow extension no_list: true --- -{{% alert title="Note" color="primary" %}} -Dapr Workflow is currently in alpha. -{{% /alert %}} - -The Dapr Python SDK provides a built in Dapr Workflow extension, `dapr.ext.workflow`, for creating Dapr services. +The Dapr Python SDK provides a built-in Dapr Workflow extension, `dapr.ext.workflow`, for creating Dapr services. ## Installation @@ -31,12 +27,79 @@ The development package will contain features and behavior that will be compatib {{% /alert %}} ```bash -pip3 install dapr-ext-workflow-dev +pip install dapr-ext-workflow-dev ``` {{% /codetab %}} {{< /tabs >}} +## Example + +```python +from time import sleep + +import dapr.ext.workflow as wf + + +wfr = wf.WorkflowRuntime() + + +@wfr.workflow(name='random_workflow') +def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): + try: + result1 = yield ctx.call_activity(step1, input=wf_input) + result2 = yield ctx.call_activity(step2, input=result1) + except Exception as e: + yield ctx.call_activity(error_handler, input=str(e)) + raise + return [result1, result2] + + +@wfr.activity(name='step1') +def step1(ctx, activity_input): + print(f'Step 1: Received input: {activity_input}.') + # Do some work + return activity_input + 1 + + +@wfr.activity +def step2(ctx, activity_input): + print(f'Step 2: Received input: {activity_input}.') + # Do some work + return activity_input * 2 + +@wfr.activity +def error_handler(ctx, error): + print(f'Executing error handler: {error}.') + # Do some compensating work + + +if __name__ == '__main__': + wfr.start() + sleep(10) # wait for workflow runtime to start + + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42) + print(f'Workflow started. Instance ID: {instance_id}') + state = wf_client.wait_for_workflow_completion(instance_id) + print(f'Workflow completed! Status: {state.runtime_status}') + + wfr.shutdown() +``` + +- Learn more about authoring and managing workflows: + - [How-To: Author a workflow]({{< ref howto-author-workflow.md >}}). + - [How-To: Manage a workflow]({{< ref howto-manage-workflow.md >}}). + - +- Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/main/examples/workflow) for code samples and instructions to try out Dapr Workflow: + - [Simple workflow example]({{< ref python-workflow.md >}}) + - [Task chaining example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py) + - [Fan-out/Fan-in example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/fan_out_fan_in.py) + - [Child workflow example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/child_workflow.py) + - [Human approval example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/human_approval.py) + - [Monitor example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/monitor.py) + + ## Next steps {{< button text="Getting started with the Dapr Workflow Python SDK" page="python-workflow.md" >}} diff --git a/daprdocs/content/en/python-sdk-docs/python-sdk-extensions/python-workflow-ext/python-workflow.md b/daprdocs/content/en/python-sdk-docs/python-sdk-extensions/python-workflow-ext/python-workflow.md index b241c9c1..6e8011f3 100644 --- a/daprdocs/content/en/python-sdk-docs/python-sdk-extensions/python-workflow-ext/python-workflow.md +++ b/daprdocs/content/en/python-sdk-docs/python-sdk-extensions/python-workflow-ext/python-workflow.md @@ -6,14 +6,15 @@ weight: 30000 description: How to get up and running with workflows using the Dapr Python SDK --- -Let’s create a Dapr workflow and invoke it using the console. With the [provided hello world workflow example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow), you will: +Let’s create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/python-sdk/tree/main/examples/workflow/simple.py), you will: -- Run a [Python console application using `DaprClient`](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py) -- Utilize the Python workflow SDK and API calls to start, pause, resume, terminate, and purge workflow instances +- Run a [Python console application](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py) that demonstrates workflow orchestration with activities, child workflows, and external events +- Learn how to handle retries, timeouts, and workflow state management +- Use the Python workflow SDK to start, pause, resume, and purge workflow instances This example uses the default configuration from `dapr init` in [self-hosted mode](https://github.com/dapr/cli#install-dapr-on-your-local-machine-self-hosted). -In the Python example project, the `app.py` file contains the setup of the app, including: +In the Python example project, the `simple.py` file contains the setup of the app, including: - The workflow definition - The workflow activity definitions - The registration of the workflow and workflow activities @@ -27,22 +28,22 @@ In the Python example project, the `app.py` file contains the setup of the app, ## Set up the environment -Run the following command to install the requirements for running this workflow sample with the Dapr Python SDK. +Start by cloning the [Python SDK repo]. ```bash -pip3 install -r demo_workflow/requirements.txt +git clone https://github.com/dapr/python-sdk.git ``` -Clone the [Python SDK repo]. +From the Python SDK root directory, navigate to the Dapr Workflow example. ```bash -git clone https://github.com/dapr/python-sdk.git +cd examples/workflow ``` -From the Python SDK root directory, navigate to the Dapr Workflow example. +Run the following command to install the requirements for running this workflow sample with the Dapr Python SDK. ```bash -cd examples/demo_workflow +pip3 install -r workflow/requirements.txt ``` ## Run the application locally @@ -50,133 +51,116 @@ cd examples/demo_workflow To run the Dapr application, you need to start the Python program and a Dapr sidecar. In the terminal, run: ```bash -dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resources-path components --placement-host-address localhost:50005 -- python3 app.py +dapr run --app-id wf-simple-example --dapr-grpc-port 50001 --resources-path components -- python3 simple.py ``` -> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`. +> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python simple.py` instead of `python3 simple.py`. **Expected output** ``` -== APP == ==========Start Counter Increase as per Input:========== - -== APP == start_resp exampleInstanceID - -== APP == Hi Counter! -== APP == New counter value is: 1! - -== APP == Hi Counter! -== APP == New counter value is: 11! - -== APP == Hi Counter! -== APP == Hi Counter! -== APP == Get response from hello_world_wf after pause call: Suspended - -== APP == Hi Counter! -== APP == Get response from hello_world_wf after resume call: Running - -== APP == Hi Counter! -== APP == New counter value is: 111! - -== APP == Hi Counter! -== APP == Instance Successfully Purged - -== APP == start_resp exampleInstanceID - -== APP == Hi Counter! -== APP == New counter value is: 1112! - -== APP == Hi Counter! -== APP == New counter value is: 1122! - -== APP == Get response from hello_world_wf after terminate call: Terminated -== APP == Get response from child_wf after terminate call: Terminated -== APP == Instance Successfully Purged +- "== APP == Hi Counter!" +- "== APP == New counter value is: 1!" +- "== APP == New counter value is: 11!" +- "== APP == Retry count value is: 0!" +- "== APP == Retry count value is: 1! This print statement verifies retry" +- "== APP == Appending 1 to child_orchestrator_string!" +- "== APP == Appending a to child_orchestrator_string!" +- "== APP == Appending a to child_orchestrator_string!" +- "== APP == Appending 2 to child_orchestrator_string!" +- "== APP == Appending b to child_orchestrator_string!" +- "== APP == Appending b to child_orchestrator_string!" +- "== APP == Appending 3 to child_orchestrator_string!" +- "== APP == Appending c to child_orchestrator_string!" +- "== APP == Appending c to child_orchestrator_string!" +- "== APP == Get response from hello_world_wf after pause call: Suspended" +- "== APP == Get response from hello_world_wf after resume call: Running" +- "== APP == New counter value is: 111!" +- "== APP == New counter value is: 1111!" +- "== APP == Workflow completed! Result: "Completed" ``` ## What happened? -When you ran `dapr run`, the Dapr client: -1. Registered the workflow (`hello_world_wf`) and its actvity (`hello_act`) -1. Started the workflow engine - -```python -def main(): - with DaprClient() as d: - host = settings.DAPR_RUNTIME_HOST - port = settings.DAPR_GRPC_PORT - workflowRuntime = WorkflowRuntime(host, port) - workflowRuntime = WorkflowRuntime() - workflowRuntime.register_workflow(hello_world_wf) - workflowRuntime.register_activity(hello_act) - workflowRuntime.start() - - print("==========Start Counter Increase as per Input:==========") - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) - print(f"start_resp {start_resp.instance_id}") -``` - -Dapr then paused and resumed the workflow: - -```python - # Pause - d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}") - - # Resume - d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}") -``` - -Once the workflow resumed, Dapr raised a workflow event and printed the new counter value: - -```python - # Raise event - d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent, - event_name=eventName, event_data=eventData) -``` - -To clear out the workflow state from your state store, Dapr purged the workflow: - -```python - # Purge - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) - try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - except DaprInternalError as err: - if nonExistentIDError in err._message: - print("Instance Successfully Purged") -``` - -The sample then demonstrated terminating a workflow by: -- Starting a new workflow using the same `instanceId` as the purged workflow. -- Terminating the workflow and purging before shutting down the workflow. - -```python - # Kick off another workflow - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) - print(f"start_resp {start_resp.instance_id}") - - # Terminate - d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent) - sleep(1) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}") - - # Purge - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) - try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - except DaprInternalError as err: - if nonExistentIDError in err._message: - print("Instance Successfully Purged") -``` - +When you run the application, several key workflow features are shown: + +1. **Workflow and Activity Registration**: The application uses Python decorators to automatically register workflows and activities with the runtime. This decorator-based approach provides a clean, declarative way to define your workflow components: + ```python + @wfr.workflow(name='hello_world_wf') + def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + # Workflow definition... + + @wfr.activity(name='hello_act') + def hello_act(ctx: WorkflowActivityContext, wf_input): + # Activity definition... + ``` + +2. **Runtime Setup**: The application initializes the workflow runtime and client: + ```python + wfr = WorkflowRuntime() + wfr.start() + wf_client = DaprWorkflowClient() + ``` + +2. **Activity Execution**: The workflow executes a series of activities that increment a counter: + ```python + @wfr.workflow(name='hello_world_wf') + def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + yield ctx.call_activity(hello_act, input=1) + yield ctx.call_activity(hello_act, input=10) + ``` + +3. **Retry Logic**: The workflow demonstrates error handling with a retry policy: + ```python + retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + backoff_coefficient=2, + max_retry_interval=timedelta(seconds=10), + retry_timeout=timedelta(seconds=100), + ) + yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + ``` + +4. **Child Workflow**: A child workflow is executed with its own retry policy: + ```python + yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy) + ``` + +5. **External Event Handling**: The workflow waits for an external event with a timeout: + ```python + event = ctx.wait_for_external_event(event_name) + timeout = ctx.create_timer(timedelta(seconds=30)) + winner = yield when_any([event, timeout]) + ``` + +6. **Workflow Lifecycle Management**: The example demonstrates how to pause and resume the workflow: + ```python + wf_client.pause_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) + # ... check status ... + wf_client.resume_workflow(instance_id=instance_id) + ``` + +7. **Event Raising**: After resuming, the workflow raises an event: + ```python + wf_client.raise_workflow_event( + instance_id=instance_id, + event_name=event_name, + data=event_data + ) + ``` + +8. **Completion and Cleanup**: Finally, the workflow waits for completion and cleans up: + ```python + state = wf_client.wait_for_workflow_completion( + instance_id, + timeout_in_seconds=30 + ) + wf_client.purge_workflow(instance_id=instance_id) + ``` ## Next steps - [Learn more about Dapr workflow]({{< ref workflow-overview.md >}}) - [Workflow API reference]({{< ref workflow_api.md >}}) +- [Try implementing more complex workflow patterns](https://github.com/dapr/python-sdk/tree/main/examples/workflow)