Skip to content

Commit 46f97cf

Browse files
authored
Update isReplaying flag at beginning of replay (#390)
1 parent 462fb69 commit 46f97cf

File tree

4 files changed

+56
-9
lines changed

4 files changed

+56
-9
lines changed

azure/durable_functions/models/TaskOrchestrationExecutor.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,22 @@ def execute(self, context: DurableOrchestrationContext,
6868
self.context = context
6969
evaluated_user_code = fn(context)
7070

71+
# The minimum History size is 2, in the shape: [OrchestratorStarted, ExecutionStarted].
72+
# At the start of replay, the `is_replaying` flag is determined from the
73+
# ExecutionStarted event.
74+
# For some reason, OrchestratorStarted does not update its `isPlayed` field.
75+
if len(history) < 2:
76+
err_message = "Internal Durable Functions error: "\
77+
+ f"received History array of size {len(history)} "\
78+
+ "when a minimum size of 2 is expected. "\
79+
+ "Please report this issue at "\
80+
+ "https://github.com/Azure/azure-functions-durable-python/issues."
81+
raise Exception(err_message)
82+
83+
# Set initial is_replaing state.
84+
execution_started_event = history[1]
85+
self.current_task.is_played = execution_started_event.is_played
86+
7187
# If user code is a generator, then it uses `yield` statements (the DF API)
7288
# and so we iterate through the DF history, generating tasks and populating
7389
# them with values when the history provides them

tests/orchestrator/test_is_replaying_flag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def add_timer_action(state: OrchestratorState, fire_at: datetime):
4747

4848
def test_is_replaying_initial_value():
4949

50-
context_builder = ContextBuilder("")
50+
context_builder = ContextBuilder("", is_replaying=False)
5151
result = get_orchestration_property(
5252
context_builder, generator_function, "durable_context")
5353

tests/orchestrator/test_sequential_orchestrator.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,17 @@ def generator_function(context):
2424

2525
return outputs
2626

27+
def generator_function_is_replaying(context):
28+
outputs = []
29+
30+
outputs.append(context.is_replaying)
31+
yield context.call_activity("Hello", "Tokyo")
32+
outputs.append(context.is_replaying)
33+
yield context.call_activity("Hello", "Seattle")
34+
outputs.append(context.is_replaying)
35+
yield context.call_activity("Hello", "London")
36+
return outputs
37+
2738
def generator_function_no_yield(context):
2839
outputs = []
2940

@@ -150,11 +161,11 @@ def add_hello_action(state: OrchestratorState, input_: str):
150161
state.actions.append([action])
151162

152163
def add_hello_completed_events(
153-
context_builder: ContextBuilder, id_: int, result: str):
164+
context_builder: ContextBuilder, id_: int, result: str, is_played=False):
154165
context_builder.add_task_scheduled_event(name='Hello', id_=id_)
155166
context_builder.add_orchestrator_completed_event()
156167
context_builder.add_orchestrator_started_event()
157-
context_builder.add_task_completed_event(id_=id_, result=result)
168+
context_builder.add_task_completed_event(id_=id_, result=result, is_played=is_played)
158169

159170

160171
def add_hello_failed_events(
@@ -286,6 +297,26 @@ def test_tokyo_and_seattle_and_london_state():
286297
assert_valid_schema(result)
287298
assert_orchestration_state_equals(expected, result)
288299

300+
def test_sequential_is_replaying():
301+
context_builder = ContextBuilder('test_simple_function', is_replaying=True)
302+
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"", True)
303+
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"", True)
304+
add_hello_completed_events(context_builder, 2, "\"Hello London!\"", True)
305+
306+
result = get_orchestration_state_result(
307+
context_builder, generator_function_is_replaying)
308+
309+
expected_state = base_expected_state(
310+
[True, True, True])
311+
add_hello_action(expected_state, 'Tokyo')
312+
add_hello_action(expected_state, 'Seattle')
313+
add_hello_action(expected_state, 'London')
314+
expected_state._is_done = True
315+
expected = expected_state.to_json()
316+
317+
assert_valid_schema(result)
318+
assert_orchestration_state_equals(expected, result)
319+
289320
def test_sequential_orchestration_no_yield():
290321
context_builder = ContextBuilder('test_simple_function')
291322
add_hello_completed_events(context_builder, 0, "\"Hello London!\"")

tests/test_utils/ContextBuilder.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
class ContextBuilder:
17-
def __init__(self, name: str="", increase_time: bool = True, starting_time: Optional[datetime] = None, replay_schema: ReplaySchema = ReplaySchema.V1):
17+
def __init__(self, name: str="", increase_time: bool = True, starting_time: Optional[datetime] = None, is_replaying=False, replay_schema: ReplaySchema = ReplaySchema.V1):
1818
self.increase_time = increase_time
1919
self.instance_id = uuid.uuid4()
2020
self.is_replaying: bool = False
@@ -28,7 +28,7 @@ def __init__(self, name: str="", increase_time: bool = True, starting_time: Opti
2828
self.upperSchemaVersion = replay_schema.value
2929

3030
self.add_orchestrator_started_event()
31-
self.add_execution_started_event(name)
31+
self.add_execution_started_event(name, is_played=is_replaying)
3232

3333
def get_base_event(
3434
self, event_type: HistoryEventType, id_: int = -1,
@@ -87,8 +87,8 @@ def add_task_scheduled_event(
8787
event.Input_ = input_
8888
self.history_events.append(event)
8989

90-
def add_task_completed_event(self, id_: int, result):
91-
event = self.get_base_event(HistoryEventType.TASK_COMPLETED)
90+
def add_task_completed_event(self, id_: int, result, is_played=False):
91+
event = self.get_base_event(HistoryEventType.TASK_COMPLETED, is_played=is_played)
9292
event.Result = result
9393
event.TaskScheduledId = id_
9494
self.history_events.append(event)
@@ -116,8 +116,8 @@ def add_timer_fired_event(self, id_: int, fire_at: str, is_played: bool = True):
116116
self.history_events.append(event)
117117

118118
def add_execution_started_event(
119-
self, name: str, version: str = '', input_=None):
120-
event = self.get_base_event(HistoryEventType.EXECUTION_STARTED, is_played=True)
119+
self, name: str, version: str = '', input_=None, is_played=True):
120+
event = self.get_base_event(HistoryEventType.EXECUTION_STARTED, is_played=is_played)
121121
event.orchestration_instance = OrchestrationInstance()
122122
self.instance_id = event.orchestration_instance.instance_id
123123
event.Name = name

0 commit comments

Comments
 (0)