Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions services/dask-sidecar/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ aiormq==6.8.0
# via aio-pika
aiosignal==1.3.1
# via aiohttp
annotated-types==0.7.0
# via pydantic
anyio==4.3.0
# via
# fast-depends
Expand Down Expand Up @@ -257,7 +259,7 @@ psutil==6.0.0
# via
# -r requirements/../../../packages/service-library/requirements/_base.in
# distributed
pydantic==1.10.15
pydantic==2.9.2
# via
# -c requirements/../../../packages/dask-task-models-library/requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../packages/dask-task-models-library/requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
Expand All @@ -278,6 +280,23 @@ pydantic==1.10.15
# -r requirements/../../../packages/settings-library/requirements/_base.in
# -r requirements/_base.in
# fast-depends
# pydantic-extra-types
# pydantic-settings
pydantic-core==2.23.4
# via pydantic
pydantic-extra-types==2.9.0
# via
# -r requirements/../../../packages/dask-task-models-library/requirements/../../../packages/models-library/requirements/_base.in
# -r requirements/../../../packages/models-library/requirements/_base.in
# -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in
pydantic-settings==2.5.2
# via
# -r requirements/../../../packages/dask-task-models-library/requirements/../../../packages/models-library/requirements/_base.in
# -r requirements/../../../packages/dask-task-models-library/requirements/../../../packages/settings-library/requirements/_base.in
# -r requirements/../../../packages/models-library/requirements/_base.in
# -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in
# -r requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/_base.in
# -r requirements/../../../packages/settings-library/requirements/_base.in
pygments==2.18.0
# via rich
pyinstrument==4.6.2
Expand All @@ -288,7 +307,7 @@ python-dateutil==2.9.0.post0
# botocore
# pandas
python-dotenv==1.0.1
# via pydantic
# via pydantic-settings
pytz==2024.1
# via pandas
pyyaml==6.0.1
Expand Down Expand Up @@ -382,6 +401,7 @@ typing-extensions==4.11.0
# faststream
# opentelemetry-sdk
# pydantic
# pydantic-core
# typer
tzdata==2024.1
# via pandas
Expand Down
11 changes: 10 additions & 1 deletion services/dask-sidecar/requirements/_test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
annotated-types==0.7.0
# via
# -c requirements/_base.txt
# pydantic
antlr4-python3-runtime==4.13.2
# via moto
attrs==23.2.0
Expand Down Expand Up @@ -141,11 +145,15 @@ py-partiql-parser==0.5.6
# via moto
pycparser==2.22
# via cffi
pydantic==1.10.15
pydantic==2.9.2
# via
# -c requirements/../../../requirements/constraints.txt
# -c requirements/_base.txt
# aws-sam-translator
pydantic-core==2.23.4
# via
# -c requirements/_base.txt
# pydantic
pyftpdlib==2.0.0
# via pytest-localftpserver
pyopenssl==24.2.1
Expand Down Expand Up @@ -244,6 +252,7 @@ typing-extensions==4.11.0
# aws-sam-translator
# cfn-lint
# pydantic
# pydantic-core
urllib3==2.2.1
# via
# -c requirements/../../../requirements/constraints.txt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from models_library.basic_regex import SIMPLE_VERSION_RE
from models_library.services import ServiceMetaDataPublished
from packaging import version
from pydantic import BaseModel, ByteSize, Extra, Field, validator
from pydantic import BaseModel, ByteSize, ConfigDict, Field, ValidationInfo, field_validator

LEGACY_INTEGRATION_VERSION = version.Version("0")
PROGRESS_REGEXP: re.Pattern[str] = re.compile(
Expand Down Expand Up @@ -36,23 +36,24 @@ class ContainerHostConfig(BaseModel):
default=None,
alias="MemorySwap",
description="Total memory limit (memory + swap). Set as -1 to enable unlimited swap.",
validate_default=True,
)
nano_cpus: int = Field(
..., alias="NanoCPUs", description="CPU quota in units of 10-9 CPUs"
)

@validator("memory_swap", pre=True, always=True)
@field_validator("memory_swap", mode="before")
@classmethod
def ensure_no_memory_swap_means_no_swap(cls, v, values):
def ensure_no_memory_swap_means_no_swap(cls, v, info: ValidationInfo):
if v is None:
# if not set it will be the same value as memory to ensure swap is disabled
return values["memory"]
return info.data["memory"]
return v

@validator("memory_swap")
@field_validator("memory_swap")
@classmethod
def ensure_memory_swap_cannot_be_unlimited_nor_smaller_than_memory(cls, v, values):
if v < values["memory"]:
def ensure_memory_swap_cannot_be_unlimited_nor_smaller_than_memory(cls, v, info: ValidationInfo):
if v < info.data["memory"]:
msg = "Memory swap cannot be set to a smaller value than memory"
raise ValueError(msg)
return v
Expand All @@ -71,26 +72,24 @@ class ImageLabels(BaseModel):
default=str(LEGACY_INTEGRATION_VERSION),
alias="integration-version",
description="integration version number",
regex=SIMPLE_VERSION_RE,
pattern=SIMPLE_VERSION_RE,
examples=["1.0.0"],
)
progress_regexp: str = Field(
default=PROGRESS_REGEXP.pattern,
alias="progress_regexp",
description="regexp pattern for detecting computational service's progress",
)
model_config = ConfigDict(extra="ignore")

class Config:
extra = Extra.ignore

@validator("integration_version", pre=True)
@field_validator("integration_version", mode="before")
@classmethod
def default_integration_version(cls, v):
if v is None:
return ImageLabels().integration_version
return v

@validator("progress_regexp", pre=True)
@field_validator("progress_regexp", mode="before")
@classmethod
def default_progress_regexp(cls, v):
if v is None:
Expand All @@ -104,6 +103,6 @@ def get_progress_regexp(self) -> re.Pattern[str]:
return re.compile(self.progress_regexp)


assert set(ImageLabels.__fields__).issubset(
ServiceMetaDataPublished.__fields__
assert set(ImageLabels.model_fields).issubset(
ServiceMetaDataPublished.model_fields
), "ImageLabels must be compatible with ServiceDockerData"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any

from models_library.basic_types import LogLevel
from pydantic import Field, validator
from pydantic import AliasChoices, Field, field_validator
from settings_library.base import BaseCustomSettings
from settings_library.utils_logging import MixinLoggingSettings

Expand All @@ -14,7 +14,9 @@ class Settings(BaseCustomSettings, MixinLoggingSettings):
SC_BOOT_MODE: str | None = None
LOG_LEVEL: LogLevel = Field(
LogLevel.INFO.value,
env=["DASK_SIDECAR_LOGLEVEL", "SIDECAR_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"],
validation_alias=AliasChoices(
"DASK_SIDECAR_LOGLEVEL", "SIDECAR_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"
),
)

# sidecar config ---
Expand All @@ -37,7 +39,9 @@ class Settings(BaseCustomSettings, MixinLoggingSettings):

DASK_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field(
default=False,
env=["DASK_LOG_FORMAT_LOCAL_DEV_ENABLED", "LOG_FORMAT_LOCAL_DEV_ENABLED"],
validation_alias=AliasChoices(
"DASK_LOG_FORMAT_LOCAL_DEV_ENABLED", "LOG_FORMAT_LOCAL_DEV_ENABLED"
),
description="Enables local development log format. WARNING: make sure it is disabled if you want to have structured logs!",
)

Expand All @@ -50,7 +54,7 @@ def as_worker(self) -> bool:
assert self.DASK_SCHEDULER_HOST is not None # nosec
return as_worker

@validator("LOG_LEVEL", pre=True)
@field_validator("LOG_LEVEL", mode="before")
@classmethod
def _validate_loglevel(cls, value: Any) -> str:
return cls.validate_log_level(f"{value}")
28 changes: 14 additions & 14 deletions services/dask-sidecar/tests/unit/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from models_library.services import ServiceMetaDataPublished
from models_library.services_resources import BootMode
from packaging import version
from pydantic import AnyUrl, SecretStr, parse_obj_as
from pydantic import AnyUrl, SecretStr, TypeAdapter
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
from settings_library.s3 import S3Settings
Expand Down Expand Up @@ -178,7 +178,7 @@ def integration_version(request: pytest.FixtureRequest) -> version.Version:

@pytest.fixture
def additional_envs(faker: Faker) -> dict[EnvVarKey, str]:
return parse_obj_as(dict[EnvVarKey, str], faker.pydict(allowed_types=(str,)))
return TypeAdapter(dict[EnvVarKey, str]).validate_python(faker.pydict(allowed_types=(str,)))


@pytest.fixture
Expand All @@ -198,7 +198,7 @@ def sleeper_task(
list_of_files = [file_on_s3_server() for _ in range(NUM_FILES)]

# defines the inputs of the task
input_data = TaskInputData.parse_obj(
input_data = TaskInputData.model_validate(
{
"input_1": 23,
"input_23": "a string input",
Expand Down Expand Up @@ -276,7 +276,7 @@ def sleeper_task(
"pytest_bool": False,
}
output_file_url = s3_remote_file_url(file_path="output_file")
expected_output_keys = TaskOutputDataSchema.parse_obj(
expected_output_keys = TaskOutputDataSchema.model_validate(
{
**(
{k: {"required": True} for k in jsonable_outputs}
Expand All @@ -295,7 +295,7 @@ def sleeper_task(
),
}
)
expected_output_data = TaskOutputData.parse_obj(
expected_output_data = TaskOutputData.model_validate(
{
**(
jsonable_outputs
Expand Down Expand Up @@ -395,10 +395,10 @@ def _creator(command: list[str] | None = None) -> ServiceExampleParam:
service_version="latest",
command=command
or ["/bin/bash", "-c", "echo 'hello I'm an empty ubuntu task!"],
input_data=TaskInputData.parse_obj({}),
output_data_keys=TaskOutputDataSchema.parse_obj({}),
input_data=TaskInputData.model_validate({}),
output_data_keys=TaskOutputDataSchema.model_validate({}),
log_file_url=s3_remote_file_url(file_path="log.dat"),
expected_output_data=TaskOutputData.parse_obj({}),
expected_output_data=TaskOutputData.model_validate({}),
expected_logs=[],
integration_version=integration_version,
task_envs={},
Expand Down Expand Up @@ -437,8 +437,8 @@ def caplog_info_level(
def mocked_get_image_labels(
integration_version: version.Version, mocker: MockerFixture
) -> mock.Mock:
labels: ImageLabels = parse_obj_as(
ImageLabels, ServiceMetaDataPublished.Config.schema_extra["examples"][0]
labels: ImageLabels = TypeAdapter(ImageLabels).validate_python(
ServiceMetaDataPublished.model_config["json_schema_extra"]["examples"][0],
)
labels.integration_version = f"{integration_version}"
return mocker.patch(
Expand Down Expand Up @@ -580,15 +580,15 @@ async def test_run_computational_sidecar_dask(

# check that the task produces expected logs
worker_progresses = [
TaskProgressEvent.parse_raw(msg).progress for msg in progress_sub.buffer
TaskProgressEvent.model_validate_json(msg).progress for msg in progress_sub.buffer
]
# check ordering
assert worker_progresses == sorted(
set(worker_progresses)
), "ordering of progress values incorrectly sorted!"
assert worker_progresses[0] == 0, "missing/incorrect initial progress value"
assert worker_progresses[-1] == 1, "missing/incorrect final progress value"
worker_logs = [TaskLogEvent.parse_raw(msg).log for msg in log_sub.buffer]
worker_logs = [TaskLogEvent.model_validate_json(msg).log for msg in log_sub.buffer]
print(f"<-- we got {len(worker_logs)} lines of logs")

for log in sleeper_task.expected_logs:
Expand Down Expand Up @@ -649,7 +649,7 @@ async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub

# check that the task produces expected logs
worker_progresses = [
TaskProgressEvent.parse_raw(msg).progress for msg in progress_sub.buffer
TaskProgressEvent.model_validate_json(msg).progress for msg in progress_sub.buffer
]
# check length
assert len(worker_progresses) == len(
Expand All @@ -659,7 +659,7 @@ async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub
assert worker_progresses[0] == 0, "missing/incorrect initial progress value"
assert worker_progresses[-1] == 1, "missing/incorrect final progress value"

worker_logs = [TaskLogEvent.parse_raw(msg).log for msg in log_sub.buffer]
worker_logs = [TaskLogEvent.model_validate_json(msg).log for msg in log_sub.buffer]
# check all the awaited logs are in there
filtered_worker_logs = filter(lambda log: "This is iteration" in log, worker_logs)
assert len(list(filtered_worker_logs)) == NUMBER_OF_LOGS
Expand Down
Loading