Skip to content

Commit 49e7dcb

Browse files
committed
Tests
1 parent b4c5d23 commit 49e7dcb

File tree

1 file changed

+149
-4
lines changed

1 file changed

+149
-4
lines changed

tests/worker/test_worker.py

Lines changed: 149 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
SlotReleaseContext,
4343
SlotReserveContext,
4444
Worker,
45-
WorkerConfig,
4645
WorkerDeploymentConfig,
4746
WorkerDeploymentVersion,
4847
WorkerTuner,
@@ -317,6 +316,7 @@ async def test_can_run_composite_tuner_worker(client: Client, env: WorkflowEnvir
317316
),
318317
resource_based_options,
319318
),
319+
nexus_supplier=FixedSizeSlotSupplier(10),
320320
)
321321
async with new_worker(
322322
client,
@@ -439,7 +439,10 @@ def reserve_asserts(self, ctx: SlotReserveContext) -> None:
439439
ss = MySlotSupplier()
440440

441441
tuner = WorkerTuner.create_composite(
442-
workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss
442+
workflow_supplier=ss,
443+
activity_supplier=ss,
444+
local_activity_supplier=ss,
445+
nexus_supplier=ss,
443446
)
444447
async with new_worker(
445448
client,
@@ -501,7 +504,10 @@ def release_slot(self, ctx: SlotReleaseContext) -> None:
501504
ss = ThrowingSlotSupplier()
502505

503506
tuner = WorkerTuner.create_composite(
504-
workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss
507+
workflow_supplier=ss,
508+
activity_supplier=ss,
509+
local_activity_supplier=ss,
510+
nexus_supplier=ss,
505511
)
506512
async with new_worker(
507513
client,
@@ -537,7 +543,10 @@ def release_slot(self, ctx: SlotReleaseContext) -> None:
537543
ss = BlockingSlotSupplier()
538544

539545
tuner = WorkerTuner.create_composite(
540-
workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss
546+
workflow_supplier=ss,
547+
activity_supplier=ss,
548+
local_activity_supplier=ss,
549+
nexus_supplier=ss,
541550
)
542551
async with new_worker(
543552
client,
@@ -1189,3 +1198,139 @@ def shutdown(self) -> None:
11891198
if self.next_exception_task:
11901199
self.next_exception_task.cancel()
11911200
setattr(self.worker._bridge_worker, self.attr, self.orig_poll_call)
1201+
1202+
1203+
async def test_nexus_slot_supplier_integration(
1204+
client: Client, env: WorkflowEnvironment
1205+
):
1206+
"""Test that nexus operations can be properly tuned with slot suppliers."""
1207+
1208+
class NexusAwareSlotSupplier(CustomSlotSupplier):
1209+
seen_nexus_slots = 0
1210+
seen_workflow_slots = 0
1211+
seen_activity_slots = 0
1212+
seen_local_activity_slots = 0
1213+
1214+
async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
1215+
if ctx.slot_type == "nexus":
1216+
self.seen_nexus_slots += 1
1217+
elif ctx.slot_type == "workflow":
1218+
self.seen_workflow_slots += 1
1219+
elif ctx.slot_type == "activity":
1220+
self.seen_activity_slots += 1
1221+
elif ctx.slot_type == "local-activity":
1222+
self.seen_local_activity_slots += 1
1223+
return SlotPermit()
1224+
1225+
def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
1226+
return None
1227+
1228+
def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
1229+
pass
1230+
1231+
def release_slot(self, ctx: SlotReleaseContext) -> None:
1232+
pass
1233+
1234+
ss = NexusAwareSlotSupplier()
1235+
1236+
# Create a tuner that includes nexus slot supplier
1237+
tuner = WorkerTuner.create_composite(
1238+
workflow_supplier=ss,
1239+
activity_supplier=ss,
1240+
local_activity_supplier=ss,
1241+
nexus_supplier=ss, # This would fail without your changes
1242+
)
1243+
1244+
# Test that the tuner can be converted to bridge format without errors
1245+
bridge_tuner = tuner._to_bridge_tuner()
1246+
1247+
# Verify that all slot types are properly handled
1248+
assert hasattr(bridge_tuner, "nexus_slot_supplier")
1249+
assert hasattr(bridge_tuner, "workflow_slot_supplier")
1250+
assert hasattr(bridge_tuner, "activity_slot_supplier")
1251+
assert hasattr(bridge_tuner, "local_activity_slot_supplier")
1252+
1253+
# Test that the tuner can be used to create a worker
1254+
async with new_worker(
1255+
client,
1256+
WaitOnSignalWorkflow,
1257+
activities=[say_hello],
1258+
tuner=tuner,
1259+
) as w:
1260+
wf1 = await client.start_workflow(
1261+
WaitOnSignalWorkflow.run,
1262+
id=f"nexus-slot-supplier-{uuid.uuid4()}",
1263+
task_queue=w.task_queue,
1264+
)
1265+
await wf1.signal(WaitOnSignalWorkflow.my_signal, "finish")
1266+
await wf1.result()
1267+
1268+
# Verify that we saw the expected slot types
1269+
assert ss.seen_workflow_slots > 0, "Should have seen workflow slots"
1270+
assert ss.seen_activity_slots > 0, "Should have seen activity slots"
1271+
# Note: We don't assert nexus_slots > 0 because this test doesn't actually
1272+
# trigger nexus operations, but the important thing is that the infrastructure
1273+
# is in place to handle them when they do occur.
1274+
1275+
1276+
async def test_nexus_slot_supplier_defaults(client: Client, env: WorkflowEnvironment):
1277+
"""Test that nexus slot suppliers get proper defaults in resource-based tuners."""
1278+
1279+
tuner = WorkerTuner.create_resource_based(
1280+
target_memory_usage=0.5,
1281+
target_cpu_usage=0.5,
1282+
# Don't specify nexus_config to test defaults
1283+
)
1284+
1285+
# Test that the tuner can be converted to bridge format without errors
1286+
bridge_tuner = tuner._to_bridge_tuner()
1287+
1288+
# Verify that nexus slot supplier is present and properly configured
1289+
assert hasattr(bridge_tuner, "nexus_slot_supplier")
1290+
1291+
# Test that the tuner can be used to create a worker
1292+
async with new_worker(
1293+
client,
1294+
WaitOnSignalWorkflow,
1295+
activities=[say_hello],
1296+
tuner=tuner,
1297+
) as w:
1298+
wf1 = await client.start_workflow(
1299+
WaitOnSignalWorkflow.run,
1300+
id=f"nexus-defaults-{uuid.uuid4()}",
1301+
task_queue=w.task_queue,
1302+
)
1303+
await wf1.signal(WaitOnSignalWorkflow.my_signal, "finish")
1304+
await wf1.result()
1305+
1306+
1307+
async def test_nexus_slot_supplier_fixed_size(client: Client, env: WorkflowEnvironment):
1308+
"""Test that nexus slot suppliers work with fixed-size tuners."""
1309+
1310+
tuner = WorkerTuner.create_fixed(
1311+
workflow_slots=10,
1312+
activity_slots=20,
1313+
local_activity_slots=5,
1314+
nexus_slots=15, # This would fail without your changes
1315+
)
1316+
1317+
# Test that the tuner can be converted to bridge format without errors
1318+
bridge_tuner = tuner._to_bridge_tuner()
1319+
1320+
# Verify that nexus slot supplier is present
1321+
assert hasattr(bridge_tuner, "nexus_slot_supplier")
1322+
1323+
# Test that the tuner can be used to create a worker
1324+
async with new_worker(
1325+
client,
1326+
WaitOnSignalWorkflow,
1327+
activities=[say_hello],
1328+
tuner=tuner,
1329+
) as w:
1330+
wf1 = await client.start_workflow(
1331+
WaitOnSignalWorkflow.run,
1332+
id=f"nexus-fixed-{uuid.uuid4()}",
1333+
task_queue=w.task_queue,
1334+
)
1335+
await wf1.signal(WaitOnSignalWorkflow.my_signal, "finish")
1336+
await wf1.result()

0 commit comments

Comments
 (0)