Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions azure/durable_functions/models/DurableOrchestrationClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def __init__(self, context: str):
async def start_new(self,
orchestration_function_name: str,
instance_id: Optional[str] = None,
client_input: Optional[Any] = None) -> str:
client_input: Optional[Any] = None,
version: Optional[str] = None) -> str:
"""Start a new instance of the specified orchestrator function.

If an orchestration instance with the specified ID already exists, the
Expand All @@ -63,14 +64,19 @@ async def start_new(self,
the Durable Functions extension will generate a random GUID (recommended).
client_input : Optional[Any]
JSON-serializable input value for the orchestrator function.
version : Optional[str]
The version to assign to the orchestration instance. If not specified,
the defaultVersion from host.json will be used.

Returns
-------
str
The ID of the new orchestration instance if successful, None if not.
"""
request_url = self._get_start_new_url(
instance_id=instance_id, orchestration_function_name=orchestration_function_name)
instance_id=instance_id,
orchestration_function_name=orchestration_function_name,
version=version)

trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context()

Expand Down Expand Up @@ -639,10 +645,15 @@ def _parse_purge_instance_history_response(
raise Exception(result)

def _get_start_new_url(
self, instance_id: Optional[str], orchestration_function_name: str) -> str:
self, instance_id: Optional[str], orchestration_function_name: str,
version: Optional[str] = None) -> str:
instance_path = f'/{instance_id}' if instance_id is not None else ''
request_url = f'{self._orchestration_bindings.rpc_base_url}orchestrators/' \
f'{orchestration_function_name}{instance_path}'

if version is not None:
request_url += f'?version={quote(version)}'

return request_url

def _get_raise_event_url(
Expand Down
17 changes: 13 additions & 4 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ def call_http(self, method: str, uri: str, content: Optional[str] = None,

def call_sub_orchestrator(self,
name: Union[str, Callable], input_: Optional[Any] = None,
instance_id: Optional[str] = None) -> TaskBase:
instance_id: Optional[str] = None,
version: Optional[str] = None) -> TaskBase:
"""Schedule sub-orchestration function named `name` for execution.

Parameters
Expand All @@ -296,6 +297,9 @@ def call_sub_orchestrator(self,
The JSON-serializable input to pass to the orchestrator function.
instance_id: Optional[str]
A unique ID to use for the sub-orchestration instance.
version: Optional[str]
The version to assign to the sub-orchestration instance. If not specified,
the defaultVersion from host.json will be used.

Returns
-------
Expand All @@ -313,14 +317,15 @@ def call_sub_orchestrator(self,
if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, OrchestrationTrigger)

action = CallSubOrchestratorAction(name, input_, instance_id)
action = CallSubOrchestratorAction(name, input_, instance_id, version)
task = self._generate_task(action)
return task

def call_sub_orchestrator_with_retry(self,
name: Union[str, Callable], retry_options: RetryOptions,
input_: Optional[Any] = None,
instance_id: Optional[str] = None) -> TaskBase:
instance_id: Optional[str] = None,
version: Optional[str] = None) -> TaskBase:
"""Schedule sub-orchestration function named `name` for execution, with retry-options.

Parameters
Expand All @@ -333,6 +338,9 @@ def call_sub_orchestrator_with_retry(self,
The JSON-serializable input to pass to the activity function. Defaults to None.
instance_id: str
The instance ID of the sub-orchestrator to call.
version: Optional[str]
The version to assign to the sub-orchestration instance. If not specified,
the defaultVersion from host.json will be used.

Returns
-------
Expand All @@ -350,7 +358,8 @@ def call_sub_orchestrator_with_retry(self,
if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, OrchestrationTrigger)

action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id)
action = CallSubOrchestratorWithRetryAction(
name, retry_options, input_, instance_id, version)
task = self._generate_task(action, retry_options)
return task

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ class CallSubOrchestratorAction(Action):
"""Defines the structure of the Call SubOrchestrator object."""

def __init__(self, function_name: str, _input: Optional[Any] = None,
instance_id: Optional[str] = None):
instance_id: Optional[str] = None, version: Optional[str] = None):
self.function_name: str = function_name
self._input: str = dumps(_input, default=_serialize_custom_object)
self.instance_id: Optional[str] = instance_id
self.version: Optional[str] = version

if not self.function_name:
raise ValueError("function_name cannot be empty")
Expand All @@ -37,4 +38,5 @@ def to_json(self) -> Dict[str, Union[str, int]]:
add_attrib(json_dict, self, 'function_name', 'functionName')
add_attrib(json_dict, self, '_input', 'input')
add_attrib(json_dict, self, 'instance_id', 'instanceId')
add_attrib(json_dict, self, 'version', 'version')
return json_dict
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ class CallSubOrchestratorWithRetryAction(Action):

def __init__(self, function_name: str, retry_options: RetryOptions,
_input: Optional[Any] = None,
instance_id: Optional[str] = None):
instance_id: Optional[str] = None, version: Optional[str] = None):
self.function_name: str = function_name
self._input: str = dumps(_input, default=_serialize_custom_object)
self.retry_options: RetryOptions = retry_options
self.instance_id: Optional[str] = instance_id
self.version: Optional[str] = version

if not self.function_name:
raise ValueError("function_name cannot be empty")
Expand All @@ -41,4 +42,5 @@ def to_json(self) -> Dict[str, Union[str, int]]:
add_attrib(json_dict, self, '_input', 'input')
add_json_attrib(json_dict, self, 'retry_options', 'retryOptions')
add_attrib(json_dict, self, 'instance_id', 'instanceId')
add_attrib(json_dict, self, 'version', 'version')
return json_dict
32 changes: 31 additions & 1 deletion samples-v2/orchestration_versioning/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ What happens to *existing orchestration instances* that were started *before* th
6. Trigger the external event.
7. Observe that the orchestration output.

```
```text
Orchestration version: 1.0
Suborchestration version: 2.0
Hello from A!
Expand All @@ -42,3 +42,33 @@ Hello from A!
Note that the value returned by `context.version` is permanently associated with the orchestrator instance and is not impacted by the `defaultVersion` change. As a result, the orchestrator follows the old execution path to guarantee deterministic replay behavior.

However, the suborchestration version is `2.0` because this suborchestration was created *after* the `defaultVersion` change.

## Overriding Version Programmatically

In addition to using `defaultVersion` in `host.json`, you can also specify a version explicitly when starting an orchestration using the `version` parameter of `client.start_new()`:

```python
# Start with a specific version
instance_id = await client.start_new("my_orchestrator", version="1.5")
```

You can also specify a version when calling sub-orchestrators from within an orchestrator function:

```python
# Call sub-orchestrator with specific version
sub_result = yield context.call_sub_orchestrator('my_sub_orchestrator', version="1.5")

# Call sub-orchestrator with retry and specific version
sub_result = yield context.call_sub_orchestrator_with_retry(
'my_sub_orchestrator',
retry_options=retry_options,
version="2.0"
)
```

When a version is explicitly provided, it takes precedence over the `defaultVersion` in `host.json`. This allows you to:

- Test new orchestration versions without changing configuration
- Run multiple versions simultaneously
- Gradually roll out new versions by controlling which requests use which version
- Running A/B tests with different sub-orchestrator implementations
37 changes: 27 additions & 10 deletions samples-v2/orchestration_versioning/function_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,17 @@
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
function_name = req.route_params.get('functionName')
instance_id = await client.start_new(function_name)

logging.info(f"Started orchestration with ID = '{instance_id}'.")
# Get optional version from query parameter
version = req.params.get('version')

if version:
instance_id = await client.start_new(function_name, version=version)
logging.info(f"Started orchestration with ID = '{instance_id}' and version = '{version}'.")
else:
instance_id = await client.start_new(function_name)
logging.info(f"Started orchestration with ID = '{instance_id}'.")

return client.create_check_status_response(req, instance_id)

@myApp.orchestration_trigger(context_name="context")
Expand All @@ -29,18 +37,27 @@ def my_orchestrator(context: df.DurableOrchestrationContext):
yield context.wait_for_external_event("Continue")
context.set_custom_status("Continue event received")

# New sub-orchestrations will use the current defaultVersion specified in host.json
sub_result = yield context.call_sub_orchestrator('my_sub_orchestrator')
return [f'Orchestration version: {context.version}', f'Suborchestration version: {sub_result}', activity_result]
# You can explicitly pass a version to sub-orchestrators
sub_result_with_version = yield context.call_sub_orchestrator('my_sub_orchestrator', version="0.9")

# Without specifying version, the sub-orchestrator will use the current defaultVersion
sub_result_default = yield context.call_sub_orchestrator('my_sub_orchestrator')

return [
f'Orchestration version: {context.version}',
f'Suborchestration version (explicit): {sub_result_with_version}',
f'Suborchestration version (default): {sub_result_default}',
activity_result
]

@myApp.orchestration_trigger(context_name="context")
def my_sub_orchestrator(context: df.DurableOrchestrationContext):
return context.version

@myApp.activity_trigger()
def activity_a() -> str:
@myApp.activity_trigger(input_name="input")
def activity_a(input: str) -> str:
return f"Hello from A!"

@myApp.activity_trigger()
def activity_b() -> str:
return f"Hello from B!"
@myApp.activity_trigger(input_name="input")
def activity_b(input: str) -> str:
return f"Hello from B!"
4 changes: 4 additions & 0 deletions samples-v2/orchestration_versioning/host.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@
"durableTask": {
"defaultVersion": "1.0"
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.26.0, 5.0.0)"
}
}
11 changes: 11 additions & 0 deletions tests/models/test_DurableOrchestrationClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ def test_get_start_new_url(binding_string):
assert expected_url == start_new_url


def test_get_start_new_url_with_version(binding_string):
client = DurableOrchestrationClient(binding_string)
instance_id = "2e2568e7-a906-43bd-8364-c81733c5891e"
function_name = "my_function"
version = "2.0"
start_new_url = client._get_start_new_url(instance_id, function_name, version)
expected_url = replace_stand_in_bits(
f"{RPC_BASE_URL}orchestrators/{function_name}/{instance_id}?version={version}")
assert expected_url == start_new_url


def test_get_input_returns_none_when_none_supplied():
result = DurableOrchestrationClient._get_json_input(None)
assert result is None
Expand Down