Skip to content

Commit 745ead9

Browse files
authored
Add orchestration versioning support (#58)
* Add orchestration versioning support - Known gap - VersionFailureStrategy.REJECT, no abandon strategy yet * Update durabletask tests to use the local code * Also use durabletask from local * Improvements - Add rejection - cleanup comments - suborch default versioning * Fix linting * Move other exception to new exceptions file * Remove unnecessary stub parameter * Update test comments, warnings * Revert partial vscode settings change * Add orchestration version to orchestration context
1 parent 45292b1 commit 745ead9

File tree

15 files changed

+645
-21
lines changed

15 files changed

+645
-21
lines changed

.github/workflows/durabletask-azuremanaged.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ jobs:
7272
python -m pip install --upgrade pip
7373
pip install -r requirements.txt
7474
75+
- name: Install durabletask-azuremanaged locally
76+
working-directory: durabletask-azuremanaged
77+
run: |
78+
pip install . --no-deps --force-reinstall
79+
80+
- name: Install durabletask locally
81+
run: |
82+
pip install . --no-deps --force-reinstall
83+
7584
- name: Run the tests
7685
working-directory: tests/durabletask-azuremanaged
7786
run: |

.vscode/settings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,6 @@
3030
"jacoco.xml",
3131
"coverage.cobertura.xml"
3232
],
33-
"makefile.configureOnOpen": false
33+
"makefile.configureOnOpen": false,
34+
"debugpy.debugJustMyCode": false
3435
}

durabletask-azuremanaged/durabletask/azuremanaged/client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ def __init__(self, *,
1717
host_address: str,
1818
taskhub: str,
1919
token_credential: Optional[TokenCredential],
20-
secure_channel: bool = True):
20+
secure_channel: bool = True,
21+
default_version: Optional[str] = None):
2122

2223
if not taskhub:
2324
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
@@ -30,4 +31,5 @@ def __init__(self, *,
3031
host_address=host_address,
3132
secure_channel=secure_channel,
3233
metadata=None,
33-
interceptors=interceptors)
34+
interceptors=interceptors,
35+
default_version=default_version)

durabletask/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
"""Durable Task SDK for Python"""
55

6-
from durabletask.worker import ConcurrencyOptions
6+
from durabletask.worker import ConcurrencyOptions, VersioningOptions
77

8-
__all__ = ["ConcurrencyOptions"]
8+
__all__ = ["ConcurrencyOptions", "VersioningOptions"]
99

1010
PACKAGE_NAME = "durabletask"

durabletask/client.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ def __init__(self, *,
9898
log_handler: Optional[logging.Handler] = None,
9999
log_formatter: Optional[logging.Formatter] = None,
100100
secure_channel: bool = False,
101-
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None):
101+
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
102+
default_version: Optional[str] = None):
102103

103104
# If the caller provided metadata, we need to create a new interceptor for it and
104105
# add it to the list of interceptors.
@@ -118,13 +119,15 @@ def __init__(self, *,
118119
)
119120
self._stub = stubs.TaskHubSidecarServiceStub(channel)
120121
self._logger = shared.get_logger("client", log_handler, log_formatter)
122+
self.default_version = default_version
121123

122124
def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
123125
input: Optional[TInput] = None,
124126
instance_id: Optional[str] = None,
125127
start_at: Optional[datetime] = None,
126128
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
127-
tags: Optional[dict[str, str]] = None) -> str:
129+
tags: Optional[dict[str, str]] = None,
130+
version: Optional[str] = None) -> str:
128131

129132
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
130133

@@ -133,9 +136,9 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
133136
instanceId=instance_id if instance_id else uuid.uuid4().hex,
134137
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None,
135138
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
136-
version=wrappers_pb2.StringValue(value=""),
139+
version=helpers.get_string_value(version if version else self.default_version),
137140
orchestrationIdReusePolicy=reuse_id_policy,
138-
tags=tags,
141+
tags=tags
139142
)
140143

141144
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")

durabletask/internal/exceptions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
class VersionFailureException(Exception):
2+
pass
3+
4+
5+
class AbandonOrchestrationError(Exception):
6+
def __init__(self, *args: object) -> None:
7+
super().__init__(*args)

durabletask/internal/helpers.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,13 @@ def new_create_sub_orchestration_action(
199199
id: int,
200200
name: str,
201201
instance_id: Optional[str],
202-
encoded_input: Optional[str]) -> pb.OrchestratorAction:
202+
encoded_input: Optional[str],
203+
version: Optional[str]) -> pb.OrchestratorAction:
203204
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
204205
name=name,
205206
instanceId=instance_id,
206-
input=get_string_value(encoded_input)
207+
input=get_string_value(encoded_input),
208+
version=get_string_value(version)
207209
))
208210

209211

durabletask/task.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,21 @@ def instance_id(self) -> str:
3535
"""
3636
pass
3737

38+
@property
39+
@abstractmethod
40+
def version(self) -> Optional[str]:
41+
"""Get the version of the orchestration instance.
42+
43+
This version is set when the orchestration is scheduled and can be used
44+
to determine which version of the orchestrator function is being executed.
45+
46+
Returns
47+
-------
48+
Optional[str]
49+
The version of the orchestration instance, or None if not set.
50+
"""
51+
pass
52+
3853
@property
3954
@abstractmethod
4055
def current_utc_datetime(self) -> datetime:
@@ -126,7 +141,8 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
126141
def call_sub_orchestrator(self, orchestrator: Orchestrator[TInput, TOutput], *,
127142
input: Optional[TInput] = None,
128143
instance_id: Optional[str] = None,
129-
retry_policy: Optional[RetryPolicy] = None) -> Task[TOutput]:
144+
retry_policy: Optional[RetryPolicy] = None,
145+
version: Optional[str] = None) -> Task[TOutput]:
130146
"""Schedule sub-orchestrator function for execution.
131147
132148
Parameters

0 commit comments

Comments
 (0)