Skip to content

Commit c053172

Browse files
Workflow fixes and improvements (dapr#784)
* Converts demo_workflow example to DaprWorkflowClient and removes default timeout of 60seconds on `wait_for_workflow_start` and `wait_for_workflow_completion` Signed-off-by: Elena Kolevska <[email protected]> * Bumps durable task library. Signed-off-by: Elena Kolevska <[email protected]> --------- Signed-off-by: Elena Kolevska <[email protected]> # Conflicts: # ext/dapr-ext-workflow/setup.cfg
1 parent 38a6bbf commit c053172

File tree

5 files changed

+245
-9
lines changed

5 files changed

+245
-9
lines changed

dev-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ Flask>=1.1
1515
# needed for auto fix
1616
ruff===0.2.2
1717
# needed for dapr-ext-workflow
18-
durabletask-dapr >= 0.2.0a4
18+
durabletask-dapr >= 0.2.0a7

examples/workflow/README.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,72 @@ pip3 install -r requirements.txt
1919

2020
Each of the examples in this directory can be run directly from the command line.
2121

22+
### Simple Workflow
23+
This example represents a workflow that manages counters through a series of activities and child workflows.
24+
It shows several Dapr Workflow features including:
25+
- Basic activity execution with counter increments
26+
- Retryable activities with configurable retry policies
27+
- Child workflow orchestration with retry logic
28+
- External event handling with timeouts
29+
- Workflow state management (pause/resume)
30+
- Activity error handling and retry backoff
31+
- Global state tracking across workflow components
32+
- Workflow lifecycle management (start, pause, resume, purge)
33+
34+
<!--STEP
35+
name: Run the simple workflow example
36+
expected_stdout_lines:
37+
- "== APP == Hi Counter!"
38+
- "== APP == New counter value is: 1!"
39+
- "== APP == New counter value is: 11!"
40+
- "== APP == Retry count value is: 0!"
41+
- "== APP == Retry count value is: 1! This print statement verifies retry"
42+
- "== APP == Appending 1 to child_orchestrator_string!"
43+
- "== APP == Appending a to child_orchestrator_string!"
44+
- "== APP == Appending a to child_orchestrator_string!"
45+
- "== APP == Appending 2 to child_orchestrator_string!"
46+
- "== APP == Appending b to child_orchestrator_string!"
47+
- "== APP == Appending b to child_orchestrator_string!"
48+
- "== APP == Appending 3 to child_orchestrator_string!"
49+
- "== APP == Appending c to child_orchestrator_string!"
50+
- "== APP == Appending c to child_orchestrator_string!"
51+
- "== APP == Get response from hello_world_wf after pause call: SUSPENDED"
52+
- "== APP == Get response from hello_world_wf after resume call: RUNNING"
53+
- "== APP == New counter value is: 111!"
54+
- "== APP == New counter value is: 1111!"
55+
- "== APP == Workflow completed! Result: Completed"
56+
timeout_seconds: 30
57+
-->
58+
59+
```sh
60+
dapr run --app-id wf-simple-example --dapr-grpc-port 50001 -- python3 simple.py
61+
```
62+
<!--END_STEP-->
63+
64+
The output of this example should look like this:
65+
66+
```
67+
- "== APP == Hi Counter!"
68+
- "== APP == New counter value is: 1!"
69+
- "== APP == New counter value is: 11!"
70+
- "== APP == Retry count value is: 0!"
71+
- "== APP == Retry count value is: 1! This print statement verifies retry"
72+
- "== APP == Appending 1 to child_orchestrator_string!"
73+
- "== APP == Appending a to child_orchestrator_string!"
74+
- "== APP == Appending a to child_orchestrator_string!"
75+
- "== APP == Appending 2 to child_orchestrator_string!"
76+
- "== APP == Appending b to child_orchestrator_string!"
77+
- "== APP == Appending b to child_orchestrator_string!"
78+
- "== APP == Appending 3 to child_orchestrator_string!"
79+
- "== APP == Appending c to child_orchestrator_string!"
80+
- "== APP == Appending c to child_orchestrator_string!"
81+
- "== APP == Get response from hello_world_wf after pause call: SUSPENDED"
82+
- "== APP == Get response from hello_world_wf after resume call: RUNNING"
83+
- "== APP == New counter value is: 111!"
84+
- "== APP == New counter value is: 1111!"
85+
- "== APP == Workflow completed! Result: Completed"
86+
```
87+
2288
### Task Chaining
2389

2490
This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command:

examples/workflow/simple.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
from datetime import timedelta
13+
from time import sleep
14+
from dapr.ext.workflow import (
15+
WorkflowRuntime,
16+
DaprWorkflowContext,
17+
WorkflowActivityContext,
18+
RetryPolicy,
19+
DaprWorkflowClient,
20+
when_any,
21+
)
22+
from dapr.conf import Settings
23+
from dapr.clients.exceptions import DaprInternalError
24+
25+
settings = Settings()
26+
27+
counter = 0
28+
retry_count = 0
29+
child_orchestrator_count = 0
30+
child_orchestrator_string = ''
31+
child_act_retry_count = 0
32+
instance_id = 'exampleInstanceID'
33+
child_instance_id = 'childInstanceID'
34+
workflow_name = 'hello_world_wf'
35+
child_workflow_name = 'child_wf'
36+
input_data = 'Hi Counter!'
37+
event_name = 'event1'
38+
event_data = 'eventData'
39+
non_existent_id_error = 'no such instance exists'
40+
41+
retry_policy = RetryPolicy(
42+
first_retry_interval=timedelta(seconds=1),
43+
max_number_of_attempts=3,
44+
backoff_coefficient=2,
45+
max_retry_interval=timedelta(seconds=10),
46+
retry_timeout=timedelta(seconds=100),
47+
)
48+
49+
wfr = WorkflowRuntime()
50+
51+
52+
@wfr.workflow(name='hello_world_wf')
53+
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
54+
print(f'{wf_input}')
55+
yield ctx.call_activity(hello_act, input=1)
56+
yield ctx.call_activity(hello_act, input=10)
57+
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
58+
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
59+
60+
# Change in event handling: Use when_any to handle both event and timeout
61+
event = ctx.wait_for_external_event(event_name)
62+
timeout = ctx.create_timer(timedelta(seconds=30))
63+
winner = yield when_any([event, timeout])
64+
65+
if winner == timeout:
66+
print('Workflow timed out waiting for event')
67+
return 'Timeout'
68+
69+
yield ctx.call_activity(hello_act, input=100)
70+
yield ctx.call_activity(hello_act, input=1000)
71+
return 'Completed'
72+
73+
74+
@wfr.activity(name='hello_act')
75+
def hello_act(ctx: WorkflowActivityContext, wf_input):
76+
global counter
77+
counter += wf_input
78+
print(f'New counter value is: {counter}!', flush=True)
79+
80+
81+
@wfr.activity(name='hello_retryable_act')
82+
def hello_retryable_act(ctx: WorkflowActivityContext):
83+
global retry_count
84+
if (retry_count % 2) == 0:
85+
print(f'Retry count value is: {retry_count}!', flush=True)
86+
retry_count += 1
87+
raise ValueError('Retryable Error')
88+
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
89+
retry_count += 1
90+
91+
92+
@wfr.workflow(name='child_retryable_wf')
93+
def child_retryable_wf(ctx: DaprWorkflowContext):
94+
global child_orchestrator_string, child_orchestrator_count
95+
if not ctx.is_replaying:
96+
child_orchestrator_count += 1
97+
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
98+
child_orchestrator_string += str(child_orchestrator_count)
99+
yield ctx.call_activity(
100+
act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
101+
)
102+
if child_orchestrator_count < 3:
103+
raise ValueError('Retryable Error')
104+
105+
106+
@wfr.activity(name='act_for_child_wf')
107+
def act_for_child_wf(ctx: WorkflowActivityContext, inp):
108+
global child_orchestrator_string, child_act_retry_count
109+
inp_char = chr(96 + inp)
110+
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
111+
child_orchestrator_string += inp_char
112+
if child_act_retry_count % 2 == 0:
113+
child_act_retry_count += 1
114+
raise ValueError('Retryable Error')
115+
child_act_retry_count += 1
116+
117+
118+
def main():
119+
wfr.start()
120+
wf_client = DaprWorkflowClient()
121+
122+
print('==========Start Counter Increase as per Input:==========')
123+
wf_client.schedule_new_workflow(
124+
workflow=hello_world_wf, input=input_data, instance_id=instance_id
125+
)
126+
127+
wf_client.wait_for_workflow_start(instance_id)
128+
129+
# Sleep to let the workflow run initial activities
130+
sleep(12)
131+
132+
assert counter == 11
133+
assert retry_count == 2
134+
assert child_orchestrator_string == '1aa2bb3cc'
135+
136+
# Pause Test
137+
wf_client.pause_workflow(instance_id=instance_id)
138+
metadata = wf_client.get_workflow_state(instance_id=instance_id)
139+
print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')
140+
141+
# Resume Test
142+
wf_client.resume_workflow(instance_id=instance_id)
143+
metadata = wf_client.get_workflow_state(instance_id=instance_id)
144+
print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')
145+
146+
sleep(2) # Give the workflow time to reach the event wait state
147+
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)
148+
149+
print('========= Waiting for Workflow completion', flush=True)
150+
try:
151+
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
152+
if state.runtime_status.name == 'COMPLETED':
153+
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
154+
else:
155+
print(f'Workflow failed! Status: {state.runtime_status.name}')
156+
except TimeoutError:
157+
print('*** Workflow timed out!')
158+
159+
wf_client.purge_workflow(instance_id=instance_id)
160+
try:
161+
wf_client.get_workflow_state(instance_id=instance_id)
162+
except DaprInternalError as err:
163+
if non_existent_id_error in err._message:
164+
print('Instance Successfully Purged')
165+
166+
wfr.shutdown()
167+
168+
169+
if __name__ == '__main__':
170+
main()

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def get_workflow_state(
122122
"""Fetches runtime state for the specified workflow instance.
123123
124124
Args:
125-
instanceId: The unique ID of the workflow instance to fetch.
125+
instance_id: The unique ID of the workflow instance to fetch.
126126
fetch_payloads: If true, fetches the input, output payloads and custom status
127127
for the workflow instance. Defaults to true.
128128
@@ -144,7 +144,7 @@ def get_workflow_state(
144144
raise
145145

146146
def wait_for_workflow_start(
147-
self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60
147+
self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 0
148148
) -> Optional[WorkflowState]:
149149
"""Waits for a workflow to start running and returns a WorkflowState object that contains
150150
metadata about the started workflow.
@@ -158,7 +158,7 @@ def wait_for_workflow_start(
158158
fetch_payloads: If true, fetches the input, output payloads and custom status for
159159
the workflow instance. Defaults to false.
160160
timeout_in_seconds: The maximum time to wait for the workflow instance to start running.
161-
Defaults to 60 seconds.
161+
Defaults to meaning no timeout.
162162
163163
Returns:
164164
WorkflowState record that describes the workflow instance and its execution status.
@@ -170,7 +170,7 @@ def wait_for_workflow_start(
170170
return WorkflowState(state) if state else None
171171

172172
def wait_for_workflow_completion(
173-
self, instance_id: str, *, fetch_payloads: bool = True, timeout_in_seconds: int = 60
173+
self, instance_id: str, *, fetch_payloads: bool = True, timeout_in_seconds: int = 0
174174
) -> Optional[WorkflowState]:
175175
"""Waits for a workflow to complete and returns a WorkflowState object that contains
176176
metadata about the started instance.
@@ -192,7 +192,7 @@ def wait_for_workflow_completion(
192192
fetch_payloads: If true, fetches the input, output payloads and custom status
193193
for the workflow instance. Defaults to true.
194194
timeout_in_seconds: The maximum time in seconds to wait for the workflow instance to
195-
complete. Defaults to 60 seconds.
195+
complete. Defaults to 0 seconds, meaning no timeout.
196196
197197
Returns:
198198
WorkflowState record that describes the workflow instance and its execution status.
@@ -222,8 +222,8 @@ def raise_workflow_event(
222222
discarded.
223223
224224
Args:
225-
instanceId: The ID of the workflow instance that will handle the event.
226-
eventName: The name of the event. Event names are case-insensitive.
225+
instance_id: The ID of the workflow instance that will handle the event.
226+
event_name: The name of the event. Event names are case-insensitive.
227227
data: The serializable data payload to include with the event.
228228
"""
229229
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)

ext/dapr-ext-workflow/setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ packages = find_namespace:
2525
include_package_data = True
2626
install_requires =
2727
dapr-dev >= 1.13.0rc1.dev
28-
durabletask-dapr >= 0.2.0a4
28+
durabletask-dapr >= 0.2.0a7
2929

3030
[options.packages.find]
3131
include =

0 commit comments

Comments
 (0)