Skip to content

Commit 7f228d8

Browse files
authored
Test workflow outbound nexus interception (#1064)
1 parent 229fe37 commit 7f228d8

File tree

1 file changed

+32
-2
lines changed

1 file changed

+32
-2
lines changed

tests/worker/test_interceptor.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from temporalio import activity, workflow
77
from temporalio.client import Client, WorkflowUpdateFailedError
8-
from temporalio.exceptions import ApplicationError
8+
from temporalio.exceptions import ApplicationError, NexusOperationError
99
from temporalio.testing import WorkflowEnvironment
1010
from temporalio.worker import (
1111
ActivityInboundInterceptor,
@@ -27,6 +27,8 @@
2727
WorkflowInterceptorClassInput,
2828
WorkflowOutboundInterceptor,
2929
)
30+
from temporalio.worker._interceptor import StartNexusOperationInput
31+
from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name
3032

3133
# Passing through because Python 3.9 has an import bug at
3234
# https://github.com/python/cpython/issues/91351
@@ -127,6 +129,12 @@ def start_local_activity(
127129
interceptor_traces.append(("workflow.start_local_activity", input))
128130
return super().start_local_activity(input)
129131

132+
async def start_nexus_operation(
133+
self, input: StartNexusOperationInput
134+
) -> workflow.NexusOperationHandle:
135+
interceptor_traces.append(("workflow.start_nexus_operation", input))
136+
return await super().start_nexus_operation(input)
137+
130138

131139
@activity.defn
132140
async def intercepted_activity(param: str) -> str:
@@ -169,6 +177,24 @@ async def run(self, style: str) -> None:
169177
)
170178
await child_handle
171179

180+
nexus_client = workflow.create_nexus_client(
181+
endpoint=make_nexus_endpoint_name(workflow.info().task_queue),
182+
service="non-existent-nexus-service",
183+
)
184+
try:
185+
await nexus_client.start_operation(
186+
operation="non-existent-nexus-operation",
187+
input={"test": "data"},
188+
schedule_to_close_timeout=timedelta(microseconds=1),
189+
)
190+
raise Exception("unreachable")
191+
except NexusOperationError:
192+
# The test requires only that the workflow attempts to schedule the nexus operation.
193+
# Instead of setting up a nexus service, we deliberately schedule a call to a
194+
# non-existent nexus operation with an insufficiently long timeout, and expect this
195+
# error.
196+
pass
197+
172198
await self.finish.wait()
173199
workflow.continue_as_new("continue-as-new")
174200

@@ -200,7 +226,9 @@ async def test_worker_interceptor(client: Client, env: WorkflowEnvironment):
200226
pytest.skip(
201227
"Java test server: https://github.com/temporalio/sdk-java/issues/1424"
202228
)
203-
task_queue = f"task_queue_{uuid.uuid4()}"
229+
task_queue = f"task-queue-{uuid.uuid4()}"
230+
await create_nexus_endpoint(task_queue, client)
231+
204232
async with Worker(
205233
client,
206234
task_queue=task_queue,
@@ -276,6 +304,8 @@ def pop_trace(name: str, filter: Optional[Callable[[Any], bool]] = None) -> Any:
276304
"workflow.signal_external_workflow",
277305
lambda v: v.args[0] == "external-signal-val",
278306
)
307+
assert pop_trace("workflow.info")
308+
assert pop_trace("workflow.start_nexus_operation")
279309
assert pop_trace(
280310
"workflow.signal", lambda v: v.args[0] == "external-signal-val"
281311
)

0 commit comments

Comments
 (0)