Skip to content

Commit b01cdd7

Browse files
committed
updates, test examples, cleanup
Signed-off-by: Filinto Duran <[email protected]>
1 parent 76473a9 commit b01cdd7

22 files changed

+316
-915
lines changed

examples/workflow-async/README.md

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,36 @@
33
These examples mirror `examples/workflow/` but author orchestrators with `async def` using the
44
async workflow APIs. Activities remain regular functions unless noted.
55

6+
## Prerequisites
7+
8+
- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
9+
- [Install Python 3.10+](https://www.python.org/downloads/)
10+
11+
612
How to run:
7-
- Ensure a Dapr sidecar is running locally. If needed, set `DURABLETASK_GRPC_ENDPOINT`, or
8-
`DURABLETASK_GRPC_HOST/PORT`.
9-
- Install requirements: `pip install -r requirements.txt`
10-
- Run any example: `python simple.py`
13+
- Install Dapr CLI: `brew install dapr/tap/dapr-cli` or `choco install dapr-cli`
14+
- Initialize Dapr: `dapr init`
15+
- Install requirements:
16+
```bash
17+
cd examples/workflow-async
18+
python -m venv .venv
19+
source .venv/bin/activate
20+
pip install -r requirements.txt
21+
```
22+
23+
or better yet with faster `uv`:
24+
```bash
25+
uv venv .venv
26+
source .venv/bin/activate
27+
uv pip install -r requirements.txt
28+
```
29+
- Run any example with dapr:
30+
- `dapr run --app-id wf_async_symple -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python simple.py`
31+
- `dapr run --app-id wf_task_chain -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python task_chaining.py`
32+
- `dapr run --app-id wf_async_child -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python child_workflow.py`
33+
- `dapr run --app-id wf_async_fafi -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python fan_out_fan_in.py`
34+
- `dapr run --app-id wf_async_gather -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python fan_out_fan_in_with_gather.py`
35+
- `dapr run --app-id wf_async_approval -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python human_approval.py`
1136

1237
Notes:
1338
- Orchestrators use `await ctx.activity(...)`, `await ctx.sleep(...)`, `await ctx.when_all/when_any(...)`, etc.

examples/workflow-async/child_workflow.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
AsyncWorkflowContext,
1818
DaprWorkflowClient,
1919
WorkflowRuntime,
20+
WorkflowStatus,
2021
)
2122

2223
wfr = WorkflowRuntime()
@@ -35,12 +36,20 @@ async def parent(ctx: AsyncWorkflowContext, n: int) -> int:
3536

3637

3738
def main():
38-
wfr.start()
39-
client = DaprWorkflowClient()
40-
instance_id = 'parent_async_instance'
41-
client.schedule_new_workflow(workflow=parent, input=5, instance_id=instance_id)
42-
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
43-
wfr.shutdown()
39+
with wfr:
40+
# the context manager starts the workflow runtime on __enter__ and shutdown on __exit__
41+
client = DaprWorkflowClient()
42+
instance_id = 'parent_async_instance'
43+
client.schedule_new_workflow(workflow=parent, input=5, instance_id=instance_id)
44+
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
45+
46+
# simple test
47+
if wf_state.runtime_status != WorkflowStatus.COMPLETED:
48+
print('Workflow failed with status ', wf_state.runtime_status)
49+
exit(1)
50+
if wf_state.serialized_output != '11':
51+
print('Workflow result is incorrect!')
52+
exit(1)
4453

4554

4655
if __name__ == '__main__':

examples/workflow-async/fan_out_fan_in.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
DaprWorkflowClient,
1818
WorkflowActivityContext,
1919
WorkflowRuntime,
20+
WorkflowStatus,
2021
)
2122

2223
wfr = WorkflowRuntime()
@@ -30,6 +31,7 @@ def square(ctx: WorkflowActivityContext, x: int) -> int:
3031
@wfr.async_workflow(name='fan_out_fan_in_async')
3132
async def orchestrator(ctx: AsyncWorkflowContext):
3233
tasks = [ctx.call_activity(square, input=i) for i in range(1, 6)]
34+
# 1 + 4 + 9 + 16 + 25 = 55
3335
results = await ctx.when_all(tasks)
3436
total = sum(results)
3537
return total
@@ -44,6 +46,14 @@ def main():
4446
print(f'Workflow state: {wf_state}')
4547
wfr.shutdown()
4648

49+
# simple test
50+
if wf_state.runtime_status != WorkflowStatus.COMPLETED:
51+
print('Workflow failed with status ', wf_state.runtime_status)
52+
exit(1)
53+
if wf_state.serialized_output != '55':
54+
print('Workflow result is incorrect!')
55+
exit(1)
56+
4757

4858
if __name__ == '__main__':
4959
main()
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Copyright 2025 The Dapr Authors
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the specific language governing permissions and
12+
limitations under the License.
13+
"""
14+
15+
import asyncio
16+
17+
from dapr.ext.workflow import (
18+
AsyncWorkflowContext,
19+
DaprWorkflowClient,
20+
WorkflowActivityContext,
21+
WorkflowRuntime,
22+
WorkflowStatus,
23+
)
24+
25+
# test using sandbox to convert asyncio methods into deterministic ones
26+
27+
wfr = WorkflowRuntime()
28+
29+
30+
@wfr.activity(name='square')
31+
def square(ctx: WorkflowActivityContext, x: int) -> int:
32+
return x * x
33+
34+
35+
# workflow function auto-recognize coroutine function and converts this into wfr.async_workflow
36+
@wfr.workflow(name='fan_out_fan_in_async')
37+
async def orchestrator(ctx: AsyncWorkflowContext):
38+
tasks = [ctx.call_activity(square, input=i) for i in range(1, 6)]
39+
# 1 + 4 + 9 + 16 + 25 = 55
40+
results = await asyncio.gather(*tasks)
41+
total = sum(results)
42+
return total
43+
44+
45+
def main():
46+
wfr.start()
47+
client = DaprWorkflowClient()
48+
instance_id = 'fofi_async'
49+
client.schedule_new_workflow(workflow=orchestrator, instance_id=instance_id)
50+
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
51+
print(f'Workflow state: {wf_state}')
52+
wfr.shutdown()
53+
54+
# simple test
55+
if wf_state.runtime_status != WorkflowStatus.COMPLETED:
56+
print('Workflow failed with status ', wf_state.runtime_status)
57+
exit(1)
58+
if wf_state.serialized_output != '55':
59+
print('Workflow result is incorrect!')
60+
exit(1)
61+
62+
63+
if __name__ == '__main__':
64+
main()

examples/workflow-async/human_approval.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,35 @@
1313
limitations under the License.
1414
"""
1515

16-
from dapr.ext.workflow import AsyncWorkflowContext, DaprWorkflowClient, WorkflowRuntime
16+
import time
17+
from datetime import timedelta
18+
19+
from dapr.ext.workflow import (
20+
AsyncWorkflowContext,
21+
DaprWorkflowClient,
22+
WorkflowRuntime,
23+
WorkflowStatus,
24+
)
1725

1826
wfr = WorkflowRuntime()
1927

2028

2129
@wfr.async_workflow(name='human_approval_async')
2230
async def orchestrator(ctx: AsyncWorkflowContext, request_id: str):
31+
approve = ctx.wait_for_external_event(f'approve:{request_id}')
32+
reject = ctx.wait_for_external_event(f'reject:{request_id}')
2333
decision = await ctx.when_any(
2434
[
25-
ctx.wait_for_external_event(f'approve:{request_id}'),
26-
ctx.wait_for_external_event(f'reject:{request_id}'),
27-
ctx.create_timer(300.0),
35+
approve,
36+
reject,
37+
ctx.create_timer(timedelta(seconds=5)),
2838
]
2939
)
30-
if isinstance(decision, dict) and decision.get('approved'):
31-
return 'APPROVED'
32-
if isinstance(decision, dict) and decision.get('rejected'):
40+
if decision == approve:
41+
print(f'Decision Approved')
42+
return request_id
43+
if decision == reject:
44+
print(f'Decision Rejected')
3345
return 'REJECTED'
3446
return 'TIMEOUT'
3547

@@ -38,10 +50,29 @@ def main():
3850
wfr.start()
3951
client = DaprWorkflowClient()
4052
instance_id = 'human_approval_async_1'
41-
client.schedule_new_workflow(workflow=orchestrator, input='REQ-1', instance_id=instance_id)
53+
try:
54+
# clean up previous workflow with this ID
55+
client.terminate_workflow(instance_id)
56+
client.purge_workflow(instance_id)
57+
except Exception:
58+
pass
59+
client.schedule_new_workflow(workflow=orchestrator, input='req-1', instance_id=instance_id)
60+
time.sleep(1)
61+
client.raise_workflow_event(instance_id, 'approve:req-1')
4262
# In a real scenario, raise approve/reject event from another service.
63+
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=20)
64+
print(f'Workflow state: {wf_state}')
65+
4366
wfr.shutdown()
4467

68+
# simple test
69+
if wf_state.runtime_status != WorkflowStatus.COMPLETED:
70+
print('Workflow failed with status ', wf_state.runtime_status)
71+
exit(1)
72+
if wf_state.serialized_output != '"req-1"':
73+
print('Workflow result is incorrect!')
74+
exit(1)
75+
4576

4677
if __name__ == '__main__':
4778
main()
Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,13 @@
1-
dapr-ext-workflow-dev>=1.15.0.dev
2-
dapr-dev>=1.15.0.dev
1+
#dapr-ext-workflow-dev>=1.15.0.dev
2+
#dapr-dev>=1.15.0.dev
3+
4+
5+
## --- local development: install local packages in editable mode --
6+
7+
## -- if using dev version of durabletask-python
8+
-e ../../../durabletask-python
9+
10+
## -- if using dev version of dapr-ext-workflow
11+
-e ../..
12+
-e ../../ext/dapr-ext-workflow
13+

examples/workflow-async/simple.py

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
See the specific language governing permissions and
1212
limitations under the License.
1313
"""
14+
import json
1415

1516
from datetime import timedelta
1617
from time import sleep
@@ -21,6 +22,7 @@
2122
RetryPolicy,
2223
WorkflowActivityContext,
2324
WorkflowRuntime,
25+
WorkflowStatus,
2426
)
2527

2628
counter = 0
@@ -58,18 +60,19 @@ async def hello_world_wf(ctx: AsyncWorkflowContext, wf_input):
5860
print(f'Child workflow returned {result_4}')
5961

6062
# Event vs timeout using when_any
63+
event_1 = ctx.wait_for_external_event(event_name)
6164
first = await ctx.when_any(
6265
[
63-
ctx.wait_for_external_event(event_name),
66+
event_1,
6467
ctx.create_timer(timedelta(seconds=30)),
6568
]
6669
)
6770

6871
# Proceed only if event won
69-
if isinstance(first, dict) and 'event' in first:
70-
await ctx.call_activity(hello_act, input=100)
71-
await ctx.call_activity(hello_act, input=1000)
72-
return 'Completed'
72+
if first == event_1:
73+
result_5 = await ctx.call_activity(hello_act, input=100)
74+
result_6 = await ctx.call_activity(hello_act, input=1000)
75+
return dict(result_1=result_1, result_2=result_2, result_3=result_3, result_4=result_4, result_5=result_5, result_6=result_6)
7376
return 'Timeout'
7477

7578

@@ -108,28 +111,37 @@ def act_for_child_wf(ctx: WorkflowActivityContext, inp):
108111

109112

110113
def main():
111-
wfr.start()
112-
wf_client = DaprWorkflowClient()
113-
114-
wf_client.schedule_new_workflow(
115-
workflow=hello_world_wf, input=input_data, instance_id=instance_id
116-
)
117-
118-
wf_client.wait_for_workflow_start(instance_id)
119-
120-
# Let initial activities run
121-
sleep(5)
122-
123-
# Raise event to continue
124-
wf_client.raise_workflow_event(
125-
instance_id=instance_id, event_name=event_name, data={'ok': True}
126-
)
127-
128-
# Wait for completion
129-
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
130-
print(f'Workflow status: {state.runtime_status.name}')
131-
132-
wfr.shutdown()
114+
wf_state = {}
115+
with wfr:
116+
wf_client = DaprWorkflowClient()
117+
118+
wf_client.schedule_new_workflow(
119+
workflow=hello_world_wf, input=input_data, instance_id=instance_id
120+
)
121+
122+
wf_client.wait_for_workflow_start(instance_id)
123+
124+
# Let initial activities run
125+
sleep(5)
126+
127+
# Raise event to continue
128+
wf_client.raise_workflow_event(
129+
instance_id=instance_id, event_name=event_name, data={'ok': True}
130+
)
131+
132+
# Wait for completion
133+
wf_state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
134+
135+
# simple test
136+
if wf_state.runtime_status != WorkflowStatus.COMPLETED:
137+
print('Workflow failed with status ', wf_state.runtime_status)
138+
exit(1)
139+
output = json.loads(wf_state.serialized_output)
140+
if (output["result_1"] != 'Activity returned 1' or output["result_2"] != 'Activity returned 10' or
141+
output["result_3"] != 'Activity returned 2' or output["result_4"] != 'ok' or
142+
output["result_5"] != 'Activity returned 100' or output["result_6"] != 'Activity returned 1000'):
143+
print('Workflow result is incorrect!')
144+
exit(1)
133145

134146

135147
if __name__ == '__main__':

examples/workflow-async/task_chaining.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
DaprWorkflowClient,
1818
WorkflowActivityContext,
1919
WorkflowRuntime,
20+
WorkflowStatus,
2021
)
2122

2223
wfr = WorkflowRuntime()
@@ -40,9 +41,18 @@ def main():
4041
client = DaprWorkflowClient()
4142
instance_id = 'task_chain_async'
4243
client.schedule_new_workflow(workflow=orchestrator, instance_id=instance_id)
43-
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
44+
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
4445
wfr.shutdown()
4546

47+
# simple test
48+
if wf_state.runtime_status != WorkflowStatus.COMPLETED:
49+
print('Workflow failed with status ', wf_state.runtime_status)
50+
exit(1)
51+
# 1 + 2 + 3 + 4 = 10
52+
if wf_state.serialized_output != '10':
53+
print('Workflow result is incorrect!')
54+
exit(1)
55+
4656

4757
if __name__ == '__main__':
4858
main()

0 commit comments

Comments
 (0)