Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,37 @@ The following features are currently supported:

### Orchestrations

Orchestrations are implemented using ordinary Python functions that take an `OrchestrationContext` as their first parameter. The `OrchestrationContext` provides APIs for starting child orchestrations, scheduling activities, and waiting for external events, among other things. Orchestrations are fault-tolerant and durable, meaning that they can automatically recover from failures and rebuild their local execution state. Orchestrator functions must be deterministic, meaning that they must always produce the same output given the same input.
Orchestrators are implemented using ordinary Python functions that take an `OrchestrationContext` as their first parameter. The `OrchestrationContext` provides APIs for starting child orchestrations, scheduling activities, and waiting for external events, among other things. Orchestrations are fault-tolerant and durable, meaning that they can automatically recover from failures and rebuild their local execution state. Orchestrator functions must be deterministic, meaning that they must always produce the same output given the same input.

#### Orchestration versioning

Orchestrations may be assigned a version when they are first created. If an orchestration is given a version, it will continually be checked during its lifecycle to ensure that it remains compatible with the underlying orchestrator code. If the orchestrator code is updated while an orchestration is running, rules can be set that will define the behavior - whether the orchestration should fail, abandon for reprocessing at a later time, or attempt to run anyway. For more information, see [The provided examples](./supported-patterns.md). For more information about versioning in the context of Durable Functions, see [Orchestration versioning in Durable Functions](https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-orchestration-versioning) (Note that concepts specific to Azure Functions, such as host.json settings, do not apply to this SDK).

##### Orchestration versioning options

Both the Durable worker and durable client have versioning configuration available. Because versioning checks are handled by the worker, the only information the client needs is a default_version, taken in its constructor, to use as the version for new orchestrations unless otherwise specified. The worker takes a VersioningOptions object with a `default_version` for new sub-orchestrations, a `version` used by the worker for orchestration version comparisons, and two more options giving control over versioning behavior in case of match failures, a `VersionMatchStrategy` and `VersionFailureStrategy`.

**VersionMatchStrategy**

| VersionMatchStrategy.NONE | VersionMatchStrategy.STRICT | VersionMatchStrategy.CURRENT_OR_OLDER |
|-|-|-|
| Do not compare orchestration versions | Only allow orchestrations with the same version as the worker | Allow orchestrations with the same or older version as the worker |

**VersionFailureStrategy**

| VersionFailureStrategy.REJECT | VersionFailureStrategy.FAIL |
|-|-|
| Abandon execution of the orchestrator, but allow it to be reprocessed later | Fail the orchestration |

**Strategy examples**

Scenario 1: You are implementing versioning for the first time in your worker. You want to have a default version for new orchestrations, but do not care about comparing versions with currently running ones. Choose VersionMatchStrategy.NONE, and VersionFailureStrategy does not matter.

Scenario 2: You are updating an orchestrator's code, and you do not want old orchestrations to continue to be processed on the new code. Bump the default version and the worker version, set VersionMatchStrategy.STRICT and VersionFailureStrategy.FAIL.

Scenario 3: You are updating an orchestrator's code, and you have ensured the code is version-aware so that it remains backward-compatible with existing orchestrations. Bump the default version and the worker version, and set VersionMatchStrategy.CURRENT_OR_OLDER and VersionFailureStrategy.FAIL.

Scenario 4: You are performing a high-availability deployment, and your orchestrator code contains breaking changes making it not backward-compatible. Bump the default version and the worker version, and set VersionFailureStrategy.REJECT and VersionMatchStrategy.STRICT. Ensure that at least a few of the previous version of workers remain available to continue processing the older orchestrations - eventually, all older orchestrations _should_ land on the correct workers for processing. Once all remaining old orchestrations have been processed, shut down the remaining old workers.

### Activities

Expand All @@ -16,7 +46,7 @@ Orchestrations can schedule durable timers using the `create_timer` API. These t

### Sub-orchestrations

Orchestrations can start child orchestrations using the `call_sub_orchestrator` API. Child orchestrations are useful for encapsulating complex logic and for breaking up large orchestrations into smaller, more manageable pieces.
Orchestrations can start child orchestrations using the `call_sub_orchestrator` API. Child orchestrations are useful for encapsulating complex logic and for breaking up large orchestrations into smaller, more manageable pieces. Sub-orchestrations can also be versioned in a similar manner to their parent orchestrations, however, they do not inherit the parent orchestrator's version. Instead, they will use the default_version defined in the current worker's VersioningOptions unless otherwise specified during `call_sub_orchestrator`.

### External events

Expand Down
45 changes: 42 additions & 3 deletions docs/supported-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def sequence(ctx: task.OrchestrationContext, _):
return [result1, result2, result3]
```

You can find the full sample [here](../examples/activity_sequence.py).
See the full [function chaining example](../examples/activity_sequence.py).

### Fan-out/fan-in

Expand Down Expand Up @@ -48,7 +48,7 @@ def orchestrator(ctx: task.OrchestrationContext, _):
return {'work_items': work_items, 'results': results, 'total': sum(results)}
```

You can find the full sample [here](../examples/fanout_fanin.py).
See the full [fan-out sample](../examples/fanout_fanin.py).

### Human interaction and durable timers

Expand Down Expand Up @@ -79,4 +79,43 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):

As an aside, you'll also notice that the example orchestration above works with custom business objects. Support for custom business objects includes support for custom classes, custom data classes, and named tuples. Serialization and deserialization of these objects is handled automatically by the SDK.

You can find the full sample [here](../examples/human_interaction.py).
See the full [human interaction sample](../examples/human_interaction.py).

### Version-aware orchestrator

When utilizing orchestration versioning, it is possible for an orchestrator to remain backwards-compatible with orchestrations created using the previously defined version. For instance, consider an orchestration defined with the following signature:

```python
def my_orchestrator(ctx: task.OrchestrationContext, order: Order):
"""Dummy orchestrator function illustrating old logic"""
yield ctx.call_activity(activity_one)
yield ctx.call_activity(activity_two)
return "Success"
```

Assume that any orchestrations created using this orchestrator were versioned 1.0.0. If the signature of this method needs to be updated to call activity_three between the calls to activity_one and activity_two, ordinarily this would break any running orchestrations at the time of deployment. However, the following orchestrator will be able to process both orchestraions versioned 1.0.0 and 2.0.0 after the change:

```python
def my_orchestrator(ctx: task.OrchestrationContext, order: Order):
"""Version-aware dummy orchestrator capable of processing both old and new orchestrations"""
yield ctx.call_activity(activity_one)
if ctx.version > '1.0.0':
yield ctx.call_activity(activity_three)
yield ctx.call_activity(activity_two)
```

Alternatively, if the orchestrator changes completely, the following syntax might be preferred:

```python
def my_orchestrator(ctx: task.OrchestrationContext, order: Order):
if ctx.version == '1.0.0':
yield ctx.call_activity(activity_one)
yield ctx.call_activity(activity_two)
return "Success
yield ctx.call_activity(activity_one)
yield ctx.call_activity(activity_three)
yield ctx.call_activity(activity_two)
return "Success"
```

See the full [version-aware orchestrator sample](../examples/version_aware_orchestrator.py)
2 changes: 1 addition & 1 deletion durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __init__(self, version: Optional[str] = None,

Args:
version: The version of orchestrations that the worker can work on.
default_version: The default version that will be used for starting new orchestrations.
default_version: The default version that will be used for starting new sub-orchestrations.
match_strategy: The versioning strategy for the Durable Task worker.
failure_strategy: The versioning failure strategy for the Durable Task worker.
"""
Expand Down
92 changes: 92 additions & 0 deletions examples/version_aware_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""End-to-end sample that demonstrates how to configure an orchestrator
that a dynamic number activity functions in parallel, waits for them all
to complete, and prints an aggregate summary of the outputs."""
import os

from azure.identity import DefaultAzureCredential

from durabletask import client, task, worker
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker


def activity_v1(ctx: task.ActivityContext, input: str) -> str:
"""Activity function that returns a result for a given work item"""
print("processing input:", input)
return "Success from activity v1"


def activity_v2(ctx: task.ActivityContext, input: str) -> str:
"""Activity function that returns a result for a given work item"""
print("processing input:", input)
return "Success from activity v2"


def orchestrator(ctx: task.OrchestrationContext, _):
"""Orchestrator function that checks the orchestration version and has version-aware behavior
Use case: Updating an orchestrator with new logic while maintaining compatibility with previously
started orchestrations"""
if ctx.version == "1.0.0":
# For version 1.0.0, we use the original logic
result: int = yield ctx.call_activity(activity_v1, input="input for v1")
elif ctx.version == "2.0.0":
# For version 2.0.0, we use the updated logic
result: int = yield ctx.call_activity(activity_v2, input="input for v2")
else:
raise ValueError(f"Unsupported version: {ctx.version}")
return {
'result': result,
}


# Use environment variables if provided, otherwise use default emulator values
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")

print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")

# Set credential to None for emulator, or DefaultAzureCredential for Azure
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()

# configure and start the worker - use secure_channel=False for emulator
secure_channel = endpoint != "http://localhost:8080"
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential) as w:
# This worker is versioned for v2, as the orchestrator code has already been updated
# CURRENT_OR_OLDER allows this worker to process orchestrations versioned below 2.0.0 - e.g. 1.0.0
w.use_versioning(worker.VersioningOptions(
version="2.0.0",
default_version="2.0.0",
match_strategy=worker.VersionMatchStrategy.CURRENT_OR_OLDER,
failure_strategy=worker.VersionFailureStrategy.FAIL
))
w.add_orchestrator(orchestrator)
w.add_activity(activity_v1)
w.add_activity(activity_v2)
w.start()

# create a client, start an orchestration, and wait for it to finish
# The client's version matches the worker's version
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential,
default_version="2.0.0")
# Schedule a new orchestration manually versioned to 1.0.0
# Normally, this would have been scheduled before the worker started from a worker also versioned v1.0.0,
# Here we are doing it manually to avoid creating two workers
instance_id_v1 = c.schedule_new_orchestration(orchestrator, version="1.0.0")
state_v1 = c.wait_for_orchestration_completion(instance_id_v1, timeout=30)
if state_v1 and state_v1.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration v1 completed! Result: {state_v1.serialized_output}')
elif state_v1:
print(f'Orchestration v1 failed: {state_v1.failure_details}')

# Also check that the orchestrator can be run with the current version
instance_id = c.schedule_new_orchestration(orchestrator)
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
elif state:
print(f'Orchestration failed: {state.failure_details}')

exit()
Loading