Skip to content
16 changes: 16 additions & 0 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ def new_event_raised_event(name: str, encoded_input: Optional[str] = None) -> pb
)


def new_event_sent_event(instance_id: str, name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
eventSent=pb.EventSentEvent(instanceId=instance_id, name=name, input=get_string_value(encoded_input))
)


def new_suspend_event() -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
Expand Down Expand Up @@ -203,6 +211,14 @@ def new_create_sub_orchestration_action(
))


def new_send_event_action(id: int, instance_id: str, event_name: str, encoded_data: Optional[str]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, sendEvent=pb.SendEventAction(
instance=pb.OrchestrationInstance(instanceId=instance_id),
name=event_name,
data=get_string_value(encoded_data)
))


def is_empty(v: wrappers_pb2.StringValue):
return v is None or v.value == ''

Expand Down
16 changes: 16 additions & 0 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,22 @@ def wait_for_external_event(self, name: str) -> Task:
"""
pass

@abstractmethod
def send_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None) -> None:
"""Send an event to another orchestration instance.

Parameters
----------
instance_id : str
The ID of the orchestration instance to send the event to.
event_name : str
The name of the event to send.
data : Optional[Any]
The optional JSON-serializable data to include with the event.
"""
pass

@abstractmethod
def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
"""Continue the orchestration execution as a new instance.
Expand Down
26 changes: 21 additions & 5 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,13 +629,16 @@ def set_complete(
result: Any,
status: pb.OrchestrationStatus,
is_result_encoded: bool = False,
preserve_actions: bool = False,
):
if self._is_complete:
return

self._is_complete = True
self._completion_status = status
self._pending_actions.clear() # Cancel any pending actions

if not preserve_actions:
self._pending_actions.clear() # Cancel any pending actions

self._result = result
result_json: Optional[str] = None
Expand Down Expand Up @@ -852,6 +855,18 @@ def wait_for_external_event(self, name: str) -> task.Task:
task_list.append(external_event_task)
return external_event_task

def send_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None) -> None:
if not instance_id:
raise ValueError("instance_id cannot be None or empty")
if not event_name:
raise ValueError("event_name cannot be None or empty")

id = self.next_sequence_number()
encoded_data = shared.to_json(data) if data is not None else None
action = ph.new_send_event_action(id, instance_id, event_name, encoded_data)
self._pending_actions[id] = action

def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
if self._is_complete:
return
Expand Down Expand Up @@ -973,8 +988,9 @@ def process_event(
# Start the orchestrator's generator function
ctx.run(result)
else:
# This is an orchestrator that doesn't schedule any tasks
ctx.set_complete(result, pb.ORCHESTRATION_STATUS_COMPLETED)
# This is an orchestrator that doesn't use generators (async tasks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this comment

# but it may have scheduled actions like send_event
ctx.set_complete(result, pb.ORCHESTRATION_STATUS_COMPLETED, preserve_actions=True)
elif event.HasField("timerCreated"):
# This history event confirms that the timer was successfully scheduled.
# Remove the timerCreated event from the pending action list so we don't schedule it again.
Expand Down Expand Up @@ -1304,8 +1320,8 @@ def _get_method_name_for_action(action: pb.OrchestratorAction) -> str:
return task.get_name(task.OrchestrationContext.create_timer)
elif action_type == "createSubOrchestration":
return task.get_name(task.OrchestrationContext.call_sub_orchestrator)
# elif action_type == "sendEvent":
# return task.get_name(task.OrchestrationContext.send_event)
elif action_type == "sendEvent":
return task.get_name(task.OrchestrationContext.send_event)
else:
raise NotImplementedError(f"Action type '{action_type}' not supported!")

Expand Down
Loading