Skip to content

Commit e70c357

Browse files
authored
Sub-orchestrators now available (#157)
1 parent 351dd93 commit e70c357

25 files changed

+984
-3
lines changed

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from ..models.Task import Task
1010
from ..models.TokenSource import TokenSource
1111
from ..tasks import call_activity_task, task_all, task_any, call_activity_with_retry_task, \
12-
wait_for_external_event_task, continue_as_new, new_uuid, call_http, create_timer_task
12+
wait_for_external_event_task, continue_as_new, new_uuid, call_http, create_timer_task, \
13+
call_sub_orchestrator_task, call_sub_orchestrator_with_retry_task
1314
from azure.functions._durable_functions import _deserialize_custom_object
1415

1516

@@ -30,6 +31,7 @@ def __init__(self,
3031
self._parent_instance_id: str = parentInstanceId
3132
self._custom_status: Any = None
3233
self._new_uuid_counter: int = 0
34+
self._sub_orchestrator_counter: int = 0
3335
self.call_activity = lambda n, i=None: call_activity_task(
3436
state=self.histories,
3537
name=n,
@@ -40,6 +42,21 @@ def __init__(self,
4042
retry_options=o,
4143
name=n,
4244
input_=i)
45+
self.call_sub_orchestrator = \
46+
lambda n, i=None, _id=None: call_sub_orchestrator_task(
47+
context=self,
48+
state=self.histories,
49+
name=n,
50+
input_=i,
51+
instance_id=_id)
52+
self.call_sub_orchestrator_with_retry = \
53+
lambda n, o, i=None, _id=None: call_sub_orchestrator_with_retry_task(
54+
context=self,
55+
state=self.histories,
56+
retry_options=o,
57+
name=n,
58+
input_=i,
59+
instance_id=_id)
4360
self.call_http = lambda method, uri, content=None, headers=None, token_source=None: \
4461
call_http(
4562
state=self.histories, method=method, uri=uri, content=content, headers=headers,
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from typing import Any, Dict, Optional, Union
2+
3+
from .Action import Action
4+
from .ActionType import ActionType
5+
from ..utils.json_utils import add_attrib
6+
from json import dumps
7+
from azure.functions._durable_functions import _serialize_custom_object
8+
9+
10+
class CallSubOrchestratorAction(Action):
11+
"""Defines the structure of the Call SubOrchestrator object."""
12+
13+
def __init__(self, function_name: str, _input: Optional[Any] = None, instance_id: str = ""):
14+
self.function_name: str = function_name
15+
self._input: str = dumps(_input, default=_serialize_custom_object)
16+
self.instance_id: str = instance_id
17+
18+
if not self.function_name:
19+
raise ValueError("function_name cannot be empty")
20+
21+
@property
22+
def action_type(self) -> int:
23+
"""Get the type of action this class represents."""
24+
return ActionType.CALL_SUB_ORCHESTRATOR
25+
26+
def to_json(self) -> Dict[str, Union[str, int]]:
27+
"""Convert object into a json dictionary.
28+
29+
Returns
30+
-------
31+
Dict[str, Union(str, int)]
32+
The instance of the class converted into a json dictionary
33+
"""
34+
json_dict: Dict[str, Union[str, int]] = {}
35+
add_attrib(json_dict, self, 'action_type', 'actionType')
36+
add_attrib(json_dict, self, 'function_name', 'functionName')
37+
add_attrib(json_dict, self, '_input', 'input')
38+
add_attrib(json_dict, self, 'instance_id', 'instance_id')
39+
return json_dict
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from typing import Any, Dict, Union, Optional
2+
3+
from .Action import Action
4+
from .ActionType import ActionType
5+
from ..utils.json_utils import add_attrib, add_json_attrib
6+
from json import dumps
7+
from ..RetryOptions import RetryOptions
8+
from azure.functions._durable_functions import _serialize_custom_object
9+
10+
11+
class CallSubOrchestratorWithRetryAction(Action):
12+
"""Defines the structure of the Call SubOrchestrator object."""
13+
14+
def __init__(self, function_name: str, retry_options: RetryOptions,
15+
_input: Optional[Any] = None,
16+
instance_id: str = ""):
17+
self.function_name: str = function_name
18+
self._input: str = dumps(_input, default=_serialize_custom_object)
19+
self.retry_options: RetryOptions = retry_options
20+
self.instance_id: str = instance_id
21+
22+
if not self.function_name:
23+
raise ValueError("function_name cannot be empty")
24+
25+
@property
26+
def action_type(self) -> int:
27+
"""Get the type of action this class represents."""
28+
return ActionType.CALL_SUB_ORCHESTRATOR_WITH_RETRY
29+
30+
def to_json(self) -> Dict[str, Union[str, int]]:
31+
"""Convert object into a json dictionary.
32+
33+
Returns
34+
-------
35+
Dict[str, Union(str, int)]
36+
The instance of the class converted into a json dictionary
37+
"""
38+
json_dict: Dict[str, Union[str, int]] = {}
39+
add_attrib(json_dict, self, 'action_type', 'actionType')
40+
add_attrib(json_dict, self, 'function_name', 'functionName')
41+
add_attrib(json_dict, self, '_input', 'input')
42+
add_json_attrib(json_dict, self, 'retry_options', 'retryOptions')
43+
add_attrib(json_dict, self, 'instance_id', 'instance_id')
44+
return json_dict

azure/durable_functions/models/actions/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .ActionType import ActionType
44
from .CallActivityAction import CallActivityAction
55
from .CallActivityWithRetryAction import CallActivityWithRetryAction
6+
from .CallSubOrchestratorAction import CallSubOrchestratorAction
67
from .WaitForExternalEventAction import WaitForExternalEventAction
78
from .CallHttpAction import CallHttpAction
89
from .CreateTimerAction import CreateTimerAction
@@ -12,6 +13,7 @@
1213
'ActionType',
1314
'CallActivityAction',
1415
'CallActivityWithRetryAction',
16+
'CallSubOrchestratorAction',
1517
'CallHttpAction',
1618
'WaitForExternalEventAction',
1719
'CreateTimerAction'

azure/durable_functions/tasks/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""Contains the definitions for the functions that enable scheduling of activities."""
22
from .call_activity import call_activity_task
33
from .call_activity_with_retry import call_activity_with_retry_task
4+
from .call_suborchestrator import call_sub_orchestrator_task
5+
from .call_suborchestrator_with_retry import call_sub_orchestrator_with_retry_task
46
from .task_all import task_all
57
from .task_any import task_any
68
from .task_utilities import should_suspend
@@ -13,6 +15,8 @@
1315
__all__ = [
1416
'call_activity_task',
1517
'call_activity_with_retry_task',
18+
'call_sub_orchestrator_task',
19+
'call_sub_orchestrator_with_retry_task',
1620
'call_http',
1721
'continue_as_new',
1822
'new_uuid',
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from typing import List, Any, Optional
2+
3+
from ..models.Task import (
4+
Task)
5+
from ..models.actions.CallSubOrchestratorAction import CallSubOrchestratorAction
6+
from ..models.history import HistoryEvent
7+
from .task_utilities import set_processed, parse_history_event, \
8+
find_sub_orchestration_created, find_sub_orchestration_completed, \
9+
find_sub_orchestration_failed
10+
11+
12+
def call_sub_orchestrator_task(
13+
context,
14+
state: List[HistoryEvent],
15+
name: str,
16+
input_: Optional[Any] = None,
17+
instance_id: str = "") -> Task:
18+
"""Determine the state of Scheduling a sub-orchestrator for execution.
19+
20+
Parameters
21+
----------
22+
context: 'DurableOrchestrationContext':
23+
A reference to the orchestration context.
24+
state: List[HistoryEvent]
25+
The list of history events to search to determine the current state of the activity.
26+
name: str
27+
The name of the activity function to schedule.
28+
input_: Optional[Any]
29+
The JSON-serializable input to pass to the activity function. Defaults to None.
30+
instance_id: str
31+
The instance ID of the sub-orchestrator to call. Defaults to "".
32+
33+
Returns
34+
-------
35+
Task
36+
A Durable Task that completes when the called sub-orchestrator completes or fails.
37+
"""
38+
new_action = CallSubOrchestratorAction(name, input_, instance_id)
39+
40+
task_scheduled = find_sub_orchestration_created(
41+
state, name, context=context, instance_id=instance_id)
42+
task_completed = find_sub_orchestration_completed(state, task_scheduled)
43+
task_failed = find_sub_orchestration_failed(state, task_scheduled)
44+
set_processed([task_scheduled, task_completed, task_failed])
45+
46+
if task_completed is not None:
47+
return Task(
48+
is_completed=True,
49+
is_faulted=False,
50+
action=new_action,
51+
result=parse_history_event(task_completed),
52+
timestamp=task_completed.timestamp,
53+
id_=task_completed.TaskScheduledId)
54+
55+
if task_failed is not None:
56+
return Task(
57+
is_completed=True,
58+
is_faulted=True,
59+
action=new_action,
60+
result=task_failed.Reason,
61+
timestamp=task_failed.timestamp,
62+
id_=task_failed.TaskScheduledId,
63+
exc=Exception(
64+
f"{task_failed.Reason} \n {task_failed.Details}")
65+
)
66+
67+
return Task(is_completed=False, is_faulted=False, action=new_action)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from typing import List, Any, Optional
2+
3+
from ..models.Task import (
4+
Task)
5+
from ..models.actions.CallSubOrchestratorWithRetryAction import CallSubOrchestratorWithRetryAction
6+
from ..models.RetryOptions import RetryOptions
7+
from ..models.history import HistoryEvent
8+
from .task_utilities import set_processed, parse_history_event, \
9+
find_sub_orchestration_created, find_sub_orchestration_completed, \
10+
find_sub_orchestration_failed, find_task_retry_timer_fired, find_task_retry_timer_created
11+
12+
13+
def call_sub_orchestrator_with_retry_task(
14+
context,
15+
state: List[HistoryEvent],
16+
retry_options: RetryOptions,
17+
name: str,
18+
input_: Optional[Any] = None,
19+
instance_id: str = "") -> Task:
20+
"""Determine the state of Scheduling a sub-orchestrator for execution, with retry options.
21+
22+
Parameters
23+
----------
24+
context: 'DurableOrchestrationContext':
25+
A reference to the orchestration context.
26+
state: List[HistoryEvent]
27+
The list of history events to search to determine the current state of the activity.
28+
retry_options: RetryOptions
29+
The settings for retrying this sub-orchestrator in case of a failure.
30+
name: str
31+
The name of the activity function to schedule.
32+
input_: Optional[Any]
33+
The JSON-serializable input to pass to the activity function. Defaults to None.
34+
instance_id: str
35+
The instance ID of the sub-orchestrator to call. Defaults to "".
36+
37+
Returns
38+
-------
39+
Task
40+
A Durable Task that completes when the called sub-orchestrator completes or fails.
41+
"""
42+
new_action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id)
43+
for attempt in range(retry_options.max_number_of_attempts):
44+
task_scheduled = find_sub_orchestration_created(
45+
state, name, context=context, instance_id=instance_id)
46+
task_completed = find_sub_orchestration_completed(state, task_scheduled)
47+
task_failed = find_sub_orchestration_failed(state, task_scheduled)
48+
task_retry_timer = find_task_retry_timer_created(state, task_failed)
49+
task_retry_timer_fired = find_task_retry_timer_fired(
50+
state, task_retry_timer)
51+
set_processed([task_scheduled, task_completed,
52+
task_failed, task_retry_timer, task_retry_timer_fired])
53+
54+
if not task_scheduled:
55+
break
56+
57+
if task_completed is not None:
58+
return Task(
59+
is_completed=True,
60+
is_faulted=False,
61+
action=new_action,
62+
result=parse_history_event(task_completed),
63+
timestamp=task_completed.timestamp,
64+
id_=task_completed.TaskScheduledId)
65+
66+
if task_failed and task_retry_timer and attempt + 1 >= \
67+
retry_options.max_number_of_attempts:
68+
return Task(
69+
is_completed=True,
70+
is_faulted=True,
71+
action=new_action,
72+
result=task_failed.Reason,
73+
timestamp=task_failed.timestamp,
74+
id_=task_failed.TaskScheduledId,
75+
exc=Exception(
76+
f"{task_failed.Reason} \n {task_failed.Details}")
77+
)
78+
79+
return Task(is_completed=False, is_faulted=False, action=new_action)

0 commit comments

Comments
 (0)