Skip to content

Commit 34472ac

Browse files
authored
Make -WithRetry APIs wait for TimerFired after final task failure (#339)
1 parent 2f679e9 commit 34472ac

File tree

2 files changed

+68
-18
lines changed

2 files changed

+68
-18
lines changed

azure/durable_functions/models/Task.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ def __init__(self, child: TaskBase, retry_options: RetryOptions, context):
347347
self.context = context
348348
self.actions = child.action_repr
349349
self.is_waiting_on_timer = False
350+
self.error = None
350351

351352
@property
352353
def id_(self):
@@ -373,10 +374,21 @@ def try_set_value(self, child: TaskBase):
373374
if self.is_waiting_on_timer:
374375
# timer fired, re-scheduling original task
375376
self.is_waiting_on_timer = False
376-
rescheduled_task = self.context._generate_task(
377-
action=NoOpAction("rescheduled task"), parent=self)
378-
self.pending_tasks.add(rescheduled_task)
379-
self.context._add_to_open_tasks(rescheduled_task)
377+
# As per DTFx semantics: we need to check the number of retires only after the final
378+
# timer has fired. This means we essentially have to wait for one "extra" timer after
379+
# the maximum number of attempts has been reached. Removing this extra timer will cause
380+
# stuck orchestrators as we need to be "in sync" with the replay logic of DTFx.
381+
if self.num_attempts >= self.retry_options.max_number_of_attempts:
382+
self.is_waiting_on_timer = True
383+
# we have reached the maximum number of attempts, set error
384+
self.set_value(is_error=True, value=self.error)
385+
else:
386+
rescheduled_task = self.context._generate_task(
387+
action=NoOpAction("rescheduled task"), parent=self)
388+
self.pending_tasks.add(rescheduled_task)
389+
self.context._add_to_open_tasks(rescheduled_task)
390+
self.num_attempts += 1
391+
380392
return
381393
if child.state is TaskState.SUCCEEDED:
382394
if len(self.pending_tasks) == 0:
@@ -386,17 +398,11 @@ def try_set_value(self, child: TaskBase):
386398
self.set_value(is_error=False, value=child.result)
387399

388400
else: # child.state is TaskState.FAILED:
389-
if self.num_attempts >= self.retry_options.max_number_of_attempts:
390-
# we have reached the maximum number of attempts, set error
391-
self.set_value(is_error=True, value=child.result)
392-
else:
393-
# still have some retries left.
394-
# increase size of pending tasks by adding a timer task
395-
# when it completes, we'll retry the original task
396-
timer_task = self.context._generate_task(
397-
action=NoOpAction("-WithRetry timer"), parent=self)
398-
self.pending_tasks.add(timer_task)
399-
self.context._add_to_open_tasks(timer_task)
400-
self.is_waiting_on_timer = True
401-
402-
self.num_attempts += 1
401+
# increase size of pending tasks by adding a timer task
402+
# when it completes, we'll retry the original task
403+
timer_task = self.context._generate_task(
404+
action=NoOpAction("-WithRetry timer"), parent=self)
405+
self.pending_tasks.add(timer_task)
406+
self.context._add_to_open_tasks(timer_task)
407+
self.is_waiting_on_timer = True
408+
self.error = child.result

tests/orchestrator/test_sequential_orchestrator_with_retry.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,19 @@ def generator_function(context):
2929

3030
return outputs
3131

32+
def generator_function_try_catch(context):
33+
outputs = []
34+
35+
retry_options = RETRY_OPTIONS
36+
result = None
37+
try:
38+
result = yield context.call_activity_with_retry(
39+
"Hello", retry_options, "Tokyo")
40+
except:
41+
result = yield context.call_activity_with_retry(
42+
"Hello", retry_options, "Seattle")
43+
return result
44+
3245
def generator_function_concurrent_retries(context):
3346
outputs = []
3447

@@ -305,6 +318,37 @@ def test_failed_tokyo_hit_max_attempts():
305318
expected_error_str = f"{error_msg}{error_label}{state_str}"
306319
assert expected_error_str == error_str
307320

321+
def test_failed_tokyo_hit_max_attempts_in_try_catch():
322+
# This test ensures that APIs can still be invoked after a failed CallActivityWithRetry invocation
323+
failed_reason = 'Reasons'
324+
failed_details = 'Stuff and Things'
325+
context_builder = ContextBuilder('test_simple_function')
326+
327+
# events for first task: "Hello Tokyo"
328+
add_hello_failed_events(context_builder, 0, failed_reason, failed_details)
329+
add_retry_timer_events(context_builder, 1)
330+
add_hello_failed_events(context_builder, 2, failed_reason, failed_details)
331+
add_retry_timer_events(context_builder, 3)
332+
add_hello_failed_events(context_builder, 4, failed_reason, failed_details)
333+
# we have an "extra" timer to wait for, due to legacy behavior in DTFx.
334+
add_retry_timer_events(context_builder, 5)
335+
336+
# events to task in except block
337+
add_hello_completed_events(context_builder, 6, "\"Hello Seattle!\"")
338+
339+
result = get_orchestration_state_result(
340+
context_builder, generator_function_try_catch)
341+
342+
expected_state = base_expected_state()
343+
add_hello_action(expected_state, 'Tokyo')
344+
add_hello_action(expected_state, 'Seattle')
345+
expected_state._output = "Hello Seattle!"
346+
expected_state._is_done = True
347+
expected = expected_state.to_json()
348+
349+
assert_valid_schema(result)
350+
assert_orchestration_state_equals(expected, result)
351+
308352
def test_concurrent_retriable_results():
309353
failed_reason = 'Reasons'
310354
failed_details = 'Stuff and Things'

0 commit comments

Comments
 (0)