Skip to content

Commit 9240283

Browse files
committed
updates, handle async activities definitions
Signed-off-by: Filinto Duran <[email protected]>
1 parent 44397e0 commit 9240283

26 files changed

+1749
-414
lines changed

examples/workflow-async/README.md

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Dapr Workflow Async Examples (Python)
22

33
These examples mirror `examples/workflow/` but author orchestrators with `async def` using the
4-
async workflow APIs. Activities remain regular functions unless noted.
4+
async workflow APIs. Activities can be either sync or async functions.
55

66
## Prerequisites
77

@@ -34,6 +34,7 @@ How to run:
3434
- `dapr run --app-id wf_async_gather -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python fan_out_fan_in_with_gather.py`
3535
- `dapr run --app-id wf_async_approval -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python human_approval.py`
3636
- `dapr run --app-id wf_ctx_interceptors -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python context_interceptors_example.py`
37+
- `dapr run --app-id wf_async_http -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python async_http_activity.py`
3738

3839
## Examples
3940

@@ -44,9 +45,42 @@ How to run:
4445
- **fan_out_fan_in_with_gather.py**: Parallel execution using `ctx.when_all()`
4546
- **human_approval.py**: Workflow waiting for external event to proceed
4647
- **context_interceptors_example.py**: Context propagation using interceptors (tenant, request ID, etc.)
48+
- **async_http_activity.py**: Async activities performing I/O-bound operations (HTTP requests with aiohttp)
4749

4850
Notes:
49-
- Orchestrators use `await ctx.activity(...)`, `await ctx.sleep(...)`, `await ctx.when_all/when_any(...)`, etc.
51+
- Orchestrators use `await ctx.activity(...)`, `await ctx.create_timer(...)`, `await ctx.when_all/when_any(...)`, etc.
5052
- No event loop is started manually; the Durable Task worker drives the async orchestrators.
5153
- You can also launch instances using `DaprWorkflowClient` as in the non-async examples.
5254
- The interceptors example demonstrates how to propagate context (tenant, request ID) across workflow and activity boundaries using the wrapper pattern to avoid contextvar loss.
55+
56+
## Async Activities
57+
58+
Activities can be either synchronous or asynchronous functions. Async activities are useful for I/O-bound operations like HTTP requests, database queries, or file operations:
59+
60+
```python
61+
from dapr.ext.workflow import WorkflowActivityContext
62+
63+
# Synchronous activity
64+
@wfr.activity
65+
def sync_activity(ctx: WorkflowActivityContext, data: str) -> str:
66+
return data.upper()
67+
68+
# Asynchronous activity
69+
@wfr.activity
70+
async def async_activity(ctx: WorkflowActivityContext, data: str) -> str:
71+
# Perform async I/O operations
72+
async with aiohttp.ClientSession() as session:
73+
async with session.get(f"https://api.example.com/{data}") as response:
74+
result = await response.json()
75+
return result
76+
```
77+
78+
Both sync and async activities are registered the same way using the `@wfr.activity` decorator. Orchestrators call them identically regardless of whether they're sync or async - the SDK handles the execution automatically.
79+
80+
**When to use async activities:**
81+
- HTTP requests or API calls
82+
- Database queries
83+
- File I/O operations
84+
- Any I/O-bound work that benefits from async/await
85+
86+
See `async_http_activity.py` for a complete example.
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Copyright 2025 The Dapr Authors
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the License for the specific language governing permissions and
12+
limitations under the License.
13+
"""
14+
15+
from dapr.ext.workflow import ( # noqa: E402
16+
AsyncWorkflowContext,
17+
DaprWorkflowClient,
18+
WorkflowActivityContext,
19+
WorkflowRuntime,
20+
WorkflowStatus,
21+
)
22+
23+
"""Example demonstrating async activities with HTTP requests.
24+
25+
This example shows how to use async activities to perform I/O-bound operations
26+
like HTTP requests without blocking the worker thread pool.
27+
"""
28+
29+
30+
wfr = WorkflowRuntime()
31+
32+
33+
@wfr.activity(name='fetch_url')
34+
async def fetch_url(ctx: WorkflowActivityContext, url: str) -> dict:
35+
"""Async activity that fetches data from a URL.
36+
37+
This demonstrates using aiohttp for non-blocking HTTP requests.
38+
In production, you would handle errors, timeouts, and retries.
39+
"""
40+
try:
41+
import aiohttp
42+
except ImportError:
43+
# Fallback if aiohttp is not installed
44+
return {
45+
'url': url,
46+
'status': 'error',
47+
'message': 'aiohttp not installed. Install with: pip install aiohttp',
48+
}
49+
50+
try:
51+
async with aiohttp.ClientSession() as session:
52+
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
53+
status = response.status
54+
if status == 200:
55+
# For JSON responses
56+
try:
57+
data = await response.json()
58+
return {'url': url, 'status': status, 'data': data}
59+
except Exception:
60+
# For text responses
61+
text = await response.text()
62+
return {
63+
'url': url,
64+
'status': status,
65+
'length': len(text),
66+
'preview': text[:100],
67+
}
68+
else:
69+
return {'url': url, 'status': status, 'error': 'HTTP error'}
70+
except Exception as e:
71+
return {'url': url, 'status': 'error', 'message': str(e)}
72+
73+
74+
@wfr.activity(name='process_data')
75+
def process_data(ctx: WorkflowActivityContext, data: dict) -> dict:
76+
"""Sync activity that processes fetched data.
77+
78+
This shows that sync and async activities can coexist in the same workflow.
79+
"""
80+
return {
81+
'processed': True,
82+
'url_count': len([k for k in data if k.startswith('url_')]),
83+
'summary': f'Processed {len(data)} items',
84+
}
85+
86+
87+
@wfr.async_workflow(name='fetch_multiple_urls_async')
88+
async def fetch_multiple_urls(ctx: AsyncWorkflowContext, urls: list[str]) -> dict:
89+
"""Orchestrator that fetches multiple URLs in parallel using async activities.
90+
91+
This demonstrates:
92+
- Calling async activities from async workflows
93+
- Fan-out/fan-in pattern with async activities
94+
- Mixing async and sync activities
95+
"""
96+
# Fan-out: Schedule all URL fetches in parallel
97+
fetch_tasks = [ctx.call_activity(fetch_url, input=url) for url in urls]
98+
99+
# Fan-in: Wait for all to complete
100+
results = await ctx.when_all(fetch_tasks)
101+
102+
# Create a dictionary of results
103+
data = {f'url_{i}': result for i, result in enumerate(results)}
104+
105+
# Process the aggregated data with a sync activity
106+
summary = await ctx.call_activity(process_data, input=data)
107+
108+
return {'results': data, 'summary': summary}
109+
110+
111+
def main():
112+
"""Run the example workflow."""
113+
# Example URLs to fetch (using httpbin.org for testing)
114+
test_urls = [
115+
'https://httpbin.org/json',
116+
'https://httpbin.org/uuid',
117+
'https://httpbin.org/user-agent',
118+
]
119+
120+
wfr.start()
121+
client = DaprWorkflowClient()
122+
123+
try:
124+
instance_id = 'async_http_activity_example'
125+
print(f'Starting workflow {instance_id}...')
126+
127+
# Schedule the workflow
128+
client.schedule_new_workflow(
129+
workflow=fetch_multiple_urls, instance_id=instance_id, input=test_urls
130+
)
131+
132+
# Wait for completion
133+
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
134+
135+
print(f'\nWorkflow status: {wf_state.runtime_status}')
136+
137+
if wf_state.runtime_status == WorkflowStatus.COMPLETED:
138+
print(f'Workflow output: {wf_state.serialized_output}')
139+
print('\n✓ Workflow completed successfully!')
140+
else:
141+
print('✗ Workflow did not complete successfully')
142+
return 1
143+
144+
finally:
145+
wfr.shutdown()
146+
147+
return 0
148+
149+
150+
if __name__ == '__main__':
151+
import sys
152+
153+
sys.exit(main())

examples/workflow-async/context_interceptors_example.py

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def wrapper():
5252
from __future__ import annotations
5353

5454
import contextvars
55+
from dataclasses import replace
5556
from typing import Any, Callable
5657

5758
from dapr.ext.workflow import (
@@ -102,15 +103,7 @@ def schedule_new_workflow(
102103
print('[Client] Scheduling workflow with metadata:', metadata)
103104

104105
# Set metadata on the request (runtime will wrap in envelope)
105-
modified_request = ScheduleWorkflowRequest(
106-
workflow_name=request.workflow_name,
107-
input=request.input,
108-
instance_id=request.instance_id,
109-
start_at=request.start_at,
110-
reuse_id_policy=request.reuse_id_policy,
111-
metadata=metadata,
112-
)
113-
return nxt(modified_request)
106+
return nxt(replace(request, metadata=metadata))
114107

115108

116109
class ContextWorkflowOutboundInterceptor(BaseWorkflowOutboundInterceptor):
@@ -126,15 +119,7 @@ def call_activity(
126119
ctx = get_request_context()
127120
metadata = ctx.copy() if ctx else {}
128121

129-
return nxt(
130-
CallActivityRequest(
131-
activity_name=request.activity_name,
132-
input=request.input,
133-
retry_policy=request.retry_policy,
134-
workflow_ctx=request.workflow_ctx,
135-
metadata=metadata,
136-
)
137-
)
122+
return nxt(replace(request, metadata=metadata))
138123

139124
def call_child_workflow(
140125
self, request: CallChildWorkflowRequest, nxt: Callable[[CallChildWorkflowRequest], Any]
@@ -143,15 +128,7 @@ def call_child_workflow(
143128
ctx = get_request_context()
144129
metadata = ctx.copy() if ctx else {}
145130

146-
return nxt(
147-
CallChildWorkflowRequest(
148-
workflow_name=request.workflow_name,
149-
input=request.input,
150-
instance_id=request.instance_id,
151-
workflow_ctx=request.workflow_ctx,
152-
metadata=metadata,
153-
)
154-
)
131+
return nxt(replace(request, metadata=metadata))
155132

156133

157134
class ContextRuntimeInterceptor(BaseRuntimeInterceptor):

examples/workflow/context_interceptors_example.py

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ def wrapper():
1616
setup_context()
1717
try:
1818
gen = nxt(request)
19-
result = yield from gen # Keep context alive
20-
return result # MUST return to propagate workflow output
19+
result = yield from gen # Keep context alive
20+
return result # MUST return to propagate workflow output
2121
finally:
2222
cleanup_context()
2323
return wrapper()
@@ -52,6 +52,7 @@ def wrapper():
5252

5353
import contextvars
5454
import json
55+
from dataclasses import replace
5556
from typing import Any, Callable
5657

5758
from dapr.ext.workflow import (
@@ -100,15 +101,7 @@ def schedule_new_workflow(
100101
print('[Client] Scheduling workflow with metadata:', metadata)
101102

102103
# Set metadata on the request (runtime will wrap in envelope)
103-
modified_request = ScheduleWorkflowRequest(
104-
workflow_name=request.workflow_name,
105-
input=request.input, # Input unchanged - metadata stored separately
106-
instance_id=request.instance_id,
107-
start_at=request.start_at,
108-
reuse_id_policy=request.reuse_id_policy,
109-
metadata=metadata,
110-
)
111-
return nxt(modified_request)
104+
return nxt(replace(request, metadata=metadata))
112105

113106

114107
class ContextWorkflowOutboundInterceptor(BaseWorkflowOutboundInterceptor):
@@ -126,15 +119,8 @@ def call_child_workflow(
126119

127120
print('[Outbound] Calling child workflow with metadata:', metadata)
128121

129-
return nxt(
130-
CallChildWorkflowRequest(
131-
workflow_name=request.workflow_name,
132-
input=request.input, # Input unchanged - metadata stored separately
133-
instance_id=request.instance_id,
134-
workflow_ctx=request.workflow_ctx,
135-
metadata=metadata,
136-
)
137-
)
122+
# Use dataclasses.replace() to create a modified copy
123+
return nxt(replace(request, metadata=metadata))
138124

139125
def call_activity(
140126
self, request: CallActivityRequest, nxt: Callable[[CallActivityRequest], Any]
@@ -147,15 +133,8 @@ def call_activity(
147133
print(f' -- input: {request.input}')
148134
print(f' -- metadata: {metadata}')
149135

150-
return nxt(
151-
CallActivityRequest(
152-
activity_name=request.activity_name,
153-
input=request.input, # Input unchanged - metadata stored separately
154-
retry_policy=request.retry_policy,
155-
workflow_ctx=request.workflow_ctx,
156-
metadata=metadata,
157-
)
158-
)
136+
# Use dataclasses.replace() to create a modified copy
137+
return nxt(replace(request, metadata=metadata))
159138

160139

161140
class ContextRuntimeInterceptor(BaseRuntimeInterceptor):

0 commit comments

Comments
 (0)