Skip to content

Commit 5f81dbf

Browse files
committed
Added example for asyncio support and adjusted code accordingly.
- Created an async version of `WorkflowRuntime` that takes event_loop as a param. Signed-off-by: Patrick Assuied <[email protected]>
1 parent 6b8d7c8 commit 5f81dbf

File tree

9 files changed

+567
-3
lines changed

9 files changed

+567
-3
lines changed

examples/workflow/README.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,73 @@ The output of this example should look like this:
8585
- "== APP == Workflow completed! Result: Completed"
8686
```
8787

88+
### Simple Workflow Async
89+
This example represents a workflow with async activities and using the async WorkflowClient that manages counters through a series of activities and child workflows.
90+
It shows several Dapr Workflow features including:
91+
- Basic activity execution with counter increments
92+
- Retryable activities with configurable retry policies
93+
- Child workflow orchestration with retry logic
94+
- External event handling with timeouts
95+
- Workflow state management (pause/resume)
96+
- Activity error handling and retry backoff
97+
- Global state tracking across workflow components
98+
- Workflow lifecycle management (start, pause, resume, purge)
99+
100+
101+
<!--STEP
102+
name: Run the simple async workflow example
103+
expected_stdout_lines:
104+
- "== APP == Hi Counter!"
105+
- "== APP == New counter value is: 1!"
106+
- "== APP == New counter value is: 11!"
107+
- "== APP == Retry count value is: 0!"
108+
- "== APP == Retry count value is: 1! This print statement verifies retry"
109+
- "== APP == Appending 1 to child_orchestrator_string!"
110+
- "== APP == Appending a to child_orchestrator_string!"
111+
- "== APP == Appending a to child_orchestrator_string!"
112+
- "== APP == Appending 2 to child_orchestrator_string!"
113+
- "== APP == Appending b to child_orchestrator_string!"
114+
- "== APP == Appending b to child_orchestrator_string!"
115+
- "== APP == Appending 3 to child_orchestrator_string!"
116+
- "== APP == Appending c to child_orchestrator_string!"
117+
- "== APP == Appending c to child_orchestrator_string!"
118+
- "== APP == Get response from hello_world_wf after pause call: SUSPENDED"
119+
- "== APP == Get response from hello_world_wf after resume call: RUNNING"
120+
- "== APP == New counter value is: 111!"
121+
- "== APP == New counter value is: 1111!"
122+
- "== APP == Workflow completed! Result: Completed"
123+
timeout_seconds: 30
124+
-->
125+
126+
```sh
127+
dapr run --app-id wf-simple-aio-example -- python3 simple_aio.py
128+
```
129+
<!--END_STEP-->
130+
131+
The output of this example should look like this:
132+
133+
```
134+
- "== APP == Hi Counter!"
135+
- "== APP == New counter value is: 1!"
136+
- "== APP == New counter value is: 11!"
137+
- "== APP == Retry count value is: 0!"
138+
- "== APP == Retry count value is: 1! This print statement verifies retry"
139+
- "== APP == Appending 1 to child_orchestrator_string!"
140+
- "== APP == Appending a to child_orchestrator_string!"
141+
- "== APP == Appending a to child_orchestrator_string!"
142+
- "== APP == Appending 2 to child_orchestrator_string!"
143+
- "== APP == Appending b to child_orchestrator_string!"
144+
- "== APP == Appending b to child_orchestrator_string!"
145+
- "== APP == Appending 3 to child_orchestrator_string!"
146+
- "== APP == Appending c to child_orchestrator_string!"
147+
- "== APP == Appending c to child_orchestrator_string!"
148+
- "== APP == Get response from hello_world_wf after pause call: SUSPENDED"
149+
- "== APP == Get response from hello_world_wf after resume call: RUNNING"
150+
- "== APP == New counter value is: 111!"
151+
- "== APP == New counter value is: 1111!"
152+
- "== APP == Workflow completed! Result: Completed"
153+
```
154+
88155
### Task Chaining
89156

90157
This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command:

examples/workflow/simple_aio.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2023 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import asyncio
14+
from datetime import timedelta
15+
16+
import dapr.ext.workflow as wf
17+
from dapr.ext.workflow.aio import DaprWorkflowClient, WorkflowRuntime
18+
19+
from dapr.clients.exceptions import DaprInternalError
20+
from dapr.conf import Settings
21+
22+
settings = Settings()
23+
24+
counter = 0
25+
retry_count = 0
26+
child_orchestrator_count = 0
27+
child_orchestrator_string = ''
28+
child_act_retry_count = 0
29+
instance_id = 'exampleInstanceID'
30+
child_instance_id = 'childInstanceID'
31+
workflow_name = 'hello_world_wf'
32+
child_workflow_name = 'child_wf'
33+
input_data = 'Hi Counter!'
34+
event_name = 'event1'
35+
event_data = 'eventData'
36+
non_existent_id_error = 'no such instance exists'
37+
38+
39+
retry_policy = wf.RetryPolicy(
40+
first_retry_interval=timedelta(seconds=1),
41+
max_number_of_attempts=3,
42+
backoff_coefficient=2,
43+
max_retry_interval=timedelta(seconds=10),
44+
retry_timeout=timedelta(seconds=100),
45+
)
46+
47+
asyncio.set_event_loop(asyncio.new_event_loop())
48+
loop = asyncio.get_event_loop()
49+
wfr = WorkflowRuntime(main_event_loop=loop)
50+
51+
52+
# workflow still synchronous. Not expected to run any async code.
53+
@wfr.workflow(name='hello_world_wf')
54+
def hello_world_wf(ctx: wf.DaprWorkflowContext, wf_input):
55+
print(f'{wf_input}')
56+
yield ctx.call_activity(hello_act, input=1)
57+
yield ctx.call_activity(hello_act, input=10)
58+
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
59+
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
60+
61+
# Change in event handling: Use when_any to handle both event and timeout
62+
event = ctx.wait_for_external_event(event_name)
63+
timeout = ctx.create_timer(timedelta(seconds=30))
64+
winner = yield wf.when_any([event, timeout])
65+
66+
if winner == timeout:
67+
print('Workflow timed out waiting for event')
68+
return 'Timeout'
69+
70+
yield ctx.call_activity(hello_act, input=100)
71+
yield ctx.call_activity(hello_act, input=1000)
72+
return 'Completed'
73+
74+
75+
# activity is async and will be executed on the main event loop.
76+
@wfr.activity(name='hello_act')
77+
async def hello_act(ctx: wf.WorkflowActivityContext, wf_input):
78+
global counter
79+
counter += wf_input
80+
print(f'New counter value is: {counter}!', flush=True)
81+
82+
83+
@wfr.activity(name='hello_retryable_act')
84+
async def hello_retryable_act(ctx: wf.WorkflowActivityContext):
85+
global retry_count
86+
if (retry_count % 2) == 0:
87+
print(f'Retry count value is: {retry_count}!', flush=True)
88+
retry_count += 1
89+
raise ValueError('Retryable Error')
90+
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
91+
retry_count += 1
92+
93+
94+
# workflow still synchronous. Not expected to run any async code.
95+
@wfr.workflow(name='child_retryable_wf')
96+
def child_retryable_wf(ctx: wf.DaprWorkflowContext):
97+
global child_orchestrator_string, child_orchestrator_count
98+
if not ctx.is_replaying:
99+
child_orchestrator_count += 1
100+
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
101+
child_orchestrator_string += str(child_orchestrator_count)
102+
yield ctx.call_activity(
103+
act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
104+
)
105+
if child_orchestrator_count < 3:
106+
raise ValueError('Retryable Error')
107+
108+
109+
@wfr.activity(name='act_for_child_wf')
110+
async def act_for_child_wf(ctx: wf.WorkflowActivityContext, inp):
111+
global child_orchestrator_string, child_act_retry_count
112+
inp_char = chr(96 + inp)
113+
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
114+
child_orchestrator_string += inp_char
115+
if child_act_retry_count % 2 == 0:
116+
child_act_retry_count += 1
117+
raise ValueError('Retryable Error')
118+
child_act_retry_count += 1
119+
120+
121+
async def main():
122+
wfr.start()
123+
wf_client = DaprWorkflowClient()
124+
125+
print('==========Start Counter Increase as per Input:==========')
126+
await wf_client.schedule_new_workflow(
127+
workflow=hello_world_wf, input=input_data, instance_id=instance_id
128+
)
129+
130+
await wf_client.wait_for_workflow_start(instance_id)
131+
132+
# Sleep to let the workflow run initial activities
133+
await asyncio.sleep(12)
134+
135+
assert counter == 11
136+
assert retry_count == 2
137+
assert child_orchestrator_string == '1aa2bb3cc'
138+
139+
# Pause Test
140+
await wf_client.pause_workflow(instance_id=instance_id)
141+
metadata = await wf_client.get_workflow_state(instance_id=instance_id)
142+
print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')
143+
144+
# Resume Test
145+
await wf_client.resume_workflow(instance_id=instance_id)
146+
metadata = await wf_client.get_workflow_state(instance_id=instance_id)
147+
print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')
148+
149+
await asyncio.sleep(2) # Give the workflow time to reach the event wait state
150+
await wf_client.raise_workflow_event(
151+
instance_id=instance_id, event_name=event_name, data=event_data
152+
)
153+
154+
print('========= Waiting for Workflow completion', flush=True)
155+
try:
156+
state = await wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
157+
if state.runtime_status.name == 'COMPLETED':
158+
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
159+
else:
160+
print(f'Workflow failed! Status: {state.runtime_status.name}')
161+
except TimeoutError:
162+
print('*** Workflow timed out!')
163+
164+
await wf_client.purge_workflow(instance_id=instance_id)
165+
try:
166+
await wf_client.get_workflow_state(instance_id=instance_id)
167+
except DaprInternalError as err:
168+
if non_existent_id_error in err._message:
169+
print('Instance Successfully Purged')
170+
171+
wfr.shutdown()
172+
173+
174+
if __name__ == '__main__':
175+
loop.run_until_complete(main())
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
# -*- coding: utf-8 -*-
22

33
from .dapr_workflow_client import DaprWorkflowClient
4+
from .workflow_runtime import WorkflowRuntime
45

56
__all__ = [
67
'DaprWorkflowClient',
8+
'WorkflowRuntime',
79
]
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2023 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
16+
import asyncio
17+
import inspect
18+
from functools import wraps
19+
from typing import Optional, Sequence, TypeVar, Union
20+
21+
import grpc
22+
from dapr.ext.workflow.logger import Logger, LoggerOptions
23+
from dapr.ext.workflow.workflow_activity_context import Activity
24+
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime as WorkflowRuntimeSync
25+
26+
T = TypeVar('T')
27+
TInput = TypeVar('TInput')
28+
TOutput = TypeVar('TOutput')
29+
30+
ClientInterceptor = Union[
31+
grpc.aio.UnaryUnaryClientInterceptor,
32+
grpc.aio.UnaryStreamClientInterceptor,
33+
grpc.aio.StreamUnaryClientInterceptor,
34+
grpc.aio.StreamStreamClientInterceptor,
35+
]
36+
37+
38+
class WorkflowRuntime(WorkflowRuntimeSync):
39+
"""WorkflowRuntime is the entry point for registering workflows and async activities."""
40+
41+
def __init__(
42+
self,
43+
host: Optional[str] = None,
44+
port: Optional[str] = None,
45+
logger_options: Optional[LoggerOptions] = None,
46+
interceptors: Optional[Sequence[ClientInterceptor]] = None,
47+
maximum_concurrent_activity_work_items: Optional[int] = None,
48+
maximum_concurrent_orchestration_work_items: Optional[int] = None,
49+
maximum_thread_pool_workers: Optional[int] = None,
50+
main_event_loop: Optional[asyncio.AbstractEventLoop] = None,
51+
):
52+
super().__init__(
53+
host,
54+
port,
55+
logger_options,
56+
interceptors,
57+
maximum_concurrent_activity_work_items,
58+
maximum_concurrent_orchestration_work_items,
59+
maximum_thread_pool_workers,
60+
)
61+
self._main_event_loop = main_event_loop
62+
self._logger = Logger('WorkflowRuntime', logger_options)
63+
64+
def activity(self, __fn: Activity = None, *, name: Optional[str] = None):
65+
"""Decorator to register an async activity function.
66+
67+
This example shows how to register an activity function with an alternate name:
68+
69+
from dapr.ext.workflow.aio import WorkflowRuntime
70+
wfr = WorkflowRuntime()
71+
72+
@wfr.activity(name="add")
73+
async def add(ctx, x: int, y: int) -> int:
74+
return x + y
75+
76+
This example shows how to register an activity function without an alternate name:
77+
78+
from dapr.ext.workflow.aio import WorkflowRuntime
79+
wfr = WorkflowRuntime()
80+
81+
@wfr.activity
82+
async def add(ctx, x: int, y: int) -> int:
83+
return x + y
84+
85+
Args:
86+
name (Optional[str], optional): Name to identify the activity function as in
87+
the workflow runtime. Defaults to None.
88+
"""
89+
90+
def wrapper(fn: Activity):
91+
# If a main event loop is provided, wrap the function so that any awaitable
92+
# result is executed on that loop in a thread-safe way.
93+
if self._main_event_loop:
94+
95+
@wraps(fn)
96+
def sync_wrapper(*args, **kwargs):
97+
result = fn(*args, **kwargs)
98+
if inspect.isawaitable(result):
99+
future = asyncio.run_coroutine_threadsafe(result, self._main_event_loop)
100+
return future.result()
101+
return result
102+
103+
target_fn = sync_wrapper
104+
else:
105+
# No special handling needed; register the original function directly.
106+
@wraps(fn)
107+
def innerfn():
108+
return fn
109+
110+
target_fn = innerfn
111+
112+
self.register_activity(target_fn, name=name)
113+
114+
if hasattr(fn, '_dapr_alternate_name'):
115+
target_fn.__dict__['_dapr_alternate_name'] = fn.__dict__['_dapr_alternate_name']
116+
else:
117+
target_fn.__dict__['_dapr_alternate_name'] = name if name else fn.__name__
118+
target_fn.__signature__ = inspect.signature(fn)
119+
# Copy attributes to fn so it doesn't get registered again when calling `register_activity()` again.
120+
fn.__dict__['_activity_registered'] = target_fn.__dict__['_activity_registered']
121+
fn.__dict__['_dapr_alternate_name'] = target_fn.__dict__['_dapr_alternate_name']
122+
return target_fn
123+
124+
if __fn:
125+
# This case is true when the decorator is used without arguments
126+
# and the function to be decorated is passed as the first argument.
127+
return wrapper(__fn)
128+
129+
return wrapper

ext/dapr-ext-workflow/setup.cfg

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ packages = find_namespace:
2525
include_package_data = True
2626
install_requires =
2727
dapr >= 1.16.1rc1
28-
durabletask-dapr >= 0.2.0a11
28+
durabletask-dapr >= 0.2.0a12
29+
grpcio >= 1.62
2930

3031
[options.packages.find]
3132
include =

0 commit comments

Comments
 (0)