Skip to content

Commit c99c472

Browse files
authored
Fix: isReplaying flag (#185)
1 parent 63b1d23 commit c99c472

16 files changed

+155
-27
lines changed

azure/durable_functions/models/Task.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@ class Task:
1717
"""
1818

1919
def __init__(self, is_completed, is_faulted, action,
20-
result=None, timestamp=None, id_=None, exc=None):
20+
result=None, timestamp=None, id_=None, exc=None, is_played=False):
2121
self._is_completed: bool = is_completed
2222
self._is_faulted: bool = is_faulted
2323
self._action: Action = action
2424
self._result = result
2525
self._timestamp: datetime = timestamp
2626
self._id = id_
2727
self._exception = exc
28+
self._is_played = is_played
2829

2930
@property
3031
def is_completed(self) -> bool:

azure/durable_functions/models/TaskSet.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ class TaskSet:
1717
"""
1818

1919
def __init__(self, is_completed, actions, result, is_faulted=False,
20-
timestamp=None, exception=None):
20+
timestamp=None, exception=None, is_played=False):
2121
self._is_completed: bool = is_completed
2222
self._actions: List[Action] = actions
2323
self._result = result
2424
self._is_faulted: bool = is_faulted
2525
self._timestamp: datetime = timestamp
2626
self._exception = exception
27+
self._is_played = is_played
2728

2829
@property
2930
def is_completed(self) -> bool:

azure/durable_functions/orchestrator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def handle(self, context: DurableOrchestrationContext):
9090
continue
9191

9292
self._reset_timestamp()
93+
self.durable_context._is_replaying = generation_state._is_played
9394
generation_state = self._generate_next(generation_state)
9495

9596
except StopIteration as sie:

azure/durable_functions/tasks/call_activity.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def call_activity_task(
4040
is_completed=True,
4141
is_faulted=False,
4242
action=new_action,
43+
is_played=task_completed._is_played,
4344
result=parse_history_event(task_completed),
4445
timestamp=task_completed.timestamp,
4546
id_=task_completed.TaskScheduledId)
@@ -49,6 +50,7 @@ def call_activity_task(
4950
is_completed=True,
5051
is_faulted=True,
5152
action=new_action,
53+
is_played=task_failed._is_played,
5254
result=task_failed.Reason,
5355
timestamp=task_failed.timestamp,
5456
id_=task_failed.TaskScheduledId,

azure/durable_functions/tasks/call_activity_with_retry.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def call_activity_with_retry_task(
5555
is_completed=True,
5656
is_faulted=False,
5757
action=new_action,
58+
is_played=task_completed._is_played,
5859
result=parse_history_event(task_completed),
5960
timestamp=task_completed.timestamp,
6061
id_=task_completed.TaskScheduledId)
@@ -65,6 +66,7 @@ def call_activity_with_retry_task(
6566
is_completed=True,
6667
is_faulted=True,
6768
action=new_action,
69+
is_played=task_failed._is_played,
6870
timestamp=task_failed.timestamp,
6971
id_=task_failed.TaskScheduledId,
7072
exc=Exception(

azure/durable_functions/tasks/call_http.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def call_http(state: List[HistoryEvent], method: str, uri: str, content: Optiona
5757
is_completed=True,
5858
is_faulted=False,
5959
action=new_action,
60+
is_played=task_completed._is_played,
6061
result=parse_history_event(task_completed),
6162
timestamp=task_completed.timestamp,
6263
id_=task_completed.TaskScheduledId)
@@ -66,6 +67,7 @@ def call_http(state: List[HistoryEvent], method: str, uri: str, content: Optiona
6667
is_completed=True,
6768
is_faulted=True,
6869
action=new_action,
70+
is_played=task_failed._is_played,
6971
result=task_failed.Reason,
7072
timestamp=task_failed.timestamp,
7173
id_=task_failed.TaskScheduledId,

azure/durable_functions/tasks/call_suborchestrator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def call_sub_orchestrator_task(
4848
is_completed=True,
4949
is_faulted=False,
5050
action=new_action,
51+
is_played=task_completed._is_played,
5152
result=parse_history_event(task_completed),
5253
timestamp=task_completed.timestamp,
5354
id_=task_completed.TaskScheduledId)
@@ -57,6 +58,7 @@ def call_sub_orchestrator_task(
5758
is_completed=True,
5859
is_faulted=True,
5960
action=new_action,
61+
is_played=task_failed._is_played,
6062
result=task_failed.Reason,
6163
timestamp=task_failed.timestamp,
6264
id_=task_failed.TaskScheduledId,

azure/durable_functions/tasks/call_suborchestrator_with_retry.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def call_sub_orchestrator_with_retry_task(
5959
is_completed=True,
6060
is_faulted=False,
6161
action=new_action,
62+
is_played=task_completed._is_played,
6263
result=parse_history_event(task_completed),
6364
timestamp=task_completed.timestamp,
6465
id_=task_completed.TaskScheduledId)
@@ -69,6 +70,7 @@ def call_sub_orchestrator_with_retry_task(
6970
is_completed=True,
7071
is_faulted=True,
7172
action=new_action,
73+
is_played=task_failed._is_played,
7274
result=task_failed.Reason,
7375
timestamp=task_failed.timestamp,
7476
id_=task_failed.TaskScheduledId,

azure/durable_functions/tasks/create_timer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ def create_timer_task(state: List[HistoryEvent],
3333
return TimerTask(
3434
is_completed=True, action=new_action,
3535
timestamp=timer_fired.timestamp,
36-
id_=timer_fired.event_id)
36+
id_=timer_fired.event_id,
37+
is_played=timer_fired.is_played)
3738
else:
3839
return TimerTask(
3940
is_completed=False, action=new_action,
Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
from typing import List
1+
from datetime import datetime
2+
from typing import List, Optional, Any
3+
24
from ..models.Task import Task
35
from ..models.TaskSet import TaskSet
6+
from ..models.actions import Action
47

58

69
def task_all(tasks: List[Task]):
@@ -16,31 +19,57 @@ def task_all(tasks: List[Task]):
1619
TaskSet
1720
A Durable Task Set that reports the state of running all of the tasks within it.
1821
"""
19-
all_actions = []
20-
results = []
22+
# Args for constructing the output TaskSet
23+
is_played = True
24+
is_faulted = False
2125
is_completed = True
22-
complete_time = None
23-
faulted = []
26+
27+
actions: List[Action] = []
28+
results: List[Any] = []
29+
30+
exception: Optional[str] = None
31+
end_time: Optional[datetime] = None
32+
2433
for task in tasks:
34+
# Add actions and results
2535
if isinstance(task, TaskSet):
26-
for action in task.actions:
27-
all_actions.append(action)
36+
actions.extend(task.actions)
2837
else:
29-
all_actions.append(task.action)
38+
# We know it's an atomic Task
39+
actions.append(task.action)
3040
results.append(task.result)
3141

32-
if task.is_faulted:
33-
faulted.append(task.exception)
42+
# Record first exception, if it exists
43+
if task.is_faulted and not is_faulted:
44+
is_faulted = True
45+
exception = task.exception
3446

47+
# If any task is not played, TaskSet is not played
48+
if not task._is_played:
49+
is_played = False
50+
51+
# If any task is incomplete, TaskSet is incomplete
52+
# If the task is complete, we can update the end_time
3553
if not task.is_completed:
3654
is_completed = False
55+
elif end_time is None:
56+
end_time = task.timestamp
3757
else:
38-
complete_time = task.timestamp if complete_time is None \
39-
else max([task.timestamp, complete_time])
40-
41-
if len(faulted) > 0:
42-
return TaskSet(is_completed, all_actions, results, is_faulted=True, exception=faulted[0])
43-
if is_completed:
44-
return TaskSet(is_completed, all_actions, results, False, complete_time)
45-
else:
46-
return TaskSet(is_completed, all_actions, None)
58+
end_time = max([task.timestamp, end_time])
59+
60+
# Incomplete TaskSets do not have results or end-time
61+
if not is_completed:
62+
results = None
63+
end_time = None
64+
65+
# Construct TaskSet
66+
taskset = TaskSet(
67+
is_completed=is_completed,
68+
actions=actions,
69+
result=results,
70+
is_faulted=is_faulted,
71+
timestamp=end_time,
72+
exception=exception,
73+
is_played=is_played
74+
)
75+
return taskset

0 commit comments

Comments
 (0)