Skip to content

Commit b20de61

Browse files
committed
updates, merge main, rebase from part-1
Signed-off-by: Filinto Duran <[email protected]>
1 parent 366b2f6 commit b20de61

30 files changed

+12712
-6
lines changed

README.md

Lines changed: 226 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,15 @@ Orchestrations can start child orchestrations using the `call_sub_orchestrator`
118118

119119
Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.
120120

121-
### Continue-as-new (TODO)
121+
### Continue-as-new
122122

123123
Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input.
124124

125125
### Suspend, resume, and terminate
126126

127127
Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events.
128128

129-
### Retry policies (TODO)
129+
### Retry policies
130130

131131
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
132132

@@ -162,6 +162,9 @@ The following is more information about how to develop this project. Note that d
162162
### Generating protobufs
163163

164164
```sh
165+
# install dev dependencies for generating protobufs and running tests
166+
pip3 install '.[dev]'
167+
165168
make gen-proto
166169
```
167170

@@ -200,9 +203,229 @@ dapr run --app-id test-app --dapr-grpc-port 4001 --resources-path ./examples/co
200203
To run the E2E tests on a specific python version (eg: 3.11), run the following command from the project root:
201204

202205
```sh
203-
tox -e py311 -- e2e
206+
tox -e py311-e2e
207+
```
208+
209+
### Configuration
210+
211+
#### Connection Configuration
212+
213+
The SDK connects to a Durable Task sidecar. By default it uses `localhost:4001`. You can override via environment variables (checked in order):
214+
215+
- `DAPR_GRPC_ENDPOINT` - Full endpoint (e.g., `localhost:4001`, `grpcs://host:443`)
216+
- `DAPR_GRPC_HOST` (or `DAPR_RUNTIME_HOST`) and `DAPR_GRPC_PORT` - Host and port separately
217+
218+
Example (common ports: 4001 for DurableTask-Go emulator, 50001 for Dapr sidecar):
219+
220+
```sh
221+
export DAPR_GRPC_ENDPOINT=localhost:4001
222+
# or
223+
export DAPR_GRPC_HOST=localhost
224+
export DAPR_GRPC_PORT=50001
225+
```
226+
227+
228+
#### Async Workflow Configuration
229+
230+
Configure async workflow behavior and debugging:
231+
232+
- `DAPR_WF_DISABLE_DETECTION` - Disable non-determinism detection (set to `true`)
233+
234+
Example:
235+
236+
```sh
237+
export DAPR_WF_DISABLE_DETECTION=false
238+
```
239+
240+
### Async workflow authoring
241+
242+
For a deeper tour of the async authoring surface (determinism helpers, sandbox modes, timeouts, concurrency patterns), see the Async Enhancements guide: [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md). The developer-facing migration notes are in [DEVELOPER_TRANSITION_GUIDE.md](./DEVELOPER_TRANSITION_GUIDE.md).
243+
244+
You can author orchestrators with `async def` using the new `durabletask.aio` package, which provides a comprehensive async workflow API:
245+
246+
```python
247+
from durabletask.worker import TaskHubGrpcWorker
248+
from durabletask.aio import AsyncWorkflowContext
249+
250+
async def my_orch(ctx: AsyncWorkflowContext, input) -> str:
251+
r1 = await ctx.call_activity(act1, input=input)
252+
await ctx.sleep(1.0)
253+
r2 = await ctx.call_activity(act2, input=r1)
254+
return r2
255+
256+
with TaskHubGrpcWorker() as worker:
257+
worker.add_orchestrator(my_orch)
258+
```
259+
260+
Optional sandbox mode (`best_effort` or `strict`) patches `asyncio.sleep`, `random`, `uuid.uuid4`, and `time.time` within the workflow step to deterministic equivalents. This is best-effort and not a correctness guarantee.
261+
262+
In `strict` mode, `asyncio.create_task` is blocked inside workflows to preserve determinism and will raise a `SandboxViolationError` if used.
263+
264+
> **Enhanced Sandbox Features**: The enhanced version includes comprehensive non-determinism detection, timeout support, enhanced concurrency primitives, and debugging tools. See [ASYNC_ENHANCEMENTS.md](./durabletask/aio/ASYNCIO_ENHANCEMENTS.md) for complete documentation.
265+
266+
#### Async patterns
267+
268+
- Activities and sub-orchestrations can be referenced by function object or by their registered string name. Both forms are supported:
269+
- Function reference (preferred for IDE/type support) or string name (useful across modules/languages).
270+
271+
- Activities:
272+
```python
273+
result = await ctx.call_activity("process", input={"x": 1})
274+
# or: result = await ctx.call_activity(process, input={"x": 1})
204275
```
205276

277+
- Timers:
278+
```python
279+
await ctx.sleep(1.5) # seconds or timedelta
280+
```
281+
282+
- External events:
283+
```python
284+
val = await ctx.wait_for_external_event("approval")
285+
```
286+
287+
- Concurrency:
288+
```python
289+
t1 = ctx.call_activity("a"); t2 = ctx.call_activity("b")
290+
await ctx.when_all([t1, t2])
291+
winner = await ctx.when_any([ctx.wait_for_external_event("x"), ctx.sleep(5)])
292+
293+
# gather combines awaitables and preserves order
294+
results = await ctx.gather(t1, t2)
295+
# gather with exception capture
296+
results_or_errors = await ctx.gather(t1, t2, return_exceptions=True)
297+
```
298+
299+
#### Async vs. generator API differences
300+
301+
- Async authoring (`durabletask.aio`): awaiting returns the operation's value. Exceptions are raised on `await` (no `is_failed`).
302+
- Generator authoring (`durabletask.task`): yielding returns `Task` objects. Use `get_result()` to read values; failures surface via `is_failed()` or by raising on `get_result()`.
303+
304+
Examples:
305+
306+
```python
307+
# Async authoring (await returns value)
308+
# when_any returns a proxy that compares equal to the original awaitable
309+
# and exposes get_result() for the completed item.
310+
approval = ctx.wait_for_external_event("approval")
311+
winner = await ctx.when_any([approval, ctx.sleep(60)])
312+
if winner == approval:
313+
details = winner.get_result()
314+
```
315+
316+
```python
317+
# Async authoring (index + result)
318+
idx, result = await ctx.when_any_with_result([approval, ctx.sleep(60)])
319+
if idx == 0: # approval won
320+
details = result
321+
```
322+
323+
```python
324+
# Generator authoring (yield returns Task)
325+
approval = ctx.wait_for_external_event("approval")
326+
winner = yield task.when_any([approval, ctx.create_timer(timedelta(seconds=60))])
327+
if winner == approval:
328+
details = approval.get_result()
329+
```
330+
331+
Failure handling in async:
332+
333+
```python
334+
try:
335+
val = await ctx.call_activity("might_fail")
336+
except Exception as e:
337+
# handle failure branch
338+
...
339+
```
340+
341+
Or capture with gather:
342+
343+
```python
344+
res = await ctx.gather(ctx.call_activity("a"), return_exceptions=True)
345+
if isinstance(res[0], Exception):
346+
...
347+
```
348+
349+
350+
- Sub-orchestrations (function reference or registered name):
351+
```python
352+
out = await ctx.call_sub_orchestrator(child_fn, input=payload)
353+
# or: out = await ctx.call_sub_orchestrator("child", input=payload)
354+
```
355+
356+
- Deterministic utilities:
357+
```python
358+
now = ctx.now(); rid = ctx.random().random(); uid = ctx.uuid4()
359+
```
360+
361+
- Workflow metadata/headers (async only for now):
362+
```python
363+
# Attach contextual metadata (e.g., tracing, tenant, app info)
364+
ctx.set_metadata({"x-trace": trace_id, "tenant": "acme"})
365+
md = ctx.get_metadata()
366+
367+
# Header aliases (same data)
368+
ctx.set_headers({"region": "us-east"})
369+
headers = ctx.get_headers()
370+
```
371+
Notes:
372+
- Useful for routing, observability, and cross-cutting concerns passed along activity/sub-orchestrator calls via the sidecar.
373+
- In python-sdk, available for both async and generator orchestrators. In this repo, currently implemented on `durabletask.aio`; generator parity is planned.
374+
375+
- Cross-app activity/sub-orchestrator routing (async only for now):
376+
```python
377+
# Route activity to a different app via app_id
378+
result = await ctx.call_activity("process", input=data, app_id="worker-app-2")
379+
380+
# Route sub-orchestrator to a different app
381+
child_result = await ctx.call_sub_orchestrator("child_workflow", input=data, app_id="orchestrator-app-2")
382+
```
383+
Notes:
384+
- The `app_id` parameter enables multi-app orchestrations where activities or child workflows run in different application instances.
385+
- Requires sidecar support for cross-app invocation.
386+
387+
#### Worker readiness
388+
389+
When starting a worker and scheduling immediately, wait for the connection to the sidecar to be established:
390+
391+
```python
392+
with TaskHubGrpcWorker() as worker:
393+
worker.add_orchestrator(my_orch)
394+
worker.start()
395+
worker.wait_for_ready(timeout=5)
396+
# Now safe to schedule
397+
```
398+
399+
#### Suspension & termination
400+
401+
- `ctx.is_suspended` reflects suspension state during replay/processing.
402+
- Suspend pauses progress without raising inside async orchestrators.
403+
- Terminate completes with `TERMINATED` status; use client APIs to terminate/resume.
404+
- Only new events are buffered while suspended; replay events continue to apply to rebuild local state deterministically.
405+
406+
### Tracing and context propagation
407+
408+
The SDK surfaces W3C tracing context provided by the sidecar:
409+
410+
- Orchestrations: `ctx.trace_parent`, `ctx.trace_state`, and `ctx.orchestration_span_id` are available on `OrchestrationContext` (and on `AsyncWorkflowContext`).
411+
- Activities: `ctx.trace_parent` and `ctx.trace_state` are available on `ActivityContext`.
412+
413+
Propagate tracing to external systems (e.g., HTTP):
414+
415+
```python
416+
def activity(ctx, payload):
417+
headers = {
418+
"traceparent": ctx.trace_parent or "",
419+
"tracestate": ctx.trace_state or "",
420+
}
421+
# requests.post(url, headers=headers, json=payload)
422+
return "ok"
423+
```
424+
425+
Notes:
426+
- The sidecar controls inbound `traceparent`/`tracestate`. App code can append vendor entries to `tracestate` for outbound calls but cannot currently alter the sidecar’s propagation for downstream Durable operations.
427+
- Configure the sidecar endpoint with `DURABLETASK_GRPC_ENDPOINT` (e.g., `127.0.0.1:56178`).
428+
206429
## Contributing
207430

208431
This project welcomes contributions and suggestions. Most contributions require you to agree to a

durabletask/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
# Public async exports (import directly from durabletask.aio)
5+
from durabletask.aio import AsyncWorkflowContext, CoroutineOrchestratorRunner # noqa: F401
6+
47
"""Durable Task SDK for Python"""
58

69
PACKAGE_NAME = "durabletask"

0 commit comments

Comments
 (0)