Skip to content

Commit 863cfef

Browse files
authored
Merge pull request #326 from Azure/dev
Promote dev to main for 1.1.2
2 parents 3b5d5e3 + 44f6a95 commit 863cfef

File tree

9 files changed

+173
-18
lines changed

9 files changed

+173
-18
lines changed

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from collections import defaultdict
22
from azure.durable_functions.models.actions.SignalEntityAction import SignalEntityAction
33
from azure.durable_functions.models.actions.CallEntityAction import CallEntityAction
4-
from azure.durable_functions.models.Task import TaskBase
4+
from azure.durable_functions.models.Task import TaskBase, TimerTask
55
from azure.durable_functions.models.actions.CallHttpAction import CallHttpAction
66
from azure.durable_functions.models.DurableHttpRequest import DurableHttpRequest
77
from azure.durable_functions.models.actions.CallSubOrchestratorWithRetryAction import \
@@ -100,7 +100,8 @@ def from_json(cls, json_string: str):
100100
def _generate_task(self, action: Action,
101101
retry_options: Optional[RetryOptions] = None,
102102
id_: Optional[Union[int, str]] = None,
103-
parent: Optional[TaskBase] = None) -> Union[AtomicTask, RetryAbleTask]:
103+
parent: Optional[TaskBase] = None,
104+
task_constructor=AtomicTask) -> Union[AtomicTask, RetryAbleTask, TimerTask]:
104105
"""Generate an atomic or retryable Task based on an input.
105106
106107
Parameters
@@ -124,7 +125,7 @@ def _generate_task(self, action: Action,
124125
action_payload = [action]
125126
else:
126127
action_payload = action
127-
task = AtomicTask(id_, action_payload)
128+
task = task_constructor(id_, action_payload)
128129
task.parent = parent
129130

130131
# if task is retryable, provide the retryable wrapper class
@@ -517,7 +518,7 @@ def create_timer(self, fire_at: datetime.datetime) -> TaskBase:
517518
A Durable Timer Task that schedules the timer to wake up the activity
518519
"""
519520
action = CreateTimerAction(fire_at)
520-
task = self._generate_task(action)
521+
task = self._generate_task(action, task_constructor=TimerTask)
521522
return task
522523

523524
def wait_for_external_event(self, name: str) -> TaskBase:

azure/durable_functions/models/Task.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from azure.durable_functions.models.actions.Action import Action
66
from azure.durable_functions.models.actions.WhenAnyAction import WhenAnyAction
77
from azure.durable_functions.models.actions.WhenAllAction import WhenAllAction
8+
from azure.durable_functions.models.actions.CreateTimerAction import CreateTimerAction
89

910
import enum
1011
from typing import Any, List, Optional, Set, Type, Union
@@ -56,6 +57,14 @@ def __init__(self, id_: Union[int, str], actions: Union[List[Action], Action]):
5657
self.action_repr: Union[List[Action], Action] = actions
5758
self.is_played = False
5859

60+
@property
61+
def is_completed(self) -> bool:
62+
"""Get indicator of whether the task completed.
63+
64+
Note that completion is not equivalent to success.
65+
"""
66+
return not(self.state is TaskState.RUNNING)
67+
5968
def set_is_played(self, is_played: bool):
6069
"""Set the is_played flag for the Task.
6170
@@ -208,7 +217,47 @@ def try_set_value(self, child: TaskBase):
208217
class AtomicTask(TaskBase):
209218
"""A Task with no subtasks."""
210219

211-
pass
220+
def _get_action(self) -> Action:
221+
action: Action
222+
if isinstance(self.action_repr, list):
223+
action = self.action_repr[0]
224+
else:
225+
action = self.action_repr
226+
return action
227+
228+
229+
class TimerTask(AtomicTask):
230+
"""A Timer Task."""
231+
232+
def __init__(self, id_: Union[int, str], action: CreateTimerAction):
233+
super().__init__(id_, action)
234+
self.action_repr: Union[List[CreateTimerAction], CreateTimerAction]
235+
236+
@property
237+
def is_cancelled(self) -> bool:
238+
"""Check if the Timer is cancelled.
239+
240+
Returns
241+
-------
242+
bool
243+
Returns whether a timer has been cancelled or not
244+
"""
245+
action: CreateTimerAction = self._get_action()
246+
return action.is_cancelled
247+
248+
def cancel(self):
249+
"""Cancel a timer.
250+
251+
Raises
252+
------
253+
ValueError
254+
Raises an error if the task is already completed and an attempt is made to cancel it
255+
"""
256+
if not self.is_completed:
257+
action: CreateTimerAction = self._get_action()
258+
action.is_cancelled = True
259+
else:
260+
raise ValueError("Cannot cancel a completed task.")
212261

213262

214263
class WhenAllTask(CompoundTask):

azure/durable_functions/models/TaskOrchestrationExecutor.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,12 @@ def parse_history_event(directive_result):
180180
# retrieve result
181181
new_value = parse_history_event(event)
182182
if task._api_name == "CallEntityAction":
183-
new_value = ResponseMessage.from_dict(new_value)
184-
new_value = json.loads(new_value.result)
183+
event_payload = ResponseMessage.from_dict(new_value)
184+
new_value = json.loads(event_payload.result)
185+
186+
if event_payload.is_exception:
187+
new_value = Exception(new_value)
188+
is_success = False
185189
else:
186190
# generate exception
187191
new_value = Exception(f"{event.Reason} \n {event.Details}")

azure/durable_functions/models/entities/ResponseMessage.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ class ResponseMessage:
77
Specifies the response of an entity, as processed by the durable-extension.
88
"""
99

10-
def __init__(self, result: str):
10+
def __init__(self, result: str, is_exception: bool = False):
1111
"""Instantiate a ResponseMessage.
1212
1313
Specifies the response of an entity, as processed by the durable-extension.
@@ -18,6 +18,7 @@ def __init__(self, result: str):
1818
The result provided by the entity
1919
"""
2020
self.result = result
21+
self.is_exception = is_exception
2122
# TODO: JS has an additional exceptionType field, but does not use it
2223

2324
@classmethod
@@ -34,5 +35,6 @@ def from_dict(cls, d: Dict[str, Any]) -> 'ResponseMessage':
3435
ResponseMessage:
3536
The ResponseMessage built from the provided dictionary
3637
"""
37-
result = cls(d["result"])
38+
is_error = "exceptionType" in d.keys()
39+
result = cls(d["result"], is_error)
3840
return result

tests/orchestrator/orchestrator_test_utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ def assert_entity_state_equals(expected, result):
3030
assert_attribute_equal(expected, result, "signals")
3131

3232
def assert_results_are_equal(expected: Dict[str, Any], result: Dict[str, Any]) -> bool:
33-
assert_attribute_equal(expected, result, "result")
34-
assert_attribute_equal(expected, result, "isError")
33+
for (payload_expected, payload_result) in zip(expected, result):
34+
assert_attribute_equal(payload_expected, payload_result, "result")
35+
assert_attribute_equal(payload_expected, payload_result, "isError")
3536

3637
def assert_attribute_equal(expected, result, attribute):
3738
if attribute in expected:

tests/orchestrator/test_create_timer.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,20 @@ def generator_function(context):
2828
yield context.create_timer(fire_at)
2929
return "Done!"
3030

31+
def generator_function_timer_can_be_cancelled(context):
32+
time_limit1 = context.current_utc_datetime + timedelta(minutes=5)
33+
timer_task1 = context.create_timer(time_limit1)
34+
35+
time_limit2 = context.current_utc_datetime + timedelta(minutes=10)
36+
timer_task2 = context.create_timer(time_limit2)
37+
38+
winner = yield context.task_any([timer_task1, timer_task2])
39+
if winner == timer_task1:
40+
timer_task2.cancel()
41+
return "Done!"
42+
else:
43+
raise Exception("timer task 1 should complete before timer task 2")
44+
3145
def add_timer_action(state: OrchestratorState, fire_at: datetime):
3246
action = CreateTimerAction(fire_at=fire_at)
3347
state._actions.append([action])
@@ -64,4 +78,25 @@ def test_timers_comparison_with_relaxed_precision():
6478
#assert_valid_schema(result)
6579
# TODO: getting the following error when validating the schema
6680
# "Additional properties are not allowed ('fireAt', 'isCanceled' were unexpected)">
67-
assert_orchestration_state_equals(expected, result)
81+
assert_orchestration_state_equals(expected, result)
82+
83+
def test_timers_can_be_cancelled():
84+
85+
context_builder = ContextBuilder("test_timers_can_be_cancelled")
86+
fire_at1 = context_builder.current_datetime + timedelta(minutes=5)
87+
fire_at2 = context_builder.current_datetime + timedelta(minutes=10)
88+
add_timer_fired_events(context_builder, 0, str(fire_at1))
89+
add_timer_fired_events(context_builder, 1, str(fire_at2))
90+
91+
result = get_orchestration_state_result(
92+
context_builder, generator_function_timer_can_be_cancelled)
93+
94+
expected_state = base_expected_state(output='Done!')
95+
expected_state._actions.append(
96+
[CreateTimerAction(fire_at=fire_at1), CreateTimerAction(fire_at=fire_at2, is_cancelled=True)])
97+
98+
expected_state._is_done = True
99+
expected = expected_state.to_json()
100+
101+
assert_orchestration_state_equals(expected, result)
102+
assert result["actions"][0][1]["isCanceled"]

tests/orchestrator/test_entity.py

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from azure.durable_functions.models.ReplaySchema import ReplaySchema
22
from .orchestrator_test_utils \
3-
import assert_orchestration_state_equals, get_orchestration_state_result, assert_valid_schema, \
3+
import assert_orchestration_state_equals, assert_results_are_equal, get_orchestration_state_result, assert_valid_schema, \
44
get_entity_state_result, assert_entity_state_equals
55
from tests.test_utils.ContextBuilder import ContextBuilder
66
from tests.test_utils.EntityContextBuilder import EntityContextBuilder
@@ -23,6 +23,14 @@ def generator_function_call_entity(context):
2323
outputs.append(x)
2424
return outputs
2525

26+
def generator_function_catch_entity_exception(context):
27+
entityId = df.EntityId("Counter", "myCounter")
28+
try:
29+
yield context.call_entity(entityId, "add", 3)
30+
return "No exception thrown"
31+
except:
32+
return "Exception thrown"
33+
2634
def generator_function_signal_entity(context):
2735
outputs = []
2836
entityId = df.EntityId("Counter", "myCounter")
@@ -53,6 +61,29 @@ def counter_entity_function(context):
5361
context.set_state(current_value)
5462
context.set_result(result)
5563

64+
def counter_entity_function_raises_exception(context):
65+
raise Exception("boom!")
66+
67+
def test_entity_raises_exception():
68+
# Create input batch
69+
batch = []
70+
add_to_batch(batch, name="get")
71+
context_builder = EntityContextBuilder(batch=batch)
72+
73+
# Run the entity, get observed result
74+
result = get_entity_state_result(
75+
context_builder,
76+
counter_entity_function_raises_exception,
77+
)
78+
79+
# Construct expected result
80+
expected_state = entity_base_expected_state()
81+
apply_operation(expected_state, result="boom!", state=None, is_error=True)
82+
expected = expected_state.to_json()
83+
84+
# Ensure expectation matches observed behavior
85+
#assert_valid_schema(result)
86+
assert_entity_state_equals(expected, result)
5687

5788
def test_entity_signal_then_call():
5889
"""Tests that a simple counter entity outputs the correct value
@@ -161,11 +192,11 @@ def add_signal_entity_action(state: OrchestratorState, id_: df.EntityId, op: str
161192
state.actions.append([action])
162193

163194
def add_call_entity_completed_events(
164-
context_builder: ContextBuilder, op: str, instance_id=str, input_=None, event_id=0):
195+
context_builder: ContextBuilder, op: str, instance_id=str, input_=None, event_id=0, is_error=False):
165196
context_builder.add_event_sent_event(instance_id, event_id)
166197
context_builder.add_orchestrator_completed_event()
167198
context_builder.add_orchestrator_started_event()
168-
context_builder.add_event_raised_event(name="0000", id_=0, input_=input_, is_entity=True)
199+
context_builder.add_event_raised_event(name="0000", id_=0, input_=input_, is_entity=True, is_error=is_error)
169200

170201
def test_call_entity_sent():
171202
context_builder = ContextBuilder('test_simple_function')
@@ -233,4 +264,29 @@ def test_call_entity_raised():
233264

234265
#assert_valid_schema(result)
235266

267+
assert_orchestration_state_equals(expected, result)
268+
269+
def test_call_entity_catch_exception():
270+
entityId = df.EntityId("Counter", "myCounter")
271+
context_builder = ContextBuilder('catch exceptions')
272+
add_call_entity_completed_events(
273+
context_builder,
274+
"add",
275+
df.EntityId.get_scheduler_id(entityId),
276+
input_="I am an error!",
277+
event_id=0,
278+
is_error=True
279+
)
280+
281+
result = get_orchestration_state_result(
282+
context_builder, generator_function_catch_entity_exception)
283+
284+
expected_state = base_expected_state(
285+
"Exception thrown"
286+
)
287+
288+
add_call_entity_action(expected_state, entityId, "add", 3)
289+
expected_state._is_done = True
290+
expected = expected_state.to_json()
291+
236292
assert_orchestration_state_equals(expected, result)

tests/orchestrator/test_sequential_orchestrator.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,14 @@ def generator_function_new_guid(context):
136136
outputs.append(str(output3))
137137
return outputs
138138

139-
140139
def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState:
141140
return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema)
142141

142+
def add_timer_fired_events(context_builder: ContextBuilder, id_: int, timestamp: str):
143+
fire_at: str = context_builder.add_timer_created_event(id_, timestamp)
144+
context_builder.add_orchestrator_completed_event()
145+
context_builder.add_orchestrator_started_event()
146+
context_builder.add_timer_fired_event(id_=id_, fire_at=fire_at)
143147

144148
def add_hello_action(state: OrchestratorState, input_: str):
145149
action = CallActivityAction(function_name='Hello', input_=input_)

tests/test_utils/ContextBuilder.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,14 @@ def add_execution_started_event(
125125
event.Input = input_
126126
self.history_events.append(event)
127127

128-
def add_event_raised_event(self, name:str, id_: int, input_=None, timestamp=None, is_entity=False):
128+
def add_event_raised_event(self, name:str, id_: int, input_=None, timestamp=None, is_entity=False, is_error = False):
129129
event = self.get_base_event(HistoryEventType.EVENT_RAISED, id_=id_, timestamp=timestamp)
130130
event.Name = name
131131
if is_entity:
132-
event.Input = json.dumps({ "result": json.dumps(input_) })
132+
if is_error:
133+
event.Input = json.dumps({ "result": json.dumps(input_), "exceptionType": "True" })
134+
else:
135+
event.Input = json.dumps({ "result": json.dumps(input_) })
133136
else:
134137
event.Input = input_
135138
# event.timestamp = timestamp

0 commit comments

Comments
 (0)