Skip to content

Commit 5a5ba25

Browse files
authored
Merge pull request #313 from Azure/dev
Promote dev to main for 1.1.0 release
2 parents f3e17e4 + d2786e0 commit 5a5ba25

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1090
-1872
lines changed

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 204 additions & 55 deletions
Large diffs are not rendered by default.
Lines changed: 308 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,328 @@
1-
from datetime import datetime
1+
from azure.durable_functions.models.actions.NoOpAction import NoOpAction
2+
from azure.durable_functions.models.actions.CompoundAction import CompoundAction
3+
from azure.durable_functions.models.RetryOptions import RetryOptions
4+
from azure.durable_functions.models.ReplaySchema import ReplaySchema
5+
from azure.durable_functions.models.actions.Action import Action
6+
from azure.durable_functions.models.actions.WhenAnyAction import WhenAnyAction
7+
from azure.durable_functions.models.actions.WhenAllAction import WhenAllAction
28

3-
from .actions.Action import Action
9+
import enum
10+
from typing import Any, List, Optional, Set, Type, Union
411

512

6-
class Task:
7-
"""Represents some pending action.
13+
class TaskState(enum.Enum):
14+
"""The possible states that a Task can be in."""
815

9-
Similar to a native JavaScript promise in
10-
that it acts as a placeholder for outstanding asynchronous work, but has
11-
a synchronous implementation and is specific to Durable Functions.
16+
RUNNING = 0
17+
SUCCEEDED = 1
18+
FAILED = 2
1219

13-
Tasks are only returned to an orchestration function when a
14-
[[DurableOrchestrationContext]] operation is not called with `yield`. They
15-
are useful for parallelization and timeout operations in conjunction with
16-
Task.all and Task.any.
20+
21+
class TaskBase:
22+
"""The base class of all Tasks.
23+
24+
Contains shared logic that drives all of its sub-classes. Should never be
25+
instantiated on its own.
1726
"""
1827

19-
def __init__(self, is_completed, is_faulted, action,
20-
result=None, timestamp=None, id_=None, exc=None, is_played=False):
21-
self._is_completed: bool = is_completed
22-
self._is_faulted: bool = is_faulted
23-
self._action: Action = action
24-
self._result = result
25-
self._timestamp: datetime = timestamp
26-
self._id = id_
27-
self._exception = exc
28-
self._is_played = is_played
29-
self._is_yielded: bool = False
30-
31-
@property
32-
def is_completed(self) -> bool:
33-
"""Get indicator whether the task has completed.
34-
35-
Note that completion is not equivalent to success.
28+
def __init__(self, id_: Union[int, str], actions: Union[List[Action], Action]):
29+
"""Initialize the TaskBase.
30+
31+
Parameters
32+
----------
33+
id_ : int
34+
An ID for the task
35+
actions : List[Any]
36+
The list of DF actions representing this Task.
37+
Needed for reconstruction in the extension.
3638
"""
37-
return self._is_completed
39+
self.id: Union[int, str] = id_
40+
self.state = TaskState.RUNNING
41+
self.parent: Optional[CompoundTask] = None
42+
self._api_name: str
43+
44+
api_action: Union[Action, Type[CompoundAction]]
45+
if isinstance(actions, list):
46+
if len(actions) == 1:
47+
api_action = actions[0]
48+
else:
49+
api_action = CompoundAction
50+
else:
51+
api_action = actions
3852

39-
@property
40-
def is_faulted(self) -> bool:
41-
"""Get indicator whether the task faulted in some way due to error."""
42-
return self._is_faulted
53+
self._api_name = api_action.__class__.__name__
4354

44-
@property
45-
def action(self) -> Action:
46-
"""Get the scheduled action represented by the task.
55+
self.result: Any = None
56+
self.action_repr: Union[List[Action], Action] = actions
57+
self.is_played = False
4758

48-
_Internal use only._
59+
def set_is_played(self, is_played: bool):
60+
"""Set the is_played flag for the Task.
61+
62+
Needed for updating the orchestrator's is_replaying flag.
63+
64+
Parameters
65+
----------
66+
is_played : bool
67+
Whether the latest event for this Task has been played before.
4968
"""
50-
return self._action
69+
self.is_played = is_played
70+
71+
def change_state(self, state: TaskState):
72+
"""Transition a running Task to a terminal state: success or failure.
5173
52-
@property
53-
def result(self) -> object:
54-
"""Get the result of the task, if completed. Otherwise `None`."""
55-
return self._result
74+
Parameters
75+
----------
76+
state : TaskState
77+
The terminal state to assign to this Task
5678
57-
@property
58-
def timestamp(self) -> datetime:
59-
"""Get the timestamp of the task."""
60-
return self._timestamp
79+
Raises
80+
------
81+
Exception
82+
When the input state is RUNNING
83+
"""
84+
if state is TaskState.RUNNING:
85+
raise Exception("Cannot change Task to the RUNNING state.")
86+
self.state = state
87+
88+
def set_value(self, is_error: bool, value: Any):
89+
"""Set the value of this Task: either an exception of a result.
6190
62-
@property
63-
def id(self):
64-
"""Get the ID number of the task.
91+
Parameters
92+
----------
93+
is_error : bool
94+
Whether the value represents an exception of a result.
95+
value : Any
96+
The value of this Task
6597
66-
_Internal use only._
98+
Raises
99+
------
100+
Exception
101+
When the Task failed but its value was not an Exception
67102
"""
68-
return self._id
103+
new_state = self.state
104+
if is_error:
105+
if not isinstance(value, Exception):
106+
if not (isinstance(value, TaskBase) and isinstance(value.result, Exception)):
107+
err_message = f"Task ID {self.id} failed but it's value was not an Exception"
108+
raise Exception(err_message)
109+
new_state = TaskState.FAILED
110+
else:
111+
new_state = TaskState.SUCCEEDED
112+
self.change_state(new_state)
113+
self.result = value
114+
self.propagate()
115+
116+
def propagate(self):
117+
"""Notify parent Task of this Task's state change."""
118+
has_completed = not (self.state is TaskState.RUNNING)
119+
has_parent = not (self.parent is None)
120+
if has_completed and has_parent:
121+
self.parent.handle_completion(self)
122+
123+
124+
class CompoundTask(TaskBase):
125+
"""A Task of Tasks.
126+
127+
Contains shared logic that drives all of its sub-classes.
128+
Should never be instantiated on its own.
129+
"""
69130

70-
@property
71-
def exception(self):
72-
"""Get the error thrown when attempting to perform the task's action.
131+
def __init__(self, tasks: List[TaskBase], compound_action_constructor=None):
132+
"""Instantiate CompoundTask attributes.
73133
74-
If the Task has not yet completed or has completed successfully, `None`
134+
Parameters
135+
----------
136+
tasks : List[Task]
137+
The children/sub-tasks of this Task
138+
compound_action_constructor : Union[WhenAllAction, WhenAnyAction, None]
139+
Either None or, a WhenAllAction or WhenAnyAction constructor.
140+
It is None when using the V1 replay protocol, where no Compound Action
141+
objects size and compound actions are represented as arrays of actions.
142+
It is not None when using the V2 replay protocol.
75143
"""
76-
return self._exception
144+
super().__init__(-1, [])
145+
child_actions = []
146+
for task in tasks:
147+
task.parent = self
148+
action_repr = task.action_repr
149+
if isinstance(action_repr, list):
150+
child_actions.extend(action_repr)
151+
else:
152+
child_actions.append(action_repr)
153+
if compound_action_constructor is None:
154+
self.action_repr = child_actions
155+
else: # replay_schema is ReplaySchema.V2
156+
self.action_repr = compound_action_constructor(child_actions)
157+
self._first_error: Optional[Exception] = None
158+
self.pending_tasks: Set[TaskBase] = set(tasks)
159+
self.completed_tasks: List[TaskBase] = []
160+
self.children = tasks
161+
162+
def handle_completion(self, child: TaskBase):
163+
"""Manage sub-task completion events.
164+
165+
Parameters
166+
----------
167+
child : TaskBase
168+
The sub-task that completed
169+
170+
Raises
171+
------
172+
Exception
173+
When the calling sub-task was not registered
174+
with this Task's pending sub-tasks.
175+
"""
176+
try:
177+
self.pending_tasks.remove(child)
178+
except KeyError:
179+
raise Exception(
180+
f"Parent Task {self.id} does not have pending sub-task with ID {child.id}."
181+
f"This most likely means that Task {child.id} completed twice.")
182+
183+
self.completed_tasks.append(child)
184+
self.set_is_played(child.is_played)
185+
self.try_set_value(child)
186+
187+
def try_set_value(self, child: TaskBase):
188+
"""Transition a CompoundTask to a terminal state and set its value.
189+
190+
Should be implemented by sub-classes.
191+
192+
Parameters
193+
----------
194+
child : TaskBase
195+
A sub-task that just completed
196+
197+
Raises
198+
------
199+
NotImplementedError
200+
This method needs to be implemented by each subclass.
201+
"""
202+
raise NotImplementedError
203+
204+
205+
class AtomicTask(TaskBase):
206+
"""A Task with no subtasks."""
207+
208+
pass
209+
210+
211+
class WhenAllTask(CompoundTask):
212+
"""A Task representing `when_all` scenarios."""
213+
214+
def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
215+
"""Initialize a WhenAllTask.
216+
217+
Parameters
218+
----------
219+
task : List[Task]
220+
The list of child tasks
221+
replay_schema : ReplaySchema
222+
The ReplaySchema, which determines the inner action payload representation
223+
"""
224+
compound_action_constructor = None
225+
if replay_schema is ReplaySchema.V2:
226+
compound_action_constructor = WhenAllAction
227+
super().__init__(task, compound_action_constructor)
228+
229+
def try_set_value(self, child: TaskBase):
230+
"""Transition a WhenAll Task to a terminal state and set its value.
231+
232+
Parameters
233+
----------
234+
child : TaskBase
235+
A sub-task that just completed
236+
"""
237+
if child.state is TaskState.SUCCEEDED:
238+
# A WhenAll Task only completes when it has no pending tasks
239+
# i.e _when all_ of its children have completed
240+
if len(self.pending_tasks) == 0:
241+
results = list(map(lambda x: x.result, self.completed_tasks))
242+
self.set_value(is_error=False, value=results)
243+
else: # child.state is TaskState.FAILED:
244+
# a single error is sufficient to fail this task
245+
if self._first_error is None:
246+
self._first_error = child.result
247+
self.set_value(is_error=True, value=self._first_error)
248+
249+
250+
class WhenAnyTask(CompoundTask):
251+
"""A Task representing `when_any` scenarios."""
252+
253+
def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema):
254+
"""Initialize a WhenAnyTask.
255+
256+
Parameters
257+
----------
258+
task : List[Task]
259+
The list of child tasks
260+
replay_schema : ReplaySchema
261+
The ReplaySchema, which determines the inner action payload representation
262+
"""
263+
compound_action_constructor = None
264+
if replay_schema is ReplaySchema.V2:
265+
compound_action_constructor = WhenAnyAction
266+
super().__init__(task, compound_action_constructor)
267+
268+
def try_set_value(self, child: TaskBase):
269+
"""Transition a WhenAny Task to a terminal state and set its value.
270+
271+
Parameters
272+
----------
273+
child : TaskBase
274+
A sub-task that just completed
275+
"""
276+
if self.state is TaskState.RUNNING:
277+
self.set_value(is_error=False, value=child)
278+
279+
280+
class RetryAbleTask(WhenAllTask):
281+
"""A Task representing `with_retry` scenarios.
282+
283+
It inherits from WhenAllTask because retryable scenarios are Tasks
284+
with equivalent to WhenAll Tasks with dynamically increasing lists
285+
of children. At every failure, we add a Timer child and a Task child
286+
to the list of pending tasks.
287+
"""
288+
289+
def __init__(self, child: TaskBase, retry_options: RetryOptions, context):
290+
self.id_ = str(child.id) + "_retryable_proxy"
291+
tasks = [child]
292+
super().__init__(tasks, context._replay_schema)
293+
294+
self.retry_options = retry_options
295+
self.num_attempts = 1
296+
self.context = context
297+
self.actions = child.action_repr
298+
299+
def try_set_value(self, child: TaskBase):
300+
"""Transition a Retryable Task to a terminal state and set its value.
301+
302+
Parameters
303+
----------
304+
child : TaskBase
305+
A sub-task that just completed
306+
"""
307+
if child.state is TaskState.SUCCEEDED:
308+
if len(self.pending_tasks) == 0:
309+
# if all pending tasks have completed,
310+
# and we have a successful child, then
311+
# we can set the Task's event
312+
self.set_value(is_error=False, value=child.result)
313+
314+
else: # child.state is TaskState.FAILED:
315+
if self.num_attempts >= self.retry_options.max_number_of_attempts:
316+
# we have reached the maximum number of attempts, set error
317+
self.set_value(is_error=True, value=child.result)
318+
else:
319+
# still have some retries left.
320+
# increase size of pending tasks by adding a timer task
321+
# and then re-scheduling the current task after that
322+
timer_task = self.context._generate_task(action=NoOpAction(), parent=self)
323+
self.pending_tasks.add(timer_task)
324+
self.context._add_to_open_tasks(timer_task)
325+
rescheduled_task = self.context._generate_task(action=NoOpAction(), parent=self)
326+
self.pending_tasks.add(rescheduled_task)
327+
self.context._add_to_open_tasks(rescheduled_task)
328+
self.num_attempts += 1

0 commit comments

Comments
 (0)