Skip to content

Commit 097bddd

Browse files
committed
updates, test examples, cleanup
Signed-off-by: Filinto Duran <[email protected]>
1 parent b01cdd7 commit 097bddd

13 files changed

+1305
-478
lines changed

examples/workflow-async/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,20 @@ How to run:
3333
- `dapr run --app-id wf_async_fafi -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python fan_out_fan_in.py`
3434
- `dapr run --app-id wf_async_gather -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python fan_out_fan_in_with_gather.py`
3535
- `dapr run --app-id wf_async_approval -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python human_approval.py`
36+
- `dapr run --app-id wf_ctx_interceptors -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python context_interceptors_example.py`
37+
38+
## Examples
39+
40+
- **simple.py**: Comprehensive example showing activities, child workflows, retry policies, and external events
41+
- **task_chaining.py**: Sequential activity calls where each result feeds into the next
42+
- **child_workflow.py**: Parent workflow calling a child workflow
43+
- **fan_out_fan_in.py**: Parallel activity execution pattern
44+
- **fan_out_fan_in_with_gather.py**: Parallel execution using `ctx.when_all()`
45+
- **human_approval.py**: Workflow waiting for external event to proceed
46+
- **context_interceptors_example.py**: Context propagation using interceptors (tenant, request ID, etc.)
3647

3748
Notes:
3849
- Orchestrators use `await ctx.activity(...)`, `await ctx.sleep(...)`, `await ctx.when_all/when_any(...)`, etc.
3950
- No event loop is started manually; the Durable Task worker drives the async orchestrators.
4051
- You can also launch instances using `DaprWorkflowClient` as in the non-async examples.
52+
- The interceptors example demonstrates how to propagate context (tenant, request ID) across workflow and activity boundaries using the wrapper pattern to avoid contextvar loss.
Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2025 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
15+
Example: Interceptors for context propagation with async workflows using metadata envelope.
16+
17+
This example demonstrates the RECOMMENDED approach for context propagation:
18+
- Use metadata envelope for durable, transparent context propagation
19+
- ClientInterceptor sets metadata when scheduling workflows
20+
- RuntimeInterceptor restores context from metadata before execution
21+
- WorkflowOutboundInterceptor propagates metadata to activities/child workflows
22+
- Use wrapper pattern with 'yield from' to keep context alive during execution
23+
24+
CRITICAL: Workflow interceptors MUST use the wrapper pattern and return the result:
25+
def execute_workflow(self, request, nxt):
26+
def wrapper():
27+
setup_context()
28+
try:
29+
gen = nxt(request)
30+
result = yield from gen # Keep context alive during execution
31+
return result # MUST return to propagate workflow output
32+
finally:
33+
cleanup_context()
34+
return wrapper()
35+
36+
Without 'return result', the workflow output will be lost (serialized_output will be null).
37+
38+
Metadata envelope approach (RECOMMENDED):
39+
------------------------------------------
40+
Metadata is stored separately from user payload and transparently wrapped/unwrapped by runtime.
41+
42+
Benefits:
43+
- User code receives only the payload (never sees envelope)
44+
- Durably persisted (survives replays, retries, continue-as-new)
45+
- Automatic propagation across workflow → activity → child workflow boundaries
46+
- String-only metadata enforces simple, serializable key-value structure
47+
- Context accessible to interceptors via request.metadata
48+
49+
Note: This requires a running Dapr sidecar to execute.
50+
"""
51+
52+
from __future__ import annotations
53+
54+
import contextvars
55+
from typing import Any, Callable
56+
57+
from dapr.ext.workflow import (
58+
AsyncWorkflowContext,
59+
BaseClientInterceptor,
60+
BaseRuntimeInterceptor,
61+
BaseWorkflowOutboundInterceptor,
62+
CallActivityRequest,
63+
CallChildWorkflowRequest,
64+
DaprWorkflowClient,
65+
ExecuteActivityRequest,
66+
ExecuteWorkflowRequest,
67+
ScheduleWorkflowRequest,
68+
WorkflowActivityContext,
69+
WorkflowRuntime,
70+
)
71+
72+
# Context variable to carry request metadata across workflow/activity execution
73+
_request_context: contextvars.ContextVar[dict[str, str] | None] = contextvars.ContextVar(
74+
'request_context', default=None
75+
)
76+
77+
78+
def set_request_context(ctx: dict[str, str] | None) -> None:
79+
"""Set the current context (stored in contextvar)."""
80+
_request_context.set(ctx)
81+
82+
83+
def get_request_context() -> dict[str, str] | None:
84+
"""Get the current context from contextvar."""
85+
return _request_context.get()
86+
87+
88+
class ContextClientInterceptor(BaseClientInterceptor):
89+
"""Client interceptor that sets metadata when scheduling workflows.
90+
91+
The metadata is automatically wrapped in an envelope by the runtime and
92+
propagated durably across workflow boundaries.
93+
"""
94+
95+
def schedule_new_workflow(
96+
self, request: ScheduleWorkflowRequest, nxt: Callable[[ScheduleWorkflowRequest], Any]
97+
) -> Any:
98+
# Get current context and convert to string-only metadata
99+
ctx = get_request_context()
100+
metadata = ctx.copy() if ctx else {}
101+
102+
print('[Client] Scheduling workflow with metadata:', metadata)
103+
104+
# Set metadata on the request (runtime will wrap in envelope)
105+
modified_request = ScheduleWorkflowRequest(
106+
workflow_name=request.workflow_name,
107+
input=request.input,
108+
instance_id=request.instance_id,
109+
start_at=request.start_at,
110+
reuse_id_policy=request.reuse_id_policy,
111+
metadata=metadata,
112+
)
113+
return nxt(modified_request)
114+
115+
116+
class ContextWorkflowOutboundInterceptor(BaseWorkflowOutboundInterceptor):
117+
"""Workflow outbound interceptor that propagates metadata to activities and child workflows.
118+
119+
The metadata is automatically wrapped in an envelope by the runtime.
120+
"""
121+
122+
def call_activity(
123+
self, request: CallActivityRequest, nxt: Callable[[CallActivityRequest], Any]
124+
) -> Any:
125+
# Get current context and convert to string-only metadata
126+
ctx = get_request_context()
127+
metadata = ctx.copy() if ctx else {}
128+
129+
return nxt(
130+
CallActivityRequest(
131+
activity_name=request.activity_name,
132+
input=request.input,
133+
retry_policy=request.retry_policy,
134+
workflow_ctx=request.workflow_ctx,
135+
metadata=metadata,
136+
)
137+
)
138+
139+
def call_child_workflow(
140+
self, request: CallChildWorkflowRequest, nxt: Callable[[CallChildWorkflowRequest], Any]
141+
) -> Any:
142+
# Get current context and convert to string-only metadata
143+
ctx = get_request_context()
144+
metadata = ctx.copy() if ctx else {}
145+
146+
return nxt(
147+
CallChildWorkflowRequest(
148+
workflow_name=request.workflow_name,
149+
input=request.input,
150+
instance_id=request.instance_id,
151+
workflow_ctx=request.workflow_ctx,
152+
metadata=metadata,
153+
)
154+
)
155+
156+
157+
class ContextRuntimeInterceptor(BaseRuntimeInterceptor):
158+
"""Runtime interceptor that restores context from metadata before execution.
159+
160+
The runtime automatically unwraps the envelope and provides metadata via
161+
request.metadata. User code receives only the original payload via request.input.
162+
"""
163+
164+
def execute_workflow(
165+
self, request: ExecuteWorkflowRequest, nxt: Callable[[ExecuteWorkflowRequest], Any]
166+
) -> Any:
167+
"""
168+
IMPORTANT: Use wrapper pattern to keep context alive during generator execution.
169+
170+
Calling nxt(request) returns a generator immediately; context must stay set
171+
while that generator executes (including during activity calls and child workflows).
172+
"""
173+
174+
def wrapper():
175+
# Restore context from metadata (automatically unwrapped by runtime)
176+
if request.metadata:
177+
set_request_context(request.metadata)
178+
179+
try:
180+
gen = nxt(request)
181+
result = yield from gen # Keep context alive while generator executes
182+
return result # Must explicitly return the result from the inner generator
183+
finally:
184+
set_request_context(None)
185+
186+
return wrapper()
187+
188+
def execute_activity(
189+
self, request: ExecuteActivityRequest, nxt: Callable[[ExecuteActivityRequest], Any]
190+
) -> Any:
191+
"""
192+
Restore context from metadata before activity execution.
193+
194+
The runtime automatically unwraps the envelope and provides metadata via
195+
request.metadata. User code receives only the original payload.
196+
"""
197+
# Restore context from metadata (automatically unwrapped by runtime)
198+
if request.metadata:
199+
set_request_context(request.metadata)
200+
201+
try:
202+
return nxt(request)
203+
finally:
204+
set_request_context(None)
205+
206+
207+
# Create runtime with interceptors
208+
wfr = WorkflowRuntime(
209+
runtime_interceptors=[ContextRuntimeInterceptor()],
210+
workflow_outbound_interceptors=[ContextWorkflowOutboundInterceptor()],
211+
)
212+
213+
214+
@wfr.activity(name='process_data')
215+
def process_data(ctx: WorkflowActivityContext, data: dict) -> dict:
216+
"""
217+
Activity that accesses the restored context.
218+
219+
The context was set in the runtime interceptor from metadata.
220+
The activity receives only the user payload (data), not the envelope.
221+
"""
222+
request_ctx = get_request_context()
223+
224+
if request_ctx is None:
225+
return {'tenant': 'unknown', 'request_id': 'unknown', 'message': 'no message', 'data': data}
226+
227+
return {
228+
'tenant': request_ctx.get('tenant', 'unknown'),
229+
'request_id': request_ctx.get('request_id', 'unknown'),
230+
'message': data.get('message', 'no message'),
231+
}
232+
233+
234+
@wfr.activity(name='aggregate_results')
235+
def aggregate_results(ctx: WorkflowActivityContext, results: list) -> dict:
236+
"""Activity that aggregates results for the same tenant in context."""
237+
request_ctx = get_request_context()
238+
tenant = request_ctx.get('tenant', 'unknown') if request_ctx else 'unknown'
239+
request_id = request_ctx.get('request_id', 'unknown') if request_ctx else 'unknown'
240+
tenant_results = [
241+
r['message'] for r in results if r['tenant'] == tenant and r['request_id'] == request_id
242+
]
243+
244+
return {
245+
'tenant': tenant,
246+
'request_id': request_id,
247+
'count': len(tenant_results),
248+
'results': tenant_results,
249+
}
250+
251+
252+
@wfr.async_workflow(name='context_propagation_example')
253+
async def context_propagation_workflow(ctx: AsyncWorkflowContext, input_data: dict) -> dict:
254+
"""
255+
Workflow that demonstrates context propagation to activities.
256+
257+
The workflow receives only the user payload (input_data), not the envelope.
258+
The context is accessible via get_request_context() thanks to the runtime interceptor.
259+
260+
Activities are executed in parallel using when_all for better performance.
261+
"""
262+
request_ctx = get_request_context()
263+
264+
# map-reduce pattern
265+
266+
# Create activity tasks (don't await yet) - metadata will be propagated automatically
267+
# Execute all activities in parallel and get results
268+
results = await ctx.when_all(
269+
[
270+
ctx.call_activity(process_data, input={'message': 'first task'}),
271+
ctx.call_activity(process_data, input={'message': 'second task'}),
272+
ctx.call_activity(process_data, input={'message': 'third task'}),
273+
]
274+
)
275+
276+
# Aggregate/reduce results
277+
final = await ctx.call_activity(aggregate_results, input=results)
278+
279+
return {'final': final, 'context_was': request_ctx}
280+
281+
282+
def main():
283+
"""
284+
Demonstrates metadata envelope approach:
285+
1. Client sets context in contextvar
286+
2. Client interceptor converts context to metadata
287+
3. Runtime wraps metadata in envelope: {"__dapr_meta__": {...}, "__dapr_payload__": {...}}
288+
4. Envelope is persisted durably in workflow state
289+
5. Runtime unwraps envelope before execution
290+
6. Runtime interceptor restores context from metadata
291+
7. User code receives only the payload, not the envelope
292+
"""
293+
print('=' * 70)
294+
print('Metadata Envelope Context Propagation Example (Async)')
295+
print('=' * 70)
296+
297+
with wfr:
298+
# Create client with client interceptor
299+
client = DaprWorkflowClient(interceptors=[ContextClientInterceptor()])
300+
301+
# Set context - this will be converted to metadata by the client interceptor
302+
set_request_context({'tenant': 'acme-corp', 'request_id': 'req-12345'})
303+
304+
# Schedule workflow with user payload (metadata is added by interceptor)
305+
instance_id = 'context_example_async'
306+
client.schedule_new_workflow(
307+
workflow=context_propagation_workflow,
308+
input={'task': 'process_orders', 'order_id': 999},
309+
instance_id=instance_id,
310+
)
311+
312+
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
313+
314+
print('\n' + '=' * 70)
315+
print('Workflow Result:')
316+
print('=' * 70)
317+
print(f'Status: {wf_state.runtime_status}')
318+
print(f'Output: {wf_state.serialized_output}')
319+
print('=' * 70)
320+
321+
322+
if __name__ == '__main__':
323+
main()

examples/workflow-async/human_approval.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ async def orchestrator(ctx: AsyncWorkflowContext, request_id: str):
3838
]
3939
)
4040
if decision == approve:
41-
print(f'Decision Approved')
41+
print('Decision Approved')
4242
return request_id
4343
if decision == reject:
44-
print(f'Decision Rejected')
44+
print('Decision Rejected')
4545
return 'REJECTED'
4646
return 'TIMEOUT'
4747

examples/workflow/aio/async_activity_sequence.py

Lines changed: 0 additions & 39 deletions
This file was deleted.

0 commit comments

Comments
 (0)