Skip to content

Commit c2684fd

Browse files
Copilotberndverst
andcommitted
Implement send_event functionality with basic tests
Co-authored-by: berndverst <[email protected]>
1 parent 8f1445c commit c2684fd

File tree

3 files changed

+60
-2
lines changed

3 files changed

+60
-2
lines changed

durabletask/internal/helpers.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ def new_event_raised_event(name: str, encoded_input: Optional[str] = None) -> pb
124124
)
125125

126126

127+
def new_event_sent_event(instance_id: str, name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
128+
return pb.HistoryEvent(
129+
eventId=-1,
130+
timestamp=timestamp_pb2.Timestamp(),
131+
eventSent=pb.EventSentEvent(instanceId=instance_id, name=name, input=get_string_value(encoded_input))
132+
)
133+
134+
127135
def new_suspend_event() -> pb.HistoryEvent:
128136
return pb.HistoryEvent(
129137
eventId=-1,
@@ -203,6 +211,14 @@ def new_create_sub_orchestration_action(
203211
))
204212

205213

214+
def new_send_event_action(id: int, instance_id: str, event_name: str, encoded_data: Optional[str]) -> pb.OrchestratorAction:
215+
return pb.OrchestratorAction(id=id, sendEvent=pb.SendEventAction(
216+
instance=pb.OrchestrationInstance(instanceId=instance_id),
217+
name=event_name,
218+
data=get_string_value(encoded_data)
219+
))
220+
221+
206222
def is_empty(v: wrappers_pb2.StringValue):
207223
return v is None or v.value == ''
208224

durabletask/task.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,27 @@ def wait_for_external_event(self, name: str) -> Task:
163163
"""
164164
pass
165165

166+
@abstractmethod
167+
def send_event(self, instance_id: str, event_name: str, *,
168+
data: Optional[Any] = None) -> Task:
169+
"""Send an event to another orchestration instance.
170+
171+
Parameters
172+
----------
173+
instance_id : str
174+
The ID of the orchestration instance to send the event to.
175+
event_name : str
176+
The name of the event to send.
177+
data : Optional[Any]
178+
The optional JSON-serializable data to include with the event.
179+
180+
Returns
181+
-------
182+
Task
183+
A Durable Task that completes when the event has been sent.
184+
"""
185+
pass
186+
166187
@abstractmethod
167188
def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
168189
"""Continue the orchestration execution as a new instance.

durabletask/worker.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,16 @@ def wait_for_external_event(self, name: str) -> task.Task:
852852
task_list.append(external_event_task)
853853
return external_event_task
854854

855+
def send_event(self, instance_id: str, event_name: str, *,
856+
data: Optional[Any] = None) -> task.Task:
857+
id = self.next_sequence_number()
858+
encoded_data = shared.to_json(data) if data is not None else None
859+
action = ph.new_send_event_action(id, instance_id, event_name, encoded_data)
860+
self._pending_actions[id] = action
861+
send_event_task = task.CompletableTask()
862+
self._pending_tasks[id] = send_event_task
863+
return send_event_task
864+
855865
def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
856866
if self._is_complete:
857867
return
@@ -1188,6 +1198,17 @@ def process_event(
11881198
self._logger.info(
11891199
f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it."
11901200
)
1201+
elif event.HasField("eventSent"):
1202+
# This history event confirms that the event was successfully sent.
1203+
# Complete the corresponding send_event task.
1204+
event_id = event.eventId
1205+
send_event_task = ctx._pending_tasks.pop(event_id, None)
1206+
if send_event_task:
1207+
# For send_event, we don't return any meaningful result, just completion
1208+
send_event_task.complete(None)
1209+
ctx.resume()
1210+
# Also remove the corresponding action from pending actions
1211+
ctx._pending_actions.pop(event_id, None)
11911212
elif event.HasField("executionSuspended"):
11921213
if not self._is_suspended and not ctx.is_replaying:
11931214
self._logger.info(f"{ctx.instance_id}: Execution suspended.")
@@ -1304,8 +1325,8 @@ def _get_method_name_for_action(action: pb.OrchestratorAction) -> str:
13041325
return task.get_name(task.OrchestrationContext.create_timer)
13051326
elif action_type == "createSubOrchestration":
13061327
return task.get_name(task.OrchestrationContext.call_sub_orchestrator)
1307-
# elif action_type == "sendEvent":
1308-
# return task.get_name(task.OrchestrationContext.send_event)
1328+
elif action_type == "sendEvent":
1329+
return task.get_name(task.OrchestrationContext.send_event)
13091330
else:
13101331
raise NotImplementedError(f"Action type '{action_type}' not supported!")
13111332

0 commit comments

Comments
 (0)