Skip to content

Commit a17025d

Browse files
committed
Add versioning samples/docs
1 parent c60e9bb commit a17025d

File tree

3 files changed

+140
-4
lines changed

3 files changed

+140
-4
lines changed

docs/features.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ The following features are currently supported:
44

55
### Orchestrations
66

7-
Orchestrations are implemented using ordinary Python functions that take an `OrchestrationContext` as their first parameter. The `OrchestrationContext` provides APIs for starting child orchestrations, scheduling activities, and waiting for external events, among other things. Orchestrations are fault-tolerant and durable, meaning that they can automatically recover from failures and rebuild their local execution state. Orchestrator functions must be deterministic, meaning that they must always produce the same output given the same input.
7+
Orchestrators are implemented using ordinary Python functions that take an `OrchestrationContext` as their first parameter. The `OrchestrationContext` provides APIs for starting child orchestrations, scheduling activities, and waiting for external events, among other things. Orchestrations are fault-tolerant and durable, meaning that they can automatically recover from failures and rebuild their local execution state. Orchestrator functions must be deterministic, meaning that they must always produce the same output given the same input.
8+
9+
#### Orchestration versioning
10+
11+
Orchestrations may be assigned a version when they are first created. If an orchestration is given a version, it will continually be checked during its lifecycle to ensure that it remains compatible with the underlying orchestrator code. If the orchestrator code is updated while an orchestration is running, rules can be set that will define the behavior - whether the orchestration should fail, abandon for reprocessing at a later time, or attempt to run anyway. For more information, see [Orchestration versioning in Durable Functions](https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-orchestration-versioning) (Note that concepts specific to Azure Functions, such as host.json settings, do not apply) and [The provided examples](./supported-patterns.md)
812

913
### Activities
1014

docs/supported-patterns.md

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def sequence(ctx: task.OrchestrationContext, _):
2020
return [result1, result2, result3]
2121
```
2222

23-
You can find the full sample [here](../examples/activity_sequence.py).
23+
Link to the full [function chaining example](../examples/activity_sequence.py).
2424

2525
### Fan-out/fan-in
2626

@@ -48,7 +48,7 @@ def orchestrator(ctx: task.OrchestrationContext, _):
4848
return {'work_items': work_items, 'results': results, 'total': sum(results)}
4949
```
5050

51-
You can find the full sample [here](../examples/fanout_fanin.py).
51+
Link to the full [fan-out sample](../examples/fanout_fanin.py).
5252

5353
### Human interaction and durable timers
5454

@@ -79,4 +79,43 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
7979

8080
As an aside, you'll also notice that the example orchestration above works with custom business objects. Support for custom business objects includes support for custom classes, custom data classes, and named tuples. Serialization and deserialization of these objects is handled automatically by the SDK.
8181

82-
You can find the full sample [here](../examples/human_interaction.py).
82+
Link to the full [human interaction sample](../examples/human_interaction.py).
83+
84+
### Version-aware orchestrator
85+
86+
When utilizing orchestration versioning, it is possible for an orchestrator to remain backwards-compatible with orchestrations created using the previously defined version. For instance, consider an orchestration defined with the following signature:
87+
88+
```python
89+
def my_orchestrator(ctx: task.OrchestrationContext, order: Order):
90+
"""Dummy orchestrator function illustrating old logic"""
91+
yield ctx.call_activity(activity_one)
92+
yield ctx.call_activity(activity_two)
93+
return "Success"
94+
```
95+
96+
Assume that any orchestrations created using this orchestrator were versioned 1.0.0. If the signature of this method needs to be updated to call activity_three between the calls to activity_one and activity_two, ordinarily this would break any running orchestrations at the time of deployment. However, the following orchestrator will be able to process both orchestraions versioned 1.0.0 and 2.0.0 after the change:
97+
98+
```python
99+
def my_orchestrator(ctx: task.OrchestrationContext, order: Order):
100+
"""Version-aware dummy orchestrator capable of processing both old and new orchestrations"""
101+
yield ctx.call_activity(activity_one)
102+
if ctx.version > '1.0.0':
103+
yield ctx.call_activity(activity_three)
104+
yield ctx.call_activity(activity_two)
105+
```
106+
107+
Alternatively, if the orchestrator changes completely, the following syntax might be preferred:
108+
109+
```python
110+
def my_orchestrator(ctx: task.OrchestrationContext, order: Order):
111+
if ctx.version == '1.0.0':
112+
yield ctx.call_activity(activity_one)
113+
yield ctx.call_activity(activity_two)
114+
return "Success
115+
yield ctx.call_activity(activity_one)
116+
yield ctx.call_activity(activity_three)
117+
yield ctx.call_activity(activity_two)
118+
return "Success"
119+
```
120+
121+
Link to the full [version-aware orchestrator sample](../examples/version_aware_orchestrator.py)
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
"""End-to-end sample that demonstrates how to configure an orchestrator
2+
that a dynamic number activity functions in parallel, waits for them all
3+
to complete, and prints an aggregate summary of the outputs."""
4+
import os
5+
import random
6+
7+
from azure.identity import DefaultAzureCredential
8+
9+
from durabletask import client, task, worker
10+
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
11+
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
12+
13+
14+
def activity_v1(ctx: task.ActivityContext, input: str) -> str:
15+
"""Activity function that returns a result for a given work item"""
16+
print("processing input:", input)
17+
return "Success from activity v1"
18+
19+
20+
def activity_v2(ctx: task.ActivityContext, input: str) -> str:
21+
"""Activity function that returns a result for a given work item"""
22+
print("processing input:", input)
23+
return "Success from activity v2"
24+
25+
26+
def orchestrator(ctx: task.OrchestrationContext, _):
27+
"""Orchestrator function that checks the orchestration version and has version-aware behavior
28+
Use case: Updating an orchestrator with new logic while maintaining compatibility with previously
29+
started orchestrations"""
30+
if ctx.version == "1.0.0":
31+
# For version 1.0.0, we use the original logic
32+
result: int = yield ctx.call_activity(activity_v1, input="input for v1")
33+
elif ctx.version == "2.0.0":
34+
# For version 2.0.0, we use the updated logic
35+
result: int = yield ctx.call_activity(activity_v2, input="input for v2")
36+
else:
37+
raise ValueError(f"Unsupported version: {ctx.version}")
38+
return {
39+
'result': result,
40+
}
41+
42+
43+
# Use environment variables if provided, otherwise use default emulator values
44+
taskhub_name = os.getenv("TASKHUB", "default")
45+
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
46+
47+
print(f"Using taskhub: {taskhub_name}")
48+
print(f"Using endpoint: {endpoint}")
49+
50+
# Set credential to None for emulator, or DefaultAzureCredential for Azure
51+
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()
52+
53+
# configure and start the worker - use secure_channel=False for emulator
54+
secure_channel = endpoint != "http://localhost:8080"
55+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
56+
taskhub=taskhub_name, token_credential=credential) as w:
57+
# This worker is versioned for v2, as the orchestrator code has already been updated
58+
# CURRENT_OR_OLDER allows this worker to process orchestrations versioned below 2.0.0 - e.g. 1.0.0
59+
w.use_versioning(worker.VersioningOptions(
60+
version="2.0.0",
61+
default_version="2.0.0",
62+
match_strategy=worker.VersionMatchStrategy.CURRENT_OR_OLDER,
63+
failure_strategy=worker.VersionFailureStrategy.FAIL
64+
))
65+
w.add_orchestrator(orchestrator)
66+
w.add_activity(activity_v1)
67+
w.add_activity(activity_v2)
68+
w.start()
69+
70+
# create a client, start an orchestration, and wait for it to finish
71+
# The client's version matches the worker's version
72+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
73+
taskhub=taskhub_name, token_credential=credential,
74+
default_version="2.0.0")
75+
# Schedule a new orchestration manually versioned to 1.0.0
76+
# Normally, this would have been scheduled before the worker started from a worker also versioned v1.0.0,
77+
# Here we are doing it manually to avoid creating two workers
78+
instance_id_v1 = c.schedule_new_orchestration(orchestrator, version="1.0.0")
79+
state_v1 = c.wait_for_orchestration_completion(instance_id_v1, timeout=30)
80+
if state_v1 and state_v1.runtime_status == client.OrchestrationStatus.COMPLETED:
81+
print(f'Orchestration v1 completed! Result: {state_v1.serialized_output}')
82+
elif state_v1:
83+
print(f'Orchestration v1 failed: {state_v1.failure_details}')
84+
85+
# Also check that the orchestrator can be run with the current version
86+
instance_id = c.schedule_new_orchestration(orchestrator)
87+
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
88+
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
89+
print(f'Orchestration completed! Result: {state.serialized_output}')
90+
elif state:
91+
print(f'Orchestration failed: {state.failure_details}')
92+
93+
exit()

0 commit comments

Comments
 (0)