Skip to content

Commit a377de7

Browse files
committed
Various
- Return pending actions when orchestrations complete - Ensure locked entities are unlocked when orchestration ends (success/fail/continue_as_new) - Provide default "delete" operation and document deleting entities
1 parent f66abf4 commit a377de7

File tree

5 files changed

+151
-10
lines changed

5 files changed

+151
-10
lines changed

docs/features.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ with (yield ctx.lock_entities([entity_id_1, entity_id_2]):
130130

131131
Note that locked entities may not be signalled, and every call to a locked entity must return a result before another call to the same entity may be made from within the critical section. For more details and advanced usage, see the examples and API documentation.
132132

133+
##### Deleting entities
134+
135+
Entites are represented as orchestration instances in your Task Hub, and their state is persisted in the Task Hub as well. When using the Durable Task Scheduler as your durability provider, the backend will automatically clean up entities when their state is empty, this is effectively the "delete" operation to save space in the Task Hub. In the DTS Dashboard, "delete entity" simply signals the entity with the "delete" operation. In this SDK, we provide a default implementation for the "delete" operation to clear the state when using class-based entities, which end users are free to override as needed. Users must implement "delete" manually for function-based entities.
136+
133137
### External events
134138

135139
Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.

durabletask/entities/durable_entity.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,13 @@ def schedule_new_orchestration(self, orchestration_name: str, input: Optional[An
8181
The instance ID of the scheduled orchestration.
8282
"""
8383
return self.entity_context.schedule_new_orchestration(orchestration_name, input, instance_id=instance_id)
84+
85+
def delete(self, input: Any = None) -> None:
86+
"""Delete the entity instance.
87+
88+
Parameters
89+
----------
90+
input : Any, optional
91+
Unused: The input for the entity "delete" operation.
92+
"""
93+
self.set_state(None)

durabletask/entities/entity_lock.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,4 @@ def __enter__(self):
99
return self
1010

1111
def __exit__(self, exc_type, exc_val, exc_tb): # TODO: Handle exceptions?
12-
for entity_unlock_message in self._context._entity_context.emit_lock_release_messages():
13-
task_id = self._context.next_sequence_number()
14-
action = pb.OrchestratorAction(id=task_id, sendEntityMessage=entity_unlock_message)
15-
self._context._pending_actions[task_id] = action
12+
self._context._exit_critical_section()

durabletask/worker.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -842,9 +842,15 @@ def set_complete(
842842
if self._is_complete:
843843
return
844844

845+
# If the user code returned without yielding the entity unlock, do that now
846+
if self._entity_context.is_inside_critical_section:
847+
self._exit_critical_section()
848+
845849
self._is_complete = True
846850
self._completion_status = status
847-
self._pending_actions.clear() # Cancel any pending actions
851+
# This is probably a bug - an orchestrator may complete with some actions remaining that the user still
852+
# wants to execute - for example, signaling an entity. So we shouldn't clear the pending actions here.
853+
# self._pending_actions.clear() # Cancel any pending actions
848854

849855
self._result = result
850856
result_json: Optional[str] = None
@@ -859,8 +865,14 @@ def set_failed(self, ex: Union[Exception, pb.TaskFailureDetails]):
859865
if self._is_complete:
860866
return
861867

868+
# If the user code crashed inside a critical section, or did not exit it, do that now
869+
if self._entity_context.is_inside_critical_section:
870+
self._exit_critical_section()
871+
862872
self._is_complete = True
863-
self._pending_actions.clear() # Cancel any pending actions
873+
# We also cannot cancel the pending actions in the failure case - if the user code had released an entity
874+
# lock, we *must* send that action to the sidecar.
875+
# self._pending_actions.clear() # Cancel any pending actions
864876
self._completion_status = pb.ORCHESTRATION_STATUS_FAILED
865877

866878
action = ph.new_complete_orchestration_action(
@@ -875,13 +887,20 @@ def set_continued_as_new(self, new_input: Any, save_events: bool):
875887
if self._is_complete:
876888
return
877889

890+
# If the user code called continue_as_new while holding an entity lock, unlock it now
891+
if self._entity_context.is_inside_critical_section:
892+
self._exit_critical_section()
893+
878894
self._is_complete = True
879-
self._pending_actions.clear() # Cancel any pending actions
895+
# We also cannot cancel the pending actions in the continue as new case - if the user code had released an
896+
# entity lock, we *must* send that action to the sidecar.
897+
# self._pending_actions.clear() # Cancel any pending actions
880898
self._completion_status = pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
881899
self._new_input = new_input
882900
self._save_events = save_events
883901

884902
def get_actions(self) -> list[pb.OrchestratorAction]:
903+
current_actions = list(self._pending_actions.values())
885904
if self._completion_status == pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW:
886905
# When continuing-as-new, we only return a single completion action.
887906
carryover_events: Optional[list[pb.HistoryEvent]] = None
@@ -906,9 +925,9 @@ def get_actions(self) -> list[pb.OrchestratorAction]:
906925
failure_details=None,
907926
carryover_events=carryover_events,
908927
)
909-
return [action]
910-
else:
911-
return list(self._pending_actions.values())
928+
# We must return the existing tasks as well, to capture entity unlocks
929+
current_actions.append(action)
930+
return current_actions
912931

913932
def next_sequence_number(self) -> int:
914933
self._sequence_number += 1
@@ -1147,6 +1166,17 @@ def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId
11471166
fn_task = task.CompletableTask[EntityLock]()
11481167
self._pending_tasks[id] = fn_task
11491168

1169+
def _exit_critical_section(self) -> None:
1170+
if not self._entity_context.is_inside_critical_section:
1171+
# Possible if the user calls continue_as_new inside the lock - in the success case, we will call
1172+
# _exit_critical_section both from the EntityLock and the exit logic. We must keep both calls in
1173+
# case the user code crashes after calling continue_as_new but before the EntityLock object is exited.
1174+
return
1175+
for entity_unlock_message in self._entity_context.emit_lock_release_messages():
1176+
task_id = self.next_sequence_number()
1177+
action = pb.OrchestratorAction(id=task_id, sendEntityMessage=entity_unlock_message)
1178+
self._pending_actions[task_id] = action
1179+
11501180
def wait_for_external_event(self, name: str) -> task.Task:
11511181
# Check to see if this event has already been received, in which case we
11521182
# can return it immediately. Otherwise, record out intent to receive an

tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e copy.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
from datetime import timedelta
12
import os
23
import time
34

45
import pytest
6+
from azure.identity import DefaultAzureCredential
57

68
from durabletask import client, entities, task
79
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
@@ -14,6 +16,8 @@
1416
# Read the environment variables
1517
taskhub_name = os.getenv("TASKHUB", "default")
1618
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
19+
# endpoint = os.getenv("ENDPOINT", "https://andy-dts-testin-byaje2c8.northcentralus.durabletask.io")
20+
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()
1721

1822

1923
def test_client_signal_entity():
@@ -256,3 +260,99 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
256260
assert state.serialized_input is None
257261
assert state.serialized_output is None
258262
assert state.serialized_custom_status is None
263+
264+
265+
def test_entity_unlocks_when_user_code_throws():
266+
invoke_count = 0
267+
268+
def empty_entity(ctx: entities.EntityContext, _):
269+
nonlocal invoke_count # don't do this in a real app!
270+
invoke_count += 1
271+
272+
def empty_orchestrator(ctx: task.OrchestrationContext, _):
273+
entity_id = entities.EntityInstanceId("empty_entity", "testEntity3")
274+
with (yield ctx.lock_entities([entity_id])):
275+
yield ctx.call_entity(entity_id, "do_nothing")
276+
raise Exception("Simulated exception")
277+
278+
# Start a worker, which will connect to the sidecar in a background thread
279+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
280+
taskhub=taskhub_name, token_credential=None) as w:
281+
w.add_orchestrator(empty_orchestrator)
282+
w.add_entity(empty_entity)
283+
w.start()
284+
285+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
286+
taskhub=taskhub_name, token_credential=None)
287+
time.sleep(2) # wait for the signal and orchestration to be processed
288+
id = c.schedule_new_orchestration(empty_orchestrator)
289+
c.wait_for_orchestration_completion(id, timeout=30)
290+
id = c.schedule_new_orchestration(empty_orchestrator)
291+
c.wait_for_orchestration_completion(id, timeout=30)
292+
293+
assert invoke_count == 2
294+
295+
296+
def test_entity_unlocks_when_user_mishandles_lock():
297+
invoke_count = 0
298+
299+
def empty_entity(ctx: entities.EntityContext, _):
300+
nonlocal invoke_count # don't do this in a real app!
301+
invoke_count += 1
302+
303+
def empty_orchestrator(ctx: task.OrchestrationContext, _):
304+
entity_id = entities.EntityInstanceId("empty_entity", "testEntity3")
305+
yield ctx.lock_entities([entity_id])
306+
yield ctx.call_entity(entity_id, "do_nothing")
307+
308+
# Start a worker, which will connect to the sidecar in a background thread
309+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
310+
taskhub=taskhub_name, token_credential=None) as w:
311+
w.add_orchestrator(empty_orchestrator)
312+
w.add_entity(empty_entity)
313+
w.start()
314+
315+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
316+
taskhub=taskhub_name, token_credential=None)
317+
time.sleep(2) # wait for the signal and orchestration to be processed
318+
id = c.schedule_new_orchestration(empty_orchestrator)
319+
c.wait_for_orchestration_completion(id, timeout=30)
320+
id = c.schedule_new_orchestration(empty_orchestrator)
321+
c.wait_for_orchestration_completion(id, timeout=30)
322+
323+
assert invoke_count == 2
324+
325+
326+
# TODO: Uncomment this test
327+
# Will not pass until https://msazure.visualstudio.com/One/_git/AAPT-DTMB/pullrequest/13610881 is merged and deployed to the docker image
328+
# def test_entity_unlocks_when_user_calls_continue_as_new():
329+
# invoke_count = 0
330+
331+
# def empty_entity(ctx: entities.EntityContext, _):
332+
# nonlocal invoke_count # don't do this in a real app!
333+
# invoke_count += 1
334+
335+
# def empty_orchestrator(ctx: task.OrchestrationContext, entity_call_count: int):
336+
# entity_id = entities.EntityInstanceId("empty_entity", "testEntity6")
337+
# nonlocal invoke_count
338+
# if not ctx.is_replaying:
339+
# invoke_count += 1
340+
# with (yield ctx.lock_entities([entity_id])):
341+
# yield ctx.call_entity(entity_id, "do_nothing")
342+
# if entity_call_count > 0:
343+
# ctx.continue_as_new(entity_call_count - 1, save_events=True)
344+
345+
# # Start a worker, which will connect to the sidecar in a background thread
346+
# with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
347+
# taskhub=taskhub_name, token_credential=credential) as w:
348+
# w.add_orchestrator(empty_orchestrator)
349+
# w.add_entity(empty_entity)
350+
# w.start()
351+
352+
# c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
353+
# taskhub=taskhub_name, token_credential=credential)
354+
# time.sleep(2) # wait for the signal and orchestration to be processed
355+
# id = c.schedule_new_orchestration(empty_orchestrator, input=2)
356+
# c.wait_for_orchestration_completion(id, timeout=500)
357+
358+
# assert invoke_count == 6

0 commit comments

Comments
 (0)