Skip to content

Commit 00b4957

Browse files
Copilotberndverst
andcommitted
Add advanced orchestration-to-orchestration communication test
Co-authored-by: berndverst <[email protected]>
1 parent cd9d951 commit 00b4957

File tree

1 file changed

+91
-0
lines changed

1 file changed

+91
-0
lines changed

tests/durabletask/test_send_event.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,3 +241,94 @@ def orchestrator_empty_instance(ctx: task.OrchestrationContext, _):
241241
assert False, "Expected failure completion action, got different action type"
242242
else:
243243
assert False, "Expected at least one action (failure completion)"
244+
245+
246+
def test_orchestration_to_orchestration_communication():
247+
"""Test advanced scenario: orchestration sends event to another waiting orchestration"""
248+
249+
# Define the waiting orchestration that waits for an approval event
250+
def waiting_orchestration(ctx: task.OrchestrationContext, _):
251+
approval_data = yield ctx.wait_for_external_event("approval")
252+
return f"Received approval: {approval_data}"
253+
254+
# Define the sender orchestration that sends an event to another orchestration
255+
def sender_orchestration(ctx: task.OrchestrationContext, target_instance_id: str):
256+
approval_payload = {"approved": True, "approver": "manager", "timestamp": "2024-01-01T10:00:00Z"}
257+
yield ctx.send_event(target_instance_id, "approval", data=approval_payload)
258+
return "Event sent successfully"
259+
260+
registry = worker._Registry()
261+
waiting_name = registry.add_orchestrator(waiting_orchestration)
262+
sender_name = registry.add_orchestrator(sender_orchestration)
263+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
264+
265+
# Instance IDs for our orchestrations
266+
waiting_instance_id = "waiting-instance-123"
267+
sender_instance_id = "sender-instance-456"
268+
269+
# Step 1: Start the waiting orchestration
270+
waiting_new_events = [
271+
helpers.new_orchestrator_started_event(),
272+
helpers.new_execution_started_event(waiting_name, waiting_instance_id, encoded_input=None),
273+
]
274+
waiting_result = executor.execute(waiting_instance_id, [], waiting_new_events)
275+
276+
# The waiting orchestration should produce no actions when waiting for an external event
277+
assert len(waiting_result.actions) == 0
278+
279+
# Step 2: Start the sender orchestration with the waiting instance ID as input
280+
sender_new_events = [
281+
helpers.new_orchestrator_started_event(),
282+
helpers.new_execution_started_event(sender_name, sender_instance_id,
283+
encoded_input=json.dumps(waiting_instance_id)),
284+
]
285+
sender_result = executor.execute(sender_instance_id, [], sender_new_events)
286+
287+
# The sender orchestration should yield a send_event action
288+
assert len(sender_result.actions) == 1
289+
send_action = sender_result.actions[0]
290+
assert send_action.WhichOneof("orchestratorActionType") == "sendEvent"
291+
assert send_action.sendEvent.instance.instanceId == waiting_instance_id
292+
assert send_action.sendEvent.name == "approval"
293+
294+
# Verify the data payload is correct
295+
expected_payload = {"approved": True, "approver": "manager", "timestamp": "2024-01-01T10:00:00Z"}
296+
assert send_action.sendEvent.data.value == json.dumps(expected_payload)
297+
298+
# Step 3: Complete the send_event action
299+
event_sent = helpers.new_event_sent_event(waiting_instance_id, "approval",
300+
json.dumps(expected_payload))
301+
event_sent.eventId = send_action.id
302+
303+
sender_old_events = [
304+
helpers.new_orchestrator_started_event(),
305+
helpers.new_execution_started_event(sender_name, sender_instance_id,
306+
encoded_input=json.dumps(waiting_instance_id))
307+
]
308+
sender_completion_result = executor.execute(sender_instance_id, sender_old_events, [event_sent])
309+
310+
# The sender should complete successfully
311+
assert len(sender_completion_result.actions) == 1
312+
sender_complete_action = sender_completion_result.actions[0]
313+
assert sender_complete_action.WhichOneof("orchestratorActionType") == "completeOrchestration"
314+
assert sender_complete_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
315+
assert sender_complete_action.completeOrchestration.result.value == json.dumps("Event sent successfully")
316+
317+
# Step 4: Simulate the event being raised to the waiting orchestration
318+
event_raised = helpers.new_event_raised_event("approval", json.dumps(expected_payload))
319+
320+
waiting_old_events = [
321+
helpers.new_orchestrator_started_event(),
322+
helpers.new_execution_started_event(waiting_name, waiting_instance_id, encoded_input=None)
323+
]
324+
waiting_completion_result = executor.execute(waiting_instance_id, waiting_old_events, [event_raised])
325+
326+
# The waiting orchestration should complete with the received data
327+
assert len(waiting_completion_result.actions) == 1
328+
waiting_complete_action = waiting_completion_result.actions[0]
329+
assert waiting_complete_action.WhichOneof("orchestratorActionType") == "completeOrchestration"
330+
assert waiting_complete_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
331+
332+
# Verify the data was passed correctly through the event
333+
expected_result = f"Received approval: {expected_payload}"
334+
assert waiting_complete_action.completeOrchestration.result.value == json.dumps(expected_result)

0 commit comments

Comments
 (0)