diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 67aa5f71..cc746cd2 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -48,7 +48,8 @@ def __init__(self, context: str): async def start_new(self, orchestration_function_name: str, instance_id: Optional[str] = None, - client_input: Optional[Any] = None) -> str: + client_input: Optional[Any] = None, + version: Optional[str] = None) -> str: """Start a new instance of the specified orchestrator function. If an orchestration instance with the specified ID already exists, the @@ -63,6 +64,9 @@ async def start_new(self, the Durable Functions extension will generate a random GUID (recommended). client_input : Optional[Any] JSON-serializable input value for the orchestrator function. + version : Optional[str] + The version to assign to the orchestration instance. If not specified, + the defaultVersion from host.json will be used. Returns ------- @@ -70,7 +74,9 @@ async def start_new(self, The ID of the new orchestration instance if successful, None if not. """ request_url = self._get_start_new_url( - instance_id=instance_id, orchestration_function_name=orchestration_function_name) + instance_id=instance_id, + orchestration_function_name=orchestration_function_name, + version=version) trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context() @@ -639,10 +645,15 @@ def _parse_purge_instance_history_response( raise Exception(result) def _get_start_new_url( - self, instance_id: Optional[str], orchestration_function_name: str) -> str: + self, instance_id: Optional[str], orchestration_function_name: str, + version: Optional[str] = None) -> str: instance_path = f'/{instance_id}' if instance_id is not None else '' request_url = f'{self._orchestration_bindings.rpc_base_url}orchestrators/' \ f'{orchestration_function_name}{instance_path}' + + if version is not None: + request_url += f'?version={quote(version)}' + return request_url def _get_raise_event_url( diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 04fa4daf..8b7d2507 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -285,7 +285,8 @@ def call_http(self, method: str, uri: str, content: Optional[str] = None, def call_sub_orchestrator(self, name: Union[str, Callable], input_: Optional[Any] = None, - instance_id: Optional[str] = None) -> TaskBase: + instance_id: Optional[str] = None, + version: Optional[str] = None) -> TaskBase: """Schedule sub-orchestration function named `name` for execution. Parameters @@ -296,6 +297,9 @@ def call_sub_orchestrator(self, The JSON-serializable input to pass to the orchestrator function. instance_id: Optional[str] A unique ID to use for the sub-orchestration instance. + version: Optional[str] + The version to assign to the sub-orchestration instance. If not specified, + the defaultVersion from host.json will be used. Returns ------- @@ -313,14 +317,15 @@ def call_sub_orchestrator(self, if isinstance(name, FunctionBuilder): name = self._get_function_name(name, OrchestrationTrigger) - action = CallSubOrchestratorAction(name, input_, instance_id) + action = CallSubOrchestratorAction(name, input_, instance_id, version) task = self._generate_task(action) return task def call_sub_orchestrator_with_retry(self, name: Union[str, Callable], retry_options: RetryOptions, input_: Optional[Any] = None, - instance_id: Optional[str] = None) -> TaskBase: + instance_id: Optional[str] = None, + version: Optional[str] = None) -> TaskBase: """Schedule sub-orchestration function named `name` for execution, with retry-options. Parameters @@ -333,6 +338,9 @@ def call_sub_orchestrator_with_retry(self, The JSON-serializable input to pass to the activity function. Defaults to None. instance_id: str The instance ID of the sub-orchestrator to call. + version: Optional[str] + The version to assign to the sub-orchestration instance. If not specified, + the defaultVersion from host.json will be used. Returns ------- @@ -350,7 +358,8 @@ def call_sub_orchestrator_with_retry(self, if isinstance(name, FunctionBuilder): name = self._get_function_name(name, OrchestrationTrigger) - action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id) + action = CallSubOrchestratorWithRetryAction( + name, retry_options, input_, instance_id, version) task = self._generate_task(action, retry_options) return task diff --git a/azure/durable_functions/models/actions/CallSubOrchestratorAction.py b/azure/durable_functions/models/actions/CallSubOrchestratorAction.py index aebf26fe..03a22413 100644 --- a/azure/durable_functions/models/actions/CallSubOrchestratorAction.py +++ b/azure/durable_functions/models/actions/CallSubOrchestratorAction.py @@ -11,10 +11,11 @@ class CallSubOrchestratorAction(Action): """Defines the structure of the Call SubOrchestrator object.""" def __init__(self, function_name: str, _input: Optional[Any] = None, - instance_id: Optional[str] = None): + instance_id: Optional[str] = None, version: Optional[str] = None): self.function_name: str = function_name self._input: str = dumps(_input, default=_serialize_custom_object) self.instance_id: Optional[str] = instance_id + self.version: Optional[str] = version if not self.function_name: raise ValueError("function_name cannot be empty") @@ -37,4 +38,5 @@ def to_json(self) -> Dict[str, Union[str, int]]: add_attrib(json_dict, self, 'function_name', 'functionName') add_attrib(json_dict, self, '_input', 'input') add_attrib(json_dict, self, 'instance_id', 'instanceId') + add_attrib(json_dict, self, 'version', 'version') return json_dict diff --git a/azure/durable_functions/models/actions/CallSubOrchestratorWithRetryAction.py b/azure/durable_functions/models/actions/CallSubOrchestratorWithRetryAction.py index 9f529a0b..c72d7181 100644 --- a/azure/durable_functions/models/actions/CallSubOrchestratorWithRetryAction.py +++ b/azure/durable_functions/models/actions/CallSubOrchestratorWithRetryAction.py @@ -13,11 +13,12 @@ class CallSubOrchestratorWithRetryAction(Action): def __init__(self, function_name: str, retry_options: RetryOptions, _input: Optional[Any] = None, - instance_id: Optional[str] = None): + instance_id: Optional[str] = None, version: Optional[str] = None): self.function_name: str = function_name self._input: str = dumps(_input, default=_serialize_custom_object) self.retry_options: RetryOptions = retry_options self.instance_id: Optional[str] = instance_id + self.version: Optional[str] = version if not self.function_name: raise ValueError("function_name cannot be empty") @@ -41,4 +42,5 @@ def to_json(self) -> Dict[str, Union[str, int]]: add_attrib(json_dict, self, '_input', 'input') add_json_attrib(json_dict, self, 'retry_options', 'retryOptions') add_attrib(json_dict, self, 'instance_id', 'instanceId') + add_attrib(json_dict, self, 'version', 'version') return json_dict diff --git a/samples-v2/orchestration_versioning/README.md b/samples-v2/orchestration_versioning/README.md index 0a6fb97b..9e462ce3 100644 --- a/samples-v2/orchestration_versioning/README.md +++ b/samples-v2/orchestration_versioning/README.md @@ -33,7 +33,7 @@ What happens to *existing orchestration instances* that were started *before* th 6. Trigger the external event. 7. Observe that the orchestration output. -``` +```text Orchestration version: 1.0 Suborchestration version: 2.0 Hello from A! @@ -42,3 +42,33 @@ Hello from A! Note that the value returned by `context.version` is permanently associated with the orchestrator instance and is not impacted by the `defaultVersion` change. As a result, the orchestrator follows the old execution path to guarantee deterministic replay behavior. However, the suborchestration version is `2.0` because this suborchestration was created *after* the `defaultVersion` change. + +## Overriding Version Programmatically + +In addition to using `defaultVersion` in `host.json`, you can also specify a version explicitly when starting an orchestration using the `version` parameter of `client.start_new()`: + +```python +# Start with a specific version +instance_id = await client.start_new("my_orchestrator", version="1.5") +``` + +You can also specify a version when calling sub-orchestrators from within an orchestrator function: + +```python +# Call sub-orchestrator with specific version +sub_result = yield context.call_sub_orchestrator('my_sub_orchestrator', version="1.5") + +# Call sub-orchestrator with retry and specific version +sub_result = yield context.call_sub_orchestrator_with_retry( + 'my_sub_orchestrator', + retry_options=retry_options, + version="2.0" +) +``` + +When a version is explicitly provided, it takes precedence over the `defaultVersion` in `host.json`. This allows you to: + +- Test new orchestration versions without changing configuration +- Run multiple versions simultaneously +- Gradually roll out new versions by controlling which requests use which version +- Running A/B tests with different sub-orchestrator implementations diff --git a/samples-v2/orchestration_versioning/function_app.py b/samples-v2/orchestration_versioning/function_app.py index 01d1e82f..74bdcd97 100644 --- a/samples-v2/orchestration_versioning/function_app.py +++ b/samples-v2/orchestration_versioning/function_app.py @@ -8,9 +8,17 @@ @myApp.durable_client_input(client_name="client") async def http_start(req: func.HttpRequest, client): function_name = req.route_params.get('functionName') - instance_id = await client.start_new(function_name) - logging.info(f"Started orchestration with ID = '{instance_id}'.") + # Get optional version from query parameter + version = req.params.get('version') + + if version: + instance_id = await client.start_new(function_name, version=version) + logging.info(f"Started orchestration with ID = '{instance_id}' and version = '{version}'.") + else: + instance_id = await client.start_new(function_name) + logging.info(f"Started orchestration with ID = '{instance_id}'.") + return client.create_check_status_response(req, instance_id) @myApp.orchestration_trigger(context_name="context") @@ -29,18 +37,27 @@ def my_orchestrator(context: df.DurableOrchestrationContext): yield context.wait_for_external_event("Continue") context.set_custom_status("Continue event received") - # New sub-orchestrations will use the current defaultVersion specified in host.json - sub_result = yield context.call_sub_orchestrator('my_sub_orchestrator') - return [f'Orchestration version: {context.version}', f'Suborchestration version: {sub_result}', activity_result] + # You can explicitly pass a version to sub-orchestrators + sub_result_with_version = yield context.call_sub_orchestrator('my_sub_orchestrator', version="0.9") + + # Without specifying version, the sub-orchestrator will use the current defaultVersion + sub_result_default = yield context.call_sub_orchestrator('my_sub_orchestrator') + + return [ + f'Orchestration version: {context.version}', + f'Suborchestration version (explicit): {sub_result_with_version}', + f'Suborchestration version (default): {sub_result_default}', + activity_result + ] @myApp.orchestration_trigger(context_name="context") def my_sub_orchestrator(context: df.DurableOrchestrationContext): return context.version -@myApp.activity_trigger() -def activity_a() -> str: +@myApp.activity_trigger(input_name="input") +def activity_a(input: str) -> str: return f"Hello from A!" -@myApp.activity_trigger() -def activity_b() -> str: - return f"Hello from B!" \ No newline at end of file +@myApp.activity_trigger(input_name="input") +def activity_b(input: str) -> str: + return f"Hello from B!" diff --git a/samples-v2/orchestration_versioning/host.json b/samples-v2/orchestration_versioning/host.json index ca18cb00..81380755 100644 --- a/samples-v2/orchestration_versioning/host.json +++ b/samples-v2/orchestration_versioning/host.json @@ -12,5 +12,9 @@ "durableTask": { "defaultVersion": "1.0" } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.26.0, 5.0.0)" } } \ No newline at end of file diff --git a/tests/models/test_DurableOrchestrationClient.py b/tests/models/test_DurableOrchestrationClient.py index 6660a2e2..7707a63a 100644 --- a/tests/models/test_DurableOrchestrationClient.py +++ b/tests/models/test_DurableOrchestrationClient.py @@ -82,6 +82,17 @@ def test_get_start_new_url(binding_string): assert expected_url == start_new_url +def test_get_start_new_url_with_version(binding_string): + client = DurableOrchestrationClient(binding_string) + instance_id = "2e2568e7-a906-43bd-8364-c81733c5891e" + function_name = "my_function" + version = "2.0" + start_new_url = client._get_start_new_url(instance_id, function_name, version) + expected_url = replace_stand_in_bits( + f"{RPC_BASE_URL}orchestrators/{function_name}/{instance_id}?version={version}") + assert expected_url == start_new_url + + def test_get_input_returns_none_when_none_supplied(): result = DurableOrchestrationClient._get_json_input(None) assert result is None