1515 EC2Tags ,
1616 Resources ,
1717)
18- from aws_library .ec2 ._errors import EC2AccessError , EC2TooManyInstancesError
18+ from aws_library .ec2 ._errors import (
19+ EC2AccessError ,
20+ EC2InsufficientCapacityError ,
21+ EC2TooManyInstancesError ,
22+ )
1923from fastapi import FastAPI
2024from models_library .generated_models .docker_rest_api import Node
2125from models_library .rabbitmq_messages import ProgressType
@@ -421,10 +425,46 @@ async def _activate_drained_nodes(
421425 )
422426
423427
428+ def _de_assign_tasks_from_warm_buffer_ec2s (
429+ cluster : Cluster , instances_to_start : list [EC2InstanceData ]
430+ ) -> tuple [Cluster , list ]:
431+ # de-assign tasks from the warm buffer instances that could not be started
432+ deassigned_tasks = list (
433+ itertools .chain .from_iterable (
434+ i .assigned_tasks
435+ for i in cluster .warm_buffer_ec2s
436+ if i .ec2_instance in instances_to_start
437+ )
438+ )
439+ # upgrade the cluster
440+ return (
441+ dataclasses .replace (
442+ cluster ,
443+ warm_buffer_ec2s = [
444+ (
445+ dataclasses .replace (i , assigned_tasks = [])
446+ if i .ec2_instance in instances_to_start
447+ else i
448+ )
449+ for i in cluster .warm_buffer_ec2s
450+ ],
451+ ),
452+ deassigned_tasks ,
453+ )
454+
455+
424456async def _try_start_warm_buffer_instances (
425457 app : FastAPI , cluster : Cluster , auto_scaling_mode : AutoscalingProvider
426- ) -> Cluster :
427- """starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed"""
458+ ) -> tuple [Cluster , list ]:
459+ """
460+ starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed
461+
462+ Returns:
463+ A tuple containing:
464+ - The updated cluster instance after attempting to start warm buffer instances.
465+ - In case warm buffer could not be started, a list of de-assigned tasks (tasks whose resource requirements cannot be fulfilled by warm buffers anymore).
466+
467+ """
428468
429469 app_settings = get_application_settings (app )
430470 assert app_settings .AUTOSCALING_EC2_INSTANCES # nosec
@@ -466,26 +506,34 @@ async def _try_start_warm_buffer_instances(
466506 ]
467507
468508 if not instances_to_start :
469- return cluster
509+ return cluster , []
470510
471511 with log_context (
472- _logger , logging .INFO , f"start { len (instances_to_start )} warm buffer machines"
512+ _logger ,
513+ logging .INFO ,
514+ f"start { len (instances_to_start )} warm buffer machines '{ [i .id for i in instances_to_start ]} '" ,
473515 ):
474516 try :
475517 started_instances = await get_ec2_client (app ).start_instances (
476518 instances_to_start
477519 )
478- except EC2AccessError :
520+ except EC2InsufficientCapacityError :
521+ # NOTE: this warning is only raised if none of the instances could be started due to InsufficientCapacity
479522 _logger .warning (
480- "Could not start warm buffer instances! "
481- "TIP: This can happen in case of Insufficient "
482- "Capacity on AWS AvailabilityZone(s) where the warm buffers were originally created. "
483- "Until https://github.com/ITISFoundation/osparc-simcore/issues/8273 is fixed this "
484- "will prevent fulfilling this instance type need." ,
485- exc_info = True ,
523+ "Could not start warm buffer instances: %s due to Insufficient Capacity in the current AWS Availability Zone! "
524+ "The warm buffer assigned tasks will be moved to new instances if possible." ,
525+ [i .id for i in instances_to_start ],
486526 )
487- # we need to re-assign the tasks assigned to the warm buffer instances
488- return cluster
527+ return _de_assign_tasks_from_warm_buffer_ec2s (cluster , instances_to_start )
528+
529+ except EC2AccessError :
530+ _logger .exception (
531+ "Could not start warm buffer instances %s! TIP: This needs to be analysed!"
532+ "The warm buffer assigned tasks will be moved to new instances if possible." ,
533+ [i .id for i in instances_to_start ],
534+ )
535+ return _de_assign_tasks_from_warm_buffer_ec2s (cluster , instances_to_start )
536+
489537 # NOTE: first start the instance and then set the tags in case the instance cannot start (e.g. InsufficientInstanceCapacity)
490538 await get_ec2_client (app ).set_instances_tags (
491539 started_instances ,
@@ -495,15 +543,18 @@ async def _try_start_warm_buffer_instances(
495543 )
496544 started_instance_ids = [i .id for i in started_instances ]
497545
498- return dataclasses .replace (
499- cluster ,
500- warm_buffer_ec2s = [
501- i
502- for i in cluster .warm_buffer_ec2s
503- if i .ec2_instance .id not in started_instance_ids
504- ],
505- pending_ec2s = cluster .pending_ec2s
506- + [NonAssociatedInstance (ec2_instance = i ) for i in started_instances ],
546+ return (
547+ dataclasses .replace (
548+ cluster ,
549+ warm_buffer_ec2s = [
550+ i
551+ for i in cluster .warm_buffer_ec2s
552+ if i .ec2_instance .id not in started_instance_ids
553+ ],
554+ pending_ec2s = cluster .pending_ec2s
555+ + [NonAssociatedInstance (ec2_instance = i ) for i in started_instances ],
556+ ),
557+ [],
507558 )
508559
509560
@@ -1243,7 +1294,11 @@ async def _autoscale_cluster(
12431294 cluster = await _activate_drained_nodes (app , cluster )
12441295
12451296 # 3. start warm buffer instances to cover the remaining tasks
1246- cluster = await _try_start_warm_buffer_instances (app , cluster , auto_scaling_mode )
1297+ cluster , de_assigned_tasks = await _try_start_warm_buffer_instances (
1298+ app , cluster , auto_scaling_mode
1299+ )
1300+ # 3.1 if some tasks were de-assigned, we need to add them to the still pending tasks
1301+ still_pending_tasks .extend (de_assigned_tasks )
12471302
12481303 # 4. scale down unused instances
12491304 cluster = await _scale_down_unused_cluster_instances (
0 commit comments