Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import defaultdict
from azure.durable_functions.models.actions.SignalEntityAction import SignalEntityAction
from azure.durable_functions.models.actions.CallEntityAction import CallEntityAction
from azure.durable_functions.models.Task import TaskBase, TimerTask
from azure.durable_functions.models.Task import LongTimerTask, TaskBase, TimerTask
from azure.durable_functions.models.actions.CallHttpAction import CallHttpAction
from azure.durable_functions.models.DurableHttpRequest import DurableHttpRequest
from azure.durable_functions.models.actions.CallSubOrchestratorWithRetryAction import \
Expand All @@ -26,6 +26,8 @@
from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID
from datetime import timezone

from azure.durable_functions.models.utils.json_utils import parse_timespan_attrib

from .RetryOptions import RetryOptions
from .FunctionContext import FunctionContext
from .history import HistoryEvent, HistoryEventType
Expand All @@ -48,11 +50,22 @@ class DurableOrchestrationContext:
# parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions
def __init__(self,
history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool,
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs):
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0,
maximumShortTimerDuration: str = None,
longRunningTimerIntervalDuration: str = None, upperSchemaVersionNew: int = None,
**kwargs):
self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history]
self._instance_id: str = instanceId
self._is_replaying: bool = isReplaying
self._parent_instance_id: str = parentInstanceId
self._maximum_short_timer_duration: datetime.timedelta = None
if maximumShortTimerDuration is not None:
max_short_duration = parse_timespan_attrib(maximumShortTimerDuration)
self._maximum_short_timer_duration = max_short_duration
self._long_timer_interval_duration: datetime.timedelta = None
if longRunningTimerIntervalDuration is not None:
long_interval_duration = parse_timespan_attrib(longRunningTimerIntervalDuration)
self._long_timer_interval_duration = long_interval_duration
self._custom_status: Any = None
self._new_uuid_counter: int = 0
self._sub_orchestrator_counter: int = 0
Expand All @@ -66,6 +79,13 @@ def __init__(self,
self._function_context: FunctionContext = FunctionContext(**kwargs)
self._sequence_number = 0
self._replay_schema = ReplaySchema(upperSchemaVersion)
if (upperSchemaVersionNew is not None
and upperSchemaVersionNew > self._replay_schema.value):
valid_schema_values = [enum_member.value for enum_member in ReplaySchema]
if upperSchemaVersionNew in valid_schema_values:
self._replay_schema = ReplaySchema(upperSchemaVersionNew)
else:
self._replay_schema = ReplaySchema(max(valid_schema_values))

self._action_payload_v1: List[List[Action]] = []
self._action_payload_v2: List[Action] = []
Expand Down Expand Up @@ -532,10 +552,10 @@ def _record_fire_and_forget_action(self, action: Action):
The action to append
"""
new_action: Union[List[Action], Action]
if self._replay_schema is ReplaySchema.V2:
new_action = action
else:
if self._replay_schema is ReplaySchema.V1:
new_action = [action]
else:
new_action = action
self._add_to_actions(new_action)
self._sequence_number += 1

Expand Down Expand Up @@ -580,6 +600,23 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase:
TaskBase
A Durable Timer Task that schedules the timer to wake up the activity
"""
if self._replay_schema.value >= ReplaySchema.V3.value:
if not self._maximum_short_timer_duration or not self._long_timer_interval_duration:
raise Exception(
"A framework-internal error was detected: "
"replay schema version >= V3 is being used, "
"but one or more of the properties `maximumShortTimerDuration`"
"and `longRunningTimerIntervalDuration` are not defined. "
"This is likely an issue with the Durable Functions Extension. "
"Please report this bug here: "
"https://github.com/Azure/azure-functions-durable-python/issues\n"
f"maximumShortTimerDuration: {self._maximum_short_timer_duration}\n"
f"longRunningTimerIntervalDuration: {self._long_timer_interval_duration}"
)
if fire_at > self.current_utc_datetime + self._maximum_short_timer_duration:
action = CreateTimerAction(fire_at)
return LongTimerTask(None, action, self)

action = CreateTimerAction(fire_at)
task = self._generate_task(action, task_constructor=TimerTask)
return task
Expand Down Expand Up @@ -656,7 +693,8 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]):

if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list):
self._action_payload_v1.append(action_repr)
elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action):
elif (self._replay_schema.value >= ReplaySchema.V2.value
and isinstance(action_repr, Action)):
self._action_payload_v2.append(action_repr)
else:
raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}"
Expand Down
1 change: 1 addition & 0 deletions azure/durable_functions/models/ReplaySchema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ class ReplaySchema(Enum):

V1 = 0
V2 = 1
V3 = 2
120 changes: 117 additions & 3 deletions azure/durable_functions/models/Task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from azure.durable_functions.models.actions.NoOpAction import NoOpAction
from azure.durable_functions.models.actions.CompoundAction import CompoundAction
from azure.durable_functions.models.RetryOptions import RetryOptions
Expand Down Expand Up @@ -170,7 +171,7 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None):
child_actions.append(action_repr)
if compound_action_constructor is None:
self.action_repr = child_actions
else: # replay_schema is ReplaySchema.V2
else: # replay_schema >= ReplaySchema.V2
self.action_repr = compound_action_constructor(child_actions)
self._first_error: Optional[Exception] = None
self.pending_tasks: Set[TaskBase] = set(tasks)
Expand Down Expand Up @@ -292,7 +293,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
The ReplaySchema, which determines the inner action payload representation
"""
compound_action_constructor = None
if replay_schema is ReplaySchema.V2:
if replay_schema.value >= ReplaySchema.V2.value:
compound_action_constructor = WhenAllAction
super().__init__(task, compound_action_constructor)

Expand All @@ -317,6 +318,119 @@ def try_set_value(self, child: TaskBase):
self.set_value(is_error=True, value=self._first_error)


class LongTimerTask(WhenAllTask):
"""A Timer Task for intervals longer than supported by the storage backend."""

def __init__(self, id_, action: CreateTimerAction, orchestration_context):
"""Initialize a LongTimerTask.

Parameters
----------
id_ : int
An ID for the task
action : CreateTimerAction
The action this task represents
orchestration_context: DurableOrchestrationContext
The orchestration context this task was created in
"""
current_time = orchestration_context.current_utc_datetime
final_fire_time = action.fire_at
duration_until_fire = final_fire_time - current_time

if duration_until_fire > orchestration_context._maximum_short_timer_duration:
next_fire_time = current_time + orchestration_context._long_timer_interval_duration
else:
next_fire_time = final_fire_time

next_timer_action = CreateTimerAction(next_fire_time)
next_timer_task = TimerTask(None, next_timer_action)
super().__init__([next_timer_task], orchestration_context._replay_schema)

self.id = id_
self.action = action
self._orchestration_context = orchestration_context
self._max_short_timer_duration = self._orchestration_context._maximum_short_timer_duration
self._long_timer_interval = self._orchestration_context._long_timer_interval_duration

def is_canceled(self) -> bool:
"""Check if the LongTimer is cancelled.

Returns
-------
bool
Returns whether the timer has been cancelled or not
"""
return self.action.is_cancelled

def cancel(self):
"""Cancel a timer.

Raises
------
ValueError
Raises an error if the task is already completed and an attempt is made to cancel it
"""
if (self.result):
raise Exception("Cannot cancel a completed task.")
self.action.is_cancelled = True

def try_set_value(self, child: TimerTask):
"""Transition this LongTimer Task to a terminal state and set its value.

If the LongTimer has not yet reached the designated completion time, starts a new
TimerTask for the next interval and does not close.

Parameters
----------
child : TimerTask
A timer sub-task that just completed
"""
current_time = self._orchestration_context.current_utc_datetime
final_fire_time = self.action.fire_at
if final_fire_time > current_time:
next_timer = self.get_next_timer_task(final_fire_time, current_time)
self.add_new_child(next_timer)
return super().try_set_value(child)

def get_next_timer_task(self, final_fire_time: datetime, current_time: datetime) -> TimerTask:
"""Create a TimerTask to represent the next interval of the LongTimer.

Parameters
----------
final_fire_time : datetime.datetime
The final firing time of the LongTimer
current_time : datetime.datetime
The current time

Returns
-------
TimerTask
A TimerTask representing the next interval of the LongTimer
"""
duration_until_fire = final_fire_time - current_time
if duration_until_fire > self._max_short_timer_duration:
next_fire_time = current_time + self._long_timer_interval
else:
next_fire_time = final_fire_time
return TimerTask(None, CreateTimerAction(next_fire_time))

def add_new_child(self, child_timer: TimerTask):
"""Add the TimerTask to this task's children.

Also register the TimerTask with the orchestration context.

Parameters
----------
child_timer : TimerTask
The newly created TimerTask to add
"""
child_timer.parent = self
self.pending_tasks.add(child_timer)
self._orchestration_context._add_to_open_tasks(child_timer)
self._orchestration_context._add_to_actions(child_timer.action_repr)
child_timer._set_is_scheduled(True)


class WhenAnyTask(CompoundTask):
"""A Task representing `when_any` scenarios."""

Expand All @@ -331,7 +445,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
The ReplaySchema, which determines the inner action payload representation
"""
compound_action_constructor = None
if replay_schema is ReplaySchema.V2:
if replay_schema.value >= ReplaySchema.V2.value:
compound_action_constructor = WhenAnyAction
super().__init__(task, compound_action_constructor)

Expand Down
40 changes: 40 additions & 0 deletions azure/durable_functions/models/utils/json_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime
import re
from typing import Dict, Any

from ...constants import DATETIME_STRING_FORMAT
Expand Down Expand Up @@ -37,6 +39,44 @@ def add_datetime_attrib(json_dict: Dict[str, Any], object_,
getattr(object_, attribute_name).strftime(DATETIME_STRING_FORMAT)


# When we recieve properties from WebJobs extension originally parsed as
# TimeSpan objects through Newtonsoft, the format complies with the constant
# format specifier for TimeSpan in .NET.
# Python offers no convenient way to parse these back into timedeltas,
# so we use this regex method instead
def parse_timespan_attrib(from_str: str) -> datetime.timedelta:
"""Convert a string representing TimeSpan.ToString("c") in .NET to a python timedelta.

Parameters
----------
from_str: The string format of the TimeSpan to convert

Returns
-------
timespan.timedelta
The TimeSpan expressed as a Python datetime.timedelta

"""
match = re.match(r"^(?P<negative>-)?(?:(?P<days>[0-9]*)\.)?"
r"(?P<hours>[0-9]{2}):(?P<minutes>[0-9]{2})"
r":(?P<seconds>[0-9]{2})(?:\.(?P<ticks>[0-9]{7}))?$",
from_str)
if match:
groups = match.groupdict()
span = datetime.timedelta(
days=int(groups['days'] or "0"),
hours=int(groups['hours']),
minutes=int(groups['minutes']),
seconds=int(groups['seconds']),
microseconds=int(groups['ticks'] or "0") // 10)

if groups['negative'] == '-':
span = -span
return span
else:
raise Exception(f"Format of TimeSpan failed attempted conversion to timedelta: {from_str}")


def add_json_attrib(json_dict: Dict[str, Any], object_,
attribute_name: str, alt_name: str = None):
"""Add the results of the to_json() function call of the attribute from the object to the dict.
Expand Down
70 changes: 70 additions & 0 deletions tests/tasks/test_long_timers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import datetime

import pytest
from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext
from azure.durable_functions.models.Task import LongTimerTask, TaskState, TimerTask
from azure.durable_functions.models.actions.CreateTimerAction import CreateTimerAction


@pytest.fixture
def starting_context_v3():
context = DurableOrchestrationContext.from_json(
'{"history":[{"EventType":12,"EventId":-1,"IsPlayed":false,'
'"Timestamp":"'
f'{datetime.datetime.now(datetime.timezone.utc).isoformat()}'
'"}, {"OrchestrationInstance":{'
'"InstanceId":"48d0f95957504c2fa579e810a390b938", '
'"ExecutionId":"fd183ee02e4b4fd18c95b773cfb5452b"},"EventType":0,'
'"ParentInstance":null, '
'"Name":"DurableOrchestratorTrigger","Version":"","Input":"null",'
'"Tags":null,"EventId":-1,"IsPlayed":false, '
'"Timestamp":"'
f'{datetime.datetime.now(datetime.timezone.utc).isoformat()}'
'"}],"input":null,'
'"instanceId":"48d0f95957504c2fa579e810a390b938", '
'"upperSchemaVersion": 2, '
'"upperSchemaVersionNew": 3, '
'"isReplaying":false,"parentInstanceId":null, '
'"maximumShortTimerDuration":"0.16:00:00", '
'"longRunningTimerIntervalDuration":"0.08:00:00" } ')
return context


def test_durable_context_creates_correct_timer(starting_context_v3):
timer = starting_context_v3.create_timer(datetime.datetime.now(datetime.timezone.utc) +
datetime.timedelta(minutes=30))
assert isinstance(timer, TimerTask)

timer2 = starting_context_v3.create_timer(datetime.datetime.now(datetime.timezone.utc) +
datetime.timedelta(days=1))
assert isinstance(timer2, LongTimerTask)

def test_long_timer_fires_appropriately(starting_context_v3):
starting_time = starting_context_v3.current_utc_datetime
final_fire_time = starting_time + datetime.timedelta(hours=20)
long_timer_action = CreateTimerAction(final_fire_time)
long_timer = LongTimerTask(None, long_timer_action, starting_context_v3)
assert long_timer.action.fire_at == final_fire_time
assert long_timer.action == long_timer_action

# Check the first "inner" timer and simulate firing it
short_timer = long_timer.pending_tasks.pop()
assert short_timer.action_repr.fire_at == starting_time + datetime.timedelta(hours=8)
# This happens when the task is reconstructed during replay, doing it manually for the test
long_timer._orchestration_context.current_utc_datetime = short_timer.action_repr.fire_at
short_timer.state = TaskState.SUCCEEDED
long_timer.try_set_value(short_timer)

assert long_timer.state == TaskState.RUNNING

# Check the scond "inner" timer and simulate firing it. This one should be set to the final
# fire time, the remaining time (12 hours) is less than the max long timer duration (16 hours)
short_timer = long_timer.pending_tasks.pop()
assert short_timer.action_repr.fire_at == final_fire_time
long_timer._orchestration_context.current_utc_datetime = short_timer.action_repr.fire_at
short_timer.state = TaskState.SUCCEEDED
long_timer.try_set_value(short_timer)

# Ensure the LongTimerTask finished
assert len(long_timer.pending_tasks) == 0
assert long_timer.state == TaskState.SUCCEEDED