Skip to content

Commit 7a64939

Browse files
committed
AI
1 parent 189effe commit 7a64939

File tree

2 files changed

+46
-15
lines changed

2 files changed

+46
-15
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ pub struct TunerHolder {
147147
workflow_slot_supplier: SlotSupplier,
148148
activity_slot_supplier: SlotSupplier,
149149
local_activity_slot_supplier: SlotSupplier,
150+
nexus_slot_supplier: SlotSupplier,
150151
}
151152

152153
#[derive(FromPyObject)]
@@ -745,10 +746,17 @@ fn convert_tuner_holder(
745746
} else {
746747
None
747748
};
749+
let maybe_nexus_resource_opts =
750+
if let SlotSupplier::ResourceBased(ref ss) = holder.nexus_slot_supplier {
751+
Some(&ss.tuner_config)
752+
} else {
753+
None
754+
};
748755
let all_resource_opts = [
749756
maybe_wf_resource_opts,
750757
maybe_act_resource_opts,
751758
maybe_local_act_resource_opts,
759+
maybe_nexus_resource_opts,
752760
];
753761
let mut set_resource_opts = all_resource_opts.iter().flatten();
754762
let first = set_resource_opts.next();
@@ -784,6 +792,10 @@ fn convert_tuner_holder(
784792
)?)
785793
.local_activity_slot_options(convert_slot_supplier(
786794
holder.local_activity_slot_supplier,
795+
task_locals.clone(),
796+
)?)
797+
.nexus_slot_options(convert_slot_supplier(
798+
holder.nexus_slot_supplier,
787799
task_locals,
788800
)?);
789801
Ok(options

tests/worker/test_worker.py

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@
5757
new_worker,
5858
worker_versioning_enabled,
5959
)
60-
from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name
60+
from tests.helpers.nexus import (
61+
ServiceClient,
62+
create_nexus_endpoint,
63+
dataclass_as_dict,
64+
)
6165

6266
# Passing through because Python 3.9 has an import bug at
6367
# https://github.com/python/cpython/issues/91351
@@ -406,13 +410,21 @@ async def test_warns_when_workers_too_low(client: Client, env: WorkflowEnvironme
406410
pass
407411

408412

413+
from dataclasses import dataclass
414+
415+
416+
@dataclass
417+
class NexusInput:
418+
name: str
419+
420+
409421
@nexusrpc.handler.service_handler
410422
class SayHelloService:
411423
@nexusrpc.handler.sync_operation
412424
async def say_hello(
413-
self, _ctx: nexusrpc.handler.StartOperationContext, name: str
425+
self, _ctx: nexusrpc.handler.StartOperationContext, input: NexusInput
414426
) -> str:
415-
return f"Hello, {name}!"
427+
return f"Hello, {input.name}!"
416428

417429

418430
@workflow.defn
@@ -429,14 +441,6 @@ async def run(self) -> None:
429441
versioning_intent=VersioningIntent.DEFAULT,
430442
start_to_close_timeout=timedelta(seconds=5),
431443
)
432-
nexus_client = workflow.create_nexus_client(
433-
endpoint=make_nexus_endpoint_name(workflow.info().task_queue),
434-
service=SayHelloService,
435-
)
436-
await nexus_client.execute_operation(
437-
SayHelloService.say_hello,
438-
"hi",
439-
)
440444

441445
@workflow.signal
442446
def my_signal(self, value: str) -> None:
@@ -524,7 +528,9 @@ def reserve_asserts(self, ctx: SlotReserveContext) -> None:
524528
tuner=tuner,
525529
identity="myworker",
526530
) as w:
527-
await create_nexus_endpoint(w.task_queue, client)
531+
endpoint_resp = await create_nexus_endpoint(w.task_queue, client)
532+
533+
# Start workflow and let it run through activity
528534
wf1 = await client.start_workflow(
529535
CustomSlotSupplierWorkflow.run,
530536
id=f"custom-slot-supplier-{uuid.uuid4()}",
@@ -533,16 +539,29 @@ def reserve_asserts(self, ctx: SlotReserveContext) -> None:
533539
await wf1.signal(CustomSlotSupplierWorkflow.my_signal, "finish")
534540
await wf1.result()
535541

542+
# Now make a direct nexus call to trigger nexus slot usage
543+
service_client = ServiceClient(
544+
server_address=ServiceClient.default_server_address(env),
545+
endpoint=endpoint_resp.endpoint.id,
546+
service=SayHelloService.__name__,
547+
)
548+
nexus_response = await service_client.start_operation(
549+
"say_hello",
550+
dataclass_as_dict(NexusInput(name="test")),
551+
)
552+
assert nexus_response.is_success
553+
536554
# We can't use reserve number directly because there is a technically possible race
537555
# where the python reserve function appears to complete, but Rust doesn't see that.
538556
# This isn't solvable without redoing a chunk of pyo3-asyncio. So we only check
539557
# that the permits passed to release line up.
540558
assert ss.highest_seen_reserve_on_release == ss.releases
541-
# Two workflow tasks, one activity
559+
# Two workflow tasks that use slots, one activity, one nexus task
560+
# (The first WFT might be cached/sticky and not need a new slot)
542561
assert ss.used == 4
543562
assert ss.seen_sticky_kinds == {True, False}
544-
assert ss.seen_slot_kinds == {"workflow", "activity", "local-activity"}
545-
assert ss.seen_used_slot_kinds == {"wf", "a"}
563+
assert ss.seen_slot_kinds == {"workflow", "activity", "local-activity", "nexus"}
564+
assert ss.seen_used_slot_kinds == {"wf", "a", "nx"}
546565
assert ss.seen_release_info_empty
547566
assert ss.seen_release_info_nonempty
548567

0 commit comments

Comments
 (0)