Skip to content

Commit 28b4f35

Browse files
authored
Enable replay V2 (#291)
1 parent b03ca64 commit 28b4f35

27 files changed

+240
-48
lines changed

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from azure.durable_functions.models.ReplaySchema import ReplaySchema
12
import json
23
import datetime
34
import inspect
@@ -30,7 +31,7 @@ class DurableOrchestrationContext:
3031
# parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions
3132
def __init__(self,
3233
history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool,
33-
parentInstanceId: str, input: Any = None, **kwargs):
34+
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs):
3435
self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history]
3536
self._instance_id: str = instanceId
3637
self._is_replaying: bool = isReplaying
@@ -45,8 +46,11 @@ def __init__(self,
4546
self._current_utc_datetime: datetime.datetime = \
4647
self.decision_started_event.timestamp
4748
self._new_uuid_counter = 0
48-
self.actions: List[List[Action]] = []
4949
self._function_context: FunctionContext = FunctionContext(**kwargs)
50+
self._replay_schema = ReplaySchema(upperSchemaVersion)
51+
self.actions: List[List[Action]] = []
52+
if self._replay_schema == ReplaySchema.V2:
53+
self.actions.append([])
5054

5155
# make _input always a string
5256
# (consistent with Python Functions generic trigger/input bindings)
@@ -240,7 +244,7 @@ def task_all(self, activities: List[Task]) -> TaskSet:
240244
TaskSet
241245
The results of all activities.
242246
"""
243-
return task_all(tasks=activities)
247+
return task_all(tasks=activities, replay_schema=self._replay_schema)
244248

245249
def task_any(self, activities: List[Task]) -> TaskSet:
246250
"""Schedule the execution of all activities.
@@ -260,7 +264,7 @@ def task_any(self, activities: List[Task]) -> TaskSet:
260264
TaskSet
261265
The first [[Task]] instance to complete.
262266
"""
263-
return task_any(tasks=activities)
267+
return task_any(tasks=activities, replay_schema=self._replay_schema)
264268

265269
def set_custom_status(self, status: Any):
266270
"""Set the customized orchestration status for your orchestrator function.

azure/durable_functions/models/OrchestratorState.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import json
22
from typing import List, Any, Dict, Optional, Union
33

4+
from azure.durable_functions.models.ReplaySchema import ReplaySchema
5+
46
from .utils.json_utils import add_attrib
57
from azure.durable_functions.models.actions.Action import Action
68

@@ -16,13 +18,15 @@ def __init__(self,
1618
is_done: bool,
1719
actions: List[List[Action]],
1820
output: Any,
21+
replay_schema: ReplaySchema,
1922
error: str = None,
2023
custom_status: Any = None):
2124
self._is_done: bool = is_done
2225
self._actions: List[List[Action]] = actions
2326
self._output: Any = output
2427
self._error: Optional[str] = error
2528
self._custom_status: Any = custom_status
29+
self._replay_schema: ReplaySchema = replay_schema
2630

2731
@property
2832
def actions(self) -> List[List[Action]]:
@@ -66,6 +70,11 @@ def custom_status(self):
6670
"""Get the JSON-serializable value used by DurableOrchestrationContext.SetCustomStatus."""
6771
return self._custom_status
6872

73+
@property
74+
def schema_version(self):
75+
"""Get the Replay Schema represented in this OrchestratorState payload."""
76+
return self._replay_schema.value
77+
6978
def to_json(self) -> Dict[str, Union[str, int]]:
7079
"""Convert object into a json dictionary.
7180
@@ -76,6 +85,8 @@ def to_json(self) -> Dict[str, Union[str, int]]:
7685
"""
7786
json_dict: Dict[str, Union[str, int]] = {}
7887
add_attrib(json_dict, self, '_is_done', 'isDone')
88+
if self._replay_schema != ReplaySchema.V1:
89+
add_attrib(json_dict, self, 'schema_version', 'schemaVersion')
7990
self._add_actions(json_dict)
8091
if not (self._output is None):
8192
json_dict['output'] = self._output
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from enum import Enum
2+
3+
4+
class ReplaySchema(Enum):
5+
"""Enum representing the ReplaySchemas supported by this SDK version."""
6+
7+
V1 = 0
8+
V2 = 1

azure/durable_functions/models/actions/ActionType.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@ class ActionType(IntEnum):
1414
CALL_ENTITY = 7
1515
CALL_HTTP: int = 8
1616
SIGNAL_ENTITY: int = 9
17+
WHEN_ANY = 11
18+
WHEN_ALL = 12
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from typing import Dict, Union
2+
3+
from .Action import Action
4+
from ..utils.json_utils import add_attrib
5+
from typing import List
6+
from abc import abstractmethod
7+
8+
9+
class CompoundAction(Action):
10+
"""Defines the structure of the WhenAll Action object.
11+
12+
Provides the information needed by the durable extension to be able to invoke WhenAll tasks.
13+
"""
14+
15+
def __init__(self, compoundTasks: List[Action]):
16+
self.compound_actions = list(map(lambda x: x.to_json(), compoundTasks))
17+
18+
@property
19+
@abstractmethod
20+
def action_type(self) -> int:
21+
"""Get this object's action type as an integer."""
22+
...
23+
24+
def to_json(self) -> Dict[str, Union[str, int]]:
25+
"""Convert object into a json dictionary.
26+
27+
Returns
28+
-------
29+
Dict[str, Union[str, int]]
30+
The instance of the class converted into a json dictionary
31+
"""
32+
json_dict: Dict[str, Union[str, int]] = {}
33+
add_attrib(json_dict, self, 'action_type', 'actionType')
34+
add_attrib(json_dict, self, 'compound_actions', 'compoundActions')
35+
return json_dict
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from .ActionType import ActionType
2+
from azure.durable_functions.models.actions.CompoundAction import CompoundAction
3+
4+
5+
class WhenAllAction(CompoundAction):
6+
"""Defines the structure of the WhenAll Action object.
7+
8+
Provides the information needed by the durable extension to be able to invoke WhenAll tasks.
9+
"""
10+
11+
@property
12+
def action_type(self) -> int:
13+
"""Get the type of action this class represents."""
14+
return ActionType.WHEN_ALL
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from azure.durable_functions.models.actions.CompoundAction import CompoundAction
2+
from .ActionType import ActionType
3+
4+
5+
class WhenAnyAction(CompoundAction):
6+
"""Defines the structure of the WhenAll Action object.
7+
8+
Provides the information needed by the durable extension to be able to invoke WhenAll tasks.
9+
"""
10+
11+
@property
12+
def action_type(self) -> int:
13+
"""Get the type of action this class represents."""
14+
return ActionType.WHEN_ANY

azure/durable_functions/orchestrator.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
"""
66
from typing import Callable, Iterator, Any, Generator
77

8+
from azure.durable_functions.models.ReplaySchema import ReplaySchema
9+
810
from .models import (
911
DurableOrchestrationContext,
1012
Task,
@@ -55,6 +57,7 @@ def handle(self, context: DurableOrchestrationContext):
5557
# `fn_output` is the return value instead of a generator
5658
if not isinstance(fn_output, Iterator):
5759
orchestration_state = OrchestratorState(
60+
replay_schema=self.durable_context._replay_schema,
5861
is_done=True,
5962
output=fn_output,
6063
actions=self.durable_context.actions,
@@ -75,6 +78,7 @@ def handle(self, context: DurableOrchestrationContext):
7578
# `will_continue_as_new` essentially "tracks"
7679
# whether or not the orchestration is done.
7780
orchestration_state = OrchestratorState(
81+
replay_schema=self.durable_context._replay_schema,
7882
is_done=self.durable_context.will_continue_as_new,
7983
output=None,
8084
actions=self.durable_context.actions,
@@ -95,13 +99,15 @@ def handle(self, context: DurableOrchestrationContext):
9599

96100
except StopIteration as sie:
97101
orchestration_state = OrchestratorState(
102+
replay_schema=self.durable_context._replay_schema,
98103
is_done=True,
99104
output=sie.value,
100105
actions=self.durable_context.actions,
101106
custom_status=self.durable_context.custom_status)
102107
except Exception as e:
103108
exception_str = str(e)
104109
orchestration_state = OrchestratorState(
110+
replay_schema=self.durable_context._replay_schema,
105111
is_done=False,
106112
output=None, # Should have no output, after generation range
107113
actions=self.durable_context.actions,
@@ -135,12 +141,17 @@ def _add_to_actions(self, generation_state):
135141
if self.durable_context.will_continue_as_new:
136142
return
137143
if not generation_state._is_yielded:
138-
if (isinstance(generation_state, Task)
139-
and hasattr(generation_state, "action")):
140-
self.durable_context.actions.append([generation_state.action])
141-
elif (isinstance(generation_state, TaskSet)
142-
and hasattr(generation_state, "actions")):
143-
self.durable_context.actions.append(generation_state.actions)
144+
if isinstance(generation_state, Task):
145+
if self.durable_context._replay_schema == ReplaySchema.V1:
146+
self.durable_context.actions.append([generation_state.action])
147+
else:
148+
self.durable_context.actions[0].append(generation_state.action)
149+
150+
elif isinstance(generation_state, TaskSet):
151+
if self.durable_context._replay_schema == ReplaySchema.V1:
152+
self.durable_context.actions.append(generation_state.actions)
153+
else:
154+
self.durable_context.actions[0].append(generation_state.actions)
144155
generation_state._is_yielded = True
145156

146157
def _update_timestamp(self):

azure/durable_functions/tasks/task_all.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from azure.durable_functions.models.actions.WhenAllAction import WhenAllAction
2+
from azure.durable_functions.models.ReplaySchema import ReplaySchema
13
from datetime import datetime
24
from typing import List, Optional, Any
35

@@ -6,7 +8,7 @@
68
from ..models.actions import Action
79

810

9-
def task_all(tasks: List[Task]):
11+
def task_all(tasks: List[Task], replay_schema: ReplaySchema):
1012
"""Determine the state of scheduling the activities for execution with retry options.
1113
1214
Parameters
@@ -33,7 +35,10 @@ def task_all(tasks: List[Task]):
3335
for task in tasks:
3436
# Add actions and results
3537
if isinstance(task, TaskSet):
36-
actions.extend(task.actions)
38+
if replay_schema == ReplaySchema.V1:
39+
actions.extend(task.actions)
40+
else:
41+
actions.append(task.actions)
3742
else:
3843
# We know it's an atomic Task
3944
actions.append(task.action)
@@ -62,6 +67,9 @@ def task_all(tasks: List[Task]):
6267
results = []
6368
end_time = None
6469

70+
if replay_schema == ReplaySchema.V2:
71+
actions = WhenAllAction(actions)
72+
6573
# Construct TaskSet
6674
taskset = TaskSet(
6775
is_completed=is_completed,

azure/durable_functions/tasks/task_any.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
from azure.durable_functions.models.actions.WhenAnyAction import WhenAnyAction
2+
from azure.durable_functions.models.ReplaySchema import ReplaySchema
13
from ..models.TaskSet import TaskSet
24

35

4-
def task_any(tasks):
6+
def task_any(tasks, replay_schema: ReplaySchema):
57
"""Determine whether any of the given tasks is completed.
68
79
Parameters
@@ -22,8 +24,10 @@ def task_any(tasks):
2224
error_message = []
2325
for task in tasks:
2426
if isinstance(task, TaskSet):
25-
for action in task.actions:
26-
all_actions.append(action)
27+
if replay_schema == ReplaySchema.V1:
28+
all_actions.extend(task.actions)
29+
else:
30+
all_actions.append(task.actions)
2731
else:
2832
all_actions.append(task.action)
2933

@@ -35,6 +39,9 @@ def task_any(tasks):
3539

3640
completed_tasks.sort(key=lambda t: t.timestamp)
3741

42+
if replay_schema == ReplaySchema.V2:
43+
all_actions = WhenAnyAction(all_actions)
44+
3845
if len(faulted_tasks) == len(tasks):
3946
return TaskSet(True, all_actions, None, is_faulted=True, exception=Exception(
4047
f"All tasks have failed, errors messages in all tasks:{error_message}"))

0 commit comments

Comments
 (0)