Skip to content

Commit 0faf559

Browse files
authored
Fixing Retrying logic (#186)
1 parent a640ad1 commit 0faf559

File tree

6 files changed

+379
-84
lines changed

6 files changed

+379
-84
lines changed
Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
from typing import List, Any
22

3-
from .task_utilities import find_task_scheduled, \
4-
find_task_retry_timer_created, set_processed, parse_history_event, \
5-
find_task_completed, find_task_failed, find_task_retry_timer_fired
3+
from .task_utilities import get_retried_task
64
from ..models.RetryOptions import RetryOptions
75
from ..models.Task import (
86
Task)
97
from ..models.actions.CallActivityWithRetryAction import \
108
CallActivityWithRetryAction
11-
from ..models.history import HistoryEvent
9+
from ..models.history import HistoryEvent, HistoryEventType
1210

1311

1412
def call_activity_with_retry_task(
@@ -37,40 +35,12 @@ def call_activity_with_retry_task(
3735
"""
3836
new_action = CallActivityWithRetryAction(
3937
function_name=name, retry_options=retry_options, input_=input_)
40-
for attempt in range(retry_options.max_number_of_attempts):
41-
task_scheduled = find_task_scheduled(state, name)
42-
task_completed = find_task_completed(state, task_scheduled)
43-
task_failed = find_task_failed(state, task_scheduled)
44-
task_retry_timer = find_task_retry_timer_created(state, task_failed)
45-
task_retry_timer_fired = find_task_retry_timer_fired(
46-
state, task_retry_timer)
47-
set_processed([task_scheduled, task_completed,
48-
task_failed, task_retry_timer, task_retry_timer_fired])
4938

50-
if not task_scheduled:
51-
break
52-
53-
if task_completed:
54-
return Task(
55-
is_completed=True,
56-
is_faulted=False,
57-
action=new_action,
58-
is_played=task_completed._is_played,
59-
result=parse_history_event(task_completed),
60-
timestamp=task_completed.timestamp,
61-
id_=task_completed.TaskScheduledId)
62-
63-
if task_failed and task_retry_timer and attempt + 1 >= \
64-
retry_options.max_number_of_attempts:
65-
return Task(
66-
is_completed=True,
67-
is_faulted=True,
68-
action=new_action,
69-
is_played=task_failed._is_played,
70-
timestamp=task_failed.timestamp,
71-
id_=task_failed.TaskScheduledId,
72-
exc=Exception(
73-
f"{task_failed.Reason} \n {task_failed.Details}")
74-
)
75-
76-
return Task(is_completed=False, is_faulted=False, action=new_action)
39+
return get_retried_task(
40+
state=state,
41+
max_number_of_attempts=retry_options.max_number_of_attempts,
42+
scheduled_type=HistoryEventType.TASK_SCHEDULED,
43+
completed_type=HistoryEventType.TASK_COMPLETED,
44+
failed_type=HistoryEventType.TASK_FAILED,
45+
action=new_action
46+
)

azure/durable_functions/tasks/call_suborchestrator_with_retry.py

Lines changed: 10 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
Task)
55
from ..models.actions.CallSubOrchestratorWithRetryAction import CallSubOrchestratorWithRetryAction
66
from ..models.RetryOptions import RetryOptions
7-
from ..models.history import HistoryEvent
8-
from .task_utilities import set_processed, parse_history_event, \
9-
find_sub_orchestration_created, find_sub_orchestration_completed, \
10-
find_sub_orchestration_failed, find_task_retry_timer_fired, find_task_retry_timer_created
7+
from ..models.history import HistoryEvent, HistoryEventType
8+
from .task_utilities import get_retried_task
119

1210

1311
def call_sub_orchestrator_with_retry_task(
@@ -40,42 +38,11 @@ def call_sub_orchestrator_with_retry_task(
4038
A Durable Task that completes when the called sub-orchestrator completes or fails.
4139
"""
4240
new_action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id)
43-
for attempt in range(retry_options.max_number_of_attempts):
44-
task_scheduled = find_sub_orchestration_created(
45-
state, name, context=context, instance_id=instance_id)
46-
task_completed = find_sub_orchestration_completed(state, task_scheduled)
47-
task_failed = find_sub_orchestration_failed(state, task_scheduled)
48-
task_retry_timer = find_task_retry_timer_created(state, task_failed)
49-
task_retry_timer_fired = find_task_retry_timer_fired(
50-
state, task_retry_timer)
51-
set_processed([task_scheduled, task_completed,
52-
task_failed, task_retry_timer, task_retry_timer_fired])
53-
54-
if not task_scheduled:
55-
break
56-
57-
if task_completed is not None:
58-
return Task(
59-
is_completed=True,
60-
is_faulted=False,
61-
action=new_action,
62-
is_played=task_completed._is_played,
63-
result=parse_history_event(task_completed),
64-
timestamp=task_completed.timestamp,
65-
id_=task_completed.TaskScheduledId)
66-
67-
if task_failed and task_retry_timer and attempt + 1 >= \
68-
retry_options.max_number_of_attempts:
69-
return Task(
70-
is_completed=True,
71-
is_faulted=True,
72-
action=new_action,
73-
is_played=task_failed._is_played,
74-
result=task_failed.Reason,
75-
timestamp=task_failed.timestamp,
76-
id_=task_failed.TaskScheduledId,
77-
exc=Exception(
78-
f"{task_failed.Reason} \n {task_failed.Details}")
79-
)
80-
81-
return Task(is_completed=False, is_faulted=False, action=new_action)
41+
return get_retried_task(
42+
state=state,
43+
max_number_of_attempts=retry_options.max_number_of_attempts,
44+
scheduled_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_CREATED,
45+
completed_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED,
46+
failed_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_FAILED,
47+
action=new_action
48+
)

azure/durable_functions/tasks/task_utilities.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from azure.functions._durable_functions import _deserialize_custom_object
55
from datetime import datetime
66
from typing import List, Optional
7+
from ..models.actions.Action import Action
8+
from ..models.Task import Task
79

810

911
def should_suspend(partial_result) -> bool:
@@ -410,3 +412,95 @@ def should_preserve(event: HistoryEvent) -> bool:
410412
# We should try to refactor this logic at some point
411413
event = matches[0]
412414
return event
415+
416+
417+
def get_retried_task(
418+
state: List[HistoryEvent], max_number_of_attempts: int, scheduled_type: HistoryEventType,
419+
completed_type: HistoryEventType, failed_type: HistoryEventType, action: Action) -> Task:
420+
"""Determine the state of scheduling some task for execution with retry options.
421+
422+
Parameters
423+
----------
424+
state: List[HistoryEvent]
425+
The list of history events
426+
max_number_of_ints: int
427+
The maximum number of retrying attempts
428+
scheduled_type: HistoryEventType
429+
The event type corresponding to scheduling the searched-for task
430+
completed_type: HistoryEventType
431+
The event type corresponding to a completion of the searched-for task
432+
failed_type: HistoryEventType
433+
The event type coresponding to the failure of the searched-for task
434+
action: Action
435+
The action corresponding to the searched-for task
436+
437+
Returns
438+
-------
439+
Task
440+
A Task encompassing the state of the scheduled work item, that is,
441+
either completed, failed, or incomplete.
442+
"""
443+
# tasks to look for in the state array
444+
scheduled_task, completed_task = None, None
445+
failed_task, scheduled_timer_task = None, None
446+
attempt = 1
447+
448+
# Note each case below is exclusive, and the order matters
449+
for event in state:
450+
event_type = HistoryEventType(event.event_type)
451+
452+
# Skip processed events
453+
if event.is_processed:
454+
continue
455+
456+
# first we find the scheduled_task
457+
elif scheduled_task is None:
458+
if event_type is scheduled_type:
459+
scheduled_task = event
460+
461+
# if the task has a correponding completion, we process the events
462+
# and return a completed task
463+
elif event_type == completed_type and \
464+
event.TaskScheduledId == scheduled_task.event_id:
465+
completed_task = event
466+
set_processed([scheduled_task, completed_task])
467+
return Task(
468+
is_completed=True,
469+
is_faulted=False,
470+
action=action,
471+
result=parse_history_event(completed_task),
472+
timestamp=completed_task.timestamp,
473+
id_=completed_task.TaskScheduledId
474+
)
475+
476+
# if its failed, we'll have to wait for an upcoming timer scheduled
477+
elif failed_task is None:
478+
if event_type is failed_type:
479+
if event.TaskScheduledId == scheduled_task.event_id:
480+
failed_task = event
481+
482+
# if we have a timer scheduled, we'll have to find a timer fired
483+
elif scheduled_timer_task is None:
484+
if event_type is HistoryEventType.TIMER_CREATED:
485+
scheduled_timer_task = event
486+
487+
# if we have a timer fired, we check if we still have more attempts for retries.
488+
# If so, we retry again and clear our found events so far.
489+
# If not, we process the events and return a completed task
490+
elif event_type is HistoryEventType.TIMER_FIRED:
491+
if event.TimerId == scheduled_timer_task.event_id:
492+
set_processed([scheduled_task, completed_task, failed_task, scheduled_timer_task])
493+
if attempt >= max_number_of_attempts:
494+
return Task(
495+
is_completed=True,
496+
is_faulted=True,
497+
action=action,
498+
timestamp=failed_task.timestamp,
499+
id_=failed_task.TaskScheduledId,
500+
exc=Exception(
501+
f"{failed_task.Reason} \n {failed_task.Details}")
502+
)
503+
else:
504+
scheduled_task, failed_task, scheduled_timer_task = None, None, None
505+
attempt += 1
506+
return Task(is_completed=False, is_faulted=False, action=action)

0 commit comments

Comments
 (0)