5757 new_worker ,
5858 worker_versioning_enabled ,
5959)
60- from tests .helpers .nexus import (
61- ServiceClient ,
62- create_nexus_endpoint ,
63- dataclass_as_dict ,
64- )
60+ from tests .helpers .nexus import create_nexus_endpoint , make_nexus_endpoint_name
6561
6662# Passing through because Python 3.9 has an import bug at
6763# https://github.com/python/cpython/issues/91351
@@ -410,21 +406,13 @@ async def test_warns_when_workers_too_low(client: Client, env: WorkflowEnvironme
410406 pass
411407
412408
413- from dataclasses import dataclass
414-
415-
416- @dataclass
417- class NexusInput :
418- name : str
419-
420-
421409@nexusrpc .handler .service_handler
422410class SayHelloService :
423411 @nexusrpc .handler .sync_operation
424412 async def say_hello (
425- self , _ctx : nexusrpc .handler .StartOperationContext , input : NexusInput
413+ self , _ctx : nexusrpc .handler .StartOperationContext , name : str
426414 ) -> str :
427- return f"Hello, { input . name } !"
415+ return f"Hello, { name } !"
428416
429417
430418@workflow .defn
@@ -441,6 +429,14 @@ async def run(self) -> None:
441429 versioning_intent = VersioningIntent .DEFAULT ,
442430 start_to_close_timeout = timedelta (seconds = 5 ),
443431 )
432+ nexus_client = workflow .create_nexus_client (
433+ endpoint = make_nexus_endpoint_name (workflow .info ().task_queue ),
434+ service = SayHelloService ,
435+ )
436+ await nexus_client .execute_operation (
437+ SayHelloService .say_hello ,
438+ "hi" ,
439+ )
444440
445441 @workflow .signal
446442 def my_signal (self , value : str ) -> None :
@@ -528,9 +524,7 @@ def reserve_asserts(self, ctx: SlotReserveContext) -> None:
528524 tuner = tuner ,
529525 identity = "myworker" ,
530526 ) as w :
531- endpoint_resp = await create_nexus_endpoint (w .task_queue , client )
532-
533- # Start workflow and let it run through activity
527+ await create_nexus_endpoint (w .task_queue , client )
534528 wf1 = await client .start_workflow (
535529 CustomSlotSupplierWorkflow .run ,
536530 id = f"custom-slot-supplier-{ uuid .uuid4 ()} " ,
@@ -539,26 +533,14 @@ def reserve_asserts(self, ctx: SlotReserveContext) -> None:
539533 await wf1 .signal (CustomSlotSupplierWorkflow .my_signal , "finish" )
540534 await wf1 .result ()
541535
542- # Now make a direct nexus call to trigger nexus slot usage
543- service_client = ServiceClient (
544- server_address = ServiceClient .default_server_address (env ),
545- endpoint = endpoint_resp .endpoint .id ,
546- service = SayHelloService .__name__ ,
547- )
548- nexus_response = await service_client .start_operation (
549- "say_hello" ,
550- dataclass_as_dict (NexusInput (name = "test" )),
551- )
552- assert nexus_response .is_success
553-
554536 # We can't use reserve number directly because there is a technically possible race
555537 # where the python reserve function appears to complete, but Rust doesn't see that.
556538 # This isn't solvable without redoing a chunk of pyo3-asyncio. So we only check
557539 # that the permits passed to release line up.
558540 assert ss .highest_seen_reserve_on_release == ss .releases
559541 # Two workflow tasks that use slots, one activity, one nexus task
560542 # (The first WFT might be cached/sticky and not need a new slot)
561- assert ss .used == 4
543+ assert ss .used == 5
562544 assert ss .seen_sticky_kinds == {True , False }
563545 assert ss .seen_slot_kinds == {"workflow" , "activity" , "local-activity" , "nexus" }
564546 assert ss .seen_used_slot_kinds == {"wf" , "a" , "nx" }
0 commit comments