diff --git a/dev-requirements.txt b/dev-requirements.txt index 769b1e1e..cec56fb2 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -15,4 +15,4 @@ Flask>=1.1 # needed for auto fix ruff===0.2.2 # needed for dapr-ext-workflow -durabletask-dapr >= 0.2.0a4 +durabletask-dapr >= 0.2.0a7 diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 6b936b31..f5b901d1 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -19,6 +19,72 @@ pip3 install -r requirements.txt Each of the examples in this directory can be run directly from the command line. +### Simple Workflow +This example represents a workflow that manages counters through a series of activities and child workflows. +It shows several Dapr Workflow features including: +- Basic activity execution with counter increments +- Retryable activities with configurable retry policies +- Child workflow orchestration with retry logic +- External event handling with timeouts +- Workflow state management (pause/resume) +- Activity error handling and retry backoff +- Global state tracking across workflow components +- Workflow lifecycle management (start, pause, resume, purge) + + + +```sh +dapr run --app-id wf-simple-example --dapr-grpc-port 50001 -- python3 simple.py +``` + + +The output of this example should look like this: + +``` + - "== APP == Hi Counter!" + - "== APP == New counter value is: 1!" + - "== APP == New counter value is: 11!" + - "== APP == Retry count value is: 0!" + - "== APP == Retry count value is: 1! This print statement verifies retry" + - "== APP == Appending 1 to child_orchestrator_string!" + - "== APP == Appending a to child_orchestrator_string!" + - "== APP == Appending a to child_orchestrator_string!" + - "== APP == Appending 2 to child_orchestrator_string!" + - "== APP == Appending b to child_orchestrator_string!" + - "== APP == Appending b to child_orchestrator_string!" + - "== APP == Appending 3 to child_orchestrator_string!" + - "== APP == Appending c to child_orchestrator_string!" + - "== APP == Appending c to child_orchestrator_string!" + - "== APP == Get response from hello_world_wf after pause call: SUSPENDED" + - "== APP == Get response from hello_world_wf after resume call: RUNNING" + - "== APP == New counter value is: 111!" + - "== APP == New counter value is: 1111!" + - "== APP == Workflow completed! Result: Completed" +``` + ### Task Chaining This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command: diff --git a/examples/workflow/simple.py b/examples/workflow/simple.py new file mode 100644 index 00000000..76f21eba --- /dev/null +++ b/examples/workflow/simple.py @@ -0,0 +1,170 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import timedelta +from time import sleep +from dapr.ext.workflow import ( + WorkflowRuntime, + DaprWorkflowContext, + WorkflowActivityContext, + RetryPolicy, + DaprWorkflowClient, + when_any, +) +from dapr.conf import Settings +from dapr.clients.exceptions import DaprInternalError + +settings = Settings() + +counter = 0 +retry_count = 0 +child_orchestrator_count = 0 +child_orchestrator_string = '' +child_act_retry_count = 0 +instance_id = 'exampleInstanceID' +child_instance_id = 'childInstanceID' +workflow_name = 'hello_world_wf' +child_workflow_name = 'child_wf' +input_data = 'Hi Counter!' +event_name = 'event1' +event_data = 'eventData' +non_existent_id_error = 'no such instance exists' + +retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + backoff_coefficient=2, + max_retry_interval=timedelta(seconds=10), + retry_timeout=timedelta(seconds=100), +) + +wfr = WorkflowRuntime() + + +@wfr.workflow(name='hello_world_wf') +def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + print(f'{wf_input}') + yield ctx.call_activity(hello_act, input=1) + yield ctx.call_activity(hello_act, input=10) + yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy) + + # Change in event handling: Use when_any to handle both event and timeout + event = ctx.wait_for_external_event(event_name) + timeout = ctx.create_timer(timedelta(seconds=30)) + winner = yield when_any([event, timeout]) + + if winner == timeout: + print('Workflow timed out waiting for event') + return 'Timeout' + + yield ctx.call_activity(hello_act, input=100) + yield ctx.call_activity(hello_act, input=1000) + return 'Completed' + + +@wfr.activity(name='hello_act') +def hello_act(ctx: WorkflowActivityContext, wf_input): + global counter + counter += wf_input + print(f'New counter value is: {counter}!', flush=True) + + +@wfr.activity(name='hello_retryable_act') +def hello_retryable_act(ctx: WorkflowActivityContext): + global retry_count + if (retry_count % 2) == 0: + print(f'Retry count value is: {retry_count}!', flush=True) + retry_count += 1 + raise ValueError('Retryable Error') + print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True) + retry_count += 1 + + +@wfr.workflow(name='child_retryable_wf') +def child_retryable_wf(ctx: DaprWorkflowContext): + global child_orchestrator_string, child_orchestrator_count + if not ctx.is_replaying: + child_orchestrator_count += 1 + print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True) + child_orchestrator_string += str(child_orchestrator_count) + yield ctx.call_activity( + act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy + ) + if child_orchestrator_count < 3: + raise ValueError('Retryable Error') + + +@wfr.activity(name='act_for_child_wf') +def act_for_child_wf(ctx: WorkflowActivityContext, inp): + global child_orchestrator_string, child_act_retry_count + inp_char = chr(96 + inp) + print(f'Appending {inp_char} to child_orchestrator_string!', flush=True) + child_orchestrator_string += inp_char + if child_act_retry_count % 2 == 0: + child_act_retry_count += 1 + raise ValueError('Retryable Error') + child_act_retry_count += 1 + + +def main(): + wfr.start() + wf_client = DaprWorkflowClient() + + print('==========Start Counter Increase as per Input:==========') + wf_client.schedule_new_workflow( + workflow=hello_world_wf, input=input_data, instance_id=instance_id + ) + + wf_client.wait_for_workflow_start(instance_id) + + # Sleep to let the workflow run initial activities + sleep(12) + + assert counter == 11 + assert retry_count == 2 + assert child_orchestrator_string == '1aa2bb3cc' + + # Pause Test + wf_client.pause_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) + print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}') + + # Resume Test + wf_client.resume_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) + print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}') + + sleep(2) # Give the workflow time to reach the event wait state + wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data) + + print('========= Waiting for Workflow completion', flush=True) + try: + state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + if state.runtime_status.name == 'COMPLETED': + print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"'))) + else: + print(f'Workflow failed! Status: {state.runtime_status.name}') + except TimeoutError: + print('*** Workflow timed out!') + + wf_client.purge_workflow(instance_id=instance_id) + try: + wf_client.get_workflow_state(instance_id=instance_id) + except DaprInternalError as err: + if non_existent_id_error in err._message: + print('Instance Successfully Purged') + + wfr.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 6bb801dc..cc384503 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -122,7 +122,7 @@ def get_workflow_state( """Fetches runtime state for the specified workflow instance. Args: - instanceId: The unique ID of the workflow instance to fetch. + instance_id: The unique ID of the workflow instance to fetch. fetch_payloads: If true, fetches the input, output payloads and custom status for the workflow instance. Defaults to true. @@ -144,7 +144,7 @@ def get_workflow_state( raise def wait_for_workflow_start( - self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60 + self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 0 ) -> Optional[WorkflowState]: """Waits for a workflow to start running and returns a WorkflowState object that contains metadata about the started workflow. @@ -158,7 +158,7 @@ def wait_for_workflow_start( fetch_payloads: If true, fetches the input, output payloads and custom status for the workflow instance. Defaults to false. timeout_in_seconds: The maximum time to wait for the workflow instance to start running. - Defaults to 60 seconds. + Defaults to meaning no timeout. Returns: WorkflowState record that describes the workflow instance and its execution status. @@ -170,7 +170,7 @@ def wait_for_workflow_start( return WorkflowState(state) if state else None def wait_for_workflow_completion( - self, instance_id: str, *, fetch_payloads: bool = True, timeout_in_seconds: int = 60 + self, instance_id: str, *, fetch_payloads: bool = True, timeout_in_seconds: int = 0 ) -> Optional[WorkflowState]: """Waits for a workflow to complete and returns a WorkflowState object that contains metadata about the started instance. @@ -192,7 +192,7 @@ def wait_for_workflow_completion( fetch_payloads: If true, fetches the input, output payloads and custom status for the workflow instance. Defaults to true. timeout_in_seconds: The maximum time in seconds to wait for the workflow instance to - complete. Defaults to 60 seconds. + complete. Defaults to 0 seconds, meaning no timeout. Returns: WorkflowState record that describes the workflow instance and its execution status. @@ -222,8 +222,8 @@ def raise_workflow_event( discarded. Args: - instanceId: The ID of the workflow instance that will handle the event. - eventName: The name of the event. Event names are case-insensitive. + instance_id: The ID of the workflow instance that will handle the event. + event_name: The name of the event. Event names are case-insensitive. data: The serializable data payload to include with the event. """ return self.__obj.raise_orchestration_event(instance_id, event_name, data=data) diff --git a/ext/dapr-ext-workflow/setup.cfg b/ext/dapr-ext-workflow/setup.cfg index a7325511..b6bbc1a7 100644 --- a/ext/dapr-ext-workflow/setup.cfg +++ b/ext/dapr-ext-workflow/setup.cfg @@ -25,7 +25,7 @@ packages = find_namespace: include_package_data = True install_requires = dapr >= 1.15.0rc2 - durabletask-dapr >= 0.2.0a4 + durabletask-dapr >= 0.2.0a7 [options.packages.find] include =