|
| 1 | +# Durable Task AsyncIO Internals |
| 2 | + |
| 3 | +This document explains how the AsyncIO implementation in this repository integrates with the existing generator‑based Durable Task runtime. It covers the coroutine→generator bridge, awaitable design, sandboxing and non‑determinism detection, error/cancellation semantics, debugging, and guidance for extending the system. |
| 4 | + |
| 5 | +## Scope and Goals |
| 6 | + |
| 7 | +- Async authoring model for orchestrators while preserving Durable Task's generator runtime contract |
| 8 | +- Deterministic execution and replay correctness first |
| 9 | +- Optional, scoped compatibility sandbox for common stdlib calls during development/test |
| 10 | +- Minimal surface area changes to core non‑async code paths |
| 11 | + |
| 12 | +Key modules: |
| 13 | +- `durabletask/aio/context.py` — Async workflow context and deterministic utilities |
| 14 | +- `durabletask/aio/driver.py` — Coroutine→generator bridge |
| 15 | +- `durabletask/aio/sandbox.py` — Scoped patching and non‑determinism detection |
| 16 | + |
| 17 | +## Architecture Overview |
| 18 | + |
| 19 | +### Coroutine→Generator Bridge |
| 20 | + |
| 21 | +Async orchestrators are authored as `async def` but executed by Durable Task as generators that yield `durabletask.task.Task` (or composite) instances. The bridge implements a driver that manually steps a coroutine and converts each `await` into a yielded Durable Task operation. |
| 22 | + |
| 23 | +High‑level flow: |
| 24 | +1. `TaskHubGrpcWorker.add_async_orchestrator(async_fn, sandbox_mode=...)` wraps `async_fn` with a `CoroutineOrchestratorRunner` and registers a generator orchestrator with the worker. |
| 25 | +2. At execution time, the runtime calls the registered generator orchestrator with a base `OrchestrationContext` and input. |
| 26 | +3. The generator orchestrator constructs `AsyncWorkflowContext` and then calls `runner.to_generator(async_fn_ctx, input)` to obtain a generator. |
| 27 | +4. The driver loop yields Durable Task operations to the engine and sends results back into the coroutine upon resume, until the coroutine completes. |
| 28 | + |
| 29 | +Driver responsibilities: |
| 30 | +- Prime the coroutine (`coro.send(None)`) and handle immediate completion |
| 31 | +- Recognize awaitables whose `__await__` yield driver‑recognized operation descriptors |
| 32 | +- Yield the underlying Durable Task `task.Task` (or composite) to the engine |
| 33 | +- Translate successful completions to `.send(value)` and failures to `.throw(exc)` on the coroutine |
| 34 | +- Normalize `StopIteration` completions (PEP 479) so that orchestrations complete with a value rather than raising into the worker |
| 35 | + |
| 36 | +### Awaitables and Operation Descriptors |
| 37 | + |
| 38 | +Awaitables in `durabletask.aio` implement `__await__` to expose a small operation descriptor that the driver understands. Each descriptor maps deterministically to a Durable Task operation: |
| 39 | + |
| 40 | +- Activity: `ctx.activity(name, *, input)` → `task.call_activity(name, input)` |
| 41 | +- Sub‑orchestrator: `ctx.sub_orchestrator(fn_or_name, *, input)` → `task.call_sub_orchestrator(...)` |
| 42 | +- Timer: `ctx.sleep(duration)` → `task.create_timer(fire_at)` |
| 43 | +- External event: `ctx.wait_for_external_event(name)` → `task.wait_for_external_event(name)` |
| 44 | +- Concurrency: `ctx.when_all([...])` / `ctx.when_any([...])` → `task.when_all([...])` / `task.when_any([...])` |
| 45 | + |
| 46 | +Design rules: |
| 47 | +- Awaitables are single‑use. Each call creates a fresh awaitable whose `__await__` returns a fresh iterator. This avoids "cannot reuse already awaited coroutine" during replay. |
| 48 | +- All awaitables use `__slots__` for memory efficiency and replay stability. |
| 49 | +- Composite awaitables convert their children to Durable Task tasks before yielding. |
| 50 | + |
| 51 | +### AsyncWorkflowContext |
| 52 | + |
| 53 | +`AsyncWorkflowContext` wraps the base generator `OrchestrationContext` and exposes deterministic utilities and async awaitables. |
| 54 | + |
| 55 | +Provided utilities (deterministic): |
| 56 | +- `now()` — orchestration time based on history |
| 57 | +- `random()` — PRNG seeded deterministically (e.g., instance/run ID); used by `uuid4()` |
| 58 | +- `uuid4()` — derived from deterministic PRNG |
| 59 | +- `is_replaying`, `is_suspended`, `workflow_name`, `instance_id`, etc. — passthrough metadata |
| 60 | + |
| 61 | +Concurrency: |
| 62 | +- `when_all([...])` returns an awaitable that completes with a list of results |
| 63 | +- `when_any([...])` returns an awaitable that completes with the first completed child |
| 64 | +- `when_any_with_result([...])` returns `(index, result)` |
| 65 | +- `with_timeout(awaitable, seconds|timedelta)` wraps any awaitable with a deterministic timer |
| 66 | + |
| 67 | +Debugging helpers (dev‑only): |
| 68 | +- Operation history when debug is enabled (`DAPR_WF_DEBUG=true` or `DT_DEBUG=true`) |
| 69 | +- `get_debug_info()` to inspect state for diagnostics |
| 70 | + |
| 71 | +### Error and Cancellation Semantics |
| 72 | + |
| 73 | +- Activity/sub‑orchestrator completion values are sent back into the coroutine. Final failures are injected via `coro.throw(...)`. |
| 74 | +- Cancellations are mapped to `asyncio.CancelledError` where appropriate and thrown into the coroutine. |
| 75 | +- Termination completes orchestrations with TERMINATED status (matching generator behavior); exceptions are surfaced as failureDetails in the runtime completion action. |
| 76 | +- The driver consumes `StopIteration` from awaited iterators and returns the value to avoid leaking `RuntimeError("generator raised StopIteration")`. |
| 77 | + |
| 78 | +## Sequence Diagram |
| 79 | + |
| 80 | +### Mermaid (rendered in compatible viewers) |
| 81 | + |
| 82 | +```mermaid |
| 83 | +sequenceDiagram |
| 84 | + autonumber |
| 85 | + participant W as TaskHubGrpcWorker |
| 86 | + participant E as Durable Task Engine |
| 87 | + participant G as Generator Orchestrator Wrapper |
| 88 | + participant R as CoroutineOrchestratorRunner |
| 89 | + participant C as Async Orchestrator (coroutine) |
| 90 | + participant A as Awaitable (__await__) |
| 91 | + participant S as Sandbox (optional) |
| 92 | +
|
| 93 | + E->>G: invoke(name, ctx, input) |
| 94 | + G->>R: to_generator(AsyncWorkflowContext(ctx), input) |
| 95 | + R->>C: start coroutine (send None) |
| 96 | +
|
| 97 | + opt sandbox_mode != "off" |
| 98 | + G->>S: enter sandbox scope (patch) |
| 99 | + S-->>G: patch asyncio.sleep/random/uuid/time |
| 100 | + end |
| 101 | +
|
| 102 | + Note right of C: await ctx.activity(...), ctx.sleep(...), ctx.when_any(...) |
| 103 | + C-->>A: create awaitable |
| 104 | + A-->>R: __await__ yields Durable Task op |
| 105 | + R-->>E: yield task/composite |
| 106 | + E-->>R: resume with result/failure |
| 107 | + R->>C: send(result) / throw(error) |
| 108 | + C-->>R: next awaitable or StopIteration |
| 109 | +
|
| 110 | + alt next awaitable |
| 111 | + R-->>E: yield next operation |
| 112 | + else completed |
| 113 | + R-->>G: return result (StopIteration.value) |
| 114 | + G-->>E: completeOrchestration(result) |
| 115 | + end |
| 116 | +
|
| 117 | + opt sandbox_mode != "off" |
| 118 | + G->>S: exit sandbox scope (restore) |
| 119 | + end |
| 120 | +``` |
| 121 | + |
| 122 | +### ASCII Flow (fallback) |
| 123 | + |
| 124 | +```text |
| 125 | +Engine → Wrapper → Runner → Coroutine |
| 126 | + │ │ │ ├─ await ctx.activity(...) |
| 127 | + │ │ │ ├─ await ctx.sleep(...) |
| 128 | + │ │ │ └─ await ctx.when_any([...]) |
| 129 | + │ │ │ |
| 130 | + │ │ └─ Awaitable.__await__ → yields Durable Task op |
| 131 | + │ └─ yield op → Engine schedules/waits |
| 132 | + └─ resume with result → Runner.send/throw → Coroutine step |
| 133 | +
|
| 134 | +Loop until coroutine returns → Runner captures StopIteration.value → |
| 135 | +Wrapper returns value → Engine emits completeOrchestration |
| 136 | +
|
| 137 | +Optional Sandbox (per activation): |
| 138 | + enter → patch asyncio.sleep/random/uuid/time → run step → restore |
| 139 | +``` |
| 140 | + |
| 141 | +## Sandboxing and Non‑Determinism Detection |
| 142 | + |
| 143 | +The sandbox provides optional, scoped compatibility and detection for common non‑deterministic stdlib calls. It is opt‑in per orchestrator via `sandbox_mode`: |
| 144 | + |
| 145 | +- `off` (default): No patching or detection; zero overhead. Use deterministic APIs only. |
| 146 | +- `best_effort`: Patch common functions within a scope and emit warnings on detected non‑determinism. |
| 147 | +- `strict`: As above, but raise `SandboxViolationError` on detected calls. |
| 148 | + |
| 149 | +Patched targets (best‑effort): |
| 150 | +- `asyncio.sleep` → deterministic timer awaitable |
| 151 | +- `random` module functions (via a deterministic `Random` instance) |
| 152 | +- `uuid.uuid4` → derived from deterministic PRNG |
| 153 | +- `time.time/time_ns` → orchestration time |
| 154 | + |
| 155 | +Important limitations: |
| 156 | +- `datetime.datetime.now()` is not patched (type immutability). Use `ctx.now()` or `ctx.current_utc_datetime`. |
| 157 | +- `from x import y` may bypass patches due to direct binding. |
| 158 | +- Modules that cache callables at import time won’t see patch updates. |
| 159 | +- This does not make I/O deterministic; all external I/O must be in activities. |
| 160 | + |
| 161 | +Detection engine: |
| 162 | +- `_NonDeterminismDetector` tracks suspicious call sites using Python frame inspection |
| 163 | +- Deduplicates warnings per call signature and location |
| 164 | +- In strict mode, raises `SandboxViolationError` with actionable suggestions; in best‑effort, issues `NonDeterminismWarning` |
| 165 | + |
| 166 | +### Detector: What, When, and Why |
| 167 | + |
| 168 | +What it checks: |
| 169 | +- Calls to common non‑deterministic functions (e.g., `time.time`, `random.random`, `uuid.uuid4`, `os.urandom`, `secrets.*`, `datetime.utcnow`) in user code |
| 170 | +- Uses a lightweight global trace function (installed only in `best_effort` or `strict`) to inspect call frames and identify risky callsites |
| 171 | +- Skips internal `durabletask` frames and built‑ins to reduce noise |
| 172 | + |
| 173 | +Modes and behavior: |
| 174 | +- `SandboxMode.OFF`: |
| 175 | + - No tracing, no patching, zero overhead |
| 176 | + - Detector is not active |
| 177 | +- `SandboxMode.BEST_EFFORT`: |
| 178 | + - Patches selected stdlib functions |
| 179 | + - Installs tracer only when `ctx._debug_mode` is true; otherwise a no‑op tracer is used to keep overhead minimal |
| 180 | + - Emits `NonDeterminismWarning` once per unique callsite with a suggested deterministic alternative |
| 181 | +- `SandboxMode.STRICT`: |
| 182 | + - Patches selected stdlib functions and blocks dangerous operations (e.g., `open`, `os.urandom`, `secrets.*`) |
| 183 | + - Installs full tracer regardless of debug flag |
| 184 | + - Raises `SandboxViolationError` on first detection with details and suggestions |
| 185 | + |
| 186 | +When to use it (recommended): |
| 187 | +- During development to quickly surface accidental non‑determinism in orchestrator code |
| 188 | +- When integrating third‑party libraries that might call time/random/uuid internally |
| 189 | +- In CI for a dedicated “determinism” job (short test matrix), using `BEST_EFFORT` for warnings or `STRICT` for enforcement |
| 190 | + |
| 191 | +When not to use it: |
| 192 | +- Production environments (prefer `OFF` for zero overhead) |
| 193 | +- Performance‑sensitive local loops (e.g., microbenchmarks) unless you are specifically testing detection overhead |
| 194 | + |
| 195 | +Enabling and controlling the detector: |
| 196 | +- Per‑orchestrator registration: |
| 197 | +```python |
| 198 | +from durabletask.aio import SandboxMode |
| 199 | + |
| 200 | +worker.add_orchestrator(my_async_orch, sandbox_mode=SandboxMode.BEST_EFFORT) |
| 201 | +``` |
| 202 | +- Scoped usage in advanced scenarios: |
| 203 | +```python |
| 204 | +from durabletask.aio import sandbox_best_effort |
| 205 | + |
| 206 | +async def my_async_orch(ctx, _): |
| 207 | + with sandbox_best_effort(ctx): |
| 208 | + # code here benefits from patches + detection |
| 209 | + ... |
| 210 | +``` |
| 211 | +- Debug gating (best_effort only): set `DAPR_WF_DEBUG=true` (or `DT_DEBUG=true`) to enable full detection; otherwise a no‑op tracer is used to minimize overhead. |
| 212 | +- Global disable (regardless of mode): set `DAPR_WF_DISABLE_DETECTION=true` to force `OFF` behavior without changing code. |
| 213 | + |
| 214 | +What warnings/errors look like: |
| 215 | +- Warning (`BEST_EFFORT`): |
| 216 | + - Category: `NonDeterminismWarning` |
| 217 | + - Message includes function name, filename:line, the current function, and a deterministic alternative (e.g., “Use `ctx.now()` instead of `datetime.utcnow()`). |
| 218 | +- Error (`STRICT`): |
| 219 | + - Exception: `SandboxViolationError` |
| 220 | + - Includes violation type, suggested alternative, `workflow_name`, and `instance_id` when available |
| 221 | + |
| 222 | +Overhead and performance: |
| 223 | +- `OFF`: zero overhead |
| 224 | +- `BEST_EFFORT`: minimal overhead by default; full detection overhead only when debug is enabled |
| 225 | +- `STRICT`: tracing overhead present; recommended only for testing/enforcement, not for production |
| 226 | + |
| 227 | +Limitations and caveats: |
| 228 | +- Direct imports like `from random import random` bind the function and may bypass patching |
| 229 | +- Libraries that cache function references at import time will not see patch changes |
| 230 | +- `datetime.datetime.now()` cannot be patched; use `ctx.now()` instead |
| 231 | +- The detector is advisory; it cannot prove determinism for arbitrary code. Treat it as a power tool for finding common pitfalls, not a formal verifier |
| 232 | + |
| 233 | +Quick mapping of alternatives: |
| 234 | +- `datetime.now/utcnow` → `ctx.now()` (async) or `ctx.current_utc_datetime` |
| 235 | +- `time.time/time_ns` → `ctx.now().timestamp()` / `int(ctx.now().timestamp() * 1e9)` |
| 236 | +- `random.*` → `ctx.random().*` |
| 237 | +- `uuid.uuid4` → `ctx.uuid4()` |
| 238 | +- `os.urandom` / `secrets.*` → `ctx.random().randbytes()` (or move to an activity) |
| 239 | + |
| 240 | +Troubleshooting tips: |
| 241 | +- Seeing repeated warnings? They are deduplicated per callsite; different files/lines will warn independently |
| 242 | +- Unexpected strict errors during replay? Confirm you are not creating background tasks (`asyncio.create_task`) or performing I/O in the orchestrator |
| 243 | +- Need to quiet a test temporarily? Use `sandbox_mode=SandboxMode.OFF` for that orchestrator or `DAPR_WF_DISABLE_DETECTION=true` during the run |
| 244 | + |
| 245 | +## Integration with Generator Runtime |
| 246 | + |
| 247 | +- Registration: `TaskHubGrpcWorker.add_async_orchestrator(async_fn, ...)` registers a generator wrapper that delegates to the driver. Generator orchestrators and async orchestrators can coexist. |
| 248 | +- Execution loop remains owned by Durable Task; the driver only yields operations and processes resumes. |
| 249 | +- Replay: The driver and awaitables are designed to be idempotent and to avoid reusing awaited iterators; orchestration state is reconstructed deterministically from history. |
| 250 | + |
| 251 | +## Debugging Guide |
| 252 | + |
| 253 | +Enable developer diagnostics: |
| 254 | +- Set `DAPR_WF_DEBUG=true` (or `DT_DEBUG=true`) to enable operation logging and non‑determinism warnings. |
| 255 | +- Use `ctx.get_debug_info()` to export state, operations, and instance metadata. |
| 256 | + |
| 257 | +Common issues: |
| 258 | +- "coroutine was never awaited": Ensure all workflow operations are awaited and that no background tasks are spawned (`asyncio.create_task` is blocked in strict mode). |
| 259 | +- "cannot reuse already awaited coroutine": Do not cache awaitables across activations; create them inline. All awaitables in this package are single‑use by design. |
| 260 | +- Orchestration hangs: Inspect last yielded operation in logs; verify that the corresponding history event occurs (activity completion, timer fired, external event received). For external events, ensure the event name matches exactly. |
| 261 | +- Sandbox leakage: Verify patches are scoped by context manager and restored after activation. Avoid `from x import y` forms in orchestrator code when relying on patching. |
| 262 | + |
| 263 | +Runtime tracing tips: |
| 264 | +- Log each yielded operation and each resume result in the driver (behind debug flag) to correlate with sidecar logs. |
| 265 | +- Capture `instance_id` and `history_event_sequence` from `AsyncWorkflowContext` when logging. |
| 266 | + |
| 267 | +## Performance Characteristics |
| 268 | + |
| 269 | +- `sandbox_mode="off"`: zero overhead vs generator orchestrators |
| 270 | +- `best_effort` / `strict`: additional overhead from Python tracing and patching; use during development and testing |
| 271 | +- Awaitables use `__slots__` and avoid per‑step allocations in hot paths where feasible |
| 272 | + |
| 273 | +## Extending the System |
| 274 | + |
| 275 | +Adding a new awaitable: |
| 276 | +1. Define a class with `__slots__` and a constructor capturing required arguments. |
| 277 | +2. Implement `_to_task(self) -> durabletask.task.Task` that builds the deterministic operation. |
| 278 | +3. Implement `__await__` to yield the driver‑recognized descriptor (or directly the task, depending on driver design). |
| 279 | +4. Add unit tests for replay stability and error propagation. |
| 280 | + |
| 281 | +Adding sandbox coverage: |
| 282 | +1. Add patch/unpatch logic inside `sandbox.py` with correct scoping and restoration. |
| 283 | +2. Update `_NonDeterminismDetector` patterns and suggestions. |
| 284 | +3. Document limitations and add tests for best‑effort and strict modes. |
| 285 | + |
| 286 | +## Interop Checklist (Async ↔ Generator) |
| 287 | + |
| 288 | +- Activities: identical behavior; only authoring differs (`yield` vs `await`). |
| 289 | +- Timers: map to the same `createTimer` actions. |
| 290 | +- External events: same semantics for buffering and completion. |
| 291 | +- Sub‑orchestrators: same create/complete/fail events. |
| 292 | +- Suspension/Termination: same runtime events; async path observes `is_suspended` and maps termination to completion with TERMINATED. |
| 293 | + |
| 294 | +## References |
| 295 | + |
| 296 | +- `durabletask/aio/context.py` |
| 297 | +- `durabletask/aio/driver.py` |
| 298 | +- `durabletask/aio/sandbox.py` |
| 299 | +- Tests under `tests/durabletask/` and `tests/aio/` |
| 300 | + |
| 301 | + |
0 commit comments