Skip to content

Commit 22d1459

Browse files
Copilotberndverst
andcommitted
Add comprehensive tests for send_event functionality
Co-authored-by: berndverst <[email protected]>
1 parent c2684fd commit 22d1459

File tree

1 file changed

+206
-0
lines changed

1 file changed

+206
-0
lines changed
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
import json
5+
import logging
6+
7+
import durabletask.internal.helpers as helpers
8+
import durabletask.internal.orchestrator_service_pb2 as pb
9+
from durabletask import task, worker
10+
11+
logging.basicConfig(
12+
format='%(asctime)s.%(msecs)03d %(name)s %(levelname)s: %(message)s',
13+
datefmt='%Y-%m-%d %H:%M:%S',
14+
level=logging.DEBUG)
15+
TEST_LOGGER = logging.getLogger("tests")
16+
17+
TEST_INSTANCE_ID = "abc123"
18+
19+
20+
def test_send_event_action():
21+
"""Test that send_event creates the correct action"""
22+
23+
def orchestrator(ctx: task.OrchestrationContext, _):
24+
yield ctx.send_event("target_instance", "my_event", data="test_data")
25+
return "completed"
26+
27+
registry = worker._Registry()
28+
name = registry.add_orchestrator(orchestrator)
29+
30+
new_events = [
31+
helpers.new_orchestrator_started_event(),
32+
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None),
33+
]
34+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
35+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
36+
actions = result.actions
37+
38+
# Should have one action for send_event
39+
assert len(actions) == 1
40+
assert type(actions[0]) is pb.OrchestratorAction
41+
42+
action = actions[0]
43+
assert action.WhichOneof("orchestratorActionType") == "sendEvent"
44+
assert action.id == 1
45+
46+
send_action = action.sendEvent
47+
assert send_action.instance.instanceId == "target_instance"
48+
assert send_action.name == "my_event"
49+
assert send_action.data.value == json.dumps("test_data")
50+
51+
52+
def test_send_event_completion():
53+
"""Test that send_event can complete successfully"""
54+
55+
def orchestrator(ctx: task.OrchestrationContext, _):
56+
result = yield ctx.send_event("target_instance", "my_event", data="test_data")
57+
return result
58+
59+
registry = worker._Registry()
60+
name = registry.add_orchestrator(orchestrator)
61+
62+
# First execution - should schedule the send_event
63+
old_events = []
64+
new_events = [
65+
helpers.new_orchestrator_started_event(),
66+
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None)
67+
]
68+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
69+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
70+
actions = result.actions
71+
72+
assert len(actions) == 1
73+
action = actions[0]
74+
assert action.WhichOneof("orchestratorActionType") == "sendEvent"
75+
76+
# Second execution - simulate event sent completion
77+
# The eventSent needs to have the same eventId as the action
78+
event_sent = helpers.new_event_sent_event("target_instance", "my_event", json.dumps("test_data"))
79+
event_sent.eventId = action.id # This is the key - the event ID must match the action ID
80+
81+
old_events = [
82+
helpers.new_orchestrator_started_event(),
83+
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None)
84+
]
85+
new_events = [event_sent]
86+
87+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
88+
actions = result.actions
89+
90+
# Should have completion action
91+
assert len(actions) == 1
92+
complete_action = actions[0]
93+
assert complete_action.WhichOneof("orchestratorActionType") == "completeOrchestration"
94+
assert complete_action.completeOrchestration.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
95+
96+
97+
def test_send_event_with_no_data():
98+
"""Test send_event with no data parameter"""
99+
100+
def orchestrator(ctx: task.OrchestrationContext, _):
101+
yield ctx.send_event("target_instance", "my_event")
102+
return "completed"
103+
104+
registry = worker._Registry()
105+
name = registry.add_orchestrator(orchestrator)
106+
107+
new_events = [
108+
helpers.new_orchestrator_started_event(),
109+
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None),
110+
]
111+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
112+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
113+
actions = result.actions
114+
115+
# Should have one action for send_event
116+
assert len(actions) == 1
117+
action = actions[0]
118+
send_action = action.sendEvent
119+
assert send_action.instance.instanceId == "target_instance"
120+
assert send_action.name == "my_event"
121+
# data should be None/empty when no data is provided
122+
assert not send_action.HasField("data") or send_action.data.value == ""
123+
124+
125+
def test_send_event_multiple():
126+
"""Test sending multiple events in sequence"""
127+
128+
def orchestrator(ctx: task.OrchestrationContext, _):
129+
yield ctx.send_event("target1", "event1", data="data1")
130+
yield ctx.send_event("target2", "event2", data="data2")
131+
return "completed"
132+
133+
registry = worker._Registry()
134+
name = registry.add_orchestrator(orchestrator)
135+
136+
new_events = [
137+
helpers.new_orchestrator_started_event(),
138+
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None),
139+
]
140+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
141+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
142+
actions = result.actions
143+
144+
# Should have one action for the first send_event
145+
assert len(actions) == 1
146+
action = actions[0]
147+
assert action.WhichOneof("orchestratorActionType") == "sendEvent"
148+
assert action.sendEvent.instance.instanceId == "target1"
149+
assert action.sendEvent.name == "event1"
150+
assert action.sendEvent.data.value == json.dumps("data1")
151+
152+
# Complete the first send_event and continue
153+
event_sent = helpers.new_event_sent_event("target1", "event1", json.dumps("data1"))
154+
event_sent.eventId = action.id
155+
156+
old_events = [
157+
helpers.new_orchestrator_started_event(),
158+
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None)
159+
]
160+
new_events = [event_sent]
161+
162+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
163+
actions = result.actions
164+
165+
# Should have one action for the second send_event
166+
assert len(actions) == 1
167+
action = actions[0]
168+
assert action.WhichOneof("orchestratorActionType") == "sendEvent"
169+
assert action.sendEvent.instance.instanceId == "target2"
170+
assert action.sendEvent.name == "event2"
171+
assert action.sendEvent.data.value == json.dumps("data2")
172+
173+
174+
def test_send_event_with_various_data_types():
175+
"""Test send_event with different data types"""
176+
177+
def orchestrator(ctx: task.OrchestrationContext, _):
178+
# Test with dict
179+
yield ctx.send_event("target1", "event1", data={"key": "value", "number": 42})
180+
# Test with list
181+
yield ctx.send_event("target2", "event2", data=[1, 2, 3])
182+
# Test with number
183+
yield ctx.send_event("target3", "event3", data=123)
184+
# Test with boolean
185+
yield ctx.send_event("target4", "event4", data=True)
186+
return "completed"
187+
188+
registry = worker._Registry()
189+
name = registry.add_orchestrator(orchestrator)
190+
191+
new_events = [
192+
helpers.new_orchestrator_started_event(),
193+
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None),
194+
]
195+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
196+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
197+
actions = result.actions
198+
199+
# Should have one action for the first send_event
200+
assert len(actions) == 1
201+
action = actions[0]
202+
assert action.WhichOneof("orchestratorActionType") == "sendEvent"
203+
assert action.sendEvent.instance.instanceId == "target1"
204+
assert action.sendEvent.name == "event1"
205+
expected_data = json.dumps({"key": "value", "number": 42})
206+
assert action.sendEvent.data.value == expected_data

0 commit comments

Comments
 (0)