22import collections
33import dataclasses
44import datetime
5+ import functools
56import itertools
67import logging
78from typing import Final , cast
@@ -327,30 +328,30 @@ async def _try_attach_pending_ec2s(
327328 )
328329
329330
330- async def sorted_allowed_instance_types (app : FastAPI ) -> list [EC2InstanceType ]:
331+ async def _sorted_allowed_instance_types (app : FastAPI ) -> list [EC2InstanceType ]:
331332 app_settings : ApplicationSettings = app .state .settings
332333 assert app_settings .AUTOSCALING_EC2_INSTANCES # nosec
333334 ec2_client = get_ec2_client (app )
334335
335- # some instances might be able to run several tasks
336+ allowed_instance_type_names = list (
337+ app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_ALLOWED_TYPES
338+ )
339+
340+ assert ( # nosec
341+ allowed_instance_type_names
342+ ), "EC2_INSTANCES_ALLOWED_TYPES cannot be empty!"
343+
336344 allowed_instance_types : list [
337345 EC2InstanceType
338346 ] = await ec2_client .get_ec2_instance_capabilities (
339- cast (
340- set [InstanceTypeType ],
341- set (
342- app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_ALLOWED_TYPES ,
343- ),
344- )
347+ cast (set [InstanceTypeType ], set (allowed_instance_type_names ))
345348 )
346349
347- def _sort_according_to_allowed_types (instance_type : EC2InstanceType ) -> int :
348- assert app_settings .AUTOSCALING_EC2_INSTANCES # nosec
349- return list (
350- app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_ALLOWED_TYPES
351- ).index (f"{ instance_type .name } " )
350+ def _as_selection (instance_type : EC2InstanceType ) -> int :
351+ # NOTE: will raise ValueError if allowed_instance_types not in allowed_instance_type_names
352+ return allowed_instance_type_names .index (f"{ instance_type .name } " )
352353
353- allowed_instance_types .sort (key = _sort_according_to_allowed_types )
354+ allowed_instance_types .sort (key = _as_selection )
354355 return allowed_instance_types
355356
356357
@@ -497,51 +498,44 @@ async def _assign_tasks_to_current_cluster(
497498 cluster : Cluster ,
498499 auto_scaling_mode : BaseAutoscaling ,
499500) -> tuple [list , Cluster ]:
501+ """
502+ Evaluates whether a task can be executed on any instance within the cluster. If the task's resource requirements are met, the task is *denoted* as assigned to the cluster.
503+ Note: This is an estimation only since actual scheduling is handled by Dask/Docker (depending on the mode).
504+
505+ Returns:
506+ A tuple containing:
507+ - A list of unassigned tasks (tasks whose resource requirements cannot be fulfilled by the available machines in the cluster).
508+ - The same cluster instance passed as input.
509+ """
500510 unassigned_tasks = []
511+ assignment_predicates = [
512+ functools .partial (_try_assign_task_to_ec2_instance , instances = instances )
513+ for instances in (
514+ cluster .active_nodes ,
515+ cluster .drained_nodes + cluster .buffer_drained_nodes ,
516+ cluster .pending_nodes ,
517+ cluster .pending_ec2s ,
518+ cluster .buffer_ec2s ,
519+ )
520+ ]
521+
501522 for task in tasks :
502523 task_required_resources = auto_scaling_mode .get_task_required_resources (task )
503524 task_required_ec2_instance = await auto_scaling_mode .get_task_defined_instance (
504525 app , task
505526 )
506527
507- assignment_functions = [
508- lambda task , required_ec2 , required_resources : _try_assign_task_to_ec2_instance (
509- task ,
510- instances = cluster .active_nodes ,
511- task_required_ec2_instance = required_ec2 ,
512- task_required_resources = required_resources ,
513- ),
514- lambda task , required_ec2 , required_resources : _try_assign_task_to_ec2_instance (
515- task ,
516- instances = cluster .drained_nodes + cluster .buffer_drained_nodes ,
517- task_required_ec2_instance = required_ec2 ,
518- task_required_resources = required_resources ,
519- ),
520- lambda task , required_ec2 , required_resources : _try_assign_task_to_ec2_instance (
521- task ,
522- instances = cluster .pending_nodes ,
523- task_required_ec2_instance = required_ec2 ,
524- task_required_resources = required_resources ,
525- ),
526- lambda task , required_ec2 , required_resources : _try_assign_task_to_ec2_instance (
527- task ,
528- instances = cluster .pending_ec2s ,
529- task_required_ec2_instance = required_ec2 ,
530- task_required_resources = required_resources ,
531- ),
532- lambda task , required_ec2 , required_resources : _try_assign_task_to_ec2_instance (
533- task ,
534- instances = cluster .buffer_ec2s ,
535- task_required_ec2_instance = required_ec2 ,
536- task_required_resources = required_resources ,
537- ),
538- ]
539-
540528 if any (
541- assignment (task , task_required_ec2_instance , task_required_resources )
542- for assignment in assignment_functions
529+ is_assigned (
530+ task ,
531+ task_required_ec2_instance = task_required_ec2_instance ,
532+ task_required_resources = task_required_resources ,
533+ )
534+ for is_assigned in assignment_predicates
543535 ):
544- _logger .debug ("assigned task to cluster" )
536+ _logger .debug (
537+ "task %s is assigned to one instance available in cluster" , task
538+ )
545539 else :
546540 unassigned_tasks .append (task )
547541
@@ -1131,7 +1125,7 @@ async def _autoscale_cluster(
11311125 # 1. check if we have pending tasks and resolve them by activating some drained nodes
11321126 unrunnable_tasks = await auto_scaling_mode .list_unrunnable_tasks (app )
11331127 _logger .info ("found %s unrunnable tasks" , len (unrunnable_tasks ))
1134-
1128+ # NOTE: this function predicts how dask will assign a task to a machine
11351129 queued_or_missing_instance_tasks , cluster = await _assign_tasks_to_current_cluster (
11361130 app , unrunnable_tasks , cluster , auto_scaling_mode
11371131 )
@@ -1217,11 +1211,13 @@ async def auto_scale_cluster(
12171211 If there are such tasks, this method will allocate new machines in AWS to cope with
12181212 the additional load.
12191213 """
1220-
1221- allowed_instance_types = await sorted_allowed_instance_types (app )
1214+ # current state
1215+ allowed_instance_types = await _sorted_allowed_instance_types (app )
12221216 cluster = await _analyze_current_cluster (
12231217 app , auto_scaling_mode , allowed_instance_types
12241218 )
1219+
1220+ # cleanup
12251221 cluster = await _cleanup_disconnected_nodes (app , cluster )
12261222 cluster = await _terminate_broken_ec2s (app , cluster )
12271223 cluster = await _make_pending_buffer_ec2s_join_cluster (app , cluster )
@@ -1230,8 +1226,11 @@ async def auto_scale_cluster(
12301226 )
12311227 cluster = await _drain_retired_nodes (app , cluster )
12321228
1229+ # desired state
12331230 cluster = await _autoscale_cluster (
12341231 app , cluster , auto_scaling_mode , allowed_instance_types
12351232 )
1233+
1234+ # notify
12361235 await _notify_machine_creation_progress (app , cluster , auto_scaling_mode )
12371236 await _notify_autoscaling_status (app , cluster , auto_scaling_mode )
0 commit comments