Skip to content

Commit 2883df3

Browse files
committed
Add long timers to Python
1 parent efa6321 commit 2883df3

File tree

4 files changed

+149
-9
lines changed

4 files changed

+149
-9
lines changed

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from collections import defaultdict
22
from azure.durable_functions.models.actions.SignalEntityAction import SignalEntityAction
33
from azure.durable_functions.models.actions.CallEntityAction import CallEntityAction
4-
from azure.durable_functions.models.Task import TaskBase, TimerTask
4+
from azure.durable_functions.models.Task import LongTimerTask, TaskBase, TimerTask
55
from azure.durable_functions.models.actions.CallHttpAction import CallHttpAction
66
from azure.durable_functions.models.DurableHttpRequest import DurableHttpRequest
77
from azure.durable_functions.models.actions.CallSubOrchestratorWithRetryAction import \
@@ -26,6 +26,8 @@
2626
from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID
2727
from datetime import timezone
2828

29+
from azure.durable_functions.models.utils.json_utils import parse_datetime_attrib_timespan
30+
2931
from .RetryOptions import RetryOptions
3032
from .FunctionContext import FunctionContext
3133
from .history import HistoryEvent, HistoryEventType
@@ -48,11 +50,19 @@ class DurableOrchestrationContext:
4850
# parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions
4951
def __init__(self,
5052
history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool,
51-
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs):
53+
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0,
54+
maximumShortTimerDuration=None, longRunningTimerIntervalDuration=None,
55+
upperSchemaVersionNew = None, **kwargs):
5256
self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history]
5357
self._instance_id: str = instanceId
5458
self._is_replaying: bool = isReplaying
5559
self._parent_instance_id: str = parentInstanceId
60+
self._maximum_short_timer_duration: datetime.timedelta
61+
if maximumShortTimerDuration is not None:
62+
self._maximum_short_timer_duration = parse_datetime_attrib_timespan(maximumShortTimerDuration)
63+
self._long_running_timer_interval_duration: datetime.timedelta
64+
if longRunningTimerIntervalDuration is not None:
65+
self._long_running_timer_interval_duration = parse_datetime_attrib_timespan(longRunningTimerIntervalDuration)
5666
self._custom_status: Any = None
5767
self._new_uuid_counter: int = 0
5868
self._sub_orchestrator_counter: int = 0
@@ -66,6 +76,8 @@ def __init__(self,
6676
self._function_context: FunctionContext = FunctionContext(**kwargs)
6777
self._sequence_number = 0
6878
self._replay_schema = ReplaySchema(upperSchemaVersion)
79+
if upperSchemaVersionNew is not None and upperSchemaVersionNew > self._replay_schema.value:
80+
self._replay_schema = ReplaySchema(upperSchemaVersionNew)
6981

7082
self._action_payload_v1: List[List[Action]] = []
7183
self._action_payload_v2: List[Action] = []
@@ -471,6 +483,37 @@ def parent_instance_id(self) -> str:
471483
"""
472484
return self._parent_instance_id
473485

486+
@property
487+
def maximum_short_timer_duration(self) -> datetime.timedelta:
488+
"""Get the maximum duration for a short timer
489+
490+
The maximum length of a "short timer" is defined by the storage backend.
491+
Some storage backends have a maximum future date for scheduled tasks, and
492+
so for timers longer than this duration, we must simulate a long timer by
493+
waiting in chunks.
494+
495+
Returns
496+
-------
497+
str
498+
Maximum allowable duration for a short timer in Durable
499+
"""
500+
return self._maximum_short_timer_duration
501+
502+
@property
503+
def long_running_timer_interval_duration(self) -> datetime.timedelta:
504+
"""Get the interval for long timers.
505+
506+
When a timer is scheduled for a duration longer than the maximum short timer
507+
duration, the timer is set to run in chunks of time. The long running timer
508+
interval duration defines how long these chunks of time should be.
509+
510+
Returns
511+
-------
512+
str
513+
Duration for intervals of a long-running timer
514+
"""
515+
return self._long_running_timer_interval_duration
516+
474517
@property
475518
def current_utc_datetime(self) -> datetime.datetime:
476519
"""Get the current date/time.
@@ -532,10 +575,10 @@ def _record_fire_and_forget_action(self, action: Action):
532575
The action to append
533576
"""
534577
new_action: Union[List[Action], Action]
535-
if self._replay_schema is ReplaySchema.V2:
536-
new_action = action
537-
else:
578+
if self._replay_schema is ReplaySchema.V1:
538579
new_action = [action]
580+
else:
581+
new_action = action
539582
self._add_to_actions(new_action)
540583
self._sequence_number += 1
541584

@@ -580,6 +623,20 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase:
580623
TaskBase
581624
A Durable Timer Task that schedules the timer to wake up the activity
582625
"""
626+
if self._replay_schema.value >= ReplaySchema.V3.value:
627+
if not self.maximum_short_timer_duration or not self.long_running_timer_interval_duration:
628+
raise Exception(
629+
"A framework-internal error was detected: replay schema version >= V3 is being used, " +
630+
"but one or more of the properties `maximumShortTimerDuration` and `longRunningTimerIntervalDuration` are not defined. " +
631+
"This is likely an issue with the Durable Functions Extension. " +
632+
"Please report this bug here: https://github.com/Azure/azure-functions-durable-js/issues\n" +
633+
f"maximumShortTimerDuration: {self.maximum_short_timer_duration}\n" +
634+
f"longRunningTimerIntervalDuration: {self.long_running_timer_interval_duration}"
635+
)
636+
if fire_at > self.current_utc_datetime + self.maximum_short_timer_duration:
637+
action = CreateTimerAction(fire_at)
638+
return LongTimerTask(None, action, self, None, self.maximum_short_timer_duration, self.long_running_timer_interval_duration)
639+
583640
action = CreateTimerAction(fire_at)
584641
task = self._generate_task(action, task_constructor=TimerTask)
585642
return task
@@ -656,7 +713,7 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]):
656713

657714
if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list):
658715
self._action_payload_v1.append(action_repr)
659-
elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action):
716+
elif self._replay_schema.value >= ReplaySchema.V2.value and isinstance(action_repr, Action):
660717
self._action_payload_v2.append(action_repr)
661718
else:
662719
raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}"

azure/durable_functions/models/ReplaySchema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ class ReplaySchema(Enum):
66

77
V1 = 0
88
V2 = 1
9+
V3 = 2

azure/durable_functions/models/Task.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime
12
from azure.durable_functions.models.actions.NoOpAction import NoOpAction
23
from azure.durable_functions.models.actions.CompoundAction import CompoundAction
34
from azure.durable_functions.models.RetryOptions import RetryOptions
@@ -170,7 +171,7 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None):
170171
child_actions.append(action_repr)
171172
if compound_action_constructor is None:
172173
self.action_repr = child_actions
173-
else: # replay_schema is ReplaySchema.V2
174+
else: # replay_schema >= ReplaySchema.V2
174175
self.action_repr = compound_action_constructor(child_actions)
175176
self._first_error: Optional[Exception] = None
176177
self.pending_tasks: Set[TaskBase] = set(tasks)
@@ -292,7 +293,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
292293
The ReplaySchema, which determines the inner action payload representation
293294
"""
294295
compound_action_constructor = None
295-
if replay_schema is ReplaySchema.V2:
296+
if replay_schema.value >= ReplaySchema.V2.value:
296297
compound_action_constructor = WhenAllAction
297298
super().__init__(task, compound_action_constructor)
298299

@@ -317,6 +318,59 @@ def try_set_value(self, child: TaskBase):
317318
self.set_value(is_error=True, value=self._first_error)
318319

319320

321+
class LongTimerTask(WhenAllTask):
322+
def __init__(self, id, action: CreateTimerAction, orchestration_context, executor, maximum_timer_length, long_running_timer_duration):
323+
current_time = orchestration_context.current_utc_datetime
324+
final_fire_time = action.fire_at
325+
duration_until_fire = final_fire_time - current_time
326+
327+
if duration_until_fire > maximum_timer_length:
328+
next_fire_time = current_time + long_running_timer_duration
329+
else:
330+
next_fire_time = final_fire_time
331+
332+
next_timer_action = CreateTimerAction(next_fire_time)
333+
next_timer_task = TimerTask(None, next_timer_action)
334+
super().__init__([next_timer_task], orchestration_context._replay_schema)
335+
336+
self.id = id
337+
self.action = action
338+
self.orchestration_context = orchestration_context
339+
self.executor = executor
340+
self.maximum_timer_length = maximum_timer_length
341+
self.long_running_timer_duration = long_running_timer_duration
342+
343+
def is_canceled(self) -> bool:
344+
return self.action.is_cancelled
345+
346+
def cancel(self):
347+
if (self.result):
348+
raise Exception("Cannot cancel a completed task.")
349+
self.action.is_cancelled = True
350+
351+
def try_set_value(self, child: TimerTask):
352+
current_time = self.orchestration_context.current_utc_datetime
353+
final_fire_time = self.action.fire_at
354+
if final_fire_time > current_time:
355+
next_timer = self.get_next_timer_task(final_fire_time, current_time)
356+
self.add_new_child(next_timer)
357+
return super().try_set_value(child)
358+
359+
def get_next_timer_task(self, final_fire_time:datetime, current_time:datetime):
360+
duration_until_fire = final_fire_time - current_time
361+
if duration_until_fire > self.maximum_timer_length:
362+
next_fire_time = current_time + self.long_running_timer_duration
363+
else:
364+
next_fire_time = final_fire_time
365+
return TimerTask(None, CreateTimerAction(next_fire_time))
366+
367+
def add_new_child(self, child_timer: TimerTask):
368+
child_timer.parent = self
369+
self.pending_tasks.add(child_timer)
370+
self.orchestration_context._add_to_open_tasks(child_timer)
371+
self.orchestration_context._add_to_actions(child_timer.action_repr)
372+
child_timer._set_is_scheduled(True)
373+
320374
class WhenAnyTask(CompoundTask):
321375
"""A Task representing `when_any` scenarios."""
322376

@@ -331,7 +385,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
331385
The ReplaySchema, which determines the inner action payload representation
332386
"""
333387
compound_action_constructor = None
334-
if replay_schema is ReplaySchema.V2:
388+
if replay_schema.value >= ReplaySchema.V2.value:
335389
compound_action_constructor = WhenAnyAction
336390
super().__init__(task, compound_action_constructor)
337391

azure/durable_functions/models/utils/json_utils.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import datetime
2+
import re
13
from typing import Dict, Any
24

35
from ...constants import DATETIME_STRING_FORMAT
@@ -36,6 +38,32 @@ def add_datetime_attrib(json_dict: Dict[str, Any], object_,
3638
json_dict[alt_name or attribute_name] = \
3739
getattr(object_, attribute_name).strftime(DATETIME_STRING_FORMAT)
3840

41+
# When we recieve properties from WebJobs extension originally parsed as TimeSpan objects through Newtonsoft,
42+
# the format complies with the constant format specifier for TimeSpan in .NET.
43+
# See https://learn.microsoft.com/en-us/dotnet/standard/base-types/standard-timespan-format-strings#the-constant-c-format-specifier
44+
# Python offers no convenient way to parse these back into timedeltas, so we use this regex method instead
45+
def parse_datetime_attrib_timespan(from_str: str) -> datetime.timedelta:
46+
"""Converts a string originally produced by TimeSpan.ToString("c") in .NET into python's timespan.timedelta
47+
48+
Parameters
49+
----------
50+
from_str: The string format of the TimeSpan to convert
51+
52+
Returns
53+
-------
54+
timespan.timedelta
55+
The TimeSpan expressed as a Python datetime.timedelta
56+
57+
"""
58+
match = re.match(r"^(-)?(?:([0-9]*)\.)?([0-9]{2}):([0-9]{2}):([0-9]{2})(?:\.([0-9]{7}))?$", from_str)
59+
if match:
60+
span = datetime.timedelta(days=int(match.group(2) or "0"), hours=int(match.group(3)), minutes=int(match.group(4)), seconds=int(match.group(5)), microseconds=int(match.group(6) or "0") // 10)
61+
if match.group(1):
62+
span = -span
63+
return span
64+
else:
65+
raise Exception(f"Format of TimeSpan failed attempted conversion to timedelta: {from_str}")
66+
3967

4068
def add_json_attrib(json_dict: Dict[str, Any], object_,
4169
attribute_name: str, alt_name: str = None):

0 commit comments

Comments
 (0)