From 2883df3357167f725745294dd980bd95c0699ed5 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 28 Feb 2025 10:14:48 -0800 Subject: [PATCH 01/10] Add long timers to Python --- .../models/DurableOrchestrationContext.py | 69 +++++++++++++++++-- .../durable_functions/models/ReplaySchema.py | 1 + azure/durable_functions/models/Task.py | 60 +++++++++++++++- .../models/utils/json_utils.py | 28 ++++++++ 4 files changed, 149 insertions(+), 9 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 01ec9000..f60a6443 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -1,7 +1,7 @@ from collections import defaultdict from azure.durable_functions.models.actions.SignalEntityAction import SignalEntityAction from azure.durable_functions.models.actions.CallEntityAction import CallEntityAction -from azure.durable_functions.models.Task import TaskBase, TimerTask +from azure.durable_functions.models.Task import LongTimerTask, TaskBase, TimerTask from azure.durable_functions.models.actions.CallHttpAction import CallHttpAction from azure.durable_functions.models.DurableHttpRequest import DurableHttpRequest from azure.durable_functions.models.actions.CallSubOrchestratorWithRetryAction import \ @@ -26,6 +26,8 @@ from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID from datetime import timezone +from azure.durable_functions.models.utils.json_utils import parse_datetime_attrib_timespan + from .RetryOptions import RetryOptions from .FunctionContext import FunctionContext from .history import HistoryEvent, HistoryEventType @@ -48,11 +50,19 @@ class DurableOrchestrationContext: # parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions def __init__(self, history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool, - parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs): + parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, + maximumShortTimerDuration=None, longRunningTimerIntervalDuration=None, + upperSchemaVersionNew = None, **kwargs): self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history] self._instance_id: str = instanceId self._is_replaying: bool = isReplaying self._parent_instance_id: str = parentInstanceId + self._maximum_short_timer_duration: datetime.timedelta + if maximumShortTimerDuration is not None: + self._maximum_short_timer_duration = parse_datetime_attrib_timespan(maximumShortTimerDuration) + self._long_running_timer_interval_duration: datetime.timedelta + if longRunningTimerIntervalDuration is not None: + self._long_running_timer_interval_duration = parse_datetime_attrib_timespan(longRunningTimerIntervalDuration) self._custom_status: Any = None self._new_uuid_counter: int = 0 self._sub_orchestrator_counter: int = 0 @@ -66,6 +76,8 @@ def __init__(self, self._function_context: FunctionContext = FunctionContext(**kwargs) self._sequence_number = 0 self._replay_schema = ReplaySchema(upperSchemaVersion) + if upperSchemaVersionNew is not None and upperSchemaVersionNew > self._replay_schema.value: + self._replay_schema = ReplaySchema(upperSchemaVersionNew) self._action_payload_v1: List[List[Action]] = [] self._action_payload_v2: List[Action] = [] @@ -471,6 +483,37 @@ def parent_instance_id(self) -> str: """ return self._parent_instance_id + @property + def maximum_short_timer_duration(self) -> datetime.timedelta: + """Get the maximum duration for a short timer + + The maximum length of a "short timer" is defined by the storage backend. + Some storage backends have a maximum future date for scheduled tasks, and + so for timers longer than this duration, we must simulate a long timer by + waiting in chunks. + + Returns + ------- + str + Maximum allowable duration for a short timer in Durable + """ + return self._maximum_short_timer_duration + + @property + def long_running_timer_interval_duration(self) -> datetime.timedelta: + """Get the interval for long timers. + + When a timer is scheduled for a duration longer than the maximum short timer + duration, the timer is set to run in chunks of time. The long running timer + interval duration defines how long these chunks of time should be. + + Returns + ------- + str + Duration for intervals of a long-running timer + """ + return self._long_running_timer_interval_duration + @property def current_utc_datetime(self) -> datetime.datetime: """Get the current date/time. @@ -532,10 +575,10 @@ def _record_fire_and_forget_action(self, action: Action): The action to append """ new_action: Union[List[Action], Action] - if self._replay_schema is ReplaySchema.V2: - new_action = action - else: + if self._replay_schema is ReplaySchema.V1: new_action = [action] + else: + new_action = action self._add_to_actions(new_action) self._sequence_number += 1 @@ -580,6 +623,20 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase: TaskBase A Durable Timer Task that schedules the timer to wake up the activity """ + if self._replay_schema.value >= ReplaySchema.V3.value: + if not self.maximum_short_timer_duration or not self.long_running_timer_interval_duration: + raise Exception( + "A framework-internal error was detected: replay schema version >= V3 is being used, " + + "but one or more of the properties `maximumShortTimerDuration` and `longRunningTimerIntervalDuration` are not defined. " + + "This is likely an issue with the Durable Functions Extension. " + + "Please report this bug here: https://github.com/Azure/azure-functions-durable-js/issues\n" + + f"maximumShortTimerDuration: {self.maximum_short_timer_duration}\n" + + f"longRunningTimerIntervalDuration: {self.long_running_timer_interval_duration}" + ) + if fire_at > self.current_utc_datetime + self.maximum_short_timer_duration: + action = CreateTimerAction(fire_at) + return LongTimerTask(None, action, self, None, self.maximum_short_timer_duration, self.long_running_timer_interval_duration) + action = CreateTimerAction(fire_at) task = self._generate_task(action, task_constructor=TimerTask) return task @@ -656,7 +713,7 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]): if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list): self._action_payload_v1.append(action_repr) - elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action): + elif self._replay_schema.value >= ReplaySchema.V2.value and isinstance(action_repr, Action): self._action_payload_v2.append(action_repr) else: raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}" diff --git a/azure/durable_functions/models/ReplaySchema.py b/azure/durable_functions/models/ReplaySchema.py index 1fb79b95..2c08e548 100644 --- a/azure/durable_functions/models/ReplaySchema.py +++ b/azure/durable_functions/models/ReplaySchema.py @@ -6,3 +6,4 @@ class ReplaySchema(Enum): V1 = 0 V2 = 1 + V3 = 2 diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index 22faac45..590f30e5 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -1,3 +1,4 @@ +from datetime import datetime from azure.durable_functions.models.actions.NoOpAction import NoOpAction from azure.durable_functions.models.actions.CompoundAction import CompoundAction from azure.durable_functions.models.RetryOptions import RetryOptions @@ -170,7 +171,7 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None): child_actions.append(action_repr) if compound_action_constructor is None: self.action_repr = child_actions - else: # replay_schema is ReplaySchema.V2 + else: # replay_schema >= ReplaySchema.V2 self.action_repr = compound_action_constructor(child_actions) self._first_error: Optional[Exception] = None self.pending_tasks: Set[TaskBase] = set(tasks) @@ -292,7 +293,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema): The ReplaySchema, which determines the inner action payload representation """ compound_action_constructor = None - if replay_schema is ReplaySchema.V2: + if replay_schema.value >= ReplaySchema.V2.value: compound_action_constructor = WhenAllAction super().__init__(task, compound_action_constructor) @@ -317,6 +318,59 @@ def try_set_value(self, child: TaskBase): self.set_value(is_error=True, value=self._first_error) +class LongTimerTask(WhenAllTask): + def __init__(self, id, action: CreateTimerAction, orchestration_context, executor, maximum_timer_length, long_running_timer_duration): + current_time = orchestration_context.current_utc_datetime + final_fire_time = action.fire_at + duration_until_fire = final_fire_time - current_time + + if duration_until_fire > maximum_timer_length: + next_fire_time = current_time + long_running_timer_duration + else: + next_fire_time = final_fire_time + + next_timer_action = CreateTimerAction(next_fire_time) + next_timer_task = TimerTask(None, next_timer_action) + super().__init__([next_timer_task], orchestration_context._replay_schema) + + self.id = id + self.action = action + self.orchestration_context = orchestration_context + self.executor = executor + self.maximum_timer_length = maximum_timer_length + self.long_running_timer_duration = long_running_timer_duration + + def is_canceled(self) -> bool: + return self.action.is_cancelled + + def cancel(self): + if (self.result): + raise Exception("Cannot cancel a completed task.") + self.action.is_cancelled = True + + def try_set_value(self, child: TimerTask): + current_time = self.orchestration_context.current_utc_datetime + final_fire_time = self.action.fire_at + if final_fire_time > current_time: + next_timer = self.get_next_timer_task(final_fire_time, current_time) + self.add_new_child(next_timer) + return super().try_set_value(child) + + def get_next_timer_task(self, final_fire_time:datetime, current_time:datetime): + duration_until_fire = final_fire_time - current_time + if duration_until_fire > self.maximum_timer_length: + next_fire_time = current_time + self.long_running_timer_duration + else: + next_fire_time = final_fire_time + return TimerTask(None, CreateTimerAction(next_fire_time)) + + def add_new_child(self, child_timer: TimerTask): + child_timer.parent = self + self.pending_tasks.add(child_timer) + self.orchestration_context._add_to_open_tasks(child_timer) + self.orchestration_context._add_to_actions(child_timer.action_repr) + child_timer._set_is_scheduled(True) + class WhenAnyTask(CompoundTask): """A Task representing `when_any` scenarios.""" @@ -331,7 +385,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema): The ReplaySchema, which determines the inner action payload representation """ compound_action_constructor = None - if replay_schema is ReplaySchema.V2: + if replay_schema.value >= ReplaySchema.V2.value: compound_action_constructor = WhenAnyAction super().__init__(task, compound_action_constructor) diff --git a/azure/durable_functions/models/utils/json_utils.py b/azure/durable_functions/models/utils/json_utils.py index cdd6a711..e0d29c13 100644 --- a/azure/durable_functions/models/utils/json_utils.py +++ b/azure/durable_functions/models/utils/json_utils.py @@ -1,3 +1,5 @@ +import datetime +import re from typing import Dict, Any from ...constants import DATETIME_STRING_FORMAT @@ -36,6 +38,32 @@ def add_datetime_attrib(json_dict: Dict[str, Any], object_, json_dict[alt_name or attribute_name] = \ getattr(object_, attribute_name).strftime(DATETIME_STRING_FORMAT) +# When we recieve properties from WebJobs extension originally parsed as TimeSpan objects through Newtonsoft, +# the format complies with the constant format specifier for TimeSpan in .NET. +# See https://learn.microsoft.com/en-us/dotnet/standard/base-types/standard-timespan-format-strings#the-constant-c-format-specifier +# Python offers no convenient way to parse these back into timedeltas, so we use this regex method instead +def parse_datetime_attrib_timespan(from_str: str) -> datetime.timedelta: + """Converts a string originally produced by TimeSpan.ToString("c") in .NET into python's timespan.timedelta + + Parameters + ---------- + from_str: The string format of the TimeSpan to convert + + Returns + ------- + timespan.timedelta + The TimeSpan expressed as a Python datetime.timedelta + + """ + match = re.match(r"^(-)?(?:([0-9]*)\.)?([0-9]{2}):([0-9]{2}):([0-9]{2})(?:\.([0-9]{7}))?$", from_str) + if match: + span = datetime.timedelta(days=int(match.group(2) or "0"), hours=int(match.group(3)), minutes=int(match.group(4)), seconds=int(match.group(5)), microseconds=int(match.group(6) or "0") // 10) + if match.group(1): + span = -span + return span + else: + raise Exception(f"Format of TimeSpan failed attempted conversion to timedelta: {from_str}") + def add_json_attrib(json_dict: Dict[str, Any], object_, attribute_name: str, alt_name: str = None): From 84b9624ffa556cb98902bdbf51a726cbb5a05b28 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 28 Feb 2025 10:40:51 -0800 Subject: [PATCH 02/10] Linter fixes --- .../models/DurableOrchestrationContext.py | 38 +++++---- azure/durable_functions/models/Task.py | 77 ++++++++++++++++--- .../models/utils/json_utils.py | 24 ++++-- 3 files changed, 105 insertions(+), 34 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index f60a6443..5f99178c 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -26,7 +26,7 @@ from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID from datetime import timezone -from azure.durable_functions.models.utils.json_utils import parse_datetime_attrib_timespan +from azure.durable_functions.models.utils.json_utils import parse_timespan_attrib from .RetryOptions import RetryOptions from .FunctionContext import FunctionContext @@ -51,18 +51,20 @@ class DurableOrchestrationContext: def __init__(self, history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool, parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, - maximumShortTimerDuration=None, longRunningTimerIntervalDuration=None, - upperSchemaVersionNew = None, **kwargs): + maximumShortTimerDuration:str = None, longRunningTimerIntervalDuration:str = None, + upperSchemaVersionNew:int = None, **kwargs): self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history] self._instance_id: str = instanceId self._is_replaying: bool = isReplaying self._parent_instance_id: str = parentInstanceId self._maximum_short_timer_duration: datetime.timedelta if maximumShortTimerDuration is not None: - self._maximum_short_timer_duration = parse_datetime_attrib_timespan(maximumShortTimerDuration) - self._long_running_timer_interval_duration: datetime.timedelta + max_short_duration = parse_timespan_attrib(maximumShortTimerDuration) + self._maximum_short_timer_duration = max_short_duration + self._long_timer_interval_duration: datetime.timedelta if longRunningTimerIntervalDuration is not None: - self._long_running_timer_interval_duration = parse_datetime_attrib_timespan(longRunningTimerIntervalDuration) + long_interval_duration = parse_timespan_attrib(longRunningTimerIntervalDuration) + self._long_timer_interval_duration = long_interval_duration self._custom_status: Any = None self._new_uuid_counter: int = 0 self._sub_orchestrator_counter: int = 0 @@ -485,7 +487,7 @@ def parent_instance_id(self) -> str: @property def maximum_short_timer_duration(self) -> datetime.timedelta: - """Get the maximum duration for a short timer + """Get the maximum duration for a short timer. The maximum length of a "short timer" is defined by the storage backend. Some storage backends have a maximum future date for scheduled tasks, and @@ -500,7 +502,7 @@ def maximum_short_timer_duration(self) -> datetime.timedelta: return self._maximum_short_timer_duration @property - def long_running_timer_interval_duration(self) -> datetime.timedelta: + def long_timer_interval_duration(self) -> datetime.timedelta: """Get the interval for long timers. When a timer is scheduled for a duration longer than the maximum short timer @@ -512,7 +514,7 @@ def long_running_timer_interval_duration(self) -> datetime.timedelta: str Duration for intervals of a long-running timer """ - return self._long_running_timer_interval_duration + return self._long_timer_interval_duration @property def current_utc_datetime(self) -> datetime.datetime: @@ -624,18 +626,21 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase: A Durable Timer Task that schedules the timer to wake up the activity """ if self._replay_schema.value >= ReplaySchema.V3.value: - if not self.maximum_short_timer_duration or not self.long_running_timer_interval_duration: + if not self.maximum_short_timer_duration or not self.long_timer_interval_duration: raise Exception( - "A framework-internal error was detected: replay schema version >= V3 is being used, " + - "but one or more of the properties `maximumShortTimerDuration` and `longRunningTimerIntervalDuration` are not defined. " + + "A framework-internal error was detected: " + + "replay schema version >= V3 is being used, " + + "but one or more of the properties `maximumShortTimerDuration`" + + "and `longRunningTimerIntervalDuration` are not defined. " + "This is likely an issue with the Durable Functions Extension. " + - "Please report this bug here: https://github.com/Azure/azure-functions-durable-js/issues\n" + + "Please report this bug here: " + + "https://github.com/Azure/azure-functions-durable-python/issues\n" + f"maximumShortTimerDuration: {self.maximum_short_timer_duration}\n" + - f"longRunningTimerIntervalDuration: {self.long_running_timer_interval_duration}" + f"longRunningTimerIntervalDuration: {self.long_timer_interval_duration}" ) if fire_at > self.current_utc_datetime + self.maximum_short_timer_duration: action = CreateTimerAction(fire_at) - return LongTimerTask(None, action, self, None, self.maximum_short_timer_duration, self.long_running_timer_interval_duration) + return LongTimerTask(None, action, self) action = CreateTimerAction(fire_at) task = self._generate_task(action, task_constructor=TimerTask) @@ -713,7 +718,8 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]): if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list): self._action_payload_v1.append(action_repr) - elif self._replay_schema.value >= ReplaySchema.V2.value and isinstance(action_repr, Action): + elif (self._replay_schema.value >= ReplaySchema.V2.value + and isinstance(action_repr, Action)): self._action_payload_v2.append(action_repr) else: raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}" diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index 590f30e5..5793795b 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -319,13 +319,26 @@ def try_set_value(self, child: TaskBase): class LongTimerTask(WhenAllTask): - def __init__(self, id, action: CreateTimerAction, orchestration_context, executor, maximum_timer_length, long_running_timer_duration): + """A Timer Task for intervals longer than supported by the storage backend.""" + + def __init__(self, id, action: CreateTimerAction, orchestration_context): + """Initialize a LongTimerTask. + + Parameters + ---------- + id_ : int + An ID for the task + action : CreateTimerAction + The action this task represents + orchestration_context: DurableOrchestrationContext + The orchestration context this task was created in + """ current_time = orchestration_context.current_utc_datetime final_fire_time = action.fire_at duration_until_fire = final_fire_time - current_time - if duration_until_fire > maximum_timer_length: - next_fire_time = current_time + long_running_timer_duration + if duration_until_fire > self.orchestration_context.maximum_timer_length: + next_fire_time = current_time + self.orchestration_context.long_running_timer_duration else: next_fire_time = final_fire_time @@ -336,35 +349,79 @@ def __init__(self, id, action: CreateTimerAction, orchestration_context, executo self.id = id self.action = action self.orchestration_context = orchestration_context - self.executor = executor - self.maximum_timer_length = maximum_timer_length - self.long_running_timer_duration = long_running_timer_duration + self.maximum_timer_length = self.orchestration_context.maximum_timer_length + self.long_running_timer_duration = self.orchestration_context.long_running_timer_duration def is_canceled(self) -> bool: + """Check if the LongTimer is cancelled. + + Returns + ------- + bool + Returns whether the timer has been cancelled or not + """ return self.action.is_cancelled - + def cancel(self): + """Cancel a timer. + + Raises + ------ + ValueError + Raises an error if the task is already completed and an attempt is made to cancel it + """ if (self.result): raise Exception("Cannot cancel a completed task.") self.action.is_cancelled = True def try_set_value(self, child: TimerTask): + """Transition this LongTimer Task to a terminal state and set its value. + + If the LongTimer has not yet reached the designated completion time, starts a new + TimerTask for the next interval and does not close. + + Parameters + ---------- + child : TimerTask + A timer sub-task that just completed + """ current_time = self.orchestration_context.current_utc_datetime final_fire_time = self.action.fire_at if final_fire_time > current_time: next_timer = self.get_next_timer_task(final_fire_time, current_time) self.add_new_child(next_timer) return super().try_set_value(child) - - def get_next_timer_task(self, final_fire_time:datetime, current_time:datetime): + + def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) -> TimerTask: + """Creates a TimerTask that represents the next interval of the LongTimer + + Parameters + ---------- + final_fire_time : datetime.datetime + The final firing time of the LongTimer + current_time : datetime.datetime + The current time + + Returns + ------- + TimerTask + A TimerTask representing the next interval of the LongTimer + """ duration_until_fire = final_fire_time - current_time if duration_until_fire > self.maximum_timer_length: next_fire_time = current_time + self.long_running_timer_duration else: next_fire_time = final_fire_time return TimerTask(None, CreateTimerAction(next_fire_time)) - + def add_new_child(self, child_timer: TimerTask): + """Adds the TimerTask to this Task's children and schedules it in the orchestrationcontext + + Parameters + ---------- + child_timer : TimerTask + The newly created TimerTask to add + """ child_timer.parent = self self.pending_tasks.add(child_timer) self.orchestration_context._add_to_open_tasks(child_timer) diff --git a/azure/durable_functions/models/utils/json_utils.py b/azure/durable_functions/models/utils/json_utils.py index e0d29c13..eec82bf6 100644 --- a/azure/durable_functions/models/utils/json_utils.py +++ b/azure/durable_functions/models/utils/json_utils.py @@ -38,12 +38,13 @@ def add_datetime_attrib(json_dict: Dict[str, Any], object_, json_dict[alt_name or attribute_name] = \ getattr(object_, attribute_name).strftime(DATETIME_STRING_FORMAT) -# When we recieve properties from WebJobs extension originally parsed as TimeSpan objects through Newtonsoft, -# the format complies with the constant format specifier for TimeSpan in .NET. -# See https://learn.microsoft.com/en-us/dotnet/standard/base-types/standard-timespan-format-strings#the-constant-c-format-specifier -# Python offers no convenient way to parse these back into timedeltas, so we use this regex method instead -def parse_datetime_attrib_timespan(from_str: str) -> datetime.timedelta: - """Converts a string originally produced by TimeSpan.ToString("c") in .NET into python's timespan.timedelta +# When we recieve properties from WebJobs extension originally parsed as +# TimeSpan objects through Newtonsoft, the format complies with the constant +# format specifier for TimeSpan in .NET. +# Python offers no convenient way to parse these back into timedeltas, +# so we use this regex method instead +def parse_timespan_attrib(from_str: str) -> datetime.timedelta: + """Converts a string representing TimeSpan.ToString("c") in .NET to python timespan.timedelta Parameters ---------- @@ -55,9 +56,16 @@ def parse_datetime_attrib_timespan(from_str: str) -> datetime.timedelta: The TimeSpan expressed as a Python datetime.timedelta """ - match = re.match(r"^(-)?(?:([0-9]*)\.)?([0-9]{2}):([0-9]{2}):([0-9]{2})(?:\.([0-9]{7}))?$", from_str) + expr = r"^(-)?(?:([0-9]*)\.)?([0-9]{2}):([0-9]{2}):([0-9]{2})(?:\.([0-9]{7}))?$" + match = re.match(expr, from_str) if match: - span = datetime.timedelta(days=int(match.group(2) or "0"), hours=int(match.group(3)), minutes=int(match.group(4)), seconds=int(match.group(5)), microseconds=int(match.group(6) or "0") // 10) + span = datetime.timedelta( + days=int(match.group(2) or "0"), + hours=int(match.group(3)), + minutes=int(match.group(4)), + seconds=int(match.group(5)), + microseconds=int(match.group(6) or "0") // 10) + if match.group(1): span = -span return span From 95055ced85328a7afc85f4ca4059fd2f7ce9623d Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 28 Feb 2025 10:51:34 -0800 Subject: [PATCH 03/10] More linter fixes --- .../models/DurableOrchestrationContext.py | 23 ++++++++++--------- azure/durable_functions/models/Task.py | 13 +++++++---- .../models/utils/json_utils.py | 19 +++++++-------- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 5f99178c..446f9b18 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -51,8 +51,9 @@ class DurableOrchestrationContext: def __init__(self, history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool, parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, - maximumShortTimerDuration:str = None, longRunningTimerIntervalDuration:str = None, - upperSchemaVersionNew:int = None, **kwargs): + maximumShortTimerDuration: str = None, + longRunningTimerIntervalDuration: str = None, upperSchemaVersionNew: int = None, + **kwargs): self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history] self._instance_id: str = instanceId self._is_replaying: bool = isReplaying @@ -628,14 +629,14 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase: if self._replay_schema.value >= ReplaySchema.V3.value: if not self.maximum_short_timer_duration or not self.long_timer_interval_duration: raise Exception( - "A framework-internal error was detected: " + - "replay schema version >= V3 is being used, " + - "but one or more of the properties `maximumShortTimerDuration`" + - "and `longRunningTimerIntervalDuration` are not defined. " + - "This is likely an issue with the Durable Functions Extension. " + - "Please report this bug here: " + - "https://github.com/Azure/azure-functions-durable-python/issues\n" + - f"maximumShortTimerDuration: {self.maximum_short_timer_duration}\n" + + "A framework-internal error was detected: "\ + "replay schema version >= V3 is being used, "\ + "but one or more of the properties `maximumShortTimerDuration`"\ + "and `longRunningTimerIntervalDuration` are not defined. "\ + "This is likely an issue with the Durable Functions Extension. "\ + "Please report this bug here: "\ + "https://github.com/Azure/azure-functions-durable-python/issues\n"\ + f"maximumShortTimerDuration: {self.maximum_short_timer_duration}\n"\ f"longRunningTimerIntervalDuration: {self.long_timer_interval_duration}" ) if fire_at > self.current_utc_datetime + self.maximum_short_timer_duration: @@ -718,7 +719,7 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]): if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list): self._action_payload_v1.append(action_repr) - elif (self._replay_schema.value >= ReplaySchema.V2.value + elif (self._replay_schema.value >= ReplaySchema.V2.value and isinstance(action_repr, Action)): self._action_payload_v2.append(action_repr) else: diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index 5793795b..e7fe0f5d 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -377,8 +377,8 @@ def cancel(self): def try_set_value(self, child: TimerTask): """Transition this LongTimer Task to a terminal state and set its value. - If the LongTimer has not yet reached the designated completion time, starts a new - TimerTask for the next interval and does not close. + If the LongTimer has not yet reached the designated completion time, starts a new + TimerTask for the next interval and does not close. Parameters ---------- @@ -392,8 +392,8 @@ def try_set_value(self, child: TimerTask): self.add_new_child(next_timer) return super().try_set_value(child) - def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) -> TimerTask: - """Creates a TimerTask that represents the next interval of the LongTimer + def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) -> TimerTask: + """Creates a TimerTask that represents the next interval of the LongTimer. Parameters ---------- @@ -415,7 +415,9 @@ def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) return TimerTask(None, CreateTimerAction(next_fire_time)) def add_new_child(self, child_timer: TimerTask): - """Adds the TimerTask to this Task's children and schedules it in the orchestrationcontext + """Add the TimerTask to this task's children. + + Also register the TimerTask with the orchestration context. Parameters ---------- @@ -428,6 +430,7 @@ def add_new_child(self, child_timer: TimerTask): self.orchestration_context._add_to_actions(child_timer.action_repr) child_timer._set_is_scheduled(True) + class WhenAnyTask(CompoundTask): """A Task representing `when_any` scenarios.""" diff --git a/azure/durable_functions/models/utils/json_utils.py b/azure/durable_functions/models/utils/json_utils.py index eec82bf6..98a12292 100644 --- a/azure/durable_functions/models/utils/json_utils.py +++ b/azure/durable_functions/models/utils/json_utils.py @@ -38,13 +38,14 @@ def add_datetime_attrib(json_dict: Dict[str, Any], object_, json_dict[alt_name or attribute_name] = \ getattr(object_, attribute_name).strftime(DATETIME_STRING_FORMAT) -# When we recieve properties from WebJobs extension originally parsed as -# TimeSpan objects through Newtonsoft, the format complies with the constant + +# When we recieve properties from WebJobs extension originally parsed as +# TimeSpan objects through Newtonsoft, the format complies with the constant # format specifier for TimeSpan in .NET. -# Python offers no convenient way to parse these back into timedeltas, +# Python offers no convenient way to parse these back into timedeltas, # so we use this regex method instead def parse_timespan_attrib(from_str: str) -> datetime.timedelta: - """Converts a string representing TimeSpan.ToString("c") in .NET to python timespan.timedelta + """Convert a string representing TimeSpan.ToString("c") in .NET to a python timedelta. Parameters ---------- @@ -60,12 +61,12 @@ def parse_timespan_attrib(from_str: str) -> datetime.timedelta: match = re.match(expr, from_str) if match: span = datetime.timedelta( - days=int(match.group(2) or "0"), - hours=int(match.group(3)), - minutes=int(match.group(4)), - seconds=int(match.group(5)), + days=int(match.group(2) or "0"), + hours=int(match.group(3)), + minutes=int(match.group(4)), + seconds=int(match.group(5)), microseconds=int(match.group(6) or "0") // 10) - + if match.group(1): span = -span return span From 551ac1a2ef606c1b6e17ac0bc5bc5bbba67d8d63 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 28 Feb 2025 10:56:45 -0800 Subject: [PATCH 04/10] Linter --- .../models/DurableOrchestrationContext.py | 20 +++++++++---------- azure/durable_functions/models/Task.py | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 446f9b18..af77c0a4 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -51,7 +51,7 @@ class DurableOrchestrationContext: def __init__(self, history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool, parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, - maximumShortTimerDuration: str = None, + maximumShortTimerDuration: str = None, longRunningTimerIntervalDuration: str = None, upperSchemaVersionNew: int = None, **kwargs): self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history] @@ -629,15 +629,15 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase: if self._replay_schema.value >= ReplaySchema.V3.value: if not self.maximum_short_timer_duration or not self.long_timer_interval_duration: raise Exception( - "A framework-internal error was detected: "\ - "replay schema version >= V3 is being used, "\ - "but one or more of the properties `maximumShortTimerDuration`"\ - "and `longRunningTimerIntervalDuration` are not defined. "\ - "This is likely an issue with the Durable Functions Extension. "\ - "Please report this bug here: "\ - "https://github.com/Azure/azure-functions-durable-python/issues\n"\ - f"maximumShortTimerDuration: {self.maximum_short_timer_duration}\n"\ - f"longRunningTimerIntervalDuration: {self.long_timer_interval_duration}" + "A framework-internal error was detected: " + "replay schema version >= V3 is being used, " + "but one or more of the properties `maximumShortTimerDuration`" + "and `longRunningTimerIntervalDuration` are not defined. " + "This is likely an issue with the Durable Functions Extension. " + "Please report this bug here: " + "https://github.com/Azure/azure-functions-durable-python/issues\n" + f"maximumShortTimerDuration: {self.maximum_short_timer_duration}\n" + f"longRunningTimerIntervalDuration: {self.long_timer_interval_duration}" ) if fire_at > self.current_utc_datetime + self.maximum_short_timer_duration: action = CreateTimerAction(fire_at) diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index e7fe0f5d..874df742 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -393,7 +393,7 @@ def try_set_value(self, child: TimerTask): return super().try_set_value(child) def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) -> TimerTask: - """Creates a TimerTask that represents the next interval of the LongTimer. + """Create a TimerTask to represent the next interval of the LongTimer. Parameters ---------- @@ -416,7 +416,7 @@ def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) def add_new_child(self, child_timer: TimerTask): """Add the TimerTask to this task's children. - + Also register the TimerTask with the orchestration context. Parameters From c85717384a095460a559ca06431a4a36de67e76d Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 28 Feb 2025 11:04:32 -0800 Subject: [PATCH 05/10] Add check for upperSchemaVersionNew --- azure/durable_functions/models/DurableOrchestrationContext.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index af77c0a4..6db72a5c 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -79,7 +79,9 @@ def __init__(self, self._function_context: FunctionContext = FunctionContext(**kwargs) self._sequence_number = 0 self._replay_schema = ReplaySchema(upperSchemaVersion) - if upperSchemaVersionNew is not None and upperSchemaVersionNew > self._replay_schema.value: + if (upperSchemaVersionNew is not None + and upperSchemaVersionNew > self._replay_schema.value + and upperSchemaVersionNew in ReplaySchema._value2member_map_): self._replay_schema = ReplaySchema(upperSchemaVersionNew) self._action_payload_v1: List[List[Action]] = [] From 1ac7bafc52c6184f373882aa334edde39c38f64b Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 28 Feb 2025 11:25:25 -0800 Subject: [PATCH 06/10] PR suggestion improvements --- .../models/DurableOrchestrationContext.py | 11 +++++++---- azure/durable_functions/models/Task.py | 12 ++++++------ .../models/utils/json_utils.py | 19 +++++++++++-------- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 6db72a5c..78c64242 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -64,7 +64,7 @@ def __init__(self, self._maximum_short_timer_duration = max_short_duration self._long_timer_interval_duration: datetime.timedelta if longRunningTimerIntervalDuration is not None: - long_interval_duration = parse_timespan_attrib(longRunningTimerIntervalDuration) + long_interval_duration = datetime.timedelta(seconds=10) # parse_timespan_attrib(longRunningTimerIntervalDuration) self._long_timer_interval_duration = long_interval_duration self._custom_status: Any = None self._new_uuid_counter: int = 0 @@ -80,9 +80,12 @@ def __init__(self, self._sequence_number = 0 self._replay_schema = ReplaySchema(upperSchemaVersion) if (upperSchemaVersionNew is not None - and upperSchemaVersionNew > self._replay_schema.value - and upperSchemaVersionNew in ReplaySchema._value2member_map_): - self._replay_schema = ReplaySchema(upperSchemaVersionNew) + and upperSchemaVersionNew > self._replay_schema.value): + valid_schema_values = [enum_member.value for enum_member in ReplaySchema] + if upperSchemaVersionNew in valid_schema_values: + self._replay_schema = ReplaySchema(upperSchemaVersionNew) + else: + self._replay_schema = ReplaySchema(max(valid_schema_values)) self._action_payload_v1: List[List[Action]] = [] self._action_payload_v2: List[Action] = [] diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index 874df742..ead86671 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -337,8 +337,8 @@ def __init__(self, id, action: CreateTimerAction, orchestration_context): final_fire_time = action.fire_at duration_until_fire = final_fire_time - current_time - if duration_until_fire > self.orchestration_context.maximum_timer_length: - next_fire_time = current_time + self.orchestration_context.long_running_timer_duration + if duration_until_fire > orchestration_context.maximum_short_timer_duration: + next_fire_time = current_time + orchestration_context.long_timer_interval_duration else: next_fire_time = final_fire_time @@ -349,8 +349,8 @@ def __init__(self, id, action: CreateTimerAction, orchestration_context): self.id = id self.action = action self.orchestration_context = orchestration_context - self.maximum_timer_length = self.orchestration_context.maximum_timer_length - self.long_running_timer_duration = self.orchestration_context.long_running_timer_duration + self.maximum_short_timer_duration = self.orchestration_context.maximum_short_timer_duration + self.long_timer_interval_duration = self.orchestration_context.long_timer_interval_duration def is_canceled(self) -> bool: """Check if the LongTimer is cancelled. @@ -408,8 +408,8 @@ def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) A TimerTask representing the next interval of the LongTimer """ duration_until_fire = final_fire_time - current_time - if duration_until_fire > self.maximum_timer_length: - next_fire_time = current_time + self.long_running_timer_duration + if duration_until_fire > self.maximum_short_timer_duration: + next_fire_time = current_time + self.long_timer_interval_duration else: next_fire_time = final_fire_time return TimerTask(None, CreateTimerAction(next_fire_time)) diff --git a/azure/durable_functions/models/utils/json_utils.py b/azure/durable_functions/models/utils/json_utils.py index 98a12292..50091da7 100644 --- a/azure/durable_functions/models/utils/json_utils.py +++ b/azure/durable_functions/models/utils/json_utils.py @@ -57,17 +57,20 @@ def parse_timespan_attrib(from_str: str) -> datetime.timedelta: The TimeSpan expressed as a Python datetime.timedelta """ - expr = r"^(-)?(?:([0-9]*)\.)?([0-9]{2}):([0-9]{2}):([0-9]{2})(?:\.([0-9]{7}))?$" - match = re.match(expr, from_str) + match = re.match(r"^(?P-)?(?:(?P[0-9]*)\.)?" + r"(?P[0-9]{2}):(?P[0-9]{2})" + r":(?P[0-9]{2})(?:\.(?P[0-9]{7}))?$", + from_str) if match: + groups = match.groupdict() span = datetime.timedelta( - days=int(match.group(2) or "0"), - hours=int(match.group(3)), - minutes=int(match.group(4)), - seconds=int(match.group(5)), - microseconds=int(match.group(6) or "0") // 10) + days=int(groups['days'] or "0"), + hours=int(groups['hours']), + minutes=int(groups['minutes']), + seconds=int(groups['seconds']), + microseconds=int(groups['ticks'] or "0") // 10) - if match.group(1): + if groups['negative'] == '-': span = -span return span else: From 4028afd198384b483d2687fb03bf81588645c00c Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 28 Feb 2025 11:27:13 -0800 Subject: [PATCH 07/10] Revert testing change --- azure/durable_functions/models/DurableOrchestrationContext.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 78c64242..15a1fe11 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -64,7 +64,7 @@ def __init__(self, self._maximum_short_timer_duration = max_short_duration self._long_timer_interval_duration: datetime.timedelta if longRunningTimerIntervalDuration is not None: - long_interval_duration = datetime.timedelta(seconds=10) # parse_timespan_attrib(longRunningTimerIntervalDuration) + long_interval_duration = parse_timespan_attrib(longRunningTimerIntervalDuration) self._long_timer_interval_duration = long_interval_duration self._custom_status: Any = None self._new_uuid_counter: int = 0 From d194b8b30aa9e2ce34026c6de7478def63721925 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 4 Apr 2025 16:23:05 -0600 Subject: [PATCH 08/10] Add basic tests for long timers --- .../models/DurableOrchestrationContext.py | 4 +- tests/tasks/test_long_timers.py | 70 +++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 tests/tasks/test_long_timers.py diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 15a1fe11..3964a4b6 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -58,11 +58,11 @@ def __init__(self, self._instance_id: str = instanceId self._is_replaying: bool = isReplaying self._parent_instance_id: str = parentInstanceId - self._maximum_short_timer_duration: datetime.timedelta + self._maximum_short_timer_duration: datetime.timedelta = None if maximumShortTimerDuration is not None: max_short_duration = parse_timespan_attrib(maximumShortTimerDuration) self._maximum_short_timer_duration = max_short_duration - self._long_timer_interval_duration: datetime.timedelta + self._long_timer_interval_duration: datetime.timedelta = None if longRunningTimerIntervalDuration is not None: long_interval_duration = parse_timespan_attrib(longRunningTimerIntervalDuration) self._long_timer_interval_duration = long_interval_duration diff --git a/tests/tasks/test_long_timers.py b/tests/tasks/test_long_timers.py new file mode 100644 index 00000000..fbc29e73 --- /dev/null +++ b/tests/tasks/test_long_timers.py @@ -0,0 +1,70 @@ +import datetime + +import pytest +from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext +from azure.durable_functions.models.Task import LongTimerTask, TaskState, TimerTask +from azure.durable_functions.models.actions.CreateTimerAction import CreateTimerAction + + +@pytest.fixture +def starting_context_v3(): + context = DurableOrchestrationContext.from_json( + '{"history":[{"EventType":12,"EventId":-1,"IsPlayed":false,' + '"Timestamp":"' + f'{datetime.datetime.now(datetime.timezone.utc).isoformat()}' + '"}, {"OrchestrationInstance":{' + '"InstanceId":"48d0f95957504c2fa579e810a390b938", ' + '"ExecutionId":"fd183ee02e4b4fd18c95b773cfb5452b"},"EventType":0,' + '"ParentInstance":null, ' + '"Name":"DurableOrchestratorTrigger","Version":"","Input":"null",' + '"Tags":null,"EventId":-1,"IsPlayed":false, ' + '"Timestamp":"' + f'{datetime.datetime.now(datetime.timezone.utc).isoformat()}' + '"}],"input":null,' + '"instanceId":"48d0f95957504c2fa579e810a390b938", ' + '"upperSchemaVersion": 2, ' + '"upperSchemaVersionNew": 3, ' + '"isReplaying":false,"parentInstanceId":null, ' + '"maximumShortTimerDuration":"0.16:00:00", ' + '"longRunningTimerIntervalDuration":"0.08:00:00" } ') + return context + + +def test_durable_context_creates_correct_timer(starting_context_v3): + timer = starting_context_v3.create_timer(datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(minutes=30)) + assert isinstance(timer, TimerTask) + + timer2 = starting_context_v3.create_timer(datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(days=1)) + assert isinstance(timer2, LongTimerTask) + +def test_long_timer_fires_appropriately(starting_context_v3): + starting_time = starting_context_v3.current_utc_datetime + final_fire_time = starting_time + datetime.timedelta(hours=20) + long_timer_action = CreateTimerAction(final_fire_time) + long_timer_task = LongTimerTask(None, long_timer_action, starting_context_v3) + assert long_timer_task.action.fire_at == final_fire_time + assert long_timer_task.action == long_timer_action + + # Check the first "inner" timer and simulate firing it + short_timer_task = long_timer_task.pending_tasks.pop() + assert short_timer_task.action_repr.fire_at == starting_time + datetime.timedelta(hours=8) + # This happens when the task is reconstructed during replay, doing it manually for the test + long_timer_task.orchestration_context.current_utc_datetime = short_timer_task.action_repr.fire_at + short_timer_task.state = TaskState.SUCCEEDED + long_timer_task.try_set_value(short_timer_task) + + assert long_timer_task.state == TaskState.RUNNING + + # Check the scond "inner" timer and simulate firing it. This one should be set to the final + # fire time, the remaining time (12 hours) is less than the max long timer duration (16 hours) + short_timer_task = long_timer_task.pending_tasks.pop() + assert short_timer_task.action_repr.fire_at == final_fire_time + long_timer_task.orchestration_context.current_utc_datetime = short_timer_task.action_repr.fire_at + short_timer_task.state = TaskState.SUCCEEDED + long_timer_task.try_set_value(short_timer_task) + + # Ensure the LongTimerTask finished + assert len(long_timer_task.pending_tasks) == 0 + assert long_timer_task.state == TaskState.SUCCEEDED From 8c359a11ffc020a4c61b513440f26bc64a461abc Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Tue, 8 Apr 2025 10:43:40 -0600 Subject: [PATCH 09/10] Naming consistency --- .../models/DurableOrchestrationContext.py | 12 +++---- azure/durable_functions/models/Task.py | 24 +++++++------- tests/tasks/test_long_timers.py | 32 +++++++++---------- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 3964a4b6..a8a784fd 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -492,7 +492,7 @@ def parent_instance_id(self) -> str: return self._parent_instance_id @property - def maximum_short_timer_duration(self) -> datetime.timedelta: + def _maximum_short_timer_duration(self) -> datetime.timedelta: """Get the maximum duration for a short timer. The maximum length of a "short timer" is defined by the storage backend. @@ -508,7 +508,7 @@ def maximum_short_timer_duration(self) -> datetime.timedelta: return self._maximum_short_timer_duration @property - def long_timer_interval_duration(self) -> datetime.timedelta: + def _long_timer_interval_duration(self) -> datetime.timedelta: """Get the interval for long timers. When a timer is scheduled for a duration longer than the maximum short timer @@ -632,7 +632,7 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase: A Durable Timer Task that schedules the timer to wake up the activity """ if self._replay_schema.value >= ReplaySchema.V3.value: - if not self.maximum_short_timer_duration or not self.long_timer_interval_duration: + if not self._maximum_short_timer_duration or not self._long_timer_interval_duration: raise Exception( "A framework-internal error was detected: " "replay schema version >= V3 is being used, " @@ -641,10 +641,10 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase: "This is likely an issue with the Durable Functions Extension. " "Please report this bug here: " "https://github.com/Azure/azure-functions-durable-python/issues\n" - f"maximumShortTimerDuration: {self.maximum_short_timer_duration}\n" - f"longRunningTimerIntervalDuration: {self.long_timer_interval_duration}" + f"maximumShortTimerDuration: {self._maximum_short_timer_duration}\n" + f"longRunningTimerIntervalDuration: {self._long_timer_interval_duration}" ) - if fire_at > self.current_utc_datetime + self.maximum_short_timer_duration: + if fire_at > self.current_utc_datetime + self._maximum_short_timer_duration: action = CreateTimerAction(fire_at) return LongTimerTask(None, action, self) diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index ead86671..7aa5b256 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -321,7 +321,7 @@ def try_set_value(self, child: TaskBase): class LongTimerTask(WhenAllTask): """A Timer Task for intervals longer than supported by the storage backend.""" - def __init__(self, id, action: CreateTimerAction, orchestration_context): + def __init__(self, id_, action: CreateTimerAction, orchestration_context): """Initialize a LongTimerTask. Parameters @@ -337,8 +337,8 @@ def __init__(self, id, action: CreateTimerAction, orchestration_context): final_fire_time = action.fire_at duration_until_fire = final_fire_time - current_time - if duration_until_fire > orchestration_context.maximum_short_timer_duration: - next_fire_time = current_time + orchestration_context.long_timer_interval_duration + if duration_until_fire > orchestration_context._maximum_short_timer_duration: + next_fire_time = current_time + orchestration_context._long_timer_interval_duration else: next_fire_time = final_fire_time @@ -346,11 +346,11 @@ def __init__(self, id, action: CreateTimerAction, orchestration_context): next_timer_task = TimerTask(None, next_timer_action) super().__init__([next_timer_task], orchestration_context._replay_schema) - self.id = id + self.id = id_ self.action = action - self.orchestration_context = orchestration_context - self.maximum_short_timer_duration = self.orchestration_context.maximum_short_timer_duration - self.long_timer_interval_duration = self.orchestration_context.long_timer_interval_duration + self._orchestration_context = orchestration_context + self._max_short_timer_duration = self._orchestration_context._maximum_short_timer_duration + self._long_timer_interval = self._orchestration_context._long_timer_interval_duration def is_canceled(self) -> bool: """Check if the LongTimer is cancelled. @@ -385,7 +385,7 @@ def try_set_value(self, child: TimerTask): child : TimerTask A timer sub-task that just completed """ - current_time = self.orchestration_context.current_utc_datetime + current_time = self._orchestration_context.current_utc_datetime final_fire_time = self.action.fire_at if final_fire_time > current_time: next_timer = self.get_next_timer_task(final_fire_time, current_time) @@ -408,8 +408,8 @@ def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) A TimerTask representing the next interval of the LongTimer """ duration_until_fire = final_fire_time - current_time - if duration_until_fire > self.maximum_short_timer_duration: - next_fire_time = current_time + self.long_timer_interval_duration + if duration_until_fire > self._max_short_timer_duration: + next_fire_time = current_time + self._long_timer_interval else: next_fire_time = final_fire_time return TimerTask(None, CreateTimerAction(next_fire_time)) @@ -426,8 +426,8 @@ def add_new_child(self, child_timer: TimerTask): """ child_timer.parent = self self.pending_tasks.add(child_timer) - self.orchestration_context._add_to_open_tasks(child_timer) - self.orchestration_context._add_to_actions(child_timer.action_repr) + self._orchestration_context._add_to_open_tasks(child_timer) + self._orchestration_context._add_to_actions(child_timer.action_repr) child_timer._set_is_scheduled(True) diff --git a/tests/tasks/test_long_timers.py b/tests/tasks/test_long_timers.py index fbc29e73..e3dceb23 100644 --- a/tests/tasks/test_long_timers.py +++ b/tests/tasks/test_long_timers.py @@ -43,28 +43,28 @@ def test_long_timer_fires_appropriately(starting_context_v3): starting_time = starting_context_v3.current_utc_datetime final_fire_time = starting_time + datetime.timedelta(hours=20) long_timer_action = CreateTimerAction(final_fire_time) - long_timer_task = LongTimerTask(None, long_timer_action, starting_context_v3) - assert long_timer_task.action.fire_at == final_fire_time - assert long_timer_task.action == long_timer_action + long_timer = LongTimerTask(None, long_timer_action, starting_context_v3) + assert long_timer.action.fire_at == final_fire_time + assert long_timer.action == long_timer_action # Check the first "inner" timer and simulate firing it - short_timer_task = long_timer_task.pending_tasks.pop() - assert short_timer_task.action_repr.fire_at == starting_time + datetime.timedelta(hours=8) + short_timer = long_timer.pending_tasks.pop() + assert short_timer.action_repr.fire_at == starting_time + datetime.timedelta(hours=8) # This happens when the task is reconstructed during replay, doing it manually for the test - long_timer_task.orchestration_context.current_utc_datetime = short_timer_task.action_repr.fire_at - short_timer_task.state = TaskState.SUCCEEDED - long_timer_task.try_set_value(short_timer_task) + long_timer._orchestration_context.current_utc_datetime = short_timer.action_repr.fire_at + short_timer.state = TaskState.SUCCEEDED + long_timer.try_set_value(short_timer) - assert long_timer_task.state == TaskState.RUNNING + assert long_timer.state == TaskState.RUNNING # Check the scond "inner" timer and simulate firing it. This one should be set to the final # fire time, the remaining time (12 hours) is less than the max long timer duration (16 hours) - short_timer_task = long_timer_task.pending_tasks.pop() - assert short_timer_task.action_repr.fire_at == final_fire_time - long_timer_task.orchestration_context.current_utc_datetime = short_timer_task.action_repr.fire_at - short_timer_task.state = TaskState.SUCCEEDED - long_timer_task.try_set_value(short_timer_task) + short_timer = long_timer.pending_tasks.pop() + assert short_timer.action_repr.fire_at == final_fire_time + long_timer._orchestration_context.current_utc_datetime = short_timer.action_repr.fire_at + short_timer.state = TaskState.SUCCEEDED + long_timer.try_set_value(short_timer) # Ensure the LongTimerTask finished - assert len(long_timer_task.pending_tasks) == 0 - assert long_timer_task.state == TaskState.SUCCEEDED + assert len(long_timer.pending_tasks) == 0 + assert long_timer.state == TaskState.SUCCEEDED From 5be662946201f8ac1ebac83631e1232430ba9008 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Tue, 8 Apr 2025 11:03:11 -0600 Subject: [PATCH 10/10] Remove unnecessary property definitions --- .../models/DurableOrchestrationContext.py | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index a8a784fd..d52631e1 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -491,37 +491,6 @@ def parent_instance_id(self) -> str: """ return self._parent_instance_id - @property - def _maximum_short_timer_duration(self) -> datetime.timedelta: - """Get the maximum duration for a short timer. - - The maximum length of a "short timer" is defined by the storage backend. - Some storage backends have a maximum future date for scheduled tasks, and - so for timers longer than this duration, we must simulate a long timer by - waiting in chunks. - - Returns - ------- - str - Maximum allowable duration for a short timer in Durable - """ - return self._maximum_short_timer_duration - - @property - def _long_timer_interval_duration(self) -> datetime.timedelta: - """Get the interval for long timers. - - When a timer is scheduled for a duration longer than the maximum short timer - duration, the timer is set to run in chunks of time. The long running timer - interval duration defines how long these chunks of time should be. - - Returns - ------- - str - Duration for intervals of a long-running timer - """ - return self._long_timer_interval_duration - @property def current_utc_datetime(self) -> datetime.datetime: """Get the current date/time.