Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 80 additions & 23 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ def __init__(self, instance_id: str, registry: _Registry):
self._pending_tasks: dict[int, task.CompletableTask] = {}
# Maps entity ID to task ID
self._entity_task_id_map: dict[str, tuple[EntityInstanceId, int]] = {}
self._entity_lock_task_id_map: dict[str, tuple[EntityInstanceId, int]] = {}
# Maps criticalSectionId to task ID
self._entity_lock_id_map: dict[str, int] = {}
self._sequence_number = 0
Expand Down Expand Up @@ -1590,33 +1591,70 @@ def process_event(
else:
raise TypeError("Unexpected sub-orchestration task type")
elif event.HasField("eventRaised"):
# event names are case-insensitive
event_name = event.eventRaised.name.casefold()
if not ctx.is_replaying:
self._logger.info(f"{ctx.instance_id} Event raised: {event_name}")
task_list = ctx._pending_events.get(event_name, None)
decoded_result: Optional[Any] = None
if task_list:
event_task = task_list.pop(0)
if event.eventRaised.name in ctx._entity_task_id_map:
# This eventRaised represents the result of an entity operation after being translated to the old
# entity protocol by the Durable WebJobs extension
entity_id, task_id = ctx._entity_task_id_map.get(event.eventRaised.name, (None, None))
if entity_id is None:
raise RuntimeError(f"Could not retrieve entity ID for entity-related eventRaised with ID '{event.eventId}'")
if task_id is None:
raise RuntimeError(f"Could not retrieve task ID for entity-related eventRaised with ID '{event.eventId}'")
entity_task = ctx._pending_tasks.pop(task_id, None)
if not entity_task:
raise RuntimeError(f"Could not retrieve entity task for entity-related eventRaised with ID '{event.eventId}'")
result = None
if not ph.is_empty(event.eventRaised.input):
decoded_result = shared.from_json(event.eventRaised.input.value)
event_task.complete(decoded_result)
if not task_list:
del ctx._pending_events[event_name]
# TODO: Investigate why the event result is wrapped in a dict with "result" key
result = shared.from_json(event.eventRaised.input.value)["result"]
ctx._entity_context.recover_lock_after_call(entity_id)
entity_task.complete(result)
ctx.resume()
else:
# buffer the event
event_list = ctx._received_events.get(event_name, None)
if not event_list:
event_list = []
ctx._received_events[event_name] = event_list
elif event.eventRaised.name in ctx._entity_lock_task_id_map:
# This eventRaised represents the result of an entity operation after being translated to the old
# entity protocol by the Durable WebJobs extension
entity_id, task_id = ctx._entity_lock_task_id_map.get(event.eventRaised.name, (None, None))
if entity_id is None:
raise RuntimeError(f"Could not retrieve entity ID for entity-related eventRaised with ID '{event.eventId}'")
if task_id is None:
raise RuntimeError(f"Could not retrieve task ID for entity-related eventRaised with ID '{event.eventId}'")
entity_task = ctx._pending_tasks.pop(task_id, None)
if not entity_task:
raise RuntimeError(f"Could not retrieve entity task for entity-related eventRaised with ID '{event.eventId}'")
result = None
if not ph.is_empty(event.eventRaised.input):
decoded_result = shared.from_json(event.eventRaised.input.value)
event_list.append(decoded_result)
# TODO: Investigate why the event result is wrapped in a dict with "result" key
result = shared.from_json(event.eventRaised.input.value)["result"]
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable result is not used.

Suggested change
result = shared.from_json(event.eventRaised.input.value)["result"]
pass # result is not used in this block

Copilot uses AI. Check for mistakes.
ctx._entity_context.complete_acquire(event.eventRaised.name)
entity_task.complete(EntityLock(ctx))
ctx.resume()
else:
# event names are case-insensitive
event_name = event.eventRaised.name.casefold()
if not ctx.is_replaying:
self._logger.info(
f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it."
)
self._logger.info(f"{ctx.instance_id} Event raised: {event_name}")
task_list = ctx._pending_events.get(event_name, None)
decoded_result: Optional[Any] = None
if task_list:
event_task = task_list.pop(0)
if not ph.is_empty(event.eventRaised.input):
decoded_result = shared.from_json(event.eventRaised.input.value)
event_task.complete(decoded_result)
if not task_list:
del ctx._pending_events[event_name]
ctx.resume()
else:
# buffer the event
event_list = ctx._received_events.get(event_name, None)
if not event_list:
event_list = []
ctx._received_events[event_name] = event_list
if not ph.is_empty(event.eventRaised.input):
decoded_result = shared.from_json(event.eventRaised.input.value)
event_list.append(decoded_result)
if not ctx.is_replaying:
self._logger.info(
f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it."
)
elif event.HasField("executionSuspended"):
if not self._is_suspended and not ctx.is_replaying:
self._logger.info(f"{ctx.instance_id}: Execution suspended.")
Expand Down Expand Up @@ -1743,6 +1781,25 @@ def process_event(
self._logger.info(f"{ctx.instance_id}: Entity operation failed.")
self._logger.info(f"Data: {json.dumps(event.entityOperationFailed)}")
pass
elif event.HasField("orchestratorCompleted"):
# Added in Functions only (for some reason) and does not affect orchestrator flow
pass
elif event.HasField("eventSent"):
# Check if this eventSent corresponds to an entity operation call after being translated to the old
# entity protocol by the Durable WebJobs extension. If so, treat this message similarly to
# entityOperationCalled and remove the pending action. Also store the entity id and event id for later
action = ctx._pending_actions.pop(event.eventId, None)
if action and action.HasField("sendEntityMessage"):
if action.sendEntityMessage.HasField("entityOperationCalled"):
entity_id = EntityInstanceId.parse(event.eventSent.instanceId)
event_id = json.loads(event.eventSent.input.value)["id"]
ctx._entity_task_id_map[event_id] = (entity_id, event.eventId)
elif action.sendEntityMessage.HasField("entityLockRequested"):
entity_id = EntityInstanceId.parse(event.eventSent.instanceId)
event_id = json.loads(event.eventSent.input.value)["id"]
ctx._entity_lock_task_id_map[event_id] = (entity_id, event.eventId)
else:
return
else:
eventType = event.WhichOneof("eventType")
raise task.OrchestrationStateError(
Expand Down
Loading