|
5 | 5 |
|
6 | 6 | from temporalio import activity, workflow |
7 | 7 | from temporalio.client import Client, WorkflowUpdateFailedError |
8 | | -from temporalio.exceptions import ApplicationError |
| 8 | +from temporalio.exceptions import ApplicationError, NexusOperationError |
9 | 9 | from temporalio.testing import WorkflowEnvironment |
10 | 10 | from temporalio.worker import ( |
11 | 11 | ActivityInboundInterceptor, |
|
27 | 27 | WorkflowInterceptorClassInput, |
28 | 28 | WorkflowOutboundInterceptor, |
29 | 29 | ) |
| 30 | +from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name |
30 | 31 |
|
31 | 32 | # Passing through because Python 3.9 has an import bug at |
32 | 33 | # https://github.com/python/cpython/issues/91351 |
@@ -169,6 +170,24 @@ async def run(self, style: str) -> None: |
169 | 170 | ) |
170 | 171 | await child_handle |
171 | 172 |
|
| 173 | + nexus_client = workflow.create_nexus_client( |
| 174 | + endpoint=make_nexus_endpoint_name(workflow.info().task_queue), |
| 175 | + service="non-existent-nexus-service", |
| 176 | + ) |
| 177 | + try: |
| 178 | + await nexus_client.start_operation( |
| 179 | + operation="non-existent-nexus-operation", |
| 180 | + input={"test": "data"}, |
| 181 | + schedule_to_close_timeout=timedelta(microseconds=1), |
| 182 | + ) |
| 183 | + raise Exception("unreachable") |
| 184 | + except NexusOperationError: |
| 185 | + # The test requires only that the workflow attempts to schedule the nexus operation. |
| 186 | + # Instead of setting up a nexus service, we deliberately schedule a call to a |
| 187 | + # non-existent nexus operation with an insufficiently long timeout, and expect this |
| 188 | + # error. |
| 189 | + pass |
| 190 | + |
172 | 191 | await self.finish.wait() |
173 | 192 | workflow.continue_as_new("continue-as-new") |
174 | 193 |
|
@@ -200,7 +219,9 @@ async def test_worker_interceptor(client: Client, env: WorkflowEnvironment): |
200 | 219 | pytest.skip( |
201 | 220 | "Java test server: https://github.com/temporalio/sdk-java/issues/1424" |
202 | 221 | ) |
203 | | - task_queue = f"task_queue_{uuid.uuid4()}" |
| 222 | + task_queue = f"task-queue-{uuid.uuid4()}" |
| 223 | + await create_nexus_endpoint(task_queue, client) |
| 224 | + |
204 | 225 | async with Worker( |
205 | 226 | client, |
206 | 227 | task_queue=task_queue, |
@@ -276,6 +297,8 @@ def pop_trace(name: str, filter: Optional[Callable[[Any], bool]] = None) -> Any: |
276 | 297 | "workflow.signal_external_workflow", |
277 | 298 | lambda v: v.args[0] == "external-signal-val", |
278 | 299 | ) |
| 300 | + assert pop_trace("workflow.info") |
| 301 | + assert pop_trace("workflow.start_nexus_operation") |
279 | 302 | assert pop_trace( |
280 | 303 | "workflow.signal", lambda v: v.args[0] == "external-signal-val" |
281 | 304 | ) |
|
0 commit comments