|
42 | 42 | SlotReleaseContext, |
43 | 43 | SlotReserveContext, |
44 | 44 | Worker, |
45 | | - WorkerConfig, |
46 | 45 | WorkerDeploymentConfig, |
47 | 46 | WorkerDeploymentVersion, |
48 | 47 | WorkerTuner, |
@@ -317,6 +316,7 @@ async def test_can_run_composite_tuner_worker(client: Client, env: WorkflowEnvir |
317 | 316 | ), |
318 | 317 | resource_based_options, |
319 | 318 | ), |
| 319 | + nexus_supplier=FixedSizeSlotSupplier(10), |
320 | 320 | ) |
321 | 321 | async with new_worker( |
322 | 322 | client, |
@@ -439,7 +439,10 @@ def reserve_asserts(self, ctx: SlotReserveContext) -> None: |
439 | 439 | ss = MySlotSupplier() |
440 | 440 |
|
441 | 441 | tuner = WorkerTuner.create_composite( |
442 | | - workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss |
| 442 | + workflow_supplier=ss, |
| 443 | + activity_supplier=ss, |
| 444 | + local_activity_supplier=ss, |
| 445 | + nexus_supplier=ss, |
443 | 446 | ) |
444 | 447 | async with new_worker( |
445 | 448 | client, |
@@ -501,7 +504,10 @@ def release_slot(self, ctx: SlotReleaseContext) -> None: |
501 | 504 | ss = ThrowingSlotSupplier() |
502 | 505 |
|
503 | 506 | tuner = WorkerTuner.create_composite( |
504 | | - workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss |
| 507 | + workflow_supplier=ss, |
| 508 | + activity_supplier=ss, |
| 509 | + local_activity_supplier=ss, |
| 510 | + nexus_supplier=ss, |
505 | 511 | ) |
506 | 512 | async with new_worker( |
507 | 513 | client, |
@@ -537,7 +543,10 @@ def release_slot(self, ctx: SlotReleaseContext) -> None: |
537 | 543 | ss = BlockingSlotSupplier() |
538 | 544 |
|
539 | 545 | tuner = WorkerTuner.create_composite( |
540 | | - workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss |
| 546 | + workflow_supplier=ss, |
| 547 | + activity_supplier=ss, |
| 548 | + local_activity_supplier=ss, |
| 549 | + nexus_supplier=ss, |
541 | 550 | ) |
542 | 551 | async with new_worker( |
543 | 552 | client, |
@@ -1189,3 +1198,139 @@ def shutdown(self) -> None: |
1189 | 1198 | if self.next_exception_task: |
1190 | 1199 | self.next_exception_task.cancel() |
1191 | 1200 | setattr(self.worker._bridge_worker, self.attr, self.orig_poll_call) |
| 1201 | + |
| 1202 | + |
| 1203 | +async def test_nexus_slot_supplier_integration( |
| 1204 | + client: Client, env: WorkflowEnvironment |
| 1205 | +): |
| 1206 | + """Test that nexus operations can be properly tuned with slot suppliers.""" |
| 1207 | + |
| 1208 | + class NexusAwareSlotSupplier(CustomSlotSupplier): |
| 1209 | + seen_nexus_slots = 0 |
| 1210 | + seen_workflow_slots = 0 |
| 1211 | + seen_activity_slots = 0 |
| 1212 | + seen_local_activity_slots = 0 |
| 1213 | + |
| 1214 | + async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit: |
| 1215 | + if ctx.slot_type == "nexus": |
| 1216 | + self.seen_nexus_slots += 1 |
| 1217 | + elif ctx.slot_type == "workflow": |
| 1218 | + self.seen_workflow_slots += 1 |
| 1219 | + elif ctx.slot_type == "activity": |
| 1220 | + self.seen_activity_slots += 1 |
| 1221 | + elif ctx.slot_type == "local-activity": |
| 1222 | + self.seen_local_activity_slots += 1 |
| 1223 | + return SlotPermit() |
| 1224 | + |
| 1225 | + def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]: |
| 1226 | + return None |
| 1227 | + |
| 1228 | + def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None: |
| 1229 | + pass |
| 1230 | + |
| 1231 | + def release_slot(self, ctx: SlotReleaseContext) -> None: |
| 1232 | + pass |
| 1233 | + |
| 1234 | + ss = NexusAwareSlotSupplier() |
| 1235 | + |
| 1236 | + # Create a tuner that includes nexus slot supplier |
| 1237 | + tuner = WorkerTuner.create_composite( |
| 1238 | + workflow_supplier=ss, |
| 1239 | + activity_supplier=ss, |
| 1240 | + local_activity_supplier=ss, |
| 1241 | + nexus_supplier=ss, # This would fail without your changes |
| 1242 | + ) |
| 1243 | + |
| 1244 | + # Test that the tuner can be converted to bridge format without errors |
| 1245 | + bridge_tuner = tuner._to_bridge_tuner() |
| 1246 | + |
| 1247 | + # Verify that all slot types are properly handled |
| 1248 | + assert hasattr(bridge_tuner, "nexus_slot_supplier") |
| 1249 | + assert hasattr(bridge_tuner, "workflow_slot_supplier") |
| 1250 | + assert hasattr(bridge_tuner, "activity_slot_supplier") |
| 1251 | + assert hasattr(bridge_tuner, "local_activity_slot_supplier") |
| 1252 | + |
| 1253 | + # Test that the tuner can be used to create a worker |
| 1254 | + async with new_worker( |
| 1255 | + client, |
| 1256 | + WaitOnSignalWorkflow, |
| 1257 | + activities=[say_hello], |
| 1258 | + tuner=tuner, |
| 1259 | + ) as w: |
| 1260 | + wf1 = await client.start_workflow( |
| 1261 | + WaitOnSignalWorkflow.run, |
| 1262 | + id=f"nexus-slot-supplier-{uuid.uuid4()}", |
| 1263 | + task_queue=w.task_queue, |
| 1264 | + ) |
| 1265 | + await wf1.signal(WaitOnSignalWorkflow.my_signal, "finish") |
| 1266 | + await wf1.result() |
| 1267 | + |
| 1268 | + # Verify that we saw the expected slot types |
| 1269 | + assert ss.seen_workflow_slots > 0, "Should have seen workflow slots" |
| 1270 | + assert ss.seen_activity_slots > 0, "Should have seen activity slots" |
| 1271 | + # Note: We don't assert nexus_slots > 0 because this test doesn't actually |
| 1272 | + # trigger nexus operations, but the important thing is that the infrastructure |
| 1273 | + # is in place to handle them when they do occur. |
| 1274 | + |
| 1275 | + |
| 1276 | +async def test_nexus_slot_supplier_defaults(client: Client, env: WorkflowEnvironment): |
| 1277 | + """Test that nexus slot suppliers get proper defaults in resource-based tuners.""" |
| 1278 | + |
| 1279 | + tuner = WorkerTuner.create_resource_based( |
| 1280 | + target_memory_usage=0.5, |
| 1281 | + target_cpu_usage=0.5, |
| 1282 | + # Don't specify nexus_config to test defaults |
| 1283 | + ) |
| 1284 | + |
| 1285 | + # Test that the tuner can be converted to bridge format without errors |
| 1286 | + bridge_tuner = tuner._to_bridge_tuner() |
| 1287 | + |
| 1288 | + # Verify that nexus slot supplier is present and properly configured |
| 1289 | + assert hasattr(bridge_tuner, "nexus_slot_supplier") |
| 1290 | + |
| 1291 | + # Test that the tuner can be used to create a worker |
| 1292 | + async with new_worker( |
| 1293 | + client, |
| 1294 | + WaitOnSignalWorkflow, |
| 1295 | + activities=[say_hello], |
| 1296 | + tuner=tuner, |
| 1297 | + ) as w: |
| 1298 | + wf1 = await client.start_workflow( |
| 1299 | + WaitOnSignalWorkflow.run, |
| 1300 | + id=f"nexus-defaults-{uuid.uuid4()}", |
| 1301 | + task_queue=w.task_queue, |
| 1302 | + ) |
| 1303 | + await wf1.signal(WaitOnSignalWorkflow.my_signal, "finish") |
| 1304 | + await wf1.result() |
| 1305 | + |
| 1306 | + |
| 1307 | +async def test_nexus_slot_supplier_fixed_size(client: Client, env: WorkflowEnvironment): |
| 1308 | + """Test that nexus slot suppliers work with fixed-size tuners.""" |
| 1309 | + |
| 1310 | + tuner = WorkerTuner.create_fixed( |
| 1311 | + workflow_slots=10, |
| 1312 | + activity_slots=20, |
| 1313 | + local_activity_slots=5, |
| 1314 | + nexus_slots=15, # This would fail without your changes |
| 1315 | + ) |
| 1316 | + |
| 1317 | + # Test that the tuner can be converted to bridge format without errors |
| 1318 | + bridge_tuner = tuner._to_bridge_tuner() |
| 1319 | + |
| 1320 | + # Verify that nexus slot supplier is present |
| 1321 | + assert hasattr(bridge_tuner, "nexus_slot_supplier") |
| 1322 | + |
| 1323 | + # Test that the tuner can be used to create a worker |
| 1324 | + async with new_worker( |
| 1325 | + client, |
| 1326 | + WaitOnSignalWorkflow, |
| 1327 | + activities=[say_hello], |
| 1328 | + tuner=tuner, |
| 1329 | + ) as w: |
| 1330 | + wf1 = await client.start_workflow( |
| 1331 | + WaitOnSignalWorkflow.run, |
| 1332 | + id=f"nexus-fixed-{uuid.uuid4()}", |
| 1333 | + task_queue=w.task_queue, |
| 1334 | + ) |
| 1335 | + await wf1.signal(WaitOnSignalWorkflow.my_signal, "finish") |
| 1336 | + await wf1.result() |
0 commit comments