diff --git a/packages/aws-library/src/aws_library/ec2/__init__.py b/packages/aws-library/src/aws_library/ec2/__init__.py index 127a6dd076db..0acff01ff0d6 100644 --- a/packages/aws-library/src/aws_library/ec2/__init__.py +++ b/packages/aws-library/src/aws_library/ec2/__init__.py @@ -17,7 +17,6 @@ EC2InstanceData, EC2InstanceType, EC2Tags, - GenericResourceValueType, Resources, ) @@ -37,7 +36,6 @@ "EC2NotConnectedError", "EC2RuntimeError", "EC2Tags", - "GenericResourceValueType", "Resources", "SimcoreEC2API", ) diff --git a/packages/aws-library/src/aws_library/ec2/_models.py b/packages/aws-library/src/aws_library/ec2/_models.py index 3e5a2b00691d..e08e207b0b0e 100644 --- a/packages/aws-library/src/aws_library/ec2/_models.py +++ b/packages/aws-library/src/aws_library/ec2/_models.py @@ -1,4 +1,3 @@ -import contextlib import datetime import re import tempfile @@ -15,171 +14,45 @@ Field, NonNegativeFloat, NonNegativeInt, - StrictFloat, - StrictInt, StringConstraints, - TypeAdapter, - ValidationError, field_validator, ) from pydantic.config import JsonDict from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType -GenericResourceValueType: TypeAlias = StrictInt | StrictFloat | str - class Resources(BaseModel, frozen=True): cpus: NonNegativeFloat ram: ByteSize - generic_resources: Annotated[ - dict[str, GenericResourceValueType], - Field( - default_factory=dict, - description=( - "Arbitrary additional resources (e.g. {'threads': 8}). " - "Numeric values are treated as quantities and participate in add/sub/compare." - ), - ), - ] = DEFAULT_FACTORY @classmethod def create_as_empty(cls) -> "Resources": return cls(cpus=0, ram=ByteSize(0)) def __ge__(self, other: "Resources") -> bool: - """operator for >= comparison - if self has greater or equal resources than other, returns True - This will return True only if all of the resources in self are greater or equal to other - - Note that generic_resources are compared only if they are numeric - Non-numeric generic resources must be equal in both or only defined in self - to be considered greater or equal - """ - if self == other: - return True - return self > other + return self.cpus >= other.cpus and self.ram >= other.ram def __gt__(self, other: "Resources") -> bool: - """operator for > comparison - if self has resources greater than other, returns True - This will return True only if all of the resources in self are greater than other - - Note that generic_resources are compared only if they are numeric - Non-numeric generic resources must only be defined in self - to be considered greater - """ - if (self.cpus < other.cpus) or (self.ram < other.ram): - return False - - keys = set(self.generic_resources) | set(other.generic_resources) - for k in keys: - a = self.generic_resources.get(k) - b = other.generic_resources.get(k) - if a is None: - return False - if b is None: - # a is greater as b is not defined - continue - if isinstance(a, int | float) and isinstance(b, int | float): - if a < b: - return False - else: - # remaining options is a is str and b is str or mixed types - # NOTE: we cannot compare strings unless they are equal or some kind of boolean (e.g. "true", "false", "yes", "no", "1", "0") - assert isinstance(a, str) # nosec - assert isinstance(b, int | float | str) # nosec - # let's try to get a boolean out of the values to compare them - with contextlib.suppress(ValidationError): - a_as_boolean = TypeAdapter(bool).validate_python(a) - b_as_boolean = TypeAdapter(bool).validate_python(b) - if not a_as_boolean and b_as_boolean: - return False - - # here we have either everything greater or equal or non-comparable strings - - return self != other + return self.cpus > other.cpus or self.ram > other.ram def __add__(self, other: "Resources") -> "Resources": - """operator for adding two Resources - Note that only numeric generic resources are added - Non-numeric generic resources are ignored - """ - merged: dict[str, GenericResourceValueType] = {} - keys = set(self.generic_resources) | set(other.generic_resources) - for k in keys: - a = self.generic_resources.get(k) - b = other.generic_resources.get(k) - # adding non numeric values does not make sense, so we skip those for the resulting resource - if isinstance(a, int | float) and isinstance(b, int | float): - merged[k] = a + b - elif a is None and isinstance(b, int | float): - merged[k] = b - elif b is None and isinstance(a, int | float): - merged[k] = a - return Resources.model_construct( - cpus=self.cpus + other.cpus, - ram=self.ram + other.ram, - generic_resources=merged, + **{ + key: a + b + for (key, a), b in zip( + self.model_dump().items(), other.model_dump().values(), strict=True + ) + } ) def __sub__(self, other: "Resources") -> "Resources": - """operator for subtracting two Resources - Note that only numeric generic resources are subtracted - Non-numeric generic resources are ignored - """ - merged: dict[str, GenericResourceValueType] = {} - keys = set(self.generic_resources) | set(other.generic_resources) - for k in keys: - a = self.generic_resources.get(k) - b = other.generic_resources.get(k) - # subtracting non numeric values does not make sense, so we skip those for the resulting resource - if isinstance(a, int | float) and isinstance(b, int | float): - merged[k] = a - b - elif a is None and isinstance(b, int | float): - merged[k] = -b - elif b is None and isinstance(a, int | float): - merged[k] = a - return Resources.model_construct( - cpus=self.cpus - other.cpus, - ram=self.ram - other.ram, - generic_resources=merged, - ) - - def __hash__(self) -> int: - """Deterministic hash including cpus, ram (in bytes) and generic_resources.""" - # sort generic_resources items to ensure order-independent hashing - generic_items: tuple[tuple[str, GenericResourceValueType], ...] = tuple( - sorted(self.generic_resources.items()) - ) - return hash((self.cpus, self.ram, generic_items)) - - def as_flat_dict(self) -> dict[str, int | float | str]: - """Like model_dump, but flattens generic_resources to top level keys""" - base = self.model_dump() - base.update(base.pop("generic_resources")) - return base - - @classmethod - def from_flat_dict( - cls, - data: dict[str, int | float | str], - *, - mapping: dict[str, str] | None = None, - ) -> "Resources": - """Inverse of as_flat_dict with optional key mapping""" - mapped_data = data - if mapping: - mapped_data = {mapping.get(k, k): v for k, v in data.items()} - generic_resources = { - k: v for k, v in mapped_data.items() if k not in {"cpus", "ram"} - } - - return cls( - cpus=float(mapped_data.get("cpus", 0)), - ram=ByteSize(mapped_data.get("ram", 0)), - generic_resources=generic_resources, + **{ + key: a - b + for (key, a), b in zip( + self.model_dump().items(), other.model_dump().values(), strict=True + ) + } ) @field_validator("cpus", mode="before") @@ -301,9 +174,8 @@ def validate_bash_calls(cls, v): temp_file.flush() # NOTE: this will not capture runtime errors, but at least some syntax errors such as invalid quotes sh.bash( - "-n", - temp_file.name, # pyright: ignore[reportCallIssue] - sh is untyped but safe for bash syntax checking - ) + "-n", temp_file.name + ) # pyright: ignore[reportCallIssue] # sh is untyped, but this call is safe for bash syntax checking except sh.ErrorReturnCode as exc: msg = f"Invalid bash call in custom_boot_scripts: {v}, Error: {exc.stderr}" raise ValueError(msg) from exc diff --git a/packages/aws-library/tests/test_ec2_models.py b/packages/aws-library/tests/test_ec2_models.py index 22f03a0bd102..ed232ad0043d 100644 --- a/packages/aws-library/tests/test_ec2_models.py +++ b/packages/aws-library/tests/test_ec2_models.py @@ -4,13 +4,7 @@ import pytest -from aws_library.ec2._models import ( - AWSTagKey, - AWSTagValue, - EC2InstanceBootSpecific, - EC2InstanceData, - Resources, -) +from aws_library.ec2._models import AWSTagKey, AWSTagValue, EC2InstanceData, Resources from faker import Faker from pydantic import ByteSize, TypeAdapter, ValidationError @@ -36,66 +30,11 @@ ( Resources(cpus=0.05, ram=ByteSize(1)), Resources(cpus=0.1, ram=ByteSize(0)), - False, # CPU is smaller - ), - ( - Resources(cpus=0.1, ram=ByteSize(0)), - Resources(cpus=0.1, ram=ByteSize(1)), - False, - ), - ( - Resources(cpus=0.1, ram=ByteSize(0), generic_resources={"GPU": 1}), - Resources(cpus=0.1, ram=ByteSize(1)), - False, # RAM is smaller - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - Resources(cpus=0.1, ram=ByteSize(1)), - True, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - True, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), - False, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1)), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), - False, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": "2"}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), - True, # string resources are not comparable so "2" is considered larger - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), - Resources(cpus=0.1, ram=ByteSize(1)), - True, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), - True, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "no"}), - True, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "no"}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), False, ), ( + Resources(cpus=0.1, ram=ByteSize(0)), Resources(cpus=0.1, ram=ByteSize(1)), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), False, ), ], @@ -127,71 +66,11 @@ def test_resources_ge_operator( ( Resources(cpus=0.05, ram=ByteSize(1)), Resources(cpus=0.1, ram=ByteSize(0)), - False, # CPU is smaller - ), - ( - Resources(cpus=0.1, ram=ByteSize(0)), - Resources(cpus=0.1, ram=ByteSize(1)), - False, - ), - ( - Resources(cpus=0.1, ram=ByteSize(0), generic_resources={"GPU": 1}), - Resources(cpus=0.1, ram=ByteSize(1)), - False, # ram is not enough - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - Resources(cpus=0.1, ram=ByteSize(1)), - True, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - False, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), - False, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), True, ), ( + Resources(cpus=0.1, ram=ByteSize(0)), Resources(cpus=0.1, ram=ByteSize(1)), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), - False, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": "2"}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), - True, # string resources are not comparable, so a > b - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), - Resources(cpus=0.1, ram=ByteSize(1)), - True, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), - False, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1)), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), - False, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "no"}), - True, - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "no"}), - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), False, ), ], @@ -213,36 +92,6 @@ def test_resources_gt_operator(a: Resources, b: Resources, a_greater_than_b: boo Resources(cpus=1, ram=ByteSize(34)), Resources(cpus=1.1, ram=ByteSize(35)), ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - Resources(cpus=1, ram=ByteSize(34)), - Resources(cpus=1.1, ram=ByteSize(35), generic_resources={"GPU": 1}), - ), - ( - Resources(cpus=0.1, ram=ByteSize(1)), - Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), - Resources(cpus=1.1, ram=ByteSize(35), generic_resources={"GPU": 1}), - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), - Resources(cpus=1.1, ram=ByteSize(35), generic_resources={"GPU": 2}), - ), - ( - Resources( - cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1, "SSE": "yes"} - ), - Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), - Resources(cpus=1.1, ram=ByteSize(35), generic_resources={"GPU": 2}), - ), # string resources are not summed - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": "1"}), - Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), - Resources( - cpus=1.1, - ram=ByteSize(35), - ), - ), # string resources are ignored in summation ], ) def test_resources_add(a: Resources, b: Resources, result: Resources): @@ -252,9 +101,7 @@ def test_resources_add(a: Resources, b: Resources, result: Resources): def test_resources_create_as_empty(): - assert Resources.create_as_empty() == Resources( - cpus=0, ram=ByteSize(0), generic_resources={} - ) + assert Resources.create_as_empty() == Resources(cpus=0, ram=ByteSize(0)) @pytest.mark.parametrize( @@ -270,41 +117,6 @@ def test_resources_create_as_empty(): Resources(cpus=1, ram=ByteSize(1)), Resources.model_construct(cpus=-0.9, ram=ByteSize(33)), ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - Resources(cpus=1, ram=ByteSize(34)), - Resources.model_construct( - cpus=-0.9, ram=ByteSize(-33), generic_resources={"GPU": 1} - ), - ), - ( - Resources(cpus=0.1, ram=ByteSize(1)), - Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), - Resources.model_construct( - cpus=-0.9, ram=ByteSize(-33), generic_resources={"GPU": -1} - ), - ), - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), - Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), - Resources.model_construct( - cpus=-0.9, ram=ByteSize(-33), generic_resources={"GPU": 0} - ), - ), - ( - Resources( - cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1, "SSE": "yes"} - ), - Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), - Resources.model_construct( - cpus=-0.9, ram=ByteSize(-33), generic_resources={"GPU": 0} - ), - ), # string resources are not summed - ( - Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": "1"}), - Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), - Resources.model_construct(cpus=-0.9, ram=ByteSize(-33)), - ), # string resources are ignored in summation ], ) def test_resources_sub(a: Resources, b: Resources, result: Resources): @@ -313,24 +125,6 @@ def test_resources_sub(a: Resources, b: Resources, result: Resources): assert a == result -def test_resources_flat_dict(): - r = Resources( - cpus=0.1, ram=ByteSize(1024), generic_resources={"GPU": 2, "SSE": "yes"} - ) - flat = r.as_flat_dict() - assert flat == {"cpus": 0.1, "ram": 1024, "GPU": 2, "SSE": "yes"} - - reconstructed = Resources.from_flat_dict(flat) - assert reconstructed == r - - # test with mapping - flat_with_oter_names = {"CPU": 0.1, "RAM": 1024, "GPU": 2, "SSE": "yes"} - reconstructed2 = Resources.from_flat_dict( - flat_with_oter_names, mapping={"CPU": "cpus", "RAM": "ram"} - ) - assert reconstructed2 == r - - @pytest.mark.parametrize("ec2_tag_key", ["", "/", " ", ".", "..", "_index"]) def test_aws_tag_key_invalid(ec2_tag_key: str): # for a key it raises @@ -354,11 +148,7 @@ def test_ec2_instance_data_hashable(faker: Faker): cpus=faker.pyfloat(min_value=0.1), ram=ByteSize(faker.pyint(min_value=123)), ), - { - TypeAdapter(AWSTagKey) - .validate_python("mytagkey"): TypeAdapter(AWSTagValue) - .validate_python("mytagvalue") - }, + {AWSTagKey("mytagkey"): AWSTagValue("mytagvalue")}, ) } second_set_of_ec2s = { @@ -373,22 +163,10 @@ def test_ec2_instance_data_hashable(faker: Faker): cpus=faker.pyfloat(min_value=0.1), ram=ByteSize(faker.pyint(min_value=123)), ), - { - TypeAdapter(AWSTagKey) - .validate_python("mytagkey"): TypeAdapter(AWSTagValue) - .validate_python("mytagvalue") - }, + {AWSTagKey("mytagkey"): AWSTagValue("mytagvalue")}, ) } union_of_sets = first_set_of_ec2s.union(second_set_of_ec2s) assert next(iter(first_set_of_ec2s)) in union_of_sets assert next(iter(second_set_of_ec2s)) in union_of_sets - - -def test_ec2_instance_boot_specific_with_invalid_custom_script(faker: Faker): - valid_model = EC2InstanceBootSpecific.model_json_schema()["examples"][0] - invalid_model = {**valid_model, "custom_boot_scripts": ["echo 'missing end quote"]} - - with pytest.raises(ValueError, match="Invalid bash call"): - EC2InstanceBootSpecific(**invalid_model) diff --git a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/utils.py b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/utils.py index 97cfb440f45b..d97b0c896c36 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/utils.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/utils.py @@ -34,7 +34,7 @@ def parse_dask_job_id( job_id: str, ) -> tuple[ServiceKey, ServiceVersion, UserID, ProjectID, NodeID]: parts = job_id.split(":") - assert len(parts) == _JOB_ID_PARTS, f"unexpected job id {parts=}" # nosec + assert len(parts) == _JOB_ID_PARTS # nosec return ( parts[0], parts[1], diff --git a/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py b/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py index 7770ba74050a..3a81114ef878 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py @@ -1,18 +1,8 @@ -from typing import Final, Literal, Required, TypedDict +from typing import Any, TypeAlias from .constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY -DASK_WORKER_THREAD_RESOURCE_NAME: Final[str] = "threads" - - -class DaskTaskResources(TypedDict, total=False): - CPU: Required[float] - RAM: Required[int] # in bytes - # threads is a constant of 1 (enforced by static type checkers via Literal) - # NOTE: a dask worker can take a task if it has a free thread, - # regardless of its resources so we need to be careful when interpreting - # the resources, adding the thread here will mimick this - threads: Required[Literal[1]] +DaskTaskResources: TypeAlias = dict[str, Any] def create_ec2_resource_constraint_key(ec2_instance_type: str) -> str: @@ -26,24 +16,3 @@ def get_ec2_instance_type_from_resources( if resource_name.startswith(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY): return resource_name.split(":")[-1] return None - - -_RAM_SAFE_MARGIN_RATIO: Final[float] = ( - 0.1 # NOTE: machines always have less available RAM than advertised -) -_CPUS_SAFE_MARGIN: Final[float] = 0.1 - - -def estimate_dask_worker_resources_from_ec2_instance( - cpus: float, ram: int -) -> tuple[float, int]: - """Estimates the resources available to a dask worker running in an EC2 instance, - taking into account safe margins for CPU and RAM. - - Returns: - tuple: Estimated resources for the dask worker (cpus, ram). - """ - worker_cpus = max(0.1, cpus - _CPUS_SAFE_MARGIN) # ensure at least 0.1 CPU - worker_ram = int(ram * (1 - _RAM_SAFE_MARGIN_RATIO)) # apply safe margin - - return (worker_cpus, worker_ram) diff --git a/packages/dask-task-models-library/tests/test_resource_constraints.py b/packages/dask-task-models-library/tests/test_resource_constraints.py index 121d2b740d23..9a2c1e59e26b 100644 --- a/packages/dask-task-models-library/tests/test_resource_constraints.py +++ b/packages/dask-task-models-library/tests/test_resource_constraints.py @@ -1,23 +1,11 @@ from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY from dask_task_models_library.resource_constraints import ( - DaskTaskResources, create_ec2_resource_constraint_key, get_ec2_instance_type_from_resources, ) from faker import Faker -def test_dask_task_resource(faker: Faker): - task_resources = DaskTaskResources( - CPU=faker.pyfloat(min_value=0.1, max_value=100), - RAM=faker.pyint(min_value=1024, max_value=1024**3), - threads=1, - ) - assert task_resources["threads"] == 1 - assert task_resources["CPU"] > 0 - assert task_resources["RAM"] >= 1024 - - def test_create_ec2_resource_constraint_key(faker: Faker): faker_instance_type = faker.pystr() assert ( diff --git a/packages/service-library/src/servicelib/docker_utils.py b/packages/service-library/src/servicelib/docker_utils.py index 374c05595beb..a919cb9487d7 100644 --- a/packages/service-library/src/servicelib/docker_utils.py +++ b/packages/service-library/src/servicelib/docker_utils.py @@ -326,33 +326,3 @@ async def _pull_image_with_retry() -> None: ) await _pull_image_with_retry() - - -_CPUS_SAFE_MARGIN: Final[float] = ( - 1.4 # accounts for machine overhead (ops + sidecar itself) -) -_MACHINE_TOTAL_RAM_SAFE_MARGIN_RATIO: Final[float] = ( - 0.1 # NOTE: machines always have less available RAM than advertised -) -_SIDECARS_OPS_SAFE_RAM_MARGIN: Final[ByteSize] = TypeAdapter(ByteSize).validate_python( - "1GiB" -) -DYNAMIC_SIDECAR_MIN_CPUS: Final[float] = 0.5 - - -def estimate_dynamic_sidecar_resources_from_ec2_instance( - cpus: float, ram: int -) -> tuple[float, int]: - """Estimates the resources available to a dynamic-sidecar running in an EC2 instance, - taking into account safe margins for CPU and RAM, as the EC2 full resources are not completely visible - - Returns: - tuple: Estimated resources for the dynamic-sidecar (cpus, ram). - """ - # dynamic-sidecar usually needs less CPU - sidecar_cpus = max(DYNAMIC_SIDECAR_MIN_CPUS, cpus - _CPUS_SAFE_MARGIN) - sidecar_ram = int( - ram - _MACHINE_TOTAL_RAM_SAFE_MARGIN_RATIO * ram - _SIDECARS_OPS_SAFE_RAM_MARGIN - ) - - return (sidecar_cpus, sidecar_ram) diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/errors.py b/services/autoscaling/src/simcore_service_autoscaling/core/errors.py index d1020d382f76..e4294631224a 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/errors.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/errors.py @@ -18,7 +18,7 @@ class TaskRequiresUnauthorizedEC2InstanceTypeError(AutoscalingRuntimeError): class TaskRequirementsAboveRequiredEC2InstanceTypeError(AutoscalingRuntimeError): msg_template: str = ( - "Task {task} specifies instance type {instance_type} but requests {resources}. {resources_diff} are missing! " + "Task {task} requires {instance_type} but requires {resources}. " "TIP: Ensure task resources requirements fit required instance type available resources." ) @@ -43,6 +43,4 @@ class DaskNoWorkersError(AutoscalingRuntimeError): class DaskWorkerNotFoundError(AutoscalingRuntimeError): - msg_template: str = ( - "Dask worker running on {worker_host} is not registered to scheduler in {url}, it is not found!" - ) + msg_template: str = "Dask worker running on {worker_host} is not registered to scheduler in {url}, it is not found!" diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py index 38f994bcea6e..0ae53b943954 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py @@ -14,7 +14,6 @@ AnyUrl, Field, NonNegativeInt, - PositiveInt, TypeAdapter, field_validator, model_validator, @@ -242,18 +241,6 @@ class DaskMonitoringSettings(BaseCustomSettings): description="defines the authentication of the clusters created via clusters-keeper (can be None or TLS)", ), ] - DASK_NTHREADS: Annotated[ - NonNegativeInt, - Field( - description="if >0, it overrides the default number of threads per process in the dask-sidecars, (see description in dask-sidecar)", - ), - ] - DASK_NTHREADS_MULTIPLIER: Annotated[ - PositiveInt, - Field( - description="if >1, it overrides the default number of threads per process in the dask-sidecars, by multiplying the number of vCPUs with this factor (see description in dask-sidecar)", - ), - ] class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index ca697a13fa06..7645b300e8de 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -23,9 +23,6 @@ def assign_task(self, task, task_resources: Resources) -> None: def has_resources_for_task(self, task_resources: Resources) -> bool: return bool(self.available_resources >= task_resources) - def has_assigned_tasks(self) -> bool: - return len(self.assigned_tasks) > 0 - @dataclass(frozen=True, kw_only=True, slots=True) class AssignedTasksToInstanceType(_TaskAssignmentMixin): @@ -40,6 +37,9 @@ def __post_init__(self) -> None: if self.available_resources == Resources.create_as_empty(): object.__setattr__(self, "available_resources", self.ec2_instance.resources) + def has_assigned_tasks(self) -> bool: + return bool(self.available_resources < self.ec2_instance.resources) + @dataclass(frozen=True, kw_only=True, slots=True) class AssociatedInstance(_BaseInstance): diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py index 535df02d3cf2..ff4b0ad4f5b6 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py @@ -94,33 +94,24 @@ async def _analyze_current_cluster( docker_nodes: list[Node] = await auto_scaling_mode.get_monitored_nodes(app) # get the EC2 instances we have - existing_ec2_instances: list[EC2InstanceData] = await get_ec2_client( - app - ).get_instances( + existing_ec2_instances = await get_ec2_client(app).get_instances( key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], tags=auto_scaling_mode.get_ec2_tags(app), state_names=["pending", "running"], ) - terminated_ec2_instances: list[EC2InstanceData] = await get_ec2_client( - app - ).get_instances( + terminated_ec2_instances = await get_ec2_client(app).get_instances( key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], tags=auto_scaling_mode.get_ec2_tags(app), state_names=["terminated"], ) - warm_buffer_ec2_instances: list[EC2InstanceData] = await get_ec2_client( - app - ).get_instances( + warm_buffer_ec2_instances = await get_ec2_client(app).get_instances( key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], tags=get_deactivated_warm_buffer_ec2_tags(auto_scaling_mode.get_ec2_tags(app)), state_names=["stopped"], ) - for i in itertools.chain(existing_ec2_instances, warm_buffer_ec2_instances): - auto_scaling_mode.add_instance_generic_resources(app, i) - attached_ec2s, pending_ec2s = associate_ec2_instances_with_nodes( docker_nodes, existing_ec2_instances ) @@ -352,9 +343,7 @@ async def _try_attach_pending_ec2s( ) -async def _sorted_allowed_instance_types( - app: FastAPI, auto_scaling_mode: AutoscalingProvider -) -> list[EC2InstanceType]: +async def _sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]: app_settings: ApplicationSettings = app.state.settings assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec ec2_client = get_ec2_client(app) @@ -377,10 +366,8 @@ def _as_selection(instance_type: EC2InstanceType) -> int: # NOTE: will raise ValueError if allowed_instance_types not in allowed_instance_type_names return allowed_instance_type_names.index(f"{instance_type.name}") - return [ - auto_scaling_mode.adjust_instance_type_resources(app, instance_type) - for instance_type in sorted(allowed_instance_types, key=_as_selection) - ] + allowed_instance_types.sort(key=_as_selection) + return allowed_instance_types async def _activate_and_notify( @@ -741,15 +728,15 @@ async def _find_needed_instances( task_required_resources = auto_scaling_mode.get_task_required_resources( task ) - task_required_ec2 = await auto_scaling_mode.get_task_defined_instance( - app, task + task_required_ec2_instance = ( + await auto_scaling_mode.get_task_defined_instance(app, task) ) # first check if we can assign the task to one of the newly tobe created instances if _try_assign_task_to_ec2_instance_type( task, instances=needed_new_instance_types_for_tasks, - task_required_ec2_instance=task_required_ec2, + task_required_ec2_instance=task_required_ec2_instance, task_required_resources=task_required_resources, ): continue @@ -757,12 +744,12 @@ async def _find_needed_instances( # so we need to find what we can create now try: # check if exact instance type is needed first - if task_required_ec2: + if task_required_ec2_instance: defined_ec2 = find_selected_instance_type_for_task( - task_required_ec2, + task_required_ec2_instance, available_ec2_types, task, - task_required_resources, + auto_scaling_mode.get_task_required_resources(task), ) needed_new_instance_types_for_tasks.append( AssignedTasksToInstanceType( @@ -776,7 +763,7 @@ async def _find_needed_instances( # we go for best fitting type best_ec2_instance = utils_ec2.find_best_fitting_ec2_instance( available_ec2_types, - task_required_resources, + auto_scaling_mode.get_task_required_resources(task), score_type=utils_ec2.closest_instance_policy, ) needed_new_instance_types_for_tasks.append( @@ -1588,10 +1575,7 @@ async def auto_scale_cluster( the additional load. """ # current state - allowed_instance_types = await _sorted_allowed_instance_types( - app, auto_scaling_mode - ) - + allowed_instance_types = await _sorted_allowed_instance_types(app) cluster = await _analyze_current_cluster( app, auto_scaling_mode, allowed_instance_types ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py index d580868627f2..c9b2d498fd66 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py @@ -1,13 +1,8 @@ import collections -import dataclasses import logging -from typing import Any, cast +from typing import cast from aws_library.ec2 import EC2InstanceData, EC2Tags, Resources -from aws_library.ec2._models import EC2InstanceType -from dask_task_models_library.resource_constraints import ( - estimate_dask_worker_resources_from_ec2_instance, -) from fastapi import FastAPI from models_library.clusters import ClusterAuthentication from models_library.docker import DockerLabelKey @@ -93,7 +88,13 @@ async def list_unrunnable_tasks(self, app: FastAPI) -> list[DaskTask]: def get_task_required_resources(self, task) -> Resources: assert self # nosec - return utils.resources_from_dask_task(task) + task_required_resources = utils.resources_from_dask_task(task) + # ensure cpu is set at least to 1 as dask-workers use 1 thread per CPU + if task_required_resources.cpus < 1.0: + task_required_resources = task_required_resources.model_copy( + update={"cpus": 1.0} + ) + return task_required_resources async def get_task_defined_instance( self, app: FastAPI, task @@ -140,14 +141,10 @@ async def compute_cluster_used_resources( list_of_used_resources: list[Resources] = await logged_gather( *(self.compute_node_used_resources(app, i) for i in instances) ) - counter: collections.Counter = collections.Counter() + counter = collections.Counter(dict.fromkeys(Resources.model_fields, 0)) for result in list_of_used_resources: - counter.update(result.as_flat_dict()) - - flat_counter: dict[str, Any] = dict(counter) - flat_counter.setdefault("cpus", 0) - flat_counter.setdefault("ram", 0) - return Resources.from_flat_dict(flat_counter) + counter.update(result.model_dump()) + return Resources.model_validate(dict(counter)) async def compute_cluster_total_resources( self, app: FastAPI, instances: list[AssociatedInstance] @@ -155,9 +152,7 @@ async def compute_cluster_total_resources( assert self # nosec try: return await dask.compute_cluster_total_resources( - _scheduler_url(app), - _scheduler_auth(app), - [i.ec2_instance for i in instances], + _scheduler_url(app), _scheduler_auth(app), instances ) except DaskNoWorkersError: return Resources.create_as_empty() @@ -187,33 +182,3 @@ async def is_instance_retired( async def try_retire_nodes(self, app: FastAPI) -> None: assert self # nosec await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app)) - - def add_instance_generic_resources( - self, app: FastAPI, instance: EC2InstanceData - ) -> None: - assert self # nosec - assert app # nosec - app_settings = get_application_settings(app) - assert app_settings.AUTOSCALING_DASK # nosec - dask.add_instance_generic_resources(app_settings.AUTOSCALING_DASK, instance) - - def adjust_instance_type_resources( - self, app: FastAPI, instance_type: EC2InstanceType - ) -> EC2InstanceType: - assert self # nosec - assert app # nosec - app_settings = get_application_settings(app) - assert app_settings.AUTOSCALING_DASK # nosec - adjusted_cpus, adjusted_ram = estimate_dask_worker_resources_from_ec2_instance( - instance_type.resources.cpus, instance_type.resources.ram - ) - replaced_instance_type = dataclasses.replace( - instance_type, - resources=instance_type.resources.model_copy( - update={"cpus": adjusted_cpus, "ram": ByteSize(adjusted_ram)} - ), - ) - dask.add_instance_type_generic_resource( - app_settings.AUTOSCALING_DASK, replaced_instance_type - ) - return replaced_instance_type diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py index d7499dc92e1a..e6dbca840e37 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py @@ -1,12 +1,7 @@ -import dataclasses - from aws_library.ec2 import EC2InstanceData, EC2Tags, Resources -from aws_library.ec2._models import EC2InstanceType from fastapi import FastAPI from models_library.docker import DockerLabelKey from models_library.generated_models.docker_rest_api import Node, Task -from pydantic import ByteSize -from servicelib.docker_utils import estimate_dynamic_sidecar_resources_from_ec2_instance from types_aiobotocore_ec2.literals import InstanceTypeType from ...core.settings import get_application_settings @@ -109,29 +104,3 @@ async def try_retire_nodes(self, app: FastAPI) -> None: assert self # nosec assert app # nosec # nothing to do here - - def add_instance_generic_resources( - self, app: FastAPI, instance: EC2InstanceData - ) -> None: - assert self # nosec - assert app # nosec - assert instance # nosec - # nothing to do at the moment - - def adjust_instance_type_resources( - self, app: FastAPI, instance_type: EC2InstanceType - ) -> EC2InstanceType: - assert self # nosec - assert app # nosec - adjusted_cpus, adjusted_ram = ( - estimate_dynamic_sidecar_resources_from_ec2_instance( - instance_type.resources.cpus, instance_type.resources.ram - ) - ) - - return dataclasses.replace( - instance_type, - resources=instance_type.resources.model_copy( - update={"cpus": adjusted_cpus, "ram": ByteSize(adjusted_ram)} - ), - ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py index d2f711229c4f..355394b9f1d3 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py @@ -1,7 +1,6 @@ from typing import Protocol from aws_library.ec2 import EC2InstanceData, EC2Tags, Resources -from aws_library.ec2._models import EC2InstanceType from fastapi import FastAPI from models_library.docker import DockerLabelKey from models_library.generated_models.docker_rest_api import Node as DockerNode @@ -48,11 +47,3 @@ async def is_instance_retired( ) -> bool: ... async def try_retire_nodes(self, app: FastAPI) -> None: ... - - def add_instance_generic_resources( - self, app: FastAPI, instance: EC2InstanceData - ) -> None: ... - - def adjust_instance_type_resources( - self, app: FastAPI, instance_type: EC2InstanceType - ) -> EC2InstanceType: ... diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py index 1b5225966809..4fb76ee5e129 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py @@ -1,12 +1,10 @@ import logging -from typing import Final, cast +from typing import Final -from aws_library.ec2 import GenericResourceValueType, Resources +from aws_library.ec2 import Resources from dask_task_models_library.resource_constraints import ( - DaskTaskResources, get_ec2_instance_type_from_resources, ) -from pydantic import ByteSize from ...models import DaskTask @@ -15,23 +13,11 @@ _DEFAULT_MAX_CPU: Final[float] = 1 _DEFAULT_MAX_RAM: Final[int] = 1024 -DASK_TO_RESOURCE_NAME_MAPPING: Final[dict[str, str]] = { - "CPU": "cpus", - "RAM": "ram", -} -_DEFAULT_DASK_RESOURCES: Final[DaskTaskResources] = DaskTaskResources( - CPU=_DEFAULT_MAX_CPU, RAM=ByteSize(_DEFAULT_MAX_RAM), threads=1 -) - def resources_from_dask_task(task: DaskTask) -> Resources: - task_resources = ( - _DEFAULT_DASK_RESOURCES | task.required_resources - ) # merge defaults with task resources (task resources override defaults) - - return Resources.from_flat_dict( - cast(dict[str, GenericResourceValueType], task_resources), - mapping=DASK_TO_RESOURCE_NAME_MAPPING, + return Resources( + cpus=task.required_resources.get("CPU", _DEFAULT_MAX_CPU), + ram=task.required_resources.get("RAM", _DEFAULT_MAX_RAM), ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index 65572f52c43f..966593295e87 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -4,35 +4,27 @@ import re from collections import defaultdict from collections.abc import AsyncIterator, Coroutine -from typing import Any, Final, TypeAlias, TypedDict +from typing import Any, Final, TypeAlias import dask.typing import distributed +import distributed.scheduler from aws_library.ec2 import EC2InstanceData, Resources -from aws_library.ec2._models import EC2InstanceType -from dask_task_models_library.resource_constraints import ( - DASK_WORKER_THREAD_RESOURCE_NAME, - DaskTaskResources, - create_ec2_resource_constraint_key, -) +from dask_task_models_library.resource_constraints import DaskTaskResources from distributed.core import Status from models_library.clusters import ClusterAuthentication, TLSAuthentication -from pydantic import AnyUrl +from pydantic import AnyUrl, ByteSize, TypeAdapter from ..core.errors import ( DaskNoWorkersError, DaskSchedulerNotFoundError, DaskWorkerNotFoundError, ) -from ..core.settings import DaskMonitoringSettings -from ..models import DaskTask, DaskTaskId +from ..models import AssociatedInstance, DaskTask, DaskTaskId from ..utils.utils_ec2 import ( node_host_name_from_ec2_private_dns, node_ip_from_ec2_private_dns, ) -from .cluster_scaling._utils_computational import ( - DASK_TO_RESOURCE_NAME_MAPPING, -) _logger = logging.getLogger(__name__) @@ -109,10 +101,6 @@ def _find_by_worker_host( _, details = dask_worker if match := re.match(DASK_NAME_PATTERN, details["name"]): return bool(match.group("private_ip") == node_hostname) - _logger.error( - "Unexpected worker name format: %s. TIP: this should be investigated as this is unexpected", - details["name"], - ) return False filtered_workers = dict(filter(_find_by_worker_host, workers.items())) @@ -126,45 +114,6 @@ def _find_by_worker_host( return next(iter(filtered_workers.items())) -class _DaskClusterTasks(TypedDict): - processing: dict[DaskWorkerUrl, list[tuple[dask.typing.Key, DaskTaskResources]]] - unrunnable: dict[dask.typing.Key, DaskTaskResources] - - -async def _list_cluster_known_tasks( - client: distributed.Client, -) -> _DaskClusterTasks: - def _list_on_scheduler( - dask_scheduler: distributed.Scheduler, - ) -> dict[str, Any]: - # NOTE: _DaskClusterTasks uses cannot be used here because of serialization issues - worker_to_processing_tasks = defaultdict(list) - unrunnable_tasks = {} - for task_key, task_state in dask_scheduler.tasks.items(): - if task_state.processing_on: - worker_to_processing_tasks[task_state.processing_on.address].append( - ( - task_key, - (task_state.resource_restrictions or {}) - | {DASK_WORKER_THREAD_RESOURCE_NAME: 1}, - ) - ) - elif task_state in dask_scheduler.unrunnable: - unrunnable_tasks[task_key] = ( - task_state.resource_restrictions or {} - ) | {DASK_WORKER_THREAD_RESOURCE_NAME: 1} - - return { - "processing": worker_to_processing_tasks, - "unrunnable": unrunnable_tasks, - } - - list_of_tasks: _DaskClusterTasks = await client.run_on_scheduler(_list_on_scheduler) - _logger.debug("found tasks: %s", list_of_tasks) - - return list_of_tasks - - async def is_worker_connected( scheduler_url: AnyUrl, authentication: ClusterAuthentication, @@ -214,10 +163,20 @@ async def list_unrunnable_tasks( DaskSchedulerNotFoundError """ - async with _scheduler_client(scheduler_url, authentication) as client: - known_tasks = await _list_cluster_known_tasks(client) - list_of_tasks = known_tasks["unrunnable"] + def _list_tasks( + dask_scheduler: distributed.Scheduler, + ) -> dict[dask.typing.Key, dict[str, float]]: + # NOTE: task.key can be a byte, str, or a tuple + return { + task.key: task.resource_restrictions or {} + for task in dask_scheduler.unrunnable + } + async with _scheduler_client(scheduler_url, authentication) as client: + list_of_tasks: dict[dask.typing.Key, DaskTaskResources] = ( + await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) + ) + _logger.debug("found unrunnable tasks: %s", list_of_tasks) return [ DaskTask( task_id=_dask_key_to_dask_task_id(task_id), @@ -236,11 +195,26 @@ async def list_processing_tasks_per_worker( DaskSchedulerNotFoundError """ - async with _scheduler_client(scheduler_url, authentication) as client: - worker_to_tasks = await _list_cluster_known_tasks(client) + def _list_processing_tasks( + dask_scheduler: distributed.Scheduler, + ) -> dict[str, list[tuple[dask.typing.Key, DaskTaskResources]]]: + worker_to_processing_tasks = defaultdict(list) + for task_key, task_state in dask_scheduler.tasks.items(): + if task_state.processing_on: + worker_to_processing_tasks[task_state.processing_on.address].append( + (task_key, task_state.resource_restrictions or {}) + ) + return worker_to_processing_tasks + async with _scheduler_client(scheduler_url, authentication) as client: + worker_to_tasks: dict[str, list[tuple[dask.typing.Key, DaskTaskResources]]] = ( + await _wrap_client_async_routine( + client.run_on_scheduler(_list_processing_tasks) + ) + ) + _logger.debug("found processing tasks: %s", worker_to_tasks) tasks_per_worker = defaultdict(list) - for worker, tasks in worker_to_tasks["processing"].items(): + for worker, tasks in worker_to_tasks.items(): for task_id, required_resources in tasks: tasks_per_worker[worker].append( DaskTask( @@ -283,54 +257,66 @@ async def get_worker_used_resources( DaskNoWorkersError """ + def _list_processing_tasks_on_worker( + dask_scheduler: distributed.Scheduler, *, worker_url: str + ) -> list[tuple[dask.typing.Key, DaskTaskResources]]: + processing_tasks = [] + for task_key, task_state in dask_scheduler.tasks.items(): + if task_state.processing_on and ( + task_state.processing_on.address == worker_url + ): + processing_tasks.append( + (task_key, task_state.resource_restrictions or {}) + ) + return processing_tasks + async with _scheduler_client(scheduler_url, authentication) as client: worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance) - known_tasks = await _list_cluster_known_tasks(client) - worker_processing_tasks = known_tasks["processing"].get(worker_url, []) - if not worker_processing_tasks: - return Resources.create_as_empty() - total_resources_used: collections.Counter = collections.Counter() + _logger.debug("looking for processing tasks for %s", f"{worker_url=}") + + # now get the used resources + worker_processing_tasks: list[tuple[dask.typing.Key, DaskTaskResources]] = ( + await _wrap_client_async_routine( + client.run_on_scheduler( + _list_processing_tasks_on_worker, worker_url=worker_url + ), + ) + ) + + total_resources_used: collections.Counter[str] = collections.Counter() for _, task_resources in worker_processing_tasks: total_resources_used.update(task_resources) _logger.debug("found %s for %s", f"{total_resources_used=}", f"{worker_url=}") - return Resources.from_flat_dict( - dict(total_resources_used), mapping=DASK_TO_RESOURCE_NAME_MAPPING + return Resources( + cpus=total_resources_used.get("CPU", 0), + ram=TypeAdapter(ByteSize).validate_python( + total_resources_used.get("RAM", 0) + ), ) async def compute_cluster_total_resources( scheduler_url: AnyUrl, authentication: ClusterAuthentication, - instances: list[EC2InstanceData], + instances: list[AssociatedInstance], ) -> Resources: if not instances: return Resources.create_as_empty() async with _scheduler_client(scheduler_url, authentication) as client: - ec2_instance_resources_map = { - node_ip_from_ec2_private_dns(i): i.resources for i in instances - } + instance_hosts = ( + node_ip_from_ec2_private_dns(i.ec2_instance) for i in instances + ) scheduler_info = client.scheduler_info() if "workers" not in scheduler_info or not scheduler_info["workers"]: raise DaskNoWorkersError(url=scheduler_url) workers: dict[str, Any] = scheduler_info["workers"] - cluster_resources = Resources.create_as_empty() for worker_details in workers.values(): - if worker_details["host"] not in ec2_instance_resources_map: + if worker_details["host"] not in instance_hosts: continue - # get dask information about resources - worker_dask_resources = worker_details["resources"] - worker_dask_nthreads = worker_details["nthreads"] - cluster_resources += Resources.from_flat_dict( - { - **worker_dask_resources, - DASK_WORKER_THREAD_RESOURCE_NAME: worker_dask_nthreads, - }, - mapping=DASK_TO_RESOURCE_NAME_MAPPING, - ) - return cluster_resources + return Resources.create_as_empty() async def try_retire_nodes( @@ -340,43 +326,3 @@ async def try_retire_nodes( await _wrap_client_async_routine( client.retire_workers(close_workers=False, remove=False) ) - - -_LARGE_RESOURCE: Final[int] = 99999 - - -def add_instance_generic_resources( - settings: DaskMonitoringSettings, instance: EC2InstanceData -) -> None: - instance_threads = max(1, round(instance.resources.cpus)) - if settings.DASK_NTHREADS > 0: - # this overrides everything - instance_threads = settings.DASK_NTHREADS - if settings.DASK_NTHREADS_MULTIPLIER > 1: - instance_threads = instance_threads * settings.DASK_NTHREADS_MULTIPLIER - instance.resources.generic_resources[DASK_WORKER_THREAD_RESOURCE_NAME] = ( - instance_threads - ) - - instance.resources.generic_resources[ - create_ec2_resource_constraint_key(instance.type) - ] = _LARGE_RESOURCE - - -def add_instance_type_generic_resource( - settings: DaskMonitoringSettings, instance_type: EC2InstanceType -) -> None: - instance_threads = max(1, round(instance_type.resources.cpus)) - if settings.DASK_NTHREADS > 0: - # this overrides everything - instance_threads = settings.DASK_NTHREADS - if settings.DASK_NTHREADS_MULTIPLIER > 1: - instance_threads = instance_threads * settings.DASK_NTHREADS_MULTIPLIER - - instance_type.resources.generic_resources[DASK_WORKER_THREAD_RESOURCE_NAME] = ( - instance_threads - ) - - instance_type.resources.generic_resources[ - create_ec2_resource_constraint_key(instance_type.name) - ] = _LARGE_RESOURCE diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py index af84e97bc01b..9de65aac078f 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py @@ -35,7 +35,7 @@ async def on_shutdown() -> None: ... def get_instrumentation(app: FastAPI) -> AutoscalingInstrumentation: - if not hasattr(app.state, "instrumentation"): + if not app.state.instrumentation: raise ConfigurationError( msg="Instrumentation not setup. Please check the configuration." ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py index cc2c1ad3ee0c..13c25dcd2112 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py @@ -109,12 +109,11 @@ def find_selected_instance_type_for_task( selected_instance = filtered_instances[0] # check that the assigned resources and the machine resource fit - if not (task_required_resources <= selected_instance.resources): + if task_required_resources > selected_instance.resources: raise TaskRequirementsAboveRequiredEC2InstanceTypeError( task=task, instance_type=selected_instance, resources=task_required_resources, - resources_diff=task_required_resources - selected_instance.resources, ) return selected_instance diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index f4feea61cfde..a48951986763 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -395,16 +395,14 @@ async def compute_cluster_used_resources( docker_client: AutoscalingDocker, nodes: list[Node] ) -> Resources: """Returns the total amount of resources (reservations) used on each of the given nodes""" - list_of_used_resources: list[Resources] = await logged_gather( + list_of_used_resources = await logged_gather( *(compute_node_used_resources(docker_client, node) for node in nodes) ) - flat_counter: collections.Counter = collections.Counter() + counter = collections.Counter(dict.fromkeys(list(Resources.model_fields), 0)) for result in list_of_used_resources: - flat_counter.update(result.as_flat_dict()) - flat_counter.setdefault("cpus", 0) - flat_counter.setdefault("ram", 0) + counter.update(result.model_dump()) - return Resources.from_flat_dict(dict(flat_counter)) + return Resources.model_validate(dict(counter)) _COMMAND_TIMEOUT_S = 10 diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 3c77b01be372..57c9b381fc2d 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -30,7 +30,6 @@ Resources, ) from common_library.json_serialization import json_dumps -from dask_task_models_library.container_tasks.utils import generate_dask_job_id from deepdiff import DeepDiff from faker import Faker from fakeredis.aioredis import FakeRedis @@ -53,11 +52,7 @@ Service, TaskSpec, ) -from models_library.projects import ProjectID -from models_library.projects_nodes_io import NodeID from models_library.services_metadata_runtime import SimcoreContainerLabels -from models_library.services_types import ServiceKey, ServiceVersion -from models_library.users import UserID from pydantic import ByteSize, NonNegativeInt, PositiveInt, TypeAdapter from pytest_mock import MockType from pytest_mock.plugin import MockerFixture @@ -385,8 +380,8 @@ def enabled_computational_mode( "AUTOSCALING_DASK": "{}", "DASK_MONITORING_URL": faker.url(), "DASK_SCHEDULER_AUTH": "{}", - "DASK_NTHREADS": f"{faker.pyint(min_value=0, max_value=10)}", - "DASK_NTHREADS_MULTIPLIER": f"{faker.pyint(min_value=1, max_value=4)}", + "DASK_MONITORING_USER_NAME": faker.user_name(), + "DASK_MONITORING_PASSWORD": faker.password(), }, ) @@ -862,55 +857,9 @@ def _creator(**cluter_overrides) -> Cluster: return _creator -@pytest.fixture -def service_version() -> ServiceVersion: - return "1.0.234" - - -@pytest.fixture -def service_key() -> ServiceKey: - return "simcore/services/dynamic/test" - - -@pytest.fixture -def node_id(faker: Faker) -> NodeID: - return faker.uuid4(cast_to=None) - - -@pytest.fixture -def project_id(faker: Faker) -> ProjectID: - return faker.uuid4(cast_to=None) - - -@pytest.fixture -def user_id(faker: Faker) -> UserID: - return faker.pyint(min_value=1) - - -@pytest.fixture -def fake_dask_job_id( - service_key: ServiceKey, - service_version: ServiceVersion, - user_id: UserID, - project_id: ProjectID, - faker: Faker, -) -> Callable[[], str]: - def _() -> str: - return generate_dask_job_id( - service_key=service_key, - service_version=service_version, - user_id=user_id, - project_id=project_id, - node_id=faker.uuid4(cast_to=None), - ) - - return _ - - @pytest.fixture async def create_dask_task( dask_spec_cluster_client: distributed.Client, - fake_dask_job_id: Callable[[], str], ) -> Callable[..., distributed.Future]: def _remote_pytest_fct(x: int, y: int) -> int: return x + y @@ -925,7 +874,6 @@ def _creator( 43, resources=required_resources, pure=False, - key=fake_dask_job_id(), **overrides, ) assert future diff --git a/services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py index bba8531e0032..f83eaac9ea8b 100644 --- a/services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py @@ -14,7 +14,7 @@ from collections.abc import Awaitable, Callable, Iterator from copy import deepcopy from dataclasses import dataclass -from typing import Any, cast +from typing import Any, Final, cast from unittest import mock import arrow @@ -22,9 +22,7 @@ import pytest from aws_library.ec2 import Resources from dask_task_models_library.resource_constraints import ( - DASK_WORKER_THREAD_RESOURCE_NAME, create_ec2_resource_constraint_key, - estimate_dask_worker_resources_from_ec2_instance, ) from faker import Faker from fastapi import FastAPI @@ -128,14 +126,10 @@ def _assert_rabbit_autoscaling_message_sent( instances_running=0, ) expected_message = default_message.model_copy(update=message_update_kwargs) - # in this mock we get all kind of messages, we just want to assert one of them is the expected one and there is only one - autoscaling_status_messages = [ - call_args.args[1] - for call_args in mock_rabbitmq_post_message.call_args_list - if isinstance(call_args.args[1], RabbitAutoscalingStatusMessage) - ] - assert len(autoscaling_status_messages) == 1, "too many messages sent" - assert autoscaling_status_messages[0] == expected_message + mock_rabbitmq_post_message.assert_called_once_with( + app, + expected_message, + ) @pytest.fixture @@ -261,25 +255,16 @@ async def _create_task_with_resources( instance_types = await ec2_client.describe_instance_types( InstanceTypes=[dask_task_imposed_ec2_type] ) - assert instance_types assert "InstanceTypes" in instance_types assert instance_types["InstanceTypes"] assert "MemoryInfo" in instance_types["InstanceTypes"][0] assert "SizeInMiB" in instance_types["InstanceTypes"][0]["MemoryInfo"] - ec2_ram = TypeAdapter(ByteSize).validate_python( - f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", - ) - assert "VCpuInfo" in instance_types["InstanceTypes"][0] - assert "DefaultVCpus" in instance_types["InstanceTypes"][0]["VCpuInfo"] - ec2_cpus = instance_types["InstanceTypes"][0]["VCpuInfo"]["DefaultVCpus"] - required_cpus, required_ram = estimate_dask_worker_resources_from_ec2_instance( - ec2_cpus, ec2_ram - ) task_resources = Resources( - cpus=required_cpus, - ram=ByteSize(required_ram), - generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1}, + cpus=1, + ram=TypeAdapter(ByteSize).validate_python( + f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", + ), ) assert task_resources @@ -300,11 +285,13 @@ class _ScaleUpParams: expected_num_instances: int +_RESOURCE_TO_DASK_RESOURCE_MAP: Final[dict[str, str]] = {"CPUS": "CPU", "RAM": "RAM"} + + def _dask_task_resources_from_resources(resources: Resources) -> DaskTaskResources: return { - "CPU": resources.cpus, - "RAM": resources.ram, - **dict(resources.generic_resources.items()), + _RESOURCE_TO_DASK_RESOURCE_MAP[res_key.upper()]: res_value + for res_key, res_value in resources.model_dump().items() } @@ -454,7 +441,7 @@ async def test_cluster_scaling_with_task_with_too_much_resources_starts_nothing( _ScaleUpParams( imposed_instance_type=None, task_resources=Resources( - cpus=1, ram=TypeAdapter(ByteSize).validate_python("115Gib") + cpus=1, ram=TypeAdapter(ByteSize).validate_python("128Gib") ), num_tasks=1, expected_instance_type="r5n.4xlarge", @@ -476,7 +463,7 @@ async def test_cluster_scaling_with_task_with_too_much_resources_starts_nothing( _ScaleUpParams( imposed_instance_type="r5n.8xlarge", task_resources=Resources( - cpus=1, ram=TypeAdapter(ByteSize).validate_python("115Gib") + cpus=1, ram=TypeAdapter(ByteSize).validate_python("116Gib") ), num_tasks=1, expected_instance_type="r5n.8xlarge", @@ -649,7 +636,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 ) mock_docker_tag_node.reset_mock() mock_docker_set_node_availability.assert_not_called() - assert mock_rabbitmq_post_message.call_count == 3 + mock_rabbitmq_post_message.assert_called_once() mock_rabbitmq_post_message.reset_mock() # now we have 1 monitored node that needs to be mocked @@ -945,6 +932,7 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_fitting_reso [InstanceTypeType | None, Resources], DaskTaskResources ], ec2_client: EC2Client, + faker: Faker, caplog: pytest.LogCaptureFixture, ): # we have nothing running now @@ -1292,7 +1280,7 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star _ScaleUpParams( imposed_instance_type=None, task_resources=Resources( - cpus=1, ram=TypeAdapter(ByteSize).validate_python("115Gib") + cpus=1, ram=TypeAdapter(ByteSize).validate_python("128Gib") ), num_tasks=1, expected_instance_type="r5n.4xlarge", @@ -1467,7 +1455,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( _ScaleUpParams( imposed_instance_type="g4dn.2xlarge", # 1 GPU, 8 CPUs, 32GiB task_resources=Resources( - cpus=7.9, ram=TypeAdapter(ByteSize).validate_python("15Gib") + cpus=8, ram=TypeAdapter(ByteSize).validate_python("15Gib") ), num_tasks=12, expected_instance_type="g4dn.2xlarge", # 1 GPU, 8 CPUs, 32GiB @@ -1476,7 +1464,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( _ScaleUpParams( imposed_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB task_resources=Resources( - cpus=31.9, ram=TypeAdapter(ByteSize).validate_python("20480MB") + cpus=32, ram=TypeAdapter(ByteSize).validate_python("20480MB") ), num_tasks=7, expected_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB diff --git a/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py index b16ac41233f0..8ba17f3f34ff 100644 --- a/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py @@ -437,7 +437,6 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect / 1e9, "ram": app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER * fake_node.description.resources.memory_bytes, - "generic_resources": {}, }, ) @@ -713,9 +712,11 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: cluster_total_resources={ "cpus": fake_attached_node.description.resources.nano_cp_us / 1e9, "ram": fake_attached_node.description.resources.memory_bytes, - "generic_resources": {}, }, - cluster_used_resources={"cpus": float(0), "ram": 0, "generic_resources": {}}, + cluster_used_resources={ + "cpus": float(0), + "ram": 0, + }, instances_running=scale_up_params.expected_num_instances, ) mock_rabbitmq_post_message.reset_mock() @@ -978,7 +979,7 @@ async def _assert_wait_for_ec2_instances_terminated() -> None: _ScaleUpParams( imposed_instance_type=None, service_resources=Resources( - cpus=4, ram=TypeAdapter(ByteSize).validate_python("114Gib") + cpus=4, ram=TypeAdapter(ByteSize).validate_python("128Gib") ), num_services=1, expected_instance_type="r5n.4xlarge", @@ -990,7 +991,7 @@ async def _assert_wait_for_ec2_instances_terminated() -> None: _ScaleUpParams( imposed_instance_type="t2.xlarge", service_resources=Resources( - cpus=2.6, ram=TypeAdapter(ByteSize).validate_python("4Gib") + cpus=4, ram=TypeAdapter(ByteSize).validate_python("4Gib") ), num_services=1, expected_instance_type="t2.xlarge", @@ -1002,7 +1003,7 @@ async def _assert_wait_for_ec2_instances_terminated() -> None: _ScaleUpParams( imposed_instance_type="r5n.8xlarge", service_resources=Resources( - cpus=4, ram=TypeAdapter(ByteSize).validate_python("114Gib") + cpus=4, ram=TypeAdapter(ByteSize).validate_python("128Gib") ), num_services=1, expected_instance_type="r5n.8xlarge", @@ -1165,7 +1166,7 @@ async def test_cluster_scaling_up_and_down_against_aws( ), num_services=10, expected_instance_type="r5n.4xlarge", # 1 GPU, 16 CPUs, 128GiB - expected_num_instances=5, + expected_num_instances=4, ), id="sim4life-light", ), @@ -1254,7 +1255,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( _ScaleUpParams( imposed_instance_type="g4dn.2xlarge", # 1 GPU, 8 CPUs, 32GiB service_resources=Resources( - cpus=6.6, ram=TypeAdapter(ByteSize).validate_python("15Gib") + cpus=8, ram=TypeAdapter(ByteSize).validate_python("15Gib") ), num_services=12, expected_instance_type="g4dn.2xlarge", # 1 GPU, 8 CPUs, 32GiB @@ -1263,7 +1264,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( _ScaleUpParams( imposed_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB service_resources=Resources( - cpus=30.6, ram=TypeAdapter(ByteSize).validate_python("20480MB") + cpus=32, ram=TypeAdapter(ByteSize).validate_python("20480MB") ), num_services=7, expected_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB @@ -1556,7 +1557,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 _ScaleUpParams( imposed_instance_type=None, service_resources=Resources( - cpus=4, ram=TypeAdapter(ByteSize).validate_python("114Gib") + cpus=4, ram=TypeAdapter(ByteSize).validate_python("128Gib") ), num_services=1, expected_instance_type="r5n.4xlarge", diff --git a/services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py index e051766dae31..e412487f4ea6 100644 --- a/services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py @@ -6,9 +6,6 @@ import pytest from aws_library.ec2 import Resources -from dask_task_models_library.resource_constraints import ( - DASK_WORKER_THREAD_RESOURCE_NAME, -) from pydantic import ByteSize, TypeAdapter from simcore_service_autoscaling.models import DaskTask, DaskTaskResources from simcore_service_autoscaling.modules.cluster_scaling._utils_computational import ( @@ -26,16 +23,13 @@ Resources( cpus=_DEFAULT_MAX_CPU, ram=TypeAdapter(ByteSize).validate_python(_DEFAULT_MAX_RAM), - generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1}, ), id="missing resources returns defaults", ), pytest.param( DaskTask(task_id="fake", required_resources={"CPU": 2.5}), Resources( - cpus=2.5, - ram=TypeAdapter(ByteSize).validate_python(_DEFAULT_MAX_RAM), - generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1}, + cpus=2.5, ram=TypeAdapter(ByteSize).validate_python(_DEFAULT_MAX_RAM) ), id="only cpus defined", ), @@ -44,25 +38,16 @@ task_id="fake", required_resources={"CPU": 2.5, "RAM": 2 * 1024 * 1024 * 1024}, ), - Resources( - cpus=2.5, - ram=TypeAdapter(ByteSize).validate_python("2GiB"), - generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1}, - ), + Resources(cpus=2.5, ram=TypeAdapter(ByteSize).validate_python("2GiB")), id="cpu and ram defined", ), pytest.param( DaskTask( task_id="fake", - required_resources={"CPU": 2.5, "xram": 2 * 1024 * 1024 * 1024}, # type: ignore + required_resources={"CPU": 2.5, "ram": 2 * 1024 * 1024 * 1024}, ), Resources( - cpus=2.5, - ram=TypeAdapter(ByteSize).validate_python(_DEFAULT_MAX_RAM), - generic_resources={ - DASK_WORKER_THREAD_RESOURCE_NAME: 1, - "xram": 2 * 1024 * 1024 * 1024, - }, + cpus=2.5, ram=TypeAdapter(ByteSize).validate_python(_DEFAULT_MAX_RAM) ), id="invalid naming", ), diff --git a/services/autoscaling/tests/unit/test_modules_dask.py b/services/autoscaling/tests/unit/test_modules_dask.py index d99c0f2f0869..9c53865cfa30 100644 --- a/services/autoscaling/tests/unit/test_modules_dask.py +++ b/services/autoscaling/tests/unit/test_modules_dask.py @@ -11,9 +11,6 @@ import pytest from arrow import utcnow from aws_library.ec2 import Resources -from dask_task_models_library.resource_constraints import ( - DASK_WORKER_THREAD_RESOURCE_NAME, -) from faker import Faker from models_library.clusters import ( ClusterAuthentication, @@ -34,20 +31,14 @@ EC2InstanceData, ) from simcore_service_autoscaling.modules.dask import ( - DaskMonitoringSettings, DaskTask, _scheduler_client, - add_instance_generic_resources, - compute_cluster_total_resources, get_worker_still_has_results_in_memory, get_worker_used_resources, - is_worker_connected, - is_worker_retired, list_processing_tasks_per_worker, list_unrunnable_tasks, - try_retire_nodes, ) -from tenacity import AsyncRetrying, retry, stop_after_delay, wait_fixed +from tenacity import retry, stop_after_delay, wait_fixed _authentication_types = [ NoAuthentication(), @@ -124,14 +115,11 @@ async def test_list_unrunnable_tasks( # we have nothing running now assert await list_unrunnable_tasks(scheduler_url, scheduler_authentication) == [] # start a task that cannot run - dask_task_impossible_resources = DaskTaskResources(XRAM=213, threads=1) # type: ignore + dask_task_impossible_resources = {"XRAM": 213} future = create_dask_task(dask_task_impossible_resources) assert future assert await list_unrunnable_tasks(scheduler_url, scheduler_authentication) == [ - DaskTask( - task_id=future.key, - required_resources=(dask_task_impossible_resources), - ) + DaskTask(task_id=future.key, required_resources=dask_task_impossible_resources) ] # remove that future, will remove the task del future @@ -166,10 +154,7 @@ def _add_fct(x: int, y: int) -> int: scheduler_url, scheduler_authentication ) == { next(iter(dask_spec_cluster_client.scheduler_info()["workers"])): [ - DaskTask( - task_id=DaskTaskId(future_queued_task.key), - required_resources=DaskTaskResources(threads=1), # type: ignore - ) + DaskTask(task_id=DaskTaskId(future_queued_task.key), required_resources={}) ] } @@ -373,11 +358,7 @@ def _add_fct(x: int, y: int) -> int: await _wait_for_dask_scheduler_to_change_state() assert await get_worker_used_resources( scheduler_url, scheduler_authentication, fake_localhost_ec2_instance_data - ) == Resources( - cpus=num_cpus, - ram=ByteSize(0), - generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1}, - ) + ) == Resources(cpus=num_cpus, ram=ByteSize(0)) result = await future_queued_task.result(timeout=_DASK_SCHEDULER_REACTION_TIME_S) # type: ignore assert result == 7 @@ -389,139 +370,3 @@ def _add_fct(x: int, y: int) -> int: ) == Resources.create_as_empty() ) - - -async def test_compute_cluster_total_resources( - dask_spec_local_cluster: distributed.SpecCluster, - scheduler_url: AnyUrl, - scheduler_authentication: ClusterAuthentication, - fake_ec2_instance_data: Callable[..., EC2InstanceData], - fake_localhost_ec2_instance_data: EC2InstanceData, -): - # asking for resources of empty cluster returns empty resources - assert ( - await compute_cluster_total_resources( - scheduler_url, scheduler_authentication, [] - ) - == Resources.create_as_empty() - ) - ec2_instance_data = fake_ec2_instance_data() - assert ec2_instance_data.resources.cpus > 0 - assert ec2_instance_data.resources.ram > 0 - assert ec2_instance_data.resources.generic_resources == {} - assert ( - await compute_cluster_total_resources( - scheduler_url, scheduler_authentication, [ec2_instance_data] - ) - == Resources.create_as_empty() - ), "this instance is not connected and should not be accounted for" - - cluster_total_resources = await compute_cluster_total_resources( - scheduler_url, scheduler_authentication, [fake_localhost_ec2_instance_data] - ) - assert cluster_total_resources.cpus > 0 - assert cluster_total_resources.ram > 0 - assert DASK_WORKER_THREAD_RESOURCE_NAME in cluster_total_resources.generic_resources - assert ( - cluster_total_resources.generic_resources[DASK_WORKER_THREAD_RESOURCE_NAME] == 2 - ) - - -@pytest.mark.parametrize( - "dask_nthreads, dask_nthreads_multiplier, expected_threads_resource", - [(4, 1, 4), (4, 2, 8), (0, 2, -1)], -) -def test_add_instance_generic_resources( - scheduler_url: AnyUrl, - scheduler_authentication: ClusterAuthentication, - fake_ec2_instance_data: Callable[..., EC2InstanceData], - dask_nthreads: int, - dask_nthreads_multiplier: int, - expected_threads_resource: int, -): - settings = DaskMonitoringSettings( - DASK_MONITORING_URL=scheduler_url, - DASK_SCHEDULER_AUTH=scheduler_authentication, - DASK_NTHREADS=dask_nthreads, - DASK_NTHREADS_MULTIPLIER=dask_nthreads_multiplier, - ) - ec2_instance_data = fake_ec2_instance_data() - assert ec2_instance_data.resources.cpus > 0 - assert ec2_instance_data.resources.ram > 0 - assert ec2_instance_data.resources.generic_resources == {} - - add_instance_generic_resources(settings, ec2_instance_data) - assert ec2_instance_data.resources.generic_resources != {} - assert ( - DASK_WORKER_THREAD_RESOURCE_NAME - in ec2_instance_data.resources.generic_resources - ) - if expected_threads_resource < 0: - expected_threads_resource = int( - ec2_instance_data.resources.cpus * dask_nthreads_multiplier - ) - assert ( - ec2_instance_data.resources.generic_resources[DASK_WORKER_THREAD_RESOURCE_NAME] - == expected_threads_resource - ) - - -async def test_is_worker_connected( - scheduler_url: AnyUrl, - scheduler_authentication: ClusterAuthentication, - fake_ec2_instance_data: Callable[..., EC2InstanceData], - fake_localhost_ec2_instance_data: EC2InstanceData, -): - ec2_instance_data = fake_ec2_instance_data() - assert ( - await is_worker_connected( - scheduler_url, scheduler_authentication, ec2_instance_data - ) - is False - ) - - assert ( - await is_worker_connected( - scheduler_url, scheduler_authentication, fake_localhost_ec2_instance_data - ) - is True - ) - - -async def test_is_worker_retired( - scheduler_url: AnyUrl, - scheduler_authentication: ClusterAuthentication, - fake_ec2_instance_data: Callable[..., EC2InstanceData], - fake_localhost_ec2_instance_data: EC2InstanceData, -): - ec2_instance_data = fake_ec2_instance_data() - # fake instance is not connected, so it cannot be retired - assert ( - await is_worker_retired( - scheduler_url, scheduler_authentication, ec2_instance_data - ) - is False - ) - - # localhost is connected, but not retired - assert ( - await is_worker_retired( - scheduler_url, scheduler_authentication, fake_localhost_ec2_instance_data - ) - is False - ) - - # retire localhost worker - await try_retire_nodes(scheduler_url, scheduler_authentication) - async for attempt in AsyncRetrying( - stop=stop_after_delay(10), wait=wait_fixed(1), reraise=True - ): - with attempt: - assert ( - await is_worker_retired( - scheduler_url, - scheduler_authentication, - fake_localhost_ec2_instance_data, - ) - is True - ) diff --git a/services/autoscaling/tests/unit/test_modules_instrumentation_core.py b/services/autoscaling/tests/unit/test_modules_instrumentation_core.py deleted file mode 100644 index ffc8d87bcb9d..000000000000 --- a/services/autoscaling/tests/unit/test_modules_instrumentation_core.py +++ /dev/null @@ -1,39 +0,0 @@ -# pylint: disable=no-value-for-parameter -# pylint: disable=redefined-outer-name -# pylint: disable=too-many-arguments -# pylint: disable=too-many-positional-arguments -# pylint: disable=too-many-statements -# pylint: disable=unused-argument -# pylint: disable=unused-variable - -import pytest -from fastapi import FastAPI -from pytest_simcore.helpers.typing_env import EnvVarsDict -from simcore_service_autoscaling.core.errors import ConfigurationError -from simcore_service_autoscaling.modules.instrumentation._core import ( - get_instrumentation, - has_instrumentation, -) - - -@pytest.fixture -def disabled_instrumentation( - app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch -) -> None: - assert app_environment - monkeypatch.setenv("AUTOSCALING_PROMETHEUS_INSTRUMENTATION_ENABLED", "false") - - -async def test_disabled_instrumentation( - disabled_rabbitmq: None, - disabled_ec2: None, - disabled_ssm: None, - disabled_instrumentation: None, - mocked_redis_server: None, - initialized_app: FastAPI, -): - # instrumentation disabled by default - assert not has_instrumentation(initialized_app) - - with pytest.raises(ConfigurationError): - get_instrumentation(initialized_app) diff --git a/services/autoscaling/tests/unit/test_utils_cluster_scaling.py b/services/autoscaling/tests/unit/test_utils_cluster_scaling.py index 5a15d63a6ed3..1c325c1f6234 100644 --- a/services/autoscaling/tests/unit/test_utils_cluster_scaling.py +++ b/services/autoscaling/tests/unit/test_utils_cluster_scaling.py @@ -75,24 +75,6 @@ async def test_associate_ec2_instances_with_nodes_with_no_correspondence( assert len(non_associated_instances) == len(ec2_instances) -async def test_associate_ec2_instances_with_nodes_with_invalid_dns( - fake_ec2_instance_data: Callable[..., EC2InstanceData], - node: Callable[..., DockerNode], -): - nodes = [node() for _ in range(10)] - ec2_instances = [ - fake_ec2_instance_data(aws_private_dns="invalid-dns-name") for _ in range(10) - ] - - ( - associated_instances, - non_associated_instances, - ) = associate_ec2_instances_with_nodes(nodes, ec2_instances) - - assert not associated_instances - assert non_associated_instances - - async def test_associate_ec2_instances_with_corresponding_nodes( fake_ec2_instance_data: Callable[..., EC2InstanceData], node: Callable[..., DockerNode], diff --git a/services/autoscaling/tests/unit/test_utils_rabbitmq.py b/services/autoscaling/tests/unit/test_utils_rabbitmq.py index 8741949e76a7..006155b1e0fa 100644 --- a/services/autoscaling/tests/unit/test_utils_rabbitmq.py +++ b/services/autoscaling/tests/unit/test_utils_rabbitmq.py @@ -122,6 +122,31 @@ async def _(labels: dict[DockerLabelKey, str]) -> list[Task]: return _ +@pytest.fixture +def service_version() -> ServiceVersion: + return "1.0.0" + + +@pytest.fixture +def service_key() -> ServiceKey: + return "simcore/services/dynamic/test" + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +def project_id(faker: Faker) -> ProjectID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +def user_id(faker: Faker) -> UserID: + return faker.pyint(min_value=1) + + @pytest.fixture def dask_task( service_key: ServiceKey, diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml index d3ba68cb76a8..dc44dd9ece75 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml @@ -104,8 +104,6 @@ services: AUTOSCALING_RABBITMQ: ${AUTOSCALING_RABBITMQ} DASK_MONITORING_URL: tls://dask-scheduler:8786 DASK_SCHEDULER_AUTH: '{"type":"tls","tls_ca_file":"${DASK_TLS_CA_FILE}","tls_client_cert":"${DASK_TLS_CERT}","tls_client_key":"${DASK_TLS_KEY}"}' - DASK_NTHREADS: ${DASK_NTHREADS} - DASK_NTHREADS_MULTIPLIER: ${DASK_NTHREADS_MULTIPLIER} EC2_INSTANCES_ALLOWED_TYPES: ${WORKERS_EC2_INSTANCES_ALLOWED_TYPES} EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING: ${WORKERS_EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING} EC2_INSTANCES_CUSTOM_TAGS: ${WORKERS_EC2_INSTANCES_CUSTOM_TAGS} diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py index 3446cb2f1497..10103909a631 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py @@ -5,9 +5,6 @@ import arrow from dask_task_models_library.container_tasks.protocol import ContainerEnvsDict -from dask_task_models_library.resource_constraints import ( - estimate_dask_worker_resources_from_ec2_instance, -) from models_library.api_schemas_catalog.services import ServiceGet from models_library.api_schemas_clusters_keeper.ec2_instances import EC2InstanceTypeGet from models_library.api_schemas_directorv2.services import ( @@ -295,14 +292,15 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: image_resources: ImageResources = node_resources[ DEFAULT_SINGLE_SERVICE_NAME ] - adjusted_cpus, adjusted_ram = ( - estimate_dask_worker_resources_from_ec2_instance( - float(selected_ec2_instance_type.cpus), - selected_ec2_instance_type.ram, + image_resources.resources["CPU"].set_value( + float(selected_ec2_instance_type.cpus) - _CPUS_SAFE_MARGIN + ) + image_resources.resources["RAM"].set_value( + int( + selected_ec2_instance_type.ram + - _RAM_SAFE_MARGIN_RATIO * selected_ec2_instance_type.ram ) ) - image_resources.resources["CPU"].set_value(adjusted_cpus) - image_resources.resources["RAM"].set_value(adjusted_ram) await project_nodes_repo.update( connection, diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py index 4d244d2c1f35..9bd620d3d23b 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py @@ -88,10 +88,6 @@ X_FORWARDED_PROTO, X_SIMCORE_USER_AGENT, ) -from servicelib.docker_utils import ( - DYNAMIC_SIDECAR_MIN_CPUS, - estimate_dynamic_sidecar_resources_from_ec2_instance, -) from servicelib.logging_utils import log_context from servicelib.rabbitmq import RemoteMethodNotRegisteredError, RPCServerError from servicelib.rabbitmq.rpc_interfaces.catalog import services as catalog_rpc @@ -656,12 +652,12 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: app, user_id, project_id, node_id, service_key, service_version ) scalable_service_name = DEFAULT_SINGLE_SERVICE_NAME - new_cpus_value, new_ram_value = ( - estimate_dynamic_sidecar_resources_from_ec2_instance( - selected_ec2_instance_type.cpus, selected_ec2_instance_type.ram - ) + new_cpus_value = float(selected_ec2_instance_type.cpus) - _CPUS_SAFE_MARGIN + new_ram_value = int( + selected_ec2_instance_type.ram + - _MACHINE_TOTAL_RAM_SAFE_MARGIN_RATIO * selected_ec2_instance_type.ram + - _SIDECARS_OPS_SAFE_RAM_MARGIN ) - if DEFAULT_SINGLE_SERVICE_NAME not in node_resources: # NOTE: we go for the largest sub-service and scale it up/down scalable_service_name, hungry_service_resources = max( @@ -684,14 +680,17 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: } ) new_cpus_value = max( - new_cpus_value - other_services_resources["CPU"], - DYNAMIC_SIDECAR_MIN_CPUS, + float(selected_ec2_instance_type.cpus) + - _CPUS_SAFE_MARGIN + - other_services_resources["CPU"], + _MIN_NUM_CPUS, ) - - new_ram_value = max( - int(new_ram_value - other_services_resources["RAM"]), 128 * 1024 * 1024 + new_ram_value = int( + selected_ec2_instance_type.ram + - _MACHINE_TOTAL_RAM_SAFE_MARGIN_RATIO * selected_ec2_instance_type.ram + - other_services_resources["RAM"] + - _SIDECARS_OPS_SAFE_RAM_MARGIN ) - # scale the service node_resources[scalable_service_name].resources["CPU"].set_value(new_cpus_value) node_resources[scalable_service_name].resources["RAM"].set_value(new_ram_value)