|
2 | 2 | import collections |
3 | 3 | import dataclasses |
4 | 4 | import datetime |
| 5 | +import functools |
5 | 6 | import itertools |
6 | 7 | import logging |
7 | 8 | import typing |
@@ -505,51 +506,44 @@ async def _assign_tasks_to_current_cluster( |
505 | 506 | cluster: Cluster, |
506 | 507 | auto_scaling_mode: BaseAutoscaling, |
507 | 508 | ) -> tuple[list, Cluster]: |
| 509 | + """Estimates how tasks will be assigned to cluster's instances |
| 510 | + based on the resources required by each task |
| 511 | +
|
| 512 | + Returns: |
| 513 | + A tuple with |
| 514 | + - list of unassigned tasks (i.e. those not fitting available machines in cluster) |
| 515 | + - same cluster instance as in the input |
| 516 | +
|
| 517 | + """ |
508 | 518 | unassigned_tasks = [] |
| 519 | + assignment_functions = [ |
| 520 | + functools.partial(_try_assign_task_to_ec2_instance, instances=instances) |
| 521 | + for instances in ( |
| 522 | + cluster.active_nodes, |
| 523 | + cluster.drained_nodes + cluster.buffer_drained_nodes, |
| 524 | + cluster.pending_nodes, |
| 525 | + cluster.pending_ec2s, |
| 526 | + cluster.buffer_ec2s, |
| 527 | + ) |
| 528 | + ] |
| 529 | + |
509 | 530 | for task in tasks: |
510 | 531 | task_required_resources = auto_scaling_mode.get_task_required_resources(task) |
511 | 532 | task_required_ec2_instance = await auto_scaling_mode.get_task_defined_instance( |
512 | 533 | app, task |
513 | 534 | ) |
514 | 535 |
|
515 | | - assignment_functions = [ |
516 | | - lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( |
517 | | - task, |
518 | | - instances=cluster.active_nodes, |
519 | | - task_required_ec2_instance=required_ec2, |
520 | | - task_required_resources=required_resources, |
521 | | - ), |
522 | | - lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( |
523 | | - task, |
524 | | - instances=cluster.drained_nodes + cluster.buffer_drained_nodes, |
525 | | - task_required_ec2_instance=required_ec2, |
526 | | - task_required_resources=required_resources, |
527 | | - ), |
528 | | - lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( |
529 | | - task, |
530 | | - instances=cluster.pending_nodes, |
531 | | - task_required_ec2_instance=required_ec2, |
532 | | - task_required_resources=required_resources, |
533 | | - ), |
534 | | - lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( |
535 | | - task, |
536 | | - instances=cluster.pending_ec2s, |
537 | | - task_required_ec2_instance=required_ec2, |
538 | | - task_required_resources=required_resources, |
539 | | - ), |
540 | | - lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance( |
541 | | - task, |
542 | | - instances=cluster.buffer_ec2s, |
543 | | - task_required_ec2_instance=required_ec2, |
544 | | - task_required_resources=required_resources, |
545 | | - ), |
546 | | - ] |
547 | | - |
548 | 536 | if any( |
549 | | - assignment(task, task_required_ec2_instance, task_required_resources) |
550 | | - for assignment in assignment_functions |
| 537 | + is_assigned( |
| 538 | + task, |
| 539 | + task_required_ec2_instance=task_required_ec2_instance, |
| 540 | + task_required_resources=task_required_resources, |
| 541 | + ) |
| 542 | + for is_assigned in assignment_functions |
551 | 543 | ): |
552 | | - _logger.debug("assigned task to cluster") |
| 544 | + _logger.debug( |
| 545 | + "task %s is assigned to one instance available in cluster", task |
| 546 | + ) |
553 | 547 | else: |
554 | 548 | unassigned_tasks.append(task) |
555 | 549 |
|
|
0 commit comments