Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 63 additions & 6 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import defaultdict
from azure.durable_functions.models.actions.SignalEntityAction import SignalEntityAction
from azure.durable_functions.models.actions.CallEntityAction import CallEntityAction
from azure.durable_functions.models.Task import TaskBase, TimerTask
from azure.durable_functions.models.Task import LongTimerTask, TaskBase, TimerTask
from azure.durable_functions.models.actions.CallHttpAction import CallHttpAction
from azure.durable_functions.models.DurableHttpRequest import DurableHttpRequest
from azure.durable_functions.models.actions.CallSubOrchestratorWithRetryAction import \
Expand All @@ -26,6 +26,8 @@
from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID
from datetime import timezone

from azure.durable_functions.models.utils.json_utils import parse_datetime_attrib_timespan

from .RetryOptions import RetryOptions
from .FunctionContext import FunctionContext
from .history import HistoryEvent, HistoryEventType
Expand All @@ -48,11 +50,19 @@ class DurableOrchestrationContext:
# parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions
def __init__(self,
history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool,
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs):
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0,
maximumShortTimerDuration=None, longRunningTimerIntervalDuration=None,
upperSchemaVersionNew = None, **kwargs):
self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history]
self._instance_id: str = instanceId
self._is_replaying: bool = isReplaying
self._parent_instance_id: str = parentInstanceId
self._maximum_short_timer_duration: datetime.timedelta
if maximumShortTimerDuration is not None:
self._maximum_short_timer_duration = parse_datetime_attrib_timespan(maximumShortTimerDuration)
self._long_running_timer_interval_duration: datetime.timedelta
if longRunningTimerIntervalDuration is not None:
self._long_running_timer_interval_duration = parse_datetime_attrib_timespan(longRunningTimerIntervalDuration)
self._custom_status: Any = None
self._new_uuid_counter: int = 0
self._sub_orchestrator_counter: int = 0
Expand All @@ -66,6 +76,8 @@ def __init__(self,
self._function_context: FunctionContext = FunctionContext(**kwargs)
self._sequence_number = 0
self._replay_schema = ReplaySchema(upperSchemaVersion)
if upperSchemaVersionNew is not None and upperSchemaVersionNew > self._replay_schema.value:
self._replay_schema = ReplaySchema(upperSchemaVersionNew)

self._action_payload_v1: List[List[Action]] = []
self._action_payload_v2: List[Action] = []
Expand Down Expand Up @@ -471,6 +483,37 @@ def parent_instance_id(self) -> str:
"""
return self._parent_instance_id

@property
def maximum_short_timer_duration(self) -> datetime.timedelta:
"""Get the maximum duration for a short timer

The maximum length of a "short timer" is defined by the storage backend.
Some storage backends have a maximum future date for scheduled tasks, and
so for timers longer than this duration, we must simulate a long timer by
waiting in chunks.

Returns
-------
str
Maximum allowable duration for a short timer in Durable
"""
return self._maximum_short_timer_duration

@property
def long_running_timer_interval_duration(self) -> datetime.timedelta:
"""Get the interval for long timers.

When a timer is scheduled for a duration longer than the maximum short timer
duration, the timer is set to run in chunks of time. The long running timer
interval duration defines how long these chunks of time should be.

Returns
-------
str
Duration for intervals of a long-running timer
"""
return self._long_running_timer_interval_duration

@property
def current_utc_datetime(self) -> datetime.datetime:
"""Get the current date/time.
Expand Down Expand Up @@ -532,10 +575,10 @@ def _record_fire_and_forget_action(self, action: Action):
The action to append
"""
new_action: Union[List[Action], Action]
if self._replay_schema is ReplaySchema.V2:
new_action = action
else:
if self._replay_schema is ReplaySchema.V1:
new_action = [action]
else:
new_action = action
self._add_to_actions(new_action)
self._sequence_number += 1

Expand Down Expand Up @@ -580,6 +623,20 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase:
TaskBase
A Durable Timer Task that schedules the timer to wake up the activity
"""
if self._replay_schema.value >= ReplaySchema.V3.value:
if not self.maximum_short_timer_duration or not self.long_running_timer_interval_duration:
raise Exception(
"A framework-internal error was detected: replay schema version >= V3 is being used, " +
"but one or more of the properties `maximumShortTimerDuration` and `longRunningTimerIntervalDuration` are not defined. " +
"This is likely an issue with the Durable Functions Extension. " +
"Please report this bug here: https://github.com/Azure/azure-functions-durable-js/issues\n" +
f"maximumShortTimerDuration: {self.maximum_short_timer_duration}\n" +
f"longRunningTimerIntervalDuration: {self.long_running_timer_interval_duration}"
)
if fire_at > self.current_utc_datetime + self.maximum_short_timer_duration:
action = CreateTimerAction(fire_at)
return LongTimerTask(None, action, self, None, self.maximum_short_timer_duration, self.long_running_timer_interval_duration)

action = CreateTimerAction(fire_at)
task = self._generate_task(action, task_constructor=TimerTask)
return task
Expand Down Expand Up @@ -656,7 +713,7 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]):

if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list):
self._action_payload_v1.append(action_repr)
elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action):
elif self._replay_schema.value >= ReplaySchema.V2.value and isinstance(action_repr, Action):
self._action_payload_v2.append(action_repr)
else:
raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}"
Expand Down
1 change: 1 addition & 0 deletions azure/durable_functions/models/ReplaySchema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ class ReplaySchema(Enum):

V1 = 0
V2 = 1
V3 = 2
60 changes: 57 additions & 3 deletions azure/durable_functions/models/Task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from azure.durable_functions.models.actions.NoOpAction import NoOpAction
from azure.durable_functions.models.actions.CompoundAction import CompoundAction
from azure.durable_functions.models.RetryOptions import RetryOptions
Expand Down Expand Up @@ -170,7 +171,7 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None):
child_actions.append(action_repr)
if compound_action_constructor is None:
self.action_repr = child_actions
else: # replay_schema is ReplaySchema.V2
else: # replay_schema >= ReplaySchema.V2
self.action_repr = compound_action_constructor(child_actions)
self._first_error: Optional[Exception] = None
self.pending_tasks: Set[TaskBase] = set(tasks)
Expand Down Expand Up @@ -292,7 +293,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
The ReplaySchema, which determines the inner action payload representation
"""
compound_action_constructor = None
if replay_schema is ReplaySchema.V2:
if replay_schema.value >= ReplaySchema.V2.value:
compound_action_constructor = WhenAllAction
super().__init__(task, compound_action_constructor)

Expand All @@ -317,6 +318,59 @@ def try_set_value(self, child: TaskBase):
self.set_value(is_error=True, value=self._first_error)


class LongTimerTask(WhenAllTask):
def __init__(self, id, action: CreateTimerAction, orchestration_context, executor, maximum_timer_length, long_running_timer_duration):
current_time = orchestration_context.current_utc_datetime
final_fire_time = action.fire_at
duration_until_fire = final_fire_time - current_time

if duration_until_fire > maximum_timer_length:
next_fire_time = current_time + long_running_timer_duration
else:
next_fire_time = final_fire_time

next_timer_action = CreateTimerAction(next_fire_time)
next_timer_task = TimerTask(None, next_timer_action)
super().__init__([next_timer_task], orchestration_context._replay_schema)

self.id = id
self.action = action
self.orchestration_context = orchestration_context
self.executor = executor
self.maximum_timer_length = maximum_timer_length
self.long_running_timer_duration = long_running_timer_duration

def is_canceled(self) -> bool:
return self.action.is_cancelled

def cancel(self):
if (self.result):
raise Exception("Cannot cancel a completed task.")
self.action.is_cancelled = True

def try_set_value(self, child: TimerTask):
current_time = self.orchestration_context.current_utc_datetime
final_fire_time = self.action.fire_at
if final_fire_time > current_time:
next_timer = self.get_next_timer_task(final_fire_time, current_time)
self.add_new_child(next_timer)
return super().try_set_value(child)

def get_next_timer_task(self, final_fire_time:datetime, current_time:datetime):
duration_until_fire = final_fire_time - current_time
if duration_until_fire > self.maximum_timer_length:
next_fire_time = current_time + self.long_running_timer_duration
else:
next_fire_time = final_fire_time
return TimerTask(None, CreateTimerAction(next_fire_time))

def add_new_child(self, child_timer: TimerTask):
child_timer.parent = self
self.pending_tasks.add(child_timer)
self.orchestration_context._add_to_open_tasks(child_timer)
self.orchestration_context._add_to_actions(child_timer.action_repr)
child_timer._set_is_scheduled(True)

class WhenAnyTask(CompoundTask):
"""A Task representing `when_any` scenarios."""

Expand All @@ -331,7 +385,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
The ReplaySchema, which determines the inner action payload representation
"""
compound_action_constructor = None
if replay_schema is ReplaySchema.V2:
if replay_schema.value >= ReplaySchema.V2.value:
compound_action_constructor = WhenAnyAction
super().__init__(task, compound_action_constructor)

Expand Down
28 changes: 28 additions & 0 deletions azure/durable_functions/models/utils/json_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime
import re
from typing import Dict, Any

from ...constants import DATETIME_STRING_FORMAT
Expand Down Expand Up @@ -36,6 +38,32 @@ def add_datetime_attrib(json_dict: Dict[str, Any], object_,
json_dict[alt_name or attribute_name] = \
getattr(object_, attribute_name).strftime(DATETIME_STRING_FORMAT)

# When we recieve properties from WebJobs extension originally parsed as TimeSpan objects through Newtonsoft,
# the format complies with the constant format specifier for TimeSpan in .NET.
# See https://learn.microsoft.com/en-us/dotnet/standard/base-types/standard-timespan-format-strings#the-constant-c-format-specifier
# Python offers no convenient way to parse these back into timedeltas, so we use this regex method instead
def parse_datetime_attrib_timespan(from_str: str) -> datetime.timedelta:
"""Converts a string originally produced by TimeSpan.ToString("c") in .NET into python's timespan.timedelta

Parameters
----------
from_str: The string format of the TimeSpan to convert

Returns
-------
timespan.timedelta
The TimeSpan expressed as a Python datetime.timedelta

"""
match = re.match(r"^(-)?(?:([0-9]*)\.)?([0-9]{2}):([0-9]{2}):([0-9]{2})(?:\.([0-9]{7}))?$", from_str)
if match:
span = datetime.timedelta(days=int(match.group(2) or "0"), hours=int(match.group(3)), minutes=int(match.group(4)), seconds=int(match.group(5)), microseconds=int(match.group(6) or "0") // 10)
if match.group(1):
span = -span
return span
else:
raise Exception(f"Format of TimeSpan failed attempted conversion to timedelta: {from_str}")


def add_json_attrib(json_dict: Dict[str, Any], object_,
attribute_name: str, alt_name: str = None):
Expand Down
Loading