3030 Ec2TooManyInstancesError ,
3131)
3232from ..core .settings import ApplicationSettings , get_application_settings
33- from ..models import AssociatedInstance , Cluster
33+ from ..models import (
34+ AssignedTasksToInstance ,
35+ AssignedTasksToInstanceType ,
36+ AssociatedInstance ,
37+ Cluster ,
38+ )
3439from ..utils import utils_docker , utils_ec2
3540from ..utils .auto_scaling_core import (
3641 associate_ec2_instances_with_nodes ,
@@ -268,74 +273,108 @@ async def _activate_drained_nodes(
268273 )
269274
270275
276+ async def _try_assign_tasks_to_instances (
277+ app : FastAPI ,
278+ task ,
279+ auto_scaling_mode : BaseAutoscaling ,
280+ task_defined_ec2_type : InstanceTypeType | None ,
281+ active_instances_to_tasks : list [AssignedTasksToInstance ],
282+ pending_instances_to_tasks : list [AssignedTasksToInstance ],
283+ drained_instances_to_tasks : list [AssignedTasksToInstance ],
284+ needed_new_instance_types_for_tasks : list [AssignedTasksToInstanceType ],
285+ ) -> bool :
286+ (
287+ filtered_active_instance_to_task ,
288+ filtered_pending_instance_to_task ,
289+ filtered_drained_instances_to_task ,
290+ filtered_needed_new_instance_types_to_task ,
291+ ) = filter_by_task_defined_instance (
292+ task_defined_ec2_type ,
293+ active_instances_to_tasks ,
294+ pending_instances_to_tasks ,
295+ drained_instances_to_tasks ,
296+ needed_new_instance_types_for_tasks ,
297+ )
298+ # try to assign the task to one of the active, pending or net created instances
299+ if (
300+ await auto_scaling_mode .try_assigning_task_to_instances (
301+ app ,
302+ task ,
303+ filtered_active_instance_to_task ,
304+ notify_progress = False ,
305+ )
306+ or await auto_scaling_mode .try_assigning_task_to_instances (
307+ app ,
308+ task ,
309+ filtered_pending_instance_to_task ,
310+ notify_progress = True ,
311+ )
312+ or await auto_scaling_mode .try_assigning_task_to_instances (
313+ app ,
314+ task ,
315+ filtered_drained_instances_to_task ,
316+ notify_progress = False ,
317+ )
318+ or auto_scaling_mode .try_assigning_task_to_instance_types (
319+ task , filtered_needed_new_instance_types_to_task
320+ )
321+ ):
322+ return True
323+ return False
324+
325+
271326async def _find_needed_instances (
272327 app : FastAPI ,
273328 pending_tasks : list ,
274329 available_ec2_types : list [EC2InstanceType ],
275330 cluster : Cluster ,
276331 auto_scaling_mode : BaseAutoscaling ,
277332) -> dict [EC2InstanceType , int ]:
278- type_to_instance_map = {t .name : t for t in available_ec2_types }
279-
280333 # 1. check first the pending task needs
281- active_instances_to_tasks : list [tuple [EC2InstanceData , list ]] = [
282- (i .ec2_instance , []) for i in cluster .active_nodes
334+ active_instances_to_tasks : list [AssignedTasksToInstance ] = [
335+ AssignedTasksToInstance (
336+ instance = i .ec2_instance ,
337+ assigned_tasks = [],
338+ available_resources = i .ec2_instance .resources
339+ - await auto_scaling_mode .compute_node_used_resources (app , i ),
340+ )
341+ for i in cluster .active_nodes
283342 ]
284- pending_instances_to_tasks : list [tuple [EC2InstanceData , list ]] = [
285- (i , []) for i in cluster .pending_ec2s
343+ pending_instances_to_tasks : list [AssignedTasksToInstance ] = [
344+ AssignedTasksToInstance (
345+ instance = i , assigned_tasks = [], available_resources = i .resources
346+ )
347+ for i in cluster .pending_ec2s
286348 ]
287- drained_instances_to_tasks : list [tuple [EC2InstanceData , list ]] = [
288- (i .ec2_instance , []) for i in cluster .drained_nodes
349+ drained_instances_to_tasks : list [AssignedTasksToInstance ] = [
350+ AssignedTasksToInstance (
351+ instance = i .ec2_instance ,
352+ assigned_tasks = [],
353+ available_resources = i .ec2_instance .resources ,
354+ )
355+ for i in cluster .drained_nodes
289356 ]
290- needed_new_instance_types_for_tasks : list [tuple [ EC2InstanceType , list ] ] = []
357+ needed_new_instance_types_for_tasks : list [AssignedTasksToInstanceType ] = []
291358 for task in pending_tasks :
292359 task_defined_ec2_type = await auto_scaling_mode .get_task_defined_instance (
293360 app , task
294361 )
295- (
296- filtered_active_instance_to_task ,
297- filtered_pending_instance_to_task ,
298- filtered_drained_instances_to_task ,
299- filtered_needed_new_instance_types_to_task ,
300- ) = filter_by_task_defined_instance (
362+ _logger .info (
363+ "task %s %s" ,
364+ task ,
365+ f"defines ec2 type as { task_defined_ec2_type } "
366+ if task_defined_ec2_type
367+ else "does NOT define ec2 type" ,
368+ )
369+ if await _try_assign_tasks_to_instances (
370+ app ,
371+ task ,
372+ auto_scaling_mode ,
301373 task_defined_ec2_type ,
302374 active_instances_to_tasks ,
303375 pending_instances_to_tasks ,
304376 drained_instances_to_tasks ,
305377 needed_new_instance_types_for_tasks ,
306- )
307-
308- # try to assign the task to one of the active, pending or net created instances
309- _logger .debug (
310- "Try to assign %s to any active/pending/created instance in the %s" ,
311- f"{ task } " ,
312- f"{ cluster = } " ,
313- )
314- if (
315- await auto_scaling_mode .try_assigning_task_to_instances (
316- app ,
317- task ,
318- filtered_active_instance_to_task ,
319- type_to_instance_map ,
320- notify_progress = False ,
321- )
322- or await auto_scaling_mode .try_assigning_task_to_instances (
323- app ,
324- task ,
325- filtered_pending_instance_to_task ,
326- type_to_instance_map ,
327- notify_progress = True ,
328- )
329- or await auto_scaling_mode .try_assigning_task_to_instances (
330- app ,
331- task ,
332- filtered_drained_instances_to_task ,
333- type_to_instance_map ,
334- notify_progress = False ,
335- )
336- or auto_scaling_mode .try_assigning_task_to_instance_types (
337- task , filtered_needed_new_instance_types_to_task
338- )
339378 ):
340379 continue
341380
@@ -346,15 +385,23 @@ async def _find_needed_instances(
346385 defined_ec2 = find_selected_instance_type_for_task (
347386 task_defined_ec2_type , available_ec2_types , auto_scaling_mode , task
348387 )
349- needed_new_instance_types_for_tasks .append ((defined_ec2 , [task ]))
388+ needed_new_instance_types_for_tasks .append (
389+ AssignedTasksToInstanceType (
390+ instance_type = defined_ec2 , assigned_tasks = [task ]
391+ )
392+ )
350393 else :
351394 # we go for best fitting type
352395 best_ec2_instance = utils_ec2 .find_best_fitting_ec2_instance (
353396 available_ec2_types ,
354397 auto_scaling_mode .get_max_resources_from_task (task ),
355398 score_type = utils_ec2 .closest_instance_policy ,
356399 )
357- needed_new_instance_types_for_tasks .append ((best_ec2_instance , [task ]))
400+ needed_new_instance_types_for_tasks .append (
401+ AssignedTasksToInstanceType (
402+ instance_type = best_ec2_instance , assigned_tasks = [task ]
403+ )
404+ )
358405 except Ec2InstanceNotFoundError :
359406 _logger .exception (
360407 "Task %s needs more resources than any EC2 instance "
@@ -365,7 +412,10 @@ async def _find_needed_instances(
365412 _logger .exception ("Unexpected error:" )
366413
367414 num_instances_per_type = collections .defaultdict (
368- int , collections .Counter (t for t , _ in needed_new_instance_types_for_tasks )
415+ int ,
416+ collections .Counter (
417+ t .instance_type for t in needed_new_instance_types_for_tasks
418+ ),
369419 )
370420
371421 # 2. check the buffer needs
@@ -379,9 +429,7 @@ async def _find_needed_instances(
379429 ) > 0 :
380430 # check if some are already pending
381431 remaining_pending_instances = [
382- instance
383- for instance , assigned_tasks in pending_instances_to_tasks
384- if not assigned_tasks
432+ i .instance for i in pending_instances_to_tasks if not i .assigned_tasks
385433 ]
386434 if len (remaining_pending_instances ) < (
387435 app_settings .AUTOSCALING_EC2_INSTANCES .EC2_INSTANCES_MACHINES_BUFFER
@@ -622,10 +670,14 @@ async def _autoscale_cluster(
622670) -> Cluster :
623671 # 1. check if we have pending tasks and resolve them by activating some drained nodes
624672 unrunnable_tasks = await auto_scaling_mode .list_unrunnable_tasks (app )
673+ _logger .info ("found %s unrunnable tasks" , len (unrunnable_tasks ))
625674 # 2. try to activate drained nodes to cover some of the tasks
626675 still_unrunnable_tasks , cluster = await _activate_drained_nodes (
627676 app , cluster , unrunnable_tasks , auto_scaling_mode
628677 )
678+ _logger .info (
679+ "still %s unrunnable tasks after node activation" , len (still_unrunnable_tasks )
680+ )
629681
630682 # let's check if there are still pending tasks or if the reserve was used
631683 app_settings = get_application_settings (app )
0 commit comments