1+ from datetime import datetime
12from azure .durable_functions .models .actions .NoOpAction import NoOpAction
23from azure .durable_functions .models .actions .CompoundAction import CompoundAction
34from azure .durable_functions .models .RetryOptions import RetryOptions
@@ -170,7 +171,7 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None):
170171 child_actions .append (action_repr )
171172 if compound_action_constructor is None :
172173 self .action_repr = child_actions
173- else : # replay_schema is ReplaySchema.V2
174+ else : # replay_schema >= ReplaySchema.V2
174175 self .action_repr = compound_action_constructor (child_actions )
175176 self ._first_error : Optional [Exception ] = None
176177 self .pending_tasks : Set [TaskBase ] = set (tasks )
@@ -292,7 +293,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
292293 The ReplaySchema, which determines the inner action payload representation
293294 """
294295 compound_action_constructor = None
295- if replay_schema is ReplaySchema .V2 :
296+ if replay_schema . value >= ReplaySchema .V2 . value :
296297 compound_action_constructor = WhenAllAction
297298 super ().__init__ (task , compound_action_constructor )
298299
@@ -317,6 +318,119 @@ def try_set_value(self, child: TaskBase):
317318 self .set_value (is_error = True , value = self ._first_error )
318319
319320
321+ class LongTimerTask (WhenAllTask ):
322+ """A Timer Task for intervals longer than supported by the storage backend."""
323+
324+ def __init__ (self , id_ , action : CreateTimerAction , orchestration_context ):
325+ """Initialize a LongTimerTask.
326+
327+ Parameters
328+ ----------
329+ id_ : int
330+ An ID for the task
331+ action : CreateTimerAction
332+ The action this task represents
333+ orchestration_context: DurableOrchestrationContext
334+ The orchestration context this task was created in
335+ """
336+ current_time = orchestration_context .current_utc_datetime
337+ final_fire_time = action .fire_at
338+ duration_until_fire = final_fire_time - current_time
339+
340+ if duration_until_fire > orchestration_context ._maximum_short_timer_duration :
341+ next_fire_time = current_time + orchestration_context ._long_timer_interval_duration
342+ else :
343+ next_fire_time = final_fire_time
344+
345+ next_timer_action = CreateTimerAction (next_fire_time )
346+ next_timer_task = TimerTask (None , next_timer_action )
347+ super ().__init__ ([next_timer_task ], orchestration_context ._replay_schema )
348+
349+ self .id = id_
350+ self .action = action
351+ self ._orchestration_context = orchestration_context
352+ self ._max_short_timer_duration = self ._orchestration_context ._maximum_short_timer_duration
353+ self ._long_timer_interval = self ._orchestration_context ._long_timer_interval_duration
354+
355+ def is_canceled (self ) -> bool :
356+ """Check if the LongTimer is cancelled.
357+
358+ Returns
359+ -------
360+ bool
361+ Returns whether the timer has been cancelled or not
362+ """
363+ return self .action .is_cancelled
364+
365+ def cancel (self ):
366+ """Cancel a timer.
367+
368+ Raises
369+ ------
370+ ValueError
371+ Raises an error if the task is already completed and an attempt is made to cancel it
372+ """
373+ if (self .result ):
374+ raise Exception ("Cannot cancel a completed task." )
375+ self .action .is_cancelled = True
376+
377+ def try_set_value (self , child : TimerTask ):
378+ """Transition this LongTimer Task to a terminal state and set its value.
379+
380+ If the LongTimer has not yet reached the designated completion time, starts a new
381+ TimerTask for the next interval and does not close.
382+
383+ Parameters
384+ ----------
385+ child : TimerTask
386+ A timer sub-task that just completed
387+ """
388+ current_time = self ._orchestration_context .current_utc_datetime
389+ final_fire_time = self .action .fire_at
390+ if final_fire_time > current_time :
391+ next_timer = self .get_next_timer_task (final_fire_time , current_time )
392+ self .add_new_child (next_timer )
393+ return super ().try_set_value (child )
394+
395+ def get_next_timer_task (self , final_fire_time : datetime , current_time : datetime ) -> TimerTask :
396+ """Create a TimerTask to represent the next interval of the LongTimer.
397+
398+ Parameters
399+ ----------
400+ final_fire_time : datetime.datetime
401+ The final firing time of the LongTimer
402+ current_time : datetime.datetime
403+ The current time
404+
405+ Returns
406+ -------
407+ TimerTask
408+ A TimerTask representing the next interval of the LongTimer
409+ """
410+ duration_until_fire = final_fire_time - current_time
411+ if duration_until_fire > self ._max_short_timer_duration :
412+ next_fire_time = current_time + self ._long_timer_interval
413+ else :
414+ next_fire_time = final_fire_time
415+ return TimerTask (None , CreateTimerAction (next_fire_time ))
416+
417+ def add_new_child (self , child_timer : TimerTask ):
418+ """Add the TimerTask to this task's children.
419+
420+ Also register the TimerTask with the orchestration context.
421+
422+ Parameters
423+ ----------
424+ child_timer : TimerTask
425+ The newly created TimerTask to add
426+ """
427+ child_timer .parent = self
428+ self .pending_tasks .add (child_timer )
429+ self ._orchestration_context ._add_to_open_tasks (child_timer )
430+ self ._orchestration_context ._add_to_actions (child_timer .action_repr )
431+ child_timer ._set_is_scheduled (True )
432+
433+
320434class WhenAnyTask (CompoundTask ):
321435 """A Task representing `when_any` scenarios."""
322436
@@ -331,7 +445,7 @@ def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
331445 The ReplaySchema, which determines the inner action payload representation
332446 """
333447 compound_action_constructor = None
334- if replay_schema is ReplaySchema .V2 :
448+ if replay_schema . value >= ReplaySchema .V2 . value :
335449 compound_action_constructor = WhenAnyAction
336450 super ().__init__ (task , compound_action_constructor )
337451
0 commit comments