diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 01ec9000..d52631e1 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -1,7 +1,7 @@ from collections import defaultdict from azure.durable_functions.models.actions.SignalEntityAction import SignalEntityAction from azure.durable_functions.models.actions.CallEntityAction import CallEntityAction -from azure.durable_functions.models.Task import TaskBase, TimerTask +from azure.durable_functions.models.Task import LongTimerTask, TaskBase, TimerTask from azure.durable_functions.models.actions.CallHttpAction import CallHttpAction from azure.durable_functions.models.DurableHttpRequest import DurableHttpRequest from azure.durable_functions.models.actions.CallSubOrchestratorWithRetryAction import \ @@ -26,6 +26,8 @@ from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID from datetime import timezone +from azure.durable_functions.models.utils.json_utils import parse_timespan_attrib + from .RetryOptions import RetryOptions from .FunctionContext import FunctionContext from .history import HistoryEvent, HistoryEventType @@ -48,11 +50,22 @@ class DurableOrchestrationContext: # parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions def __init__(self, history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool, - parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs): + parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, + maximumShortTimerDuration: str = None, + longRunningTimerIntervalDuration: str = None, upperSchemaVersionNew: int = None, + **kwargs): self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history] self._instance_id: str = instanceId self._is_replaying: bool = isReplaying self._parent_instance_id: str = parentInstanceId + self._maximum_short_timer_duration: datetime.timedelta = None + if maximumShortTimerDuration is not None: + max_short_duration = parse_timespan_attrib(maximumShortTimerDuration) + self._maximum_short_timer_duration = max_short_duration + self._long_timer_interval_duration: datetime.timedelta = None + if longRunningTimerIntervalDuration is not None: + long_interval_duration = parse_timespan_attrib(longRunningTimerIntervalDuration) + self._long_timer_interval_duration = long_interval_duration self._custom_status: Any = None self._new_uuid_counter: int = 0 self._sub_orchestrator_counter: int = 0 @@ -66,6 +79,13 @@ def __init__(self, self._function_context: FunctionContext = FunctionContext(**kwargs) self._sequence_number = 0 self._replay_schema = ReplaySchema(upperSchemaVersion) + if (upperSchemaVersionNew is not None + and upperSchemaVersionNew > self._replay_schema.value): + valid_schema_values = [enum_member.value for enum_member in ReplaySchema] + if upperSchemaVersionNew in valid_schema_values: + self._replay_schema = ReplaySchema(upperSchemaVersionNew) + else: + self._replay_schema = ReplaySchema(max(valid_schema_values)) self._action_payload_v1: List[List[Action]] = [] self._action_payload_v2: List[Action] = [] @@ -532,10 +552,10 @@ def _record_fire_and_forget_action(self, action: Action): The action to append """ new_action: Union[List[Action], Action] - if self._replay_schema is ReplaySchema.V2: - new_action = action - else: + if self._replay_schema is ReplaySchema.V1: new_action = [action] + else: + new_action = action self._add_to_actions(new_action) self._sequence_number += 1 @@ -580,6 +600,23 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase: TaskBase A Durable Timer Task that schedules the timer to wake up the activity """ + if self._replay_schema.value >= ReplaySchema.V3.value: + if not self._maximum_short_timer_duration or not self._long_timer_interval_duration: + raise Exception( + "A framework-internal error was detected: " + "replay schema version >= V3 is being used, " + "but one or more of the properties `maximumShortTimerDuration`" + "and `longRunningTimerIntervalDuration` are not defined. " + "This is likely an issue with the Durable Functions Extension. " + "Please report this bug here: " + "https://github.com/Azure/azure-functions-durable-python/issues\n" + f"maximumShortTimerDuration: {self._maximum_short_timer_duration}\n" + f"longRunningTimerIntervalDuration: {self._long_timer_interval_duration}" + ) + if fire_at > self.current_utc_datetime + self._maximum_short_timer_duration: + action = CreateTimerAction(fire_at) + return LongTimerTask(None, action, self) + action = CreateTimerAction(fire_at) task = self._generate_task(action, task_constructor=TimerTask) return task @@ -656,7 +693,8 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]): if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list): self._action_payload_v1.append(action_repr) - elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action): + elif (self._replay_schema.value >= ReplaySchema.V2.value + and isinstance(action_repr, Action)): self._action_payload_v2.append(action_repr) else: raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}" diff --git a/azure/durable_functions/models/ReplaySchema.py b/azure/durable_functions/models/ReplaySchema.py index 1fb79b95..2c08e548 100644 --- a/azure/durable_functions/models/ReplaySchema.py +++ b/azure/durable_functions/models/ReplaySchema.py @@ -6,3 +6,4 @@ class ReplaySchema(Enum): V1 = 0 V2 = 1 + V3 = 2 diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index 22faac45..7aa5b256 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -1,3 +1,4 @@ +from datetime import datetime from azure.durable_functions.models.actions.NoOpAction import NoOpAction from azure.durable_functions.models.actions.CompoundAction import CompoundAction from azure.durable_functions.models.RetryOptions import RetryOptions @@ -170,7 +171,7 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None): child_actions.append(action_repr) if compound_action_constructor is None: self.action_repr = child_actions - else: # replay_schema is ReplaySchema.V2 + else: # replay_schema >= ReplaySchema.V2 self.action_repr = compound_action_constructor(child_actions) self._first_error: Optional[Exception] = None self.pending_tasks: Set[TaskBase] = set(tasks) @@ -292,7 +293,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema): The ReplaySchema, which determines the inner action payload representation """ compound_action_constructor = None - if replay_schema is ReplaySchema.V2: + if replay_schema.value >= ReplaySchema.V2.value: compound_action_constructor = WhenAllAction super().__init__(task, compound_action_constructor) @@ -317,6 +318,119 @@ def try_set_value(self, child: TaskBase): self.set_value(is_error=True, value=self._first_error) +class LongTimerTask(WhenAllTask): + """A Timer Task for intervals longer than supported by the storage backend.""" + + def __init__(self, id_, action: CreateTimerAction, orchestration_context): + """Initialize a LongTimerTask. + + Parameters + ---------- + id_ : int + An ID for the task + action : CreateTimerAction + The action this task represents + orchestration_context: DurableOrchestrationContext + The orchestration context this task was created in + """ + current_time = orchestration_context.current_utc_datetime + final_fire_time = action.fire_at + duration_until_fire = final_fire_time - current_time + + if duration_until_fire > orchestration_context._maximum_short_timer_duration: + next_fire_time = current_time + orchestration_context._long_timer_interval_duration + else: + next_fire_time = final_fire_time + + next_timer_action = CreateTimerAction(next_fire_time) + next_timer_task = TimerTask(None, next_timer_action) + super().__init__([next_timer_task], orchestration_context._replay_schema) + + self.id = id_ + self.action = action + self._orchestration_context = orchestration_context + self._max_short_timer_duration = self._orchestration_context._maximum_short_timer_duration + self._long_timer_interval = self._orchestration_context._long_timer_interval_duration + + def is_canceled(self) -> bool: + """Check if the LongTimer is cancelled. + + Returns + ------- + bool + Returns whether the timer has been cancelled or not + """ + return self.action.is_cancelled + + def cancel(self): + """Cancel a timer. + + Raises + ------ + ValueError + Raises an error if the task is already completed and an attempt is made to cancel it + """ + if (self.result): + raise Exception("Cannot cancel a completed task.") + self.action.is_cancelled = True + + def try_set_value(self, child: TimerTask): + """Transition this LongTimer Task to a terminal state and set its value. + + If the LongTimer has not yet reached the designated completion time, starts a new + TimerTask for the next interval and does not close. + + Parameters + ---------- + child : TimerTask + A timer sub-task that just completed + """ + current_time = self._orchestration_context.current_utc_datetime + final_fire_time = self.action.fire_at + if final_fire_time > current_time: + next_timer = self.get_next_timer_task(final_fire_time, current_time) + self.add_new_child(next_timer) + return super().try_set_value(child) + + def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) -> TimerTask: + """Create a TimerTask to represent the next interval of the LongTimer. + + Parameters + ---------- + final_fire_time : datetime.datetime + The final firing time of the LongTimer + current_time : datetime.datetime + The current time + + Returns + ------- + TimerTask + A TimerTask representing the next interval of the LongTimer + """ + duration_until_fire = final_fire_time - current_time + if duration_until_fire > self._max_short_timer_duration: + next_fire_time = current_time + self._long_timer_interval + else: + next_fire_time = final_fire_time + return TimerTask(None, CreateTimerAction(next_fire_time)) + + def add_new_child(self, child_timer: TimerTask): + """Add the TimerTask to this task's children. + + Also register the TimerTask with the orchestration context. + + Parameters + ---------- + child_timer : TimerTask + The newly created TimerTask to add + """ + child_timer.parent = self + self.pending_tasks.add(child_timer) + self._orchestration_context._add_to_open_tasks(child_timer) + self._orchestration_context._add_to_actions(child_timer.action_repr) + child_timer._set_is_scheduled(True) + + class WhenAnyTask(CompoundTask): """A Task representing `when_any` scenarios.""" @@ -331,7 +445,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema): The ReplaySchema, which determines the inner action payload representation """ compound_action_constructor = None - if replay_schema is ReplaySchema.V2: + if replay_schema.value >= ReplaySchema.V2.value: compound_action_constructor = WhenAnyAction super().__init__(task, compound_action_constructor) diff --git a/azure/durable_functions/models/utils/json_utils.py b/azure/durable_functions/models/utils/json_utils.py index cdd6a711..50091da7 100644 --- a/azure/durable_functions/models/utils/json_utils.py +++ b/azure/durable_functions/models/utils/json_utils.py @@ -1,3 +1,5 @@ +import datetime +import re from typing import Dict, Any from ...constants import DATETIME_STRING_FORMAT @@ -37,6 +39,44 @@ def add_datetime_attrib(json_dict: Dict[str, Any], object_, getattr(object_, attribute_name).strftime(DATETIME_STRING_FORMAT) +# When we recieve properties from WebJobs extension originally parsed as +# TimeSpan objects through Newtonsoft, the format complies with the constant +# format specifier for TimeSpan in .NET. +# Python offers no convenient way to parse these back into timedeltas, +# so we use this regex method instead +def parse_timespan_attrib(from_str: str) -> datetime.timedelta: + """Convert a string representing TimeSpan.ToString("c") in .NET to a python timedelta. + + Parameters + ---------- + from_str: The string format of the TimeSpan to convert + + Returns + ------- + timespan.timedelta + The TimeSpan expressed as a Python datetime.timedelta + + """ + match = re.match(r"^(?P-)?(?:(?P[0-9]*)\.)?" + r"(?P[0-9]{2}):(?P[0-9]{2})" + r":(?P[0-9]{2})(?:\.(?P[0-9]{7}))?$", + from_str) + if match: + groups = match.groupdict() + span = datetime.timedelta( + days=int(groups['days'] or "0"), + hours=int(groups['hours']), + minutes=int(groups['minutes']), + seconds=int(groups['seconds']), + microseconds=int(groups['ticks'] or "0") // 10) + + if groups['negative'] == '-': + span = -span + return span + else: + raise Exception(f"Format of TimeSpan failed attempted conversion to timedelta: {from_str}") + + def add_json_attrib(json_dict: Dict[str, Any], object_, attribute_name: str, alt_name: str = None): """Add the results of the to_json() function call of the attribute from the object to the dict. diff --git a/tests/tasks/test_long_timers.py b/tests/tasks/test_long_timers.py new file mode 100644 index 00000000..e3dceb23 --- /dev/null +++ b/tests/tasks/test_long_timers.py @@ -0,0 +1,70 @@ +import datetime + +import pytest +from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext +from azure.durable_functions.models.Task import LongTimerTask, TaskState, TimerTask +from azure.durable_functions.models.actions.CreateTimerAction import CreateTimerAction + + +@pytest.fixture +def starting_context_v3(): + context = DurableOrchestrationContext.from_json( + '{"history":[{"EventType":12,"EventId":-1,"IsPlayed":false,' + '"Timestamp":"' + f'{datetime.datetime.now(datetime.timezone.utc).isoformat()}' + '"}, {"OrchestrationInstance":{' + '"InstanceId":"48d0f95957504c2fa579e810a390b938", ' + '"ExecutionId":"fd183ee02e4b4fd18c95b773cfb5452b"},"EventType":0,' + '"ParentInstance":null, ' + '"Name":"DurableOrchestratorTrigger","Version":"","Input":"null",' + '"Tags":null,"EventId":-1,"IsPlayed":false, ' + '"Timestamp":"' + f'{datetime.datetime.now(datetime.timezone.utc).isoformat()}' + '"}],"input":null,' + '"instanceId":"48d0f95957504c2fa579e810a390b938", ' + '"upperSchemaVersion": 2, ' + '"upperSchemaVersionNew": 3, ' + '"isReplaying":false,"parentInstanceId":null, ' + '"maximumShortTimerDuration":"0.16:00:00", ' + '"longRunningTimerIntervalDuration":"0.08:00:00" } ') + return context + + +def test_durable_context_creates_correct_timer(starting_context_v3): + timer = starting_context_v3.create_timer(datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(minutes=30)) + assert isinstance(timer, TimerTask) + + timer2 = starting_context_v3.create_timer(datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(days=1)) + assert isinstance(timer2, LongTimerTask) + +def test_long_timer_fires_appropriately(starting_context_v3): + starting_time = starting_context_v3.current_utc_datetime + final_fire_time = starting_time + datetime.timedelta(hours=20) + long_timer_action = CreateTimerAction(final_fire_time) + long_timer = LongTimerTask(None, long_timer_action, starting_context_v3) + assert long_timer.action.fire_at == final_fire_time + assert long_timer.action == long_timer_action + + # Check the first "inner" timer and simulate firing it + short_timer = long_timer.pending_tasks.pop() + assert short_timer.action_repr.fire_at == starting_time + datetime.timedelta(hours=8) + # This happens when the task is reconstructed during replay, doing it manually for the test + long_timer._orchestration_context.current_utc_datetime = short_timer.action_repr.fire_at + short_timer.state = TaskState.SUCCEEDED + long_timer.try_set_value(short_timer) + + assert long_timer.state == TaskState.RUNNING + + # Check the scond "inner" timer and simulate firing it. This one should be set to the final + # fire time, the remaining time (12 hours) is less than the max long timer duration (16 hours) + short_timer = long_timer.pending_tasks.pop() + assert short_timer.action_repr.fire_at == final_fire_time + long_timer._orchestration_context.current_utc_datetime = short_timer.action_repr.fire_at + short_timer.state = TaskState.SUCCEEDED + long_timer.try_set_value(short_timer) + + # Ensure the LongTimerTask finished + assert len(long_timer.pending_tasks) == 0 + assert long_timer.state == TaskState.SUCCEEDED