@@ -1198,139 +1198,3 @@ def shutdown(self) -> None:
11981198 if self .next_exception_task :
11991199 self .next_exception_task .cancel ()
12001200 setattr (self .worker ._bridge_worker , self .attr , self .orig_poll_call )
1201-
1202-
1203- async def test_nexus_slot_supplier_integration (
1204- client : Client , env : WorkflowEnvironment
1205- ):
1206- """Test that nexus operations can be properly tuned with slot suppliers."""
1207-
1208- class NexusAwareSlotSupplier (CustomSlotSupplier ):
1209- seen_nexus_slots = 0
1210- seen_workflow_slots = 0
1211- seen_activity_slots = 0
1212- seen_local_activity_slots = 0
1213-
1214- async def reserve_slot (self , ctx : SlotReserveContext ) -> SlotPermit :
1215- if ctx .slot_type == "nexus" :
1216- self .seen_nexus_slots += 1
1217- elif ctx .slot_type == "workflow" :
1218- self .seen_workflow_slots += 1
1219- elif ctx .slot_type == "activity" :
1220- self .seen_activity_slots += 1
1221- elif ctx .slot_type == "local-activity" :
1222- self .seen_local_activity_slots += 1
1223- return SlotPermit ()
1224-
1225- def try_reserve_slot (self , ctx : SlotReserveContext ) -> Optional [SlotPermit ]:
1226- return None
1227-
1228- def mark_slot_used (self , ctx : SlotMarkUsedContext ) -> None :
1229- pass
1230-
1231- def release_slot (self , ctx : SlotReleaseContext ) -> None :
1232- pass
1233-
1234- ss = NexusAwareSlotSupplier ()
1235-
1236- # Create a tuner that includes nexus slot supplier
1237- tuner = WorkerTuner .create_composite (
1238- workflow_supplier = ss ,
1239- activity_supplier = ss ,
1240- local_activity_supplier = ss ,
1241- nexus_supplier = ss , # This would fail without your changes
1242- )
1243-
1244- # Test that the tuner can be converted to bridge format without errors
1245- bridge_tuner = tuner ._to_bridge_tuner ()
1246-
1247- # Verify that all slot types are properly handled
1248- assert hasattr (bridge_tuner , "nexus_slot_supplier" )
1249- assert hasattr (bridge_tuner , "workflow_slot_supplier" )
1250- assert hasattr (bridge_tuner , "activity_slot_supplier" )
1251- assert hasattr (bridge_tuner , "local_activity_slot_supplier" )
1252-
1253- # Test that the tuner can be used to create a worker
1254- async with new_worker (
1255- client ,
1256- WaitOnSignalWorkflow ,
1257- activities = [say_hello ],
1258- tuner = tuner ,
1259- ) as w :
1260- wf1 = await client .start_workflow (
1261- WaitOnSignalWorkflow .run ,
1262- id = f"nexus-slot-supplier-{ uuid .uuid4 ()} " ,
1263- task_queue = w .task_queue ,
1264- )
1265- await wf1 .signal (WaitOnSignalWorkflow .my_signal , "finish" )
1266- await wf1 .result ()
1267-
1268- # Verify that we saw the expected slot types
1269- assert ss .seen_workflow_slots > 0 , "Should have seen workflow slots"
1270- assert ss .seen_activity_slots > 0 , "Should have seen activity slots"
1271- # Note: We don't assert nexus_slots > 0 because this test doesn't actually
1272- # trigger nexus operations, but the important thing is that the infrastructure
1273- # is in place to handle them when they do occur.
1274-
1275-
1276- async def test_nexus_slot_supplier_defaults (client : Client , env : WorkflowEnvironment ):
1277- """Test that nexus slot suppliers get proper defaults in resource-based tuners."""
1278-
1279- tuner = WorkerTuner .create_resource_based (
1280- target_memory_usage = 0.5 ,
1281- target_cpu_usage = 0.5 ,
1282- # Don't specify nexus_config to test defaults
1283- )
1284-
1285- # Test that the tuner can be converted to bridge format without errors
1286- bridge_tuner = tuner ._to_bridge_tuner ()
1287-
1288- # Verify that nexus slot supplier is present and properly configured
1289- assert hasattr (bridge_tuner , "nexus_slot_supplier" )
1290-
1291- # Test that the tuner can be used to create a worker
1292- async with new_worker (
1293- client ,
1294- WaitOnSignalWorkflow ,
1295- activities = [say_hello ],
1296- tuner = tuner ,
1297- ) as w :
1298- wf1 = await client .start_workflow (
1299- WaitOnSignalWorkflow .run ,
1300- id = f"nexus-defaults-{ uuid .uuid4 ()} " ,
1301- task_queue = w .task_queue ,
1302- )
1303- await wf1 .signal (WaitOnSignalWorkflow .my_signal , "finish" )
1304- await wf1 .result ()
1305-
1306-
1307- async def test_nexus_slot_supplier_fixed_size (client : Client , env : WorkflowEnvironment ):
1308- """Test that nexus slot suppliers work with fixed-size tuners."""
1309-
1310- tuner = WorkerTuner .create_fixed (
1311- workflow_slots = 10 ,
1312- activity_slots = 20 ,
1313- local_activity_slots = 5 ,
1314- nexus_slots = 15 , # This would fail without your changes
1315- )
1316-
1317- # Test that the tuner can be converted to bridge format without errors
1318- bridge_tuner = tuner ._to_bridge_tuner ()
1319-
1320- # Verify that nexus slot supplier is present
1321- assert hasattr (bridge_tuner , "nexus_slot_supplier" )
1322-
1323- # Test that the tuner can be used to create a worker
1324- async with new_worker (
1325- client ,
1326- WaitOnSignalWorkflow ,
1327- activities = [say_hello ],
1328- tuner = tuner ,
1329- ) as w :
1330- wf1 = await client .start_workflow (
1331- WaitOnSignalWorkflow .run ,
1332- id = f"nexus-fixed-{ uuid .uuid4 ()} " ,
1333- task_queue = w .task_queue ,
1334- )
1335- await wf1 .signal (WaitOnSignalWorkflow .my_signal , "finish" )
1336- await wf1 .result ()
0 commit comments