Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/durabletask-azuremanaged.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ jobs:
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Install durabletask-azuremanaged locally
working-directory: durabletask-azuremanaged
run: |
pip install . --no-deps --force-reinstall
- name: Install durabletask locally
run: |
pip install . --no-deps --force-reinstall
- name: Run the tests
working-directory: tests/durabletask-azuremanaged
run: |
Expand Down
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@
"jacoco.xml",
"coverage.cobertura.xml"
],
"makefile.configureOnOpen": false
"makefile.configureOnOpen": false,
"debugpy.debugJustMyCode": false
}
6 changes: 4 additions & 2 deletions durabletask-azuremanaged/durabletask/azuremanaged/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def __init__(self, *,
host_address: str,
taskhub: str,
token_credential: Optional[TokenCredential],
secure_channel: bool = True):
secure_channel: bool = True,
default_version: Optional[str] = None):

if not taskhub:
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
Expand All @@ -30,4 +31,5 @@ def __init__(self, *,
host_address=host_address,
secure_channel=secure_channel,
metadata=None,
interceptors=interceptors)
interceptors=interceptors,
default_version=default_version)
4 changes: 2 additions & 2 deletions durabletask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

"""Durable Task SDK for Python"""

from durabletask.worker import ConcurrencyOptions
from durabletask.worker import ConcurrencyOptions, VersioningOptions

__all__ = ["ConcurrencyOptions"]
__all__ = ["ConcurrencyOptions", "VersioningOptions"]

PACKAGE_NAME = "durabletask"
11 changes: 7 additions & 4 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def __init__(self, *,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None,
secure_channel: bool = False,
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None):
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
default_version: Optional[str] = None):

# If the caller provided metadata, we need to create a new interceptor for it and
# add it to the list of interceptors.
Expand All @@ -118,13 +119,15 @@ def __init__(self, *,
)
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)
self.default_version = default_version

def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
tags: Optional[dict[str, str]] = None) -> str:
tags: Optional[dict[str, str]] = None,
version: Optional[str] = None) -> str:

name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)

Expand All @@ -133,9 +136,9 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
instanceId=instance_id if instance_id else uuid.uuid4().hex,
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None,
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=wrappers_pb2.StringValue(value=""),
version=helpers.get_string_value(version if version else self.default_version),
orchestrationIdReusePolicy=reuse_id_policy,
tags=tags,
tags=tags
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
Expand Down
7 changes: 7 additions & 0 deletions durabletask/internal/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class VersionFailureException(Exception):
pass


class AbandonOrchestrationError(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super happy with this small file or propagating information using exceptions like I'm doing. Open to better solutions

6 changes: 4 additions & 2 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,13 @@ def new_create_sub_orchestration_action(
id: int,
name: str,
instance_id: Optional[str],
encoded_input: Optional[str]) -> pb.OrchestratorAction:
encoded_input: Optional[str],
version: Optional[str]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
name=name,
instanceId=instance_id,
input=get_string_value(encoded_input)
input=get_string_value(encoded_input),
version=get_string_value(version)
))


Expand Down
18 changes: 17 additions & 1 deletion durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ def instance_id(self) -> str:
"""
pass

@property
@abstractmethod
def version(self) -> Optional[str]:
"""Get the version of the orchestration instance.
This version is set when the orchestration is scheduled and can be used
to determine which version of the orchestrator function is being executed.
Returns
-------
Optional[str]
The version of the orchestration instance, or None if not set.
"""
pass

@property
@abstractmethod
def current_utc_datetime(self) -> datetime:
Expand Down Expand Up @@ -126,7 +141,8 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
def call_sub_orchestrator(self, orchestrator: Orchestrator[TInput, TOutput], *,
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None) -> Task[TOutput]:
retry_policy: Optional[RetryPolicy] = None,
version: Optional[str] = None) -> Task[TOutput]:
"""Schedule sub-orchestrator function for execution.
Parameters
Expand Down
Loading
Loading