Skip to content

Commit ee347c7

Browse files
authored
Add long timers to Python (#532)
* Add long timers to Python * Add basic tests for long timers
1 parent 225965d commit ee347c7

File tree

5 files changed

+272
-9
lines changed

5 files changed

+272
-9
lines changed

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from collections import defaultdict
22
from azure.durable_functions.models.actions.SignalEntityAction import SignalEntityAction
33
from azure.durable_functions.models.actions.CallEntityAction import CallEntityAction
4-
from azure.durable_functions.models.Task import TaskBase, TimerTask
4+
from azure.durable_functions.models.Task import LongTimerTask, TaskBase, TimerTask
55
from azure.durable_functions.models.actions.CallHttpAction import CallHttpAction
66
from azure.durable_functions.models.DurableHttpRequest import DurableHttpRequest
77
from azure.durable_functions.models.actions.CallSubOrchestratorWithRetryAction import \
@@ -26,6 +26,8 @@
2626
from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID
2727
from datetime import timezone
2828

29+
from azure.durable_functions.models.utils.json_utils import parse_timespan_attrib
30+
2931
from .RetryOptions import RetryOptions
3032
from .FunctionContext import FunctionContext
3133
from .history import HistoryEvent, HistoryEventType
@@ -48,11 +50,22 @@ class DurableOrchestrationContext:
4850
# parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions
4951
def __init__(self,
5052
history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool,
51-
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs):
53+
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0,
54+
maximumShortTimerDuration: str = None,
55+
longRunningTimerIntervalDuration: str = None, upperSchemaVersionNew: int = None,
56+
**kwargs):
5257
self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history]
5358
self._instance_id: str = instanceId
5459
self._is_replaying: bool = isReplaying
5560
self._parent_instance_id: str = parentInstanceId
61+
self._maximum_short_timer_duration: datetime.timedelta = None
62+
if maximumShortTimerDuration is not None:
63+
max_short_duration = parse_timespan_attrib(maximumShortTimerDuration)
64+
self._maximum_short_timer_duration = max_short_duration
65+
self._long_timer_interval_duration: datetime.timedelta = None
66+
if longRunningTimerIntervalDuration is not None:
67+
long_interval_duration = parse_timespan_attrib(longRunningTimerIntervalDuration)
68+
self._long_timer_interval_duration = long_interval_duration
5669
self._custom_status: Any = None
5770
self._new_uuid_counter: int = 0
5871
self._sub_orchestrator_counter: int = 0
@@ -66,6 +79,13 @@ def __init__(self,
6679
self._function_context: FunctionContext = FunctionContext(**kwargs)
6780
self._sequence_number = 0
6881
self._replay_schema = ReplaySchema(upperSchemaVersion)
82+
if (upperSchemaVersionNew is not None
83+
and upperSchemaVersionNew > self._replay_schema.value):
84+
valid_schema_values = [enum_member.value for enum_member in ReplaySchema]
85+
if upperSchemaVersionNew in valid_schema_values:
86+
self._replay_schema = ReplaySchema(upperSchemaVersionNew)
87+
else:
88+
self._replay_schema = ReplaySchema(max(valid_schema_values))
6989

7090
self._action_payload_v1: List[List[Action]] = []
7191
self._action_payload_v2: List[Action] = []
@@ -532,10 +552,10 @@ def _record_fire_and_forget_action(self, action: Action):
532552
The action to append
533553
"""
534554
new_action: Union[List[Action], Action]
535-
if self._replay_schema is ReplaySchema.V2:
536-
new_action = action
537-
else:
555+
if self._replay_schema is ReplaySchema.V1:
538556
new_action = [action]
557+
else:
558+
new_action = action
539559
self._add_to_actions(new_action)
540560
self._sequence_number += 1
541561

@@ -580,6 +600,23 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase:
580600
TaskBase
581601
A Durable Timer Task that schedules the timer to wake up the activity
582602
"""
603+
if self._replay_schema.value >= ReplaySchema.V3.value:
604+
if not self._maximum_short_timer_duration or not self._long_timer_interval_duration:
605+
raise Exception(
606+
"A framework-internal error was detected: "
607+
"replay schema version >= V3 is being used, "
608+
"but one or more of the properties `maximumShortTimerDuration`"
609+
"and `longRunningTimerIntervalDuration` are not defined. "
610+
"This is likely an issue with the Durable Functions Extension. "
611+
"Please report this bug here: "
612+
"https://github.com/Azure/azure-functions-durable-python/issues\n"
613+
f"maximumShortTimerDuration: {self._maximum_short_timer_duration}\n"
614+
f"longRunningTimerIntervalDuration: {self._long_timer_interval_duration}"
615+
)
616+
if fire_at > self.current_utc_datetime + self._maximum_short_timer_duration:
617+
action = CreateTimerAction(fire_at)
618+
return LongTimerTask(None, action, self)
619+
583620
action = CreateTimerAction(fire_at)
584621
task = self._generate_task(action, task_constructor=TimerTask)
585622
return task
@@ -656,7 +693,8 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]):
656693

657694
if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list):
658695
self._action_payload_v1.append(action_repr)
659-
elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action):
696+
elif (self._replay_schema.value >= ReplaySchema.V2.value
697+
and isinstance(action_repr, Action)):
660698
self._action_payload_v2.append(action_repr)
661699
else:
662700
raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}"

azure/durable_functions/models/ReplaySchema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ class ReplaySchema(Enum):
66

77
V1 = 0
88
V2 = 1
9+
V3 = 2

azure/durable_functions/models/Task.py

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime
12
from azure.durable_functions.models.actions.NoOpAction import NoOpAction
23
from azure.durable_functions.models.actions.CompoundAction import CompoundAction
34
from azure.durable_functions.models.RetryOptions import RetryOptions
@@ -170,7 +171,7 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None):
170171
child_actions.append(action_repr)
171172
if compound_action_constructor is None:
172173
self.action_repr = child_actions
173-
else: # replay_schema is ReplaySchema.V2
174+
else: # replay_schema >= ReplaySchema.V2
174175
self.action_repr = compound_action_constructor(child_actions)
175176
self._first_error: Optional[Exception] = None
176177
self.pending_tasks: Set[TaskBase] = set(tasks)
@@ -292,7 +293,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
292293
The ReplaySchema, which determines the inner action payload representation
293294
"""
294295
compound_action_constructor = None
295-
if replay_schema is ReplaySchema.V2:
296+
if replay_schema.value >= ReplaySchema.V2.value:
296297
compound_action_constructor = WhenAllAction
297298
super().__init__(task, compound_action_constructor)
298299

@@ -317,6 +318,119 @@ def try_set_value(self, child: TaskBase):
317318
self.set_value(is_error=True, value=self._first_error)
318319

319320

321+
class LongTimerTask(WhenAllTask):
322+
"""A Timer Task for intervals longer than supported by the storage backend."""
323+
324+
def __init__(self, id_, action: CreateTimerAction, orchestration_context):
325+
"""Initialize a LongTimerTask.
326+
327+
Parameters
328+
----------
329+
id_ : int
330+
An ID for the task
331+
action : CreateTimerAction
332+
The action this task represents
333+
orchestration_context: DurableOrchestrationContext
334+
The orchestration context this task was created in
335+
"""
336+
current_time = orchestration_context.current_utc_datetime
337+
final_fire_time = action.fire_at
338+
duration_until_fire = final_fire_time - current_time
339+
340+
if duration_until_fire > orchestration_context._maximum_short_timer_duration:
341+
next_fire_time = current_time + orchestration_context._long_timer_interval_duration
342+
else:
343+
next_fire_time = final_fire_time
344+
345+
next_timer_action = CreateTimerAction(next_fire_time)
346+
next_timer_task = TimerTask(None, next_timer_action)
347+
super().__init__([next_timer_task], orchestration_context._replay_schema)
348+
349+
self.id = id_
350+
self.action = action
351+
self._orchestration_context = orchestration_context
352+
self._max_short_timer_duration = self._orchestration_context._maximum_short_timer_duration
353+
self._long_timer_interval = self._orchestration_context._long_timer_interval_duration
354+
355+
def is_canceled(self) -> bool:
356+
"""Check if the LongTimer is cancelled.
357+
358+
Returns
359+
-------
360+
bool
361+
Returns whether the timer has been cancelled or not
362+
"""
363+
return self.action.is_cancelled
364+
365+
def cancel(self):
366+
"""Cancel a timer.
367+
368+
Raises
369+
------
370+
ValueError
371+
Raises an error if the task is already completed and an attempt is made to cancel it
372+
"""
373+
if (self.result):
374+
raise Exception("Cannot cancel a completed task.")
375+
self.action.is_cancelled = True
376+
377+
def try_set_value(self, child: TimerTask):
378+
"""Transition this LongTimer Task to a terminal state and set its value.
379+
380+
If the LongTimer has not yet reached the designated completion time, starts a new
381+
TimerTask for the next interval and does not close.
382+
383+
Parameters
384+
----------
385+
child : TimerTask
386+
A timer sub-task that just completed
387+
"""
388+
current_time = self._orchestration_context.current_utc_datetime
389+
final_fire_time = self.action.fire_at
390+
if final_fire_time > current_time:
391+
next_timer = self.get_next_timer_task(final_fire_time, current_time)
392+
self.add_new_child(next_timer)
393+
return super().try_set_value(child)
394+
395+
def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) -> TimerTask:
396+
"""Create a TimerTask to represent the next interval of the LongTimer.
397+
398+
Parameters
399+
----------
400+
final_fire_time : datetime.datetime
401+
The final firing time of the LongTimer
402+
current_time : datetime.datetime
403+
The current time
404+
405+
Returns
406+
-------
407+
TimerTask
408+
A TimerTask representing the next interval of the LongTimer
409+
"""
410+
duration_until_fire = final_fire_time - current_time
411+
if duration_until_fire > self._max_short_timer_duration:
412+
next_fire_time = current_time + self._long_timer_interval
413+
else:
414+
next_fire_time = final_fire_time
415+
return TimerTask(None, CreateTimerAction(next_fire_time))
416+
417+
def add_new_child(self, child_timer: TimerTask):
418+
"""Add the TimerTask to this task's children.
419+
420+
Also register the TimerTask with the orchestration context.
421+
422+
Parameters
423+
----------
424+
child_timer : TimerTask
425+
The newly created TimerTask to add
426+
"""
427+
child_timer.parent = self
428+
self.pending_tasks.add(child_timer)
429+
self._orchestration_context._add_to_open_tasks(child_timer)
430+
self._orchestration_context._add_to_actions(child_timer.action_repr)
431+
child_timer._set_is_scheduled(True)
432+
433+
320434
class WhenAnyTask(CompoundTask):
321435
"""A Task representing `when_any` scenarios."""
322436

@@ -331,7 +445,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
331445
The ReplaySchema, which determines the inner action payload representation
332446
"""
333447
compound_action_constructor = None
334-
if replay_schema is ReplaySchema.V2:
448+
if replay_schema.value >= ReplaySchema.V2.value:
335449
compound_action_constructor = WhenAnyAction
336450
super().__init__(task, compound_action_constructor)
337451

azure/durable_functions/models/utils/json_utils.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import datetime
2+
import re
13
from typing import Dict, Any
24

35
from ...constants import DATETIME_STRING_FORMAT
@@ -37,6 +39,44 @@ def add_datetime_attrib(json_dict: Dict[str, Any], object_,
3739
getattr(object_, attribute_name).strftime(DATETIME_STRING_FORMAT)
3840

3941

42+
# When we recieve properties from WebJobs extension originally parsed as
43+
# TimeSpan objects through Newtonsoft, the format complies with the constant
44+
# format specifier for TimeSpan in .NET.
45+
# Python offers no convenient way to parse these back into timedeltas,
46+
# so we use this regex method instead
47+
def parse_timespan_attrib(from_str: str) -> datetime.timedelta:
48+
"""Convert a string representing TimeSpan.ToString("c") in .NET to a python timedelta.
49+
50+
Parameters
51+
----------
52+
from_str: The string format of the TimeSpan to convert
53+
54+
Returns
55+
-------
56+
timespan.timedelta
57+
The TimeSpan expressed as a Python datetime.timedelta
58+
59+
"""
60+
match = re.match(r"^(?P<negative>-)?(?:(?P<days>[0-9]*)\.)?"
61+
r"(?P<hours>[0-9]{2}):(?P<minutes>[0-9]{2})"
62+
r":(?P<seconds>[0-9]{2})(?:\.(?P<ticks>[0-9]{7}))?$",
63+
from_str)
64+
if match:
65+
groups = match.groupdict()
66+
span = datetime.timedelta(
67+
days=int(groups['days'] or "0"),
68+
hours=int(groups['hours']),
69+
minutes=int(groups['minutes']),
70+
seconds=int(groups['seconds']),
71+
microseconds=int(groups['ticks'] or "0") // 10)
72+
73+
if groups['negative'] == '-':
74+
span = -span
75+
return span
76+
else:
77+
raise Exception(f"Format of TimeSpan failed attempted conversion to timedelta: {from_str}")
78+
79+
4080
def add_json_attrib(json_dict: Dict[str, Any], object_,
4181
attribute_name: str, alt_name: str = None):
4282
"""Add the results of the to_json() function call of the attribute from the object to the dict.

tests/tasks/test_long_timers.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import datetime
2+
3+
import pytest
4+
from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext
5+
from azure.durable_functions.models.Task import LongTimerTask, TaskState, TimerTask
6+
from azure.durable_functions.models.actions.CreateTimerAction import CreateTimerAction
7+
8+
9+
@pytest.fixture
10+
def starting_context_v3():
11+
context = DurableOrchestrationContext.from_json(
12+
'{"history":[{"EventType":12,"EventId":-1,"IsPlayed":false,'
13+
'"Timestamp":"'
14+
f'{datetime.datetime.now(datetime.timezone.utc).isoformat()}'
15+
'"}, {"OrchestrationInstance":{'
16+
'"InstanceId":"48d0f95957504c2fa579e810a390b938", '
17+
'"ExecutionId":"fd183ee02e4b4fd18c95b773cfb5452b"},"EventType":0,'
18+
'"ParentInstance":null, '
19+
'"Name":"DurableOrchestratorTrigger","Version":"","Input":"null",'
20+
'"Tags":null,"EventId":-1,"IsPlayed":false, '
21+
'"Timestamp":"'
22+
f'{datetime.datetime.now(datetime.timezone.utc).isoformat()}'
23+
'"}],"input":null,'
24+
'"instanceId":"48d0f95957504c2fa579e810a390b938", '
25+
'"upperSchemaVersion": 2, '
26+
'"upperSchemaVersionNew": 3, '
27+
'"isReplaying":false,"parentInstanceId":null, '
28+
'"maximumShortTimerDuration":"0.16:00:00", '
29+
'"longRunningTimerIntervalDuration":"0.08:00:00" } ')
30+
return context
31+
32+
33+
def test_durable_context_creates_correct_timer(starting_context_v3):
34+
timer = starting_context_v3.create_timer(datetime.datetime.now(datetime.timezone.utc) +
35+
datetime.timedelta(minutes=30))
36+
assert isinstance(timer, TimerTask)
37+
38+
timer2 = starting_context_v3.create_timer(datetime.datetime.now(datetime.timezone.utc) +
39+
datetime.timedelta(days=1))
40+
assert isinstance(timer2, LongTimerTask)
41+
42+
def test_long_timer_fires_appropriately(starting_context_v3):
43+
starting_time = starting_context_v3.current_utc_datetime
44+
final_fire_time = starting_time + datetime.timedelta(hours=20)
45+
long_timer_action = CreateTimerAction(final_fire_time)
46+
long_timer = LongTimerTask(None, long_timer_action, starting_context_v3)
47+
assert long_timer.action.fire_at == final_fire_time
48+
assert long_timer.action == long_timer_action
49+
50+
# Check the first "inner" timer and simulate firing it
51+
short_timer = long_timer.pending_tasks.pop()
52+
assert short_timer.action_repr.fire_at == starting_time + datetime.timedelta(hours=8)
53+
# This happens when the task is reconstructed during replay, doing it manually for the test
54+
long_timer._orchestration_context.current_utc_datetime = short_timer.action_repr.fire_at
55+
short_timer.state = TaskState.SUCCEEDED
56+
long_timer.try_set_value(short_timer)
57+
58+
assert long_timer.state == TaskState.RUNNING
59+
60+
# Check the scond "inner" timer and simulate firing it. This one should be set to the final
61+
# fire time, the remaining time (12 hours) is less than the max long timer duration (16 hours)
62+
short_timer = long_timer.pending_tasks.pop()
63+
assert short_timer.action_repr.fire_at == final_fire_time
64+
long_timer._orchestration_context.current_utc_datetime = short_timer.action_repr.fire_at
65+
short_timer.state = TaskState.SUCCEEDED
66+
long_timer.try_set_value(short_timer)
67+
68+
# Ensure the LongTimerTask finished
69+
assert len(long_timer.pending_tasks) == 0
70+
assert long_timer.state == TaskState.SUCCEEDED

0 commit comments

Comments
 (0)