Skip to content

Commit aae83c9

Browse files
authored
ContinueAsNew works, needed to mark the orchestrator as completed (#158)
1 parent 8d703a3 commit aae83c9

File tree

4 files changed

+69
-54
lines changed

4 files changed

+69
-54
lines changed

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(self,
3232
self._custom_status: Any = None
3333
self._new_uuid_counter: int = 0
3434
self._sub_orchestrator_counter: int = 0
35+
self._continue_as_new_flag: bool = False
3536
self.call_activity = lambda n, i=None: call_activity_task(
3637
state=self.histories,
3738
name=n,
@@ -65,7 +66,7 @@ def __init__(self,
6566
state=self.histories,
6667
name=n)
6768
self.new_uuid = lambda: new_uuid(context=self)
68-
self.continue_as_new = lambda i: continue_as_new(input_=i)
69+
self.continue_as_new = lambda i: continue_as_new(context=self, input_=i)
6970
self.task_any = lambda t: task_any(tasks=t)
7071
self.task_all = lambda t: task_all(tasks=t)
7172
self.create_timer = lambda d: create_timer_task(state=self.histories, fire_at=d)
@@ -344,3 +345,8 @@ def function_context(self) -> FunctionContext:
344345
Object containing function level attributes not used by durable orchestrator.
345346
"""
346347
return self._function_context
348+
349+
@property
350+
def will_continue_as_new(self) -> bool:
351+
"""Return true if continue_as_new was called."""
352+
return self._continue_as_new_flag

azure/durable_functions/orchestrator.py

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -49,57 +49,66 @@ def handle(self, context: DurableOrchestrationContext):
4949
suspended = False
5050

5151
fn_output = self.fn(self.durable_context)
52+
5253
# If `fn_output` is not an Iterator, then the orchestrator
5354
# function does not make use of its context parameter. If so,
5455
# `fn_output` is the return value instead of a generator
55-
if isinstance(fn_output, Iterator):
56-
self.generator = fn_output
57-
58-
else:
56+
if not isinstance(fn_output, Iterator):
5957
orchestration_state = OrchestratorState(
6058
is_done=True,
6159
output=fn_output,
6260
actions=self.durable_context.actions,
6361
custom_status=self.durable_context.custom_status)
64-
return orchestration_state.to_json_string()
65-
try:
66-
generation_state = self._generate_next(None)
67-
68-
while not suspended:
69-
self._add_to_actions(generation_state)
70-
71-
if should_suspend(generation_state):
72-
orchestration_state = OrchestratorState(
73-
is_done=False,
74-
output=None,
75-
actions=self.durable_context.actions,
76-
custom_status=self.durable_context.custom_status)
77-
suspended = True
78-
continue
79-
80-
if (isinstance(generation_state, Task)
81-
or isinstance(generation_state, TaskSet)) and (
82-
generation_state.is_faulted):
83-
generation_state = self.generator.throw(
84-
generation_state.exception)
85-
continue
86-
87-
self._reset_timestamp()
88-
generation_state = self._generate_next(generation_state)
89-
90-
except StopIteration as sie:
91-
orchestration_state = OrchestratorState(
92-
is_done=True,
93-
output=sie.value,
94-
actions=self.durable_context.actions,
95-
custom_status=self.durable_context.custom_status)
96-
except Exception as e:
97-
orchestration_state = OrchestratorState(
98-
is_done=False,
99-
output=None, # Should have no output, after generation range
100-
actions=self.durable_context.actions,
101-
error=str(e),
102-
custom_status=self.durable_context.custom_status)
62+
63+
else:
64+
self.generator = fn_output
65+
try:
66+
generation_state = self._generate_next(None)
67+
68+
while not suspended:
69+
self._add_to_actions(generation_state)
70+
71+
if should_suspend(generation_state):
72+
73+
# The `is_done` field should be False here unless
74+
# `continue_as_new` was called. Therefore,
75+
# `will_continue_as_new` essentially "tracks"
76+
# whether or not the orchestration is done.
77+
orchestration_state = OrchestratorState(
78+
is_done=self.durable_context.will_continue_as_new,
79+
output=None,
80+
actions=self.durable_context.actions,
81+
custom_status=self.durable_context.custom_status)
82+
suspended = True
83+
continue
84+
85+
if (isinstance(generation_state, Task)
86+
or isinstance(generation_state, TaskSet)) and (
87+
generation_state.is_faulted):
88+
generation_state = self.generator.throw(
89+
generation_state.exception)
90+
continue
91+
92+
self._reset_timestamp()
93+
generation_state = self._generate_next(generation_state)
94+
95+
except StopIteration as sie:
96+
orchestration_state = OrchestratorState(
97+
is_done=True,
98+
output=sie.value,
99+
actions=self.durable_context.actions,
100+
custom_status=self.durable_context.custom_status)
101+
except Exception as e:
102+
orchestration_state = OrchestratorState(
103+
is_done=False,
104+
output=None, # Should have no output, after generation range
105+
actions=self.durable_context.actions,
106+
error=str(e),
107+
custom_status=self.durable_context.custom_status)
108+
109+
# No output if continue_as_new was called
110+
if self.durable_context.will_continue_as_new:
111+
orchestration_state._output = None
103112

104113
return orchestration_state.to_json_string()
105114

@@ -108,9 +117,13 @@ def _generate_next(self, partial_result):
108117
gen_result = self.generator.send(partial_result.result)
109118
else:
110119
gen_result = self.generator.send(None)
120+
111121
return gen_result
112122

113123
def _add_to_actions(self, generation_state):
124+
# Do not add new tasks to action if continue_as_new was called
125+
if self.durable_context.will_continue_as_new:
126+
return
114127
if (isinstance(generation_state, Task)
115128
and hasattr(generation_state, "action")):
116129
self.durable_context.actions.append([generation_state.action])
Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,19 @@
11
from typing import Any
22

3-
from ..models.Task import (
4-
Task)
53
from ..models.actions.ContinueAsNewAction import ContinueAsNewAction
64

75

86
def continue_as_new(
9-
input_: Any = None) -> Task:
7+
context,
8+
input_: Any = None):
109
"""Create a new continue as new action.
1110
1211
Parameters
1312
----------
1413
input_: Any
1514
The JSON-serializable input to pass to the activity function.
16-
17-
Returns
18-
-------
19-
Task
20-
A Durable Task that causes the orchestrator reset and start as a new orchestration.
2115
"""
2216
new_action = ContinueAsNewAction(input_)
2317

24-
return Task(is_completed=False, is_faulted=False, action=new_action)
18+
context.actions.append([new_action])
19+
context._continue_as_new_flag = True

tests/orchestrator/test_continue_as_new.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
def generator_function(context):
1212
yield context.call_activity("Hello", "Tokyo")
13-
yield context.continue_as_new("Cause I can")
13+
context.continue_as_new("Cause I can")
1414

1515

1616
def base_expected_state(output=None) -> OrchestratorState:
@@ -25,6 +25,7 @@ def add_hello_action(state: OrchestratorState, input_: str):
2525
def add_continue_as_new_action(state: OrchestratorState, input_: str):
2626
action = ContinueAsNewAction(input_=input_)
2727
state.actions.append([action])
28+
state._is_done = True
2829

2930

3031
def add_hello_completed_events(

0 commit comments

Comments
 (0)