|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +# Copyright 2025 The Dapr Authors |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 7 | +# Unless required by applicable law or agreed to in writing, software |
| 8 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 9 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 10 | +# See the License for the specific language governing permissions and |
| 11 | +# limitations under the License. |
| 12 | +import asyncio |
| 13 | +from datetime import timedelta |
| 14 | + |
| 15 | +from dapr.ext.workflow import ( |
| 16 | + DaprWorkflowContext, |
| 17 | + RetryPolicy, |
| 18 | + WorkflowActivityContext, |
| 19 | + WorkflowRuntime, |
| 20 | + when_any, |
| 21 | +) |
| 22 | +from dapr.ext.workflow.aio import DaprWorkflowClient |
| 23 | + |
| 24 | +from dapr.clients.exceptions import DaprInternalError |
| 25 | +from dapr.conf import Settings |
| 26 | + |
| 27 | +settings = Settings() |
| 28 | + |
| 29 | +counter = 0 |
| 30 | +retry_count = 0 |
| 31 | +child_orchestrator_count = 0 |
| 32 | +child_orchestrator_string = '' |
| 33 | +child_act_retry_count = 0 |
| 34 | +instance_id = 'exampleInstanceID' |
| 35 | +child_instance_id = 'childInstanceID' |
| 36 | +workflow_name = 'hello_world_wf' |
| 37 | +child_workflow_name = 'child_wf' |
| 38 | +input_data = 'Hi Counter!' |
| 39 | +event_name = 'event1' |
| 40 | +event_data = 'eventData' |
| 41 | +non_existent_id_error = 'no such instance exists' |
| 42 | + |
| 43 | +retry_policy = RetryPolicy( |
| 44 | + first_retry_interval=timedelta(seconds=1), |
| 45 | + max_number_of_attempts=3, |
| 46 | + backoff_coefficient=2, |
| 47 | + max_retry_interval=timedelta(seconds=10), |
| 48 | + retry_timeout=timedelta(seconds=100), |
| 49 | +) |
| 50 | + |
| 51 | +wfr = WorkflowRuntime() |
| 52 | + |
| 53 | + |
| 54 | +@wfr.workflow(name='hello_world_wf') |
| 55 | +def hello_world_wf(ctx: DaprWorkflowContext, wf_input): |
| 56 | + print(f'{wf_input}') |
| 57 | + yield ctx.call_activity(hello_act, input=1) |
| 58 | + yield ctx.call_activity(hello_act, input=10) |
| 59 | + yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) |
| 60 | + yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy) |
| 61 | + |
| 62 | + # Change in event handling: Use when_any to handle both event and timeout |
| 63 | + event = ctx.wait_for_external_event(event_name) |
| 64 | + timeout = ctx.create_timer(timedelta(seconds=30)) |
| 65 | + winner = yield when_any([event, timeout]) |
| 66 | + |
| 67 | + if winner == timeout: |
| 68 | + print('Workflow timed out waiting for event') |
| 69 | + return 'Timeout' |
| 70 | + |
| 71 | + yield ctx.call_activity(hello_act, input=100) |
| 72 | + yield ctx.call_activity(hello_act, input=1000) |
| 73 | + return 'Completed' |
| 74 | + |
| 75 | + |
| 76 | +@wfr.activity(name='hello_act') |
| 77 | +def hello_act(ctx: WorkflowActivityContext, wf_input): |
| 78 | + global counter |
| 79 | + counter += wf_input |
| 80 | + print(f'New counter value is: {counter}!', flush=True) |
| 81 | + |
| 82 | + |
| 83 | +@wfr.activity(name='hello_retryable_act') |
| 84 | +def hello_retryable_act(ctx: WorkflowActivityContext): |
| 85 | + global retry_count |
| 86 | + if (retry_count % 2) == 0: |
| 87 | + print(f'Retry count value is: {retry_count}!', flush=True) |
| 88 | + retry_count += 1 |
| 89 | + raise ValueError('Retryable Error') |
| 90 | + print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True) |
| 91 | + retry_count += 1 |
| 92 | + |
| 93 | + |
| 94 | +@wfr.workflow(name='child_retryable_wf') |
| 95 | +def child_retryable_wf(ctx: DaprWorkflowContext): |
| 96 | + global child_orchestrator_string, child_orchestrator_count |
| 97 | + if not ctx.is_replaying: |
| 98 | + child_orchestrator_count += 1 |
| 99 | + print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True) |
| 100 | + child_orchestrator_string += str(child_orchestrator_count) |
| 101 | + yield ctx.call_activity( |
| 102 | + act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy |
| 103 | + ) |
| 104 | + if child_orchestrator_count < 3: |
| 105 | + raise ValueError('Retryable Error') |
| 106 | + |
| 107 | + |
| 108 | +@wfr.activity(name='act_for_child_wf') |
| 109 | +def act_for_child_wf(ctx: WorkflowActivityContext, inp): |
| 110 | + global child_orchestrator_string, child_act_retry_count |
| 111 | + inp_char = chr(96 + inp) |
| 112 | + print(f'Appending {inp_char} to child_orchestrator_string!', flush=True) |
| 113 | + child_orchestrator_string += inp_char |
| 114 | + if child_act_retry_count % 2 == 0: |
| 115 | + child_act_retry_count += 1 |
| 116 | + raise ValueError('Retryable Error') |
| 117 | + child_act_retry_count += 1 |
| 118 | + |
| 119 | + |
| 120 | +async def main(): |
| 121 | + wfr.start() |
| 122 | + wf_client = DaprWorkflowClient() |
| 123 | + |
| 124 | + try: |
| 125 | + print('==========Start Counter Increase as per Input:==========') |
| 126 | + await wf_client.schedule_new_workflow( |
| 127 | + workflow=hello_world_wf, input=input_data, instance_id=instance_id |
| 128 | + ) |
| 129 | + |
| 130 | + await wf_client.wait_for_workflow_start(instance_id) |
| 131 | + |
| 132 | + # Sleep to let the workflow run initial activities |
| 133 | + await asyncio.sleep(12) |
| 134 | + |
| 135 | + assert counter == 11 |
| 136 | + assert retry_count == 2 |
| 137 | + assert child_orchestrator_string == '1aa2bb3cc' |
| 138 | + |
| 139 | + # Pause Test |
| 140 | + await wf_client.pause_workflow(instance_id=instance_id) |
| 141 | + metadata = await wf_client.get_workflow_state(instance_id=instance_id) |
| 142 | + print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}') |
| 143 | + |
| 144 | + # Resume Test |
| 145 | + await wf_client.resume_workflow(instance_id=instance_id) |
| 146 | + metadata = await wf_client.get_workflow_state(instance_id=instance_id) |
| 147 | + print( |
| 148 | + f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}' |
| 149 | + ) |
| 150 | + |
| 151 | + await asyncio.sleep(2) # Give the workflow time to reach the event wait state |
| 152 | + await wf_client.raise_workflow_event( |
| 153 | + instance_id=instance_id, event_name=event_name, data=event_data |
| 154 | + ) |
| 155 | + |
| 156 | + print('========= Waiting for Workflow completion', flush=True) |
| 157 | + try: |
| 158 | + state = await wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) |
| 159 | + if state.runtime_status.name == 'COMPLETED': |
| 160 | + print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"'))) |
| 161 | + else: |
| 162 | + print(f'Workflow failed! Status: {state.runtime_status.name}') |
| 163 | + except TimeoutError: |
| 164 | + print('*** Workflow timed out!') |
| 165 | + |
| 166 | + await wf_client.purge_workflow(instance_id=instance_id) |
| 167 | + try: |
| 168 | + await wf_client.get_workflow_state(instance_id=instance_id) |
| 169 | + except DaprInternalError as err: |
| 170 | + if non_existent_id_error in err._message: |
| 171 | + print('Instance Successfully Purged') |
| 172 | + finally: |
| 173 | + wfr.shutdown() |
| 174 | + |
| 175 | + |
| 176 | +if __name__ == '__main__': |
| 177 | + asyncio.run(main()) |
0 commit comments