Skip to content

Commit 27068dc

Browse files
committed
updates, merge main, rebase from part-1
Signed-off-by: Filinto Duran <[email protected]>
1 parent d9ed06e commit 27068dc

31 files changed

+12626
-5
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,6 @@ dmypy.json
130130

131131
# IDEs
132132
.idea
133+
.vscode
133134

134135
coverage.lcov

README.md

Lines changed: 203 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ Orchestrations can start child orchestrations using the `call_sub_orchestrator`
118118

119119
Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.
120120

121-
### Continue-as-new (TODO)
121+
### Continue-as-new
122122

123123
Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input.
124124

@@ -214,6 +214,9 @@ The following is more information about how to develop this project. Note that d
214214
### Generating protobufs
215215

216216
```sh
217+
# install dev dependencies for generating protobufs and running tests
218+
pip3 install '.[dev]'
219+
217220
make gen-proto
218221
```
219222

@@ -252,9 +255,207 @@ dapr run --app-id test-app --dapr-grpc-port 4001 --resources-path ./examples/co
252255
To run the E2E tests on a specific python version (eg: 3.11), run the following command from the project root:
253256

254257
```sh
255-
tox -e py311 -- e2e
258+
tox -e py311-e2e
259+
```
260+
261+
### Configuration
262+
263+
#### Connection Configuration
264+
265+
The SDK connects to a Durable Task sidecar. By default it uses `localhost:4001`. You can override via environment variables (checked in order):
266+
267+
- `DAPR_GRPC_ENDPOINT` - Full endpoint (e.g., `localhost:4001`, `grpcs://host:443`)
268+
- `DAPR_GRPC_HOST` (or `DAPR_RUNTIME_HOST`) and `DAPR_GRPC_PORT` - Host and port separately
269+
270+
Example (common ports: 4001 for DurableTask-Go emulator, 50001 for Dapr sidecar):
271+
272+
```sh
273+
export DAPR_GRPC_ENDPOINT=localhost:4001
274+
# or
275+
export DAPR_GRPC_HOST=localhost
276+
export DAPR_GRPC_PORT=50001
277+
```
278+
279+
280+
#### Async Workflow Configuration
281+
282+
Configure async workflow behavior and debugging:
283+
284+
- `DAPR_WF_DISABLE_DETECTION` - Disable non-determinism detection (set to `true`)
285+
286+
Example:
287+
288+
```sh
289+
export DAPR_WF_DISABLE_DETECTION=false
290+
```
291+
292+
### Async workflow authoring
293+
294+
For a deeper tour of the async authoring surface (determinism helpers, sandbox modes, timeouts, concurrency patterns), see the Async Enhancements guide: [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md). The developer-facing migration notes are in [DEVELOPER_TRANSITION_GUIDE.md](./DEVELOPER_TRANSITION_GUIDE.md).
295+
296+
You can author orchestrators with `async def` using the new `durabletask.aio` package, which provides a comprehensive async workflow API:
297+
298+
```python
299+
from durabletask.worker import TaskHubGrpcWorker
300+
from durabletask.aio import AsyncWorkflowContext
301+
302+
async def my_orch(ctx: AsyncWorkflowContext, input) -> str:
303+
r1 = await ctx.call_activity(act1, input=input)
304+
await ctx.sleep(1.0)
305+
r2 = await ctx.call_activity(act2, input=r1)
306+
return r2
307+
308+
with TaskHubGrpcWorker() as worker:
309+
worker.add_orchestrator(my_orch)
310+
```
311+
312+
Optional sandbox mode (`best_effort` or `strict`) patches `asyncio.sleep`, `random`, `uuid.uuid4`, and `time.time` within the workflow step to deterministic equivalents. This is best-effort and not a correctness guarantee.
313+
314+
In `strict` mode, `asyncio.create_task` is blocked inside workflows to preserve determinism and will raise a `SandboxViolationError` if used.
315+
316+
> **Enhanced Sandbox Features**: The enhanced version includes comprehensive non-determinism detection, timeout support, enhanced concurrency primitives, and debugging tools. See [ASYNC_ENHANCEMENTS.md](./durabletask/aio/ASYNCIO_ENHANCEMENTS.md) for complete documentation.
317+
318+
#### Async patterns
319+
320+
- Activities and sub-orchestrations can be referenced by function object or by their registered string name. Both forms are supported:
321+
- Function reference (preferred for IDE/type support) or string name (useful across modules/languages).
322+
323+
- Activities:
324+
```python
325+
result = await ctx.call_activity("process", input={"x": 1})
326+
# or: result = await ctx.call_activity(process, input={"x": 1})
327+
```
328+
329+
- Timers:
330+
```python
331+
await ctx.sleep(1.5) # seconds or timedelta
332+
```
333+
334+
- External events:
335+
```python
336+
val = await ctx.wait_for_external_event("approval")
337+
```
338+
339+
- Concurrency:
340+
```python
341+
t1 = ctx.call_activity("a"); t2 = ctx.call_activity("b")
342+
await ctx.when_all([t1, t2])
343+
winner = await ctx.when_any([ctx.wait_for_external_event("x"), ctx.sleep(5)])
344+
345+
# gather combines awaitables and preserves order
346+
results = await ctx.gather(t1, t2)
347+
# gather with exception capture
348+
results_or_errors = await ctx.gather(t1, t2, return_exceptions=True)
256349
```
257350

351+
#### Async vs. generator API differences
352+
353+
- Async authoring (`durabletask.aio`): awaiting returns the operation's value. Exceptions are raised on `await` (no `is_failed`).
354+
- Generator authoring (`durabletask.task`): yielding returns `Task` objects. Use `get_result()` to read values; failures surface via `is_failed()` or by raising on `get_result()`.
355+
356+
Examples:
357+
358+
```python
359+
# Async authoring (await returns value)
360+
# when_any returns a proxy that compares equal to the original awaitable
361+
# and exposes get_result() for the completed item.
362+
approval = ctx.wait_for_external_event("approval")
363+
winner = await ctx.when_any([approval, ctx.sleep(60)])
364+
if winner == approval:
365+
details = winner.get_result()
366+
```
367+
368+
```python
369+
# Async authoring (index + result)
370+
idx, result = await ctx.when_any_with_result([approval, ctx.sleep(60)])
371+
if idx == 0: # approval won
372+
details = result
373+
```
374+
375+
```python
376+
# Generator authoring (yield returns Task)
377+
approval = ctx.wait_for_external_event("approval")
378+
winner = yield task.when_any([approval, ctx.create_timer(timedelta(seconds=60))])
379+
if winner == approval:
380+
details = approval.get_result()
381+
```
382+
383+
Failure handling in async:
384+
385+
```python
386+
try:
387+
val = await ctx.call_activity("might_fail")
388+
except Exception as e:
389+
# handle failure branch
390+
...
391+
```
392+
393+
Or capture with gather:
394+
395+
```python
396+
res = await ctx.gather(ctx.call_activity("a"), return_exceptions=True)
397+
if isinstance(res[0], Exception):
398+
...
399+
```
400+
401+
402+
- Sub-orchestrations (function reference or registered name):
403+
```python
404+
out = await ctx.call_sub_orchestrator(child_fn, input=payload)
405+
# or: out = await ctx.call_sub_orchestrator("child", input=payload)
406+
```
407+
408+
- Deterministic utilities:
409+
```python
410+
now = ctx.now(); rid = ctx.random().random(); uid = ctx.uuid4()
411+
```
412+
413+
- Workflow metadata/headers (async only for now):
414+
```python
415+
# Attach contextual metadata (e.g., tracing, tenant, app info)
416+
ctx.set_metadata({"x-trace": trace_id, "tenant": "acme"})
417+
md = ctx.get_metadata()
418+
419+
# Header aliases (same data)
420+
ctx.set_headers({"region": "us-east"})
421+
headers = ctx.get_headers()
422+
```
423+
Notes:
424+
- Useful for routing, observability, and cross-cutting concerns passed along activity/sub-orchestrator calls via the sidecar.
425+
- In python-sdk, available for both async and generator orchestrators. In this repo, currently implemented on `durabletask.aio`; generator parity is planned.
426+
427+
- Cross-app activity/sub-orchestrator routing (async only for now):
428+
```python
429+
# Route activity to a different app via app_id
430+
result = await ctx.call_activity("process", input=data, app_id="worker-app-2")
431+
432+
# Route sub-orchestrator to a different app
433+
child_result = await ctx.call_sub_orchestrator("child_workflow", input=data, app_id="orchestrator-app-2")
434+
```
435+
Notes:
436+
- The `app_id` parameter enables multi-app orchestrations where activities or child workflows run in different application instances.
437+
- Requires sidecar support for cross-app invocation.
438+
439+
#### Worker readiness
440+
441+
When starting a worker and scheduling immediately, wait for the connection to the sidecar to be established:
442+
443+
```python
444+
with TaskHubGrpcWorker() as worker:
445+
worker.add_orchestrator(my_orch)
446+
worker.start()
447+
worker.wait_for_ready(timeout=5)
448+
# Now safe to schedule
449+
```
450+
451+
#### Suspension & termination
452+
453+
- `ctx.is_suspended` reflects suspension state during replay/processing.
454+
- Suspend pauses progress without raising inside async orchestrators.
455+
- Terminate completes with `TERMINATED` status; use client APIs to terminate/resume.
456+
- Only new events are buffered while suspended; replay events continue to apply to rebuild local state deterministically.
457+
458+
258459
## Contributing
259460

260461
This project welcomes contributions and suggestions. Most contributions require you to agree to a

durabletask/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
# Public async exports (import directly from durabletask.aio)
5+
from durabletask.aio import AsyncWorkflowContext, CoroutineOrchestratorRunner # noqa: F401
6+
47
"""Durable Task SDK for Python"""
58

69
PACKAGE_NAME = "durabletask"

0 commit comments

Comments
 (0)