Skip to content

Commit d03cd0d

Browse files
committed
merge from main, more tests to increase coverage of sandbox and awaitable, some cleanup on duplicated files, refactor aio
Signed-off-by: Filinto Duran <[email protected]>
1 parent 200d1a1 commit d03cd0d

19 files changed

+1800
-381
lines changed

README.md

Lines changed: 124 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,32 @@
66

77
This repo contains a Python client SDK for use with the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go) and [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code.
88

9+
> **🚀 Enhanced Async Features**: This fork includes comprehensive async workflow enhancements with advanced error handling, non-determinism detection, timeout support, and debugging tools. See [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md) for details.
10+
11+
## Quick Start - Async Workflows
12+
13+
For async workflow development, use the new `durabletask.aio` package:
14+
15+
```python
16+
from durabletask.aio import AsyncWorkflowContext
17+
from durabletask.worker import TaskHubGrpcWorker
18+
19+
async def my_workflow(ctx: AsyncWorkflowContext, name: str) -> str:
20+
result = await ctx.call_activity(say_hello, input=name)
21+
await ctx.sleep(1.0)
22+
return f"Workflow completed: {result}"
23+
24+
def say_hello(ctx, name: str) -> str:
25+
return f"Hello, {name}!"
26+
27+
# Register and run
28+
with TaskHubGrpcWorker() as worker:
29+
worker.add_activity(say_hello)
30+
worker.add_orchestrator(my_workflow)
31+
worker.start()
32+
# ... schedule workflows with client
33+
```
34+
935
⚠️ **This SDK is currently under active development and is not yet ready for production use.** ⚠️
1036

1137
> Note that this project is **not** currently affiliated with the [Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) project for Azure Functions. If you are looking for a Python SDK for Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).
@@ -155,6 +181,13 @@ python3 -m pip install .
155181

156182
See the [examples](./examples) directory for a list of sample orchestrations and instructions on how to run them.
157183

184+
**Enhanced Async Examples:**
185+
- `async_activity_sequence.py` - Updated to use new `durabletask.aio` package
186+
- `async_fanout_fanin.py` - Updated to use new `durabletask.aio` package
187+
- `async_enhanced_features.py` - Comprehensive demo of all enhanced features
188+
- `async_non_determinism_demo.py` - Non-determinism detection demonstration
189+
- See [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md) for detailed examples and usage patterns
190+
158191
## Development
159192

160193
The following is more information about how to develop this project. Note that development commands require that `make` is installed on your local machine. If you're using Windows, you can install `make` using [Chocolatey](https://chocolatey.org/) or use WSL.
@@ -207,32 +240,41 @@ export DURABLETASK_GRPC_ENDPOINT=localhost:4001
207240
export DURABLETASK_GRPC_ENDPOINT=localhost:50001
208241
```
209242

210-
### Async authoring compatibility
243+
### Async workflow authoring
244+
245+
For a deeper tour of the async authoring surface (determinism helpers, sandbox modes, timeouts, concurrency patterns), see the Async Enhancements guide: [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md). The developer-facing migration notes are in [DEVELOPER_TRANSITION_GUIDE.md](./DEVELOPER_TRANSITION_GUIDE.md).
211246

212-
You can author orchestrators with `async def` using `add_async_orchestrator`, which provides awaitables for activities, timers, external events, and when_all/any:
247+
You can author orchestrators with `async def` using the new `durabletask.aio` package, which provides a comprehensive async workflow API:
213248

214249
```python
215250
from durabletask.worker import TaskHubGrpcWorker
251+
from durabletask.aio import AsyncWorkflowContext
216252

217-
async def my_orch(ctx, input):
218-
r1 = await ctx.activity("act1", input=input)
219-
await ctx.sleep(1)
220-
r2 = await ctx.activity("act2", input=r1)
253+
async def my_orch(ctx: AsyncWorkflowContext, input) -> str:
254+
r1 = await ctx.call_activity(act1, input=input)
255+
await ctx.sleep(1.0)
256+
r2 = await ctx.call_activity(act2, input=r1)
221257
return r2
222258

223259
with TaskHubGrpcWorker() as worker:
224-
worker.add_async_orchestrator(my_orch, name="my_orch", sandbox_mode="off")
260+
worker.add_orchestrator(my_orch)
225261
```
226262

227263
Optional sandbox mode (`best_effort` or `strict`) patches `asyncio.sleep`, `random`, `uuid.uuid4`, and `time.time` within the workflow step to deterministic equivalents. This is best-effort and not a correctness guarantee.
228264

229-
In `strict` mode, `asyncio.create_task` is blocked inside workflows to preserve determinism and will raise a `RuntimeError` if used.
265+
In `strict` mode, `asyncio.create_task` is blocked inside workflows to preserve determinism and will raise a `SandboxViolationError` if used.
266+
267+
> **Enhanced Sandbox Features**: The enhanced version includes comprehensive non-determinism detection, timeout support, enhanced concurrency primitives, and debugging tools. See [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md) for complete documentation.
230268
231269
#### Async patterns
232270

271+
- Activities and sub-orchestrations can be referenced by function object or by their registered string name. Both forms are supported:
272+
- Function reference (preferred for IDE/type support) or string name (useful across modules/languages).
273+
233274
- Activities:
234275
```python
235-
result = await ctx.activity("process", input={"x": 1})
276+
result = await ctx.call_activity("process", input={"x": 1})
277+
# or: result = await ctx.call_activity(process, input={"x": 1})
236278
```
237279

238280
- Timers:
@@ -247,29 +289,98 @@ val = await ctx.wait_for_external_event("approval")
247289

248290
- Concurrency:
249291
```python
250-
t1 = ctx.activity("a"); t2 = ctx.activity("b")
292+
t1 = ctx.call_activity("a"); t2 = ctx.call_activity("b")
251293
await ctx.when_all([t1, t2])
252294
winner = await ctx.when_any([ctx.wait_for_external_event("x"), ctx.sleep(5)])
295+
296+
# gather combines awaitables and preserves order
297+
results = await ctx.gather(t1, t2)
298+
# gather with exception capture
299+
results_or_errors = await ctx.gather(t1, t2, return_exceptions=True)
300+
```
301+
302+
#### Async vs. generator API differences
303+
304+
- Async authoring (`durabletask.aio`): awaiting returns the operation's value. Exceptions are raised on `await` (no `is_failed`).
305+
- Generator authoring (`durabletask.task`): yielding returns `Task` objects. Use `get_result()` to read values; failures surface via `is_failed()` or by raising on `get_result()`.
306+
307+
Examples:
308+
309+
```python
310+
# Async authoring (await returns value)
311+
# when_any returns a proxy that compares equal to the original awaitable
312+
# and exposes get_result() for the completed item.
313+
approval = ctx.wait_for_external_event("approval")
314+
winner = await ctx.when_any([approval, ctx.sleep(60)])
315+
if winner == approval:
316+
details = winner.get_result()
317+
```
318+
319+
```python
320+
# Async authoring (index + result)
321+
idx, result = await ctx.when_any_with_result([approval, ctx.sleep(60)])
322+
if idx == 0: # approval won
323+
details = result
324+
```
325+
326+
```python
327+
# Generator authoring (yield returns Task)
328+
approval = ctx.wait_for_external_event("approval")
329+
winner = yield task.when_any([approval, ctx.create_timer(timedelta(seconds=60))])
330+
if winner == approval:
331+
details = approval.get_result()
332+
```
333+
334+
Failure handling in async:
335+
336+
```python
337+
try:
338+
val = await ctx.call_activity("might_fail")
339+
except Exception as e:
340+
# handle failure branch
341+
...
342+
```
343+
344+
Or capture with gather:
345+
346+
```python
347+
res = await ctx.gather(ctx.call_activity("a"), return_exceptions=True)
348+
if isinstance(res[0], Exception):
349+
...
253350
```
254351

255352
- Sub-orchestrations (function reference or registered name):
256353
```python
257-
out = await ctx.sub_orchestrator(child_fn, input=payload)
258-
# or: out = await ctx.sub_orchestrator("child", input=payload)
354+
out = await ctx.call_sub_orchestrator(child_fn, input=payload)
355+
# or: out = await ctx.call_sub_orchestrator("child", input=payload)
259356
```
260357

261358
- Deterministic utilities:
262359
```python
263360
now = ctx.now(); rid = ctx.random().random(); uid = ctx.uuid4()
264361
```
265362

363+
- Workflow metadata/headers (async only for now):
364+
```python
365+
# Attach contextual metadata (e.g., tracing, tenant, app info)
366+
ctx.set_metadata({"x-trace": trace_id, "tenant": "acme"})
367+
md = ctx.get_metadata()
368+
369+
# Header aliases (same data)
370+
ctx.set_headers({"region": "us-east"})
371+
headers = ctx.get_headers()
372+
```
373+
Notes:
374+
- Useful for routing, observability, and cross-cutting concerns passed along activity/sub-orchestrator calls via the sidecar.
375+
- In python-sdk, available for both async and generator orchestrators. In this repo, currently implemented on `durabletask.aio`; generator parity is planned.
376+
266377
#### Worker readiness
267378

268379
When starting a worker and scheduling immediately, wait for the connection to the sidecar to be established:
269380

270381
```python
271382
with TaskHubGrpcWorker() as worker:
272-
worker.add_async_orchestrator(my_orch, name="my_orch")
383+
worker.add_orchestrator(my_orch)
273384
worker.start()
274385
worker.wait_for_ready(timeout=5)
275386
# Now safe to schedule

dev-requirements.txt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
1-
grpcio-tools==1.62.3 # 1.62.X is the latest version before protobuf 1.26.X is used which has breaking changes for Python
1+
pytest-asyncio>=0.23
2+
pytest
3+
pytest-cov
4+
autopep8
5+
grpcio>=1.74.0
6+
protobuf>=6.31.1

durabletask/aio/awaitables.py

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def __await__(self) -> Generator[Any, Any, TOutput]:
7171
class ActivityAwaitable(AwaitableBase[TOutput]):
7272
"""Awaitable for activity function calls."""
7373

74-
__slots__ = ('_ctx', '_activity_fn', '_input', '_retry_policy', '_metadata')
74+
__slots__ = ('_ctx', '_activity_fn', '_input', '_retry_policy', '_app_id', '_metadata')
7575

7676
def __init__(
7777
self,
@@ -80,6 +80,7 @@ def __init__(
8080
*,
8181
input: Any = None,
8282
retry_policy: Any = None,
83+
app_id: str | None = None,
8384
metadata: dict[str, str] | None = None,
8485
):
8586
"""
@@ -90,13 +91,15 @@ def __init__(
9091
activity_fn: The activity function to call
9192
input: Input data for the activity
9293
retry_policy: Optional retry policy
94+
app_id: Optional target app ID for routing
9395
metadata: Optional metadata for the activity call
9496
"""
9597
super().__init__()
9698
self._ctx = ctx
9799
self._activity_fn = activity_fn
98100
self._input = input
99101
self._retry_policy = retry_policy
102+
self._app_id = app_id
100103
self._metadata = metadata
101104

102105
def _to_task(self) -> task.Task[Any]:
@@ -105,21 +108,30 @@ def _to_task(self) -> task.Task[Any]:
105108
import inspect
106109
sig = inspect.signature(self._ctx.call_activity)
107110
supports_metadata = 'metadata' in sig.parameters
111+
supports_app_id = 'app_id' in sig.parameters
108112

109113
if self._retry_policy is None:
110-
if supports_metadata and self._metadata is not None:
114+
if (supports_metadata and self._metadata is not None) or (supports_app_id and self._app_id is not None):
115+
kwargs: dict[str, Any] = {"input": self._input}
116+
if supports_metadata and self._metadata is not None:
117+
kwargs["metadata"] = self._metadata
118+
if supports_app_id and self._app_id is not None:
119+
kwargs["app_id"] = self._app_id
111120
return cast(task.Task[Any], self._ctx.call_activity(
112-
self._activity_fn, input=self._input, metadata=self._metadata
121+
self._activity_fn, **kwargs
113122
))
114123
else:
115124
return cast(task.Task[Any], self._ctx.call_activity(self._activity_fn, input=self._input))
116125
else:
117-
if supports_metadata and self._metadata is not None:
126+
if (supports_metadata and self._metadata is not None) or (supports_app_id and self._app_id is not None):
127+
kwargs2: dict[str, Any] = {"input": self._input, "retry_policy": self._retry_policy}
128+
if supports_metadata and self._metadata is not None:
129+
kwargs2["metadata"] = self._metadata
130+
if supports_app_id and self._app_id is not None:
131+
kwargs2["app_id"] = self._app_id
118132
return cast(task.Task[Any], self._ctx.call_activity(
119133
self._activity_fn,
120-
input=self._input,
121-
retry_policy=self._retry_policy,
122-
metadata=self._metadata,
134+
**kwargs2,
123135
))
124136
else:
125137
return cast(task.Task[Any], self._ctx.call_activity(
@@ -132,7 +144,7 @@ def _to_task(self) -> task.Task[Any]:
132144
class SubOrchestratorAwaitable(AwaitableBase[TOutput]):
133145
"""Awaitable for sub-orchestrator calls."""
134146

135-
__slots__ = ('_ctx', '_workflow_fn', '_input', '_instance_id', '_retry_policy', '_metadata')
147+
__slots__ = ('_ctx', '_workflow_fn', '_input', '_instance_id', '_retry_policy', '_app_id', '_metadata')
136148

137149
def __init__(
138150
self,
@@ -142,6 +154,7 @@ def __init__(
142154
input: Any = None,
143155
instance_id: Optional[str] = None,
144156
retry_policy: Any = None,
157+
app_id: str | None = None,
145158
metadata: dict[str, str] | None = None,
146159
):
147160
"""
@@ -153,6 +166,7 @@ def __init__(
153166
input: Input data for the sub-orchestrator
154167
instance_id: Optional instance ID for the sub-orchestrator
155168
retry_policy: Optional retry policy
169+
app_id: Optional target app ID for routing
156170
metadata: Optional metadata for the sub-orchestrator call
157171
"""
158172
super().__init__()
@@ -161,6 +175,7 @@ def __init__(
161175
self._input = input
162176
self._instance_id = instance_id
163177
self._retry_policy = retry_policy
178+
self._app_id = app_id
164179
self._metadata = metadata
165180

166181
def _to_task(self) -> task.Task[Any]:
@@ -170,14 +185,18 @@ def _to_task(self) -> task.Task[Any]:
170185
import inspect
171186
sig = inspect.signature(self._ctx.call_sub_orchestrator)
172187
supports_metadata = 'metadata' in sig.parameters
188+
supports_app_id = 'app_id' in sig.parameters
173189

174190
if self._retry_policy is None:
175-
if supports_metadata and self._metadata is not None:
191+
if (supports_metadata and self._metadata is not None) or (supports_app_id and self._app_id is not None):
192+
kwargs: dict[str, Any] = {"input": self._input, "instance_id": self._instance_id}
193+
if supports_metadata and self._metadata is not None:
194+
kwargs["metadata"] = self._metadata
195+
if supports_app_id and self._app_id is not None:
196+
kwargs["app_id"] = self._app_id
176197
return cast(task.Task[Any], self._ctx.call_sub_orchestrator(
177198
self._workflow_fn,
178-
input=self._input,
179-
instance_id=self._instance_id,
180-
metadata=self._metadata,
199+
**kwargs,
181200
))
182201
else:
183202
return cast(task.Task[Any], self._ctx.call_sub_orchestrator(
@@ -186,13 +205,19 @@ def _to_task(self) -> task.Task[Any]:
186205
instance_id=self._instance_id,
187206
))
188207
else:
189-
if supports_metadata and self._metadata is not None:
208+
if (supports_metadata and self._metadata is not None) or (supports_app_id and self._app_id is not None):
209+
kwargs2: dict[str, Any] = {
210+
"input": self._input,
211+
"instance_id": self._instance_id,
212+
"retry_policy": self._retry_policy,
213+
}
214+
if supports_metadata and self._metadata is not None:
215+
kwargs2["metadata"] = self._metadata
216+
if supports_app_id and self._app_id is not None:
217+
kwargs2["app_id"] = self._app_id
190218
return cast(task.Task[Any], self._ctx.call_sub_orchestrator(
191219
self._workflow_fn,
192-
input=self._input,
193-
instance_id=self._instance_id,
194-
retry_policy=self._retry_policy,
195-
metadata=self._metadata,
220+
**kwargs2,
196221
))
197222
else:
198223
return cast(task.Task[Any], self._ctx.call_sub_orchestrator(

0 commit comments

Comments
 (0)