diff --git a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py index a27bb027e948..ea9292d483c4 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py @@ -1,6 +1,4 @@ -import logging from abc import ABC, abstractmethod -from typing import TypeAlias import dask.typing from distributed.worker import get_worker @@ -85,48 +83,3 @@ def ensure_between_0_1(cls, v): if 0 <= v <= 1: return v return min(max(0, v), 1) - - -LogMessageStr: TypeAlias = str -LogLevelInt: TypeAlias = int - - -class TaskLogEvent(BaseTaskEvent): - log: LogMessageStr - log_level: LogLevelInt - - @staticmethod - def topic_name() -> str: - return "task_logs" - - @classmethod - def from_dask_worker( - cls, log: str, log_level: LogLevelInt, *, task_owner: TaskOwner - ) -> "TaskLogEvent": - worker = get_worker() - job_id = worker.get_current_task() - return cls( - job_id=_dask_key_to_dask_task_id(job_id), - log=log, - log_level=log_level, - task_owner=task_owner, - ) - - model_config = ConfigDict( - json_schema_extra={ - "examples": [ - { - "job_id": "simcore/services/comp/sleeper:1.1.0:projectid_ec7e595a-63ee-46a1-a04a-901b11b649f8:nodeid_39467d89-b659-4914-9359-c40b1b6d1d6d:uuid_5ee5c655-450d-4711-a3ec-32ffe16bc580", - "log": "some logs", - "log_level": logging.INFO, - "task_owner": { - "user_id": 32, - "project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8", - "node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d", - "parent_project_id": None, - "parent_node_id": None, - }, - }, - ] - } - ) diff --git a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py index dc87c52b1210..71eecbbe362d 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py @@ -17,6 +17,7 @@ StrictInt, StrictStr, ) +from pydantic.config import JsonDict TaskCancelEventName = "cancel_event_{}" @@ -24,18 +25,24 @@ class PortSchema(BaseModel): required: bool + @staticmethod + def _update_json_schema_extra(schema: JsonDict) -> None: + schema.update( + { + "examples": [ + { + "required": True, + }, + { + "required": False, + }, + ] + } + ) + model_config = ConfigDict( extra="forbid", - json_schema_extra={ - "examples": [ - { - "required": True, - }, - { - "required": False, - }, - ] - }, + json_schema_extra=_update_json_schema_extra, ) @@ -43,20 +50,26 @@ class FilePortSchema(PortSchema): mapping: str | None = None url: AnyUrl + @staticmethod + def _update_json_schema_extra(schema: JsonDict) -> None: + schema.update( + { + "examples": [ + { + "mapping": "some_filename.txt", + "url": "sftp://some_file_url", + "required": True, + }, + { + "required": False, + "url": "s3://another_file_url", + }, + ] + } + ) + model_config = ConfigDict( - json_schema_extra={ - "examples": [ - { - "mapping": "some_filename.txt", - "url": "sftp://some_file_url", - "required": True, - }, - { - "required": False, - "url": "s3://another_file_url", - }, - ] - } + json_schema_extra=_update_json_schema_extra, ) @@ -70,18 +83,27 @@ class FileUrl(BaseModel): default=None, description="the file MIME type", pattern=MIME_TYPE_RE ) + @staticmethod + def _update_json_schema_extra(schema: JsonDict) -> None: + schema.update( + { + "examples": [ + { + "url": "https://some_file_url", + "file_mime_type": "application/json", + }, + { + "url": "https://some_file_url", + "file_mapping": "some_file_name.txt", + "file_mime_type": "application/json", + }, + ] + } + ) + model_config = ConfigDict( extra="forbid", - json_schema_extra={ - "examples": [ - {"url": "https://some_file_url", "file_mime_type": "application/json"}, - { - "url": "https://some_file_url", - "file_mapping": "some_file_name.txt", - "file_mime_type": "application/json", - }, - ] - }, + json_schema_extra=_update_json_schema_extra, ) @@ -99,18 +121,24 @@ class FileUrl(BaseModel): class TaskInputData(DictModel[ServicePortKey, PortValue]): + @staticmethod + def _update_json_schema_extra(schema: JsonDict) -> None: + schema.update( + { + "examples": [ + { + "boolean_input": False, + "int_input": -45, + "float_input": 4564.45, + "string_input": "nobody thinks like a string", + "file_input": {"url": "s3://thatis_file_url"}, + }, + ] + } + ) + model_config = ConfigDict( - json_schema_extra={ - "examples": [ - { - "boolean_input": False, - "int_input": -45, - "float_input": 4564.45, - "string_input": "nobody thinks like a string", - "file_input": {"url": "s3://thatis_file_url"}, - }, - ] - } + json_schema_extra=_update_json_schema_extra, ) @@ -126,26 +154,32 @@ class TaskOutputDataSchema(DictModel[ServicePortKey, PortSchemaValue]): # does not work well in that case. For that reason, the schema is # sent as a json-schema instead of with a dynamically-created model class # - model_config = ConfigDict( - json_schema_extra={ - "examples": [ - { - "boolean_output": {"required": False}, - "int_output": {"required": True}, - "float_output": {"required": True}, - "string_output": {"required": False}, - "file_output": { - "required": True, - "url": "https://some_file_url", - "mapping": "the_output_filename", - }, - "optional_file_output": { - "required": False, - "url": "s3://one_file_url", + @staticmethod + def _update_json_schema_extra(schema: JsonDict) -> None: + schema.update( + { + "examples": [ + { + "boolean_output": {"required": False}, + "int_output": {"required": True}, + "float_output": {"required": True}, + "string_output": {"required": False}, + "file_output": { + "required": True, + "url": "https://some_file_url", + "mapping": "the_output_filename", + }, + "optional_file_output": { + "required": False, + "url": "s3://one_file_url", + }, }, - }, - ] - } + ] + } + ) + + model_config = ConfigDict( + json_schema_extra=_update_json_schema_extra, ) @@ -181,16 +215,20 @@ def from_task_output( return cls.model_validate(data) - model_config = ConfigDict( - json_schema_extra={ - "examples": [ - { - "boolean_output": False, - "int_output": -45, - "float_output": 4564.45, - "string_output": "nobody thinks like a string", - "file_output": {"url": "s3://yet_another_file_url"}, - }, - ] - } - ) + @staticmethod + def _update_json_schema_extra(schema: JsonDict) -> None: + schema.update( + { + "examples": [ + { + "boolean_output": False, + "int_output": -45, + "float_output": 4564.45, + "string_output": "nobody thinks like a string", + "file_output": {"url": "s3://yet_another_file_url"}, + }, + ] + } + ) + + model_config = ConfigDict(json_schema_extra=_update_json_schema_extra) diff --git a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/protocol.py b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/protocol.py index fd6acf554e02..f7179be78c0a 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/protocol.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/protocol.py @@ -7,6 +7,7 @@ from models_library.services_resources import BootMode from models_library.users import UserID from pydantic import AnyUrl, BaseModel, ConfigDict, model_validator +from pydantic.config import JsonDict from settings_library.s3 import S3Settings from .docker import DockerBasicAuth @@ -44,25 +45,31 @@ def check_parent_valid(cls, values: dict[str, Any]) -> dict[str, Any]: raise ValueError(msg) return values + @staticmethod + def _update_json_schema_extra(schema: JsonDict) -> None: + schema.update( + { + "examples": [ + { + "user_id": 32, + "project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8", + "node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d", + "parent_project_id": None, + "parent_node_id": None, + }, + { + "user_id": 32, + "project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8", + "node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d", + "parent_project_id": "887e595a-63ee-46a1-a04a-901b11b649f8", + "parent_node_id": "aa467d89-b659-4914-9359-c40b1b6d1d6d", + }, + ] + } + ) + model_config = ConfigDict( - json_schema_extra={ - "examples": [ - { - "user_id": 32, - "project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8", - "node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d", - "parent_project_id": None, - "parent_node_id": None, - }, - { - "user_id": 32, - "project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8", - "node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d", - "parent_project_id": "887e595a-63ee-46a1-a04a-901b11b649f8", - "parent_node_id": "aa467d89-b659-4914-9359-c40b1b6d1d6d", - }, - ] - } + json_schema_extra=_update_json_schema_extra, ) @@ -83,13 +90,15 @@ class ContainerTaskParameters(BaseModel): { "image": "ubuntu", "tag": "latest", - "input_data": TaskInputData.model_config["json_schema_extra"]["examples"][0], # type: ignore[index] - "output_data_keys": TaskOutputDataSchema.model_config["json_schema_extra"]["examples"][0], # type: ignore[index] + "input_data": TaskInputData.model_json_schema()["examples"][0], + "output_data_keys": TaskOutputDataSchema.model_json_schema()[ + "examples" + ][0], "command": ["sleep 10", "echo hello"], "envs": {"MYENV": "is an env"}, "labels": {"io.simcore.thelabel": "is amazing"}, "boot_mode": BootMode.CPU.value, - "task_owner": TaskOwner.model_config["json_schema_extra"]["examples"][0], # type: ignore[index] + "task_owner": TaskOwner.model_json_schema()["examples"][0], }, ] } @@ -104,5 +113,4 @@ def __call__( docker_auth: DockerBasicAuth, log_file_url: LogFileUploadURL, s3_settings: S3Settings | None, - ) -> TaskOutputData: - ... + ) -> TaskOutputData: ... diff --git a/packages/dask-task-models-library/tests/container_tasks/test_events.py b/packages/dask-task-models-library/tests/container_tasks/test_events.py index 1aa4139720d6..2d49f7d0310d 100644 --- a/packages/dask-task-models-library/tests/container_tasks/test_events.py +++ b/packages/dask-task-models-library/tests/container_tasks/test_events.py @@ -5,12 +5,10 @@ # pylint:disable=protected-access # pylint:disable=too-many-arguments -import logging import pytest from dask_task_models_library.container_tasks.events import ( BaseTaskEvent, - TaskLogEvent, TaskProgressEvent, ) from dask_task_models_library.container_tasks.protocol import TaskOwner @@ -24,7 +22,7 @@ def test_task_event_abstract(): BaseTaskEvent(job_id="some_fake") # type: ignore -@pytest.mark.parametrize("model_cls", [TaskProgressEvent, TaskLogEvent]) +@pytest.mark.parametrize("model_cls", [TaskProgressEvent]) def test_events_models_examples(model_cls): examples = model_cls.model_config["json_schema_extra"]["examples"] @@ -37,13 +35,15 @@ def test_events_models_examples(model_cls): assert model_instance.topic_name() -@pytest.fixture -def job_id(faker: Faker) -> str: - return faker.pystr() +@pytest.fixture(params=["string", "bytes"]) +def job_id(faker: Faker, request: pytest.FixtureRequest) -> str | bytes: + return faker.pystr() if request.param == "string" else faker.pystr().encode() @pytest.fixture() -def mocked_dask_worker_job_id(mocker: MockerFixture, job_id: str) -> str: +def mocked_dask_worker_job_id( + mocker: MockerFixture, job_id: str | bytes +) -> str | bytes: mock_get_worker = mocker.patch( "dask_task_models_library.container_tasks.events.get_worker", autospec=True ) @@ -51,41 +51,41 @@ def mocked_dask_worker_job_id(mocker: MockerFixture, job_id: str) -> str: return job_id -@pytest.fixture(params=TaskOwner.model_config["json_schema_extra"]["examples"]) +@pytest.fixture(params=TaskOwner.model_json_schema()["examples"]) def task_owner(request: pytest.FixtureRequest) -> TaskOwner: return TaskOwner(**request.param) def test_task_progress_from_worker( - mocked_dask_worker_job_id: str, task_owner: TaskOwner + mocked_dask_worker_job_id: str | bytes, task_owner: TaskOwner ): event = TaskProgressEvent.from_dask_worker(0.7, task_owner=task_owner) - assert event.job_id == mocked_dask_worker_job_id - assert event.progress == 0.7 - - -def test_task_log_from_worker(mocked_dask_worker_job_id: str, task_owner: TaskOwner): - event = TaskLogEvent.from_dask_worker( - log="here is the amazing logs", log_level=logging.INFO, task_owner=task_owner + assert ( + event.job_id == mocked_dask_worker_job_id.decode() + if isinstance(mocked_dask_worker_job_id, bytes) + else mocked_dask_worker_job_id ) - - assert event.job_id == mocked_dask_worker_job_id - assert event.log == "here is the amazing logs" - assert event.log_level == logging.INFO + assert event.progress == 0.7 @pytest.mark.parametrize( "progress_value, expected_progress", [(1.5, 1), (-0.5, 0), (0.75, 0.75)] ) def test_task_progress_progress_value_is_capped_between_0_and_1( - mocked_dask_worker_job_id: str, + mocked_dask_worker_job_id: str | bytes, task_owner: TaskOwner, progress_value: float, expected_progress: float, ): event = TaskProgressEvent( - job_id=mocked_dask_worker_job_id, task_owner=task_owner, progress=progress_value + job_id=( + mocked_dask_worker_job_id.decode() + if isinstance(mocked_dask_worker_job_id, bytes) + else mocked_dask_worker_job_id + ), + task_owner=task_owner, + progress=progress_value, ) assert event assert event.progress == expected_progress diff --git a/packages/dask-task-models-library/tests/container_tasks/test_io.py b/packages/dask-task-models-library/tests/container_tasks/test_io.py index db6357d930cd..f5340d379c08 100644 --- a/packages/dask-task-models-library/tests/container_tasks/test_io.py +++ b/packages/dask-task-models-library/tests/container_tasks/test_io.py @@ -53,9 +53,9 @@ def _create_fake_outputs( a_file.write_text(faker.text(max_nb_chars=450)) assert a_file.exists() else: - jsonable_data[ - key - ] = "some value just for testing, does not represent any kind of type" + jsonable_data[key] = ( + "some value just for testing, does not represent any kind of type" + ) if jsonable_data: output_file = output_folder / faker.file_name() with output_file.open("wt") as fp: @@ -69,10 +69,7 @@ def _create_fake_outputs( def test_create_task_output_from_task_with_optional_fields_as_required( tmp_path: Path, optional_fields_set: bool, faker: Faker ): - for schema_example in TaskOutputDataSchema.model_config["json_schema_extra"][ - "examples" - ]: - + for schema_example in TaskOutputDataSchema.model_json_schema()["examples"]: task_output_schema = TaskOutputDataSchema.model_validate(schema_example) outputs_file_name = _create_fake_outputs( task_output_schema, tmp_path, optional_fields_set, faker diff --git a/packages/dask-task-models-library/tests/container_tasks/test_protocol.py b/packages/dask-task-models-library/tests/container_tasks/test_protocol.py index 3c70924a0437..68f8aec751ae 100644 --- a/packages/dask-task-models-library/tests/container_tasks/test_protocol.py +++ b/packages/dask-task-models-library/tests/container_tasks/test_protocol.py @@ -9,7 +9,7 @@ @pytest.mark.parametrize("model_cls", [TaskOwner, ContainerTaskParameters]) def test_events_models_examples(model_cls): - examples = model_cls.model_config["json_schema_extra"]["examples"] + examples = model_cls.model_json_schema()["examples"] for index, example in enumerate(examples): print(f"{index:-^10}:\n", example) @@ -19,9 +19,7 @@ def test_events_models_examples(model_cls): def test_task_owner_parent_valid(faker: Faker): - invalid_task_owner_example = TaskOwner.model_config["json_schema_extra"][ - "examples" - ][0] + invalid_task_owner_example = TaskOwner.model_json_schema()["examples"][0] invalid_task_owner_example["parent_project_id"] = faker.uuid4() assert invalid_task_owner_example["parent_node_id"] is None with pytest.raises(ValidationError, match=r".+ are None or both are set!"): diff --git a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py index 61aed94151a4..c42075704b06 100644 --- a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py @@ -43,7 +43,7 @@ def rabbit_env_vars_dict( assert f"{prefix}_rabbit" in docker_stack["services"] port = get_service_published_port( - "rabbit", env_vars_for_docker_compose["RABBIT_PORT"] + "rabbit", int(env_vars_for_docker_compose["RABBIT_PORT"]) ) return { diff --git a/scripts/maintenance/computational-clusters/autoscaled_monitor/ssh.py b/scripts/maintenance/computational-clusters/autoscaled_monitor/ssh.py index 0d3159f818fc..229dab0c3f40 100644 --- a/scripts/maintenance/computational-clusters/autoscaled_monitor/ssh.py +++ b/scripts/maintenance/computational-clusters/autoscaled_monitor/ssh.py @@ -139,7 +139,7 @@ async def get_available_disk_space( # Available disk space will be captured here available_space = stdout.read().decode("utf-8").strip() - return ByteSize(available_space) + return ByteSize(available_space if available_space else 0) except ( paramiko.AuthenticationException, paramiko.SSHException, diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml index 87f4ef94560f..f0901f1093cd 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml @@ -9,10 +9,12 @@ services: DASK_TLS_CA_FILE: ${DASK_TLS_CA_FILE} DASK_TLS_CERT: ${DASK_TLS_CERT} DASK_TLS_KEY: ${DASK_TLS_KEY} + DASK_SIDECAR_RABBITMQ: ${AUTOSCALING_RABBITMQ} DASK_SCHEDULER_URL: tls://dask-scheduler:8786 DASK_START_AS_SCHEDULER: 1 DASK_WORKER_SATURATION: ${DASK_WORKER_SATURATION} LOG_LEVEL: ${LOG_LEVEL} + ports: - 8786:8786 # dask-scheduler access - 8787:8787 # dashboard @@ -54,11 +56,13 @@ services: DASK_SCHEDULER_URL: tls://dask-scheduler:8786 DASK_SIDECAR_NON_USABLE_RAM: 0 DASK_SIDECAR_NUM_NON_USABLE_CPUS: 0 + DASK_SIDECAR_RABBITMQ: ${AUTOSCALING_RABBITMQ} DASK_TLS_CA_FILE: ${DASK_TLS_CA_FILE} DASK_TLS_CERT: ${DASK_TLS_CERT} DASK_TLS_KEY: ${DASK_TLS_KEY} DASK_WORKER_SATURATION: ${DASK_WORKER_SATURATION} LOG_LEVEL: ${LOG_LEVEL} + SIDECAR_COMP_SERVICES_SHARED_FOLDER: /home/scu/computational_shared_data SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME: computational_shared_data deploy: diff --git a/services/dask-sidecar/docker/boot.sh b/services/dask-sidecar/docker/boot.sh index 89a4d14afc68..e86c8518abc2 100755 --- a/services/dask-sidecar/docker/boot.sh +++ b/services/dask-sidecar/docker/boot.sh @@ -117,7 +117,7 @@ else fi # GPUs - num_gpus=$(python -c "from simcore_service_dask_sidecar.utils import num_available_gpus; print(num_available_gpus());") + num_gpus=$(python -c "from simcore_service_dask_sidecar.utils.gpus import num_available_gpus; print(num_available_gpus());") # RAM (is computed similarly as the default dask-sidecar computation) _value=$(python -c "import psutil; print(int(psutil.virtual_memory().total * $num_cpus/$(nproc)))") @@ -128,7 +128,7 @@ else # add the GPUs if there are any if [ "$num_gpus" -gt 0 ]; then - total_vram=$(python -c "from simcore_service_dask_sidecar.utils import video_memory; print(video_memory());") + total_vram=$(python -c "from simcore_service_dask_sidecar.utils.gpus import video_memory; print(video_memory());") resources="$resources,GPU=$num_gpus,VRAM=$total_vram" fi @@ -173,7 +173,7 @@ else exec watchmedo auto-restart --recursive --pattern="*.py;*/src/*" --ignore-patterns="*test*;pytest_simcore/*;setup.py;*ignore*" --ignore-directories -- \ dask worker "${DASK_SCHEDULER_URL}" \ --local-directory /tmp/dask-sidecar \ - --preload simcore_service_dask_sidecar.tasks \ + --preload simcore_service_dask_sidecar.worker \ --nworkers ${DASK_NPROCS} \ --nthreads "${DASK_NTHREADS}" \ --dashboard-address 8787 \ @@ -183,7 +183,7 @@ else else exec dask worker "${DASK_SCHEDULER_URL}" \ --local-directory /tmp/dask-sidecar \ - --preload simcore_service_dask_sidecar.tasks \ + --preload simcore_service_dask_sidecar.worker \ --nworkers ${DASK_NPROCS} \ --nthreads "${DASK_NTHREADS}" \ --dashboard-address 8787 \ diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/cli.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/cli.py index 827d23d3491c..55fef7662329 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/cli.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/cli.py @@ -4,7 +4,7 @@ from settings_library.utils_cli import create_settings_command, create_version_callback from ._meta import PROJECT_NAME, __version__ -from .settings import Settings +from .settings import ApplicationSettings # SEE setup entrypoint 'simcore_service_dask_sidecar.cli:the_app' _logger = logging.getLogger(__name__) @@ -15,4 +15,6 @@ # COMMANDS # main.callback()(create_version_callback(__version__)) -main.command()(create_settings_command(settings_cls=Settings, logger=_logger)) +main.command()( + create_settings_command(settings_cls=ApplicationSettings, logger=_logger) +) diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py index 7b753e306207..2bd094306fbd 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py @@ -25,9 +25,9 @@ from settings_library.s3 import S3Settings from yarl import URL -from ..dask_utils import TaskPublisher -from ..file_utils import pull_file_from_remote, push_file_to_remote -from ..settings import Settings +from ..settings import ApplicationSettings +from ..utils.dask import TaskPublisher +from ..utils.files import pull_file_from_remote, push_file_to_remote from .docker_utils import ( create_container_config, get_computational_shared_data_mount_point, @@ -160,7 +160,7 @@ async def _retrieve_output_data( async def _publish_sidecar_log( self, log: LogMessageStr, log_level: LogLevelInt = logging.INFO ) -> None: - self.task_publishers.publish_logs( + await self.task_publishers.publish_logs( message=f"[sidecar] {log}", log_level=log_level ) @@ -172,16 +172,20 @@ async def run(self, command: list[str]) -> TaskOutputData: # NOTE: this is for tracing purpose _logger.info("Running task owner: %s", self.task_parameters.task_owner) - settings = Settings.create_from_envs() + settings = ApplicationSettings.create_from_envs() run_id = f"{uuid4()}" - async with Docker() as docker_client, TaskSharedVolumes( - Path(f"{settings.SIDECAR_COMP_SERVICES_SHARED_FOLDER}/{run_id}") - ) as task_volumes, ProgressBarData( - num_steps=3, - step_weights=[5 / 100, 90 / 100, 5 / 100], - progress_report_cb=self.task_publishers.publish_progress, - description="running", - ) as progress_bar: + async with ( + Docker() as docker_client, + TaskSharedVolumes( + Path(f"{settings.SIDECAR_COMP_SERVICES_SHARED_FOLDER}/{run_id}") + ) as task_volumes, + ProgressBarData( + num_steps=3, + step_weights=[5 / 100, 90 / 100, 5 / 100], + progress_report_cb=self.task_publishers.publish_progress, + description="running", + ) as progress_bar, + ): # PRE-PROCESSING await pull_image( docker_client, @@ -216,24 +220,28 @@ async def run(self, command: list[str]) -> TaskOutputData: ) await progress_bar.update() # NOTE: (1 step weighting 5%) # PROCESSING (1 step weighted 90%) - async with managed_container( - docker_client, - config, - name=f"{self.task_parameters.image.split(sep='/')[-1]}_{run_id}", - ) as container, progress_bar.sub_progress( - 100, description="processing" - ) as processing_progress_bar, managed_monitor_container_log_task( - container=container, - progress_regexp=image_labels.get_progress_regexp(), - service_key=self.task_parameters.image, - service_version=self.task_parameters.tag, - task_publishers=self.task_publishers, - integration_version=image_labels.get_integration_version(), - task_volumes=task_volumes, - log_file_url=self.log_file_url, - log_publishing_cb=self._publish_sidecar_log, - s3_settings=self.s3_settings, - progress_bar=processing_progress_bar, + async with ( + managed_container( + docker_client, + config, + name=f"{self.task_parameters.image.split(sep='/')[-1]}_{run_id}", + ) as container, + progress_bar.sub_progress( + 100, description="processing" + ) as processing_progress_bar, + managed_monitor_container_log_task( + container=container, + progress_regexp=image_labels.get_progress_regexp(), + service_key=self.task_parameters.image, + service_version=self.task_parameters.tag, + task_publishers=self.task_publishers, + integration_version=image_labels.get_integration_version(), + task_volumes=task_volumes, + log_file_url=self.log_file_url, + log_publishing_cb=self._publish_sidecar_log, + s3_settings=self.s3_settings, + progress_bar=processing_progress_bar, + ), ): await container.start() await self._publish_sidecar_log( diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py index 5fd4f24e71d7..9b472fa2f1c4 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py @@ -38,9 +38,9 @@ from servicelib.progress_bar import ProgressBarData from settings_library.s3 import S3Settings -from ..dask_utils import TaskPublisher -from ..file_utils import push_file_to_remote -from ..settings import Settings +from ..settings import ApplicationSettings +from ..utils.dask import TaskPublisher +from ..utils.files import push_file_to_remote from .constants import LEGACY_SERVICE_LOG_FILE_NAME from .models import ( LEGACY_INTEGRATION_VERSION, @@ -188,7 +188,7 @@ async def _parse_and_publish_logs( if progress_value is not None: await progress_bar.set_(round(progress_value * 100.0)) - task_publishers.publish_logs( + await task_publishers.publish_logs( message=log_line, log_level=guess_message_log_level(log_line) ) @@ -474,7 +474,7 @@ async def get_image_labels( async def get_computational_shared_data_mount_point(docker_client: Docker) -> Path: - app_settings = Settings.create_from_envs() + app_settings = ApplicationSettings.create_from_envs() try: logger.debug( "getting computational shared data mount point for %s", diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/errors.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/errors.py index 8e5d1e8794ff..009ae95f650e 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/errors.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/errors.py @@ -1,8 +1,4 @@ -from common_library.errors_classes import OsparcErrorMixin - - -class ComputationalSidecarRuntimeError(OsparcErrorMixin, RuntimeError): - ... +from ..errors import ComputationalSidecarRuntimeError class ServiceBadFormattedOutputError(ComputationalSidecarRuntimeError): diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/errors.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/errors.py new file mode 100644 index 000000000000..1400bf1a2699 --- /dev/null +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/errors.py @@ -0,0 +1,8 @@ +from common_library.errors_classes import OsparcErrorMixin + + +class ComputationalSidecarRuntimeError(OsparcErrorMixin, RuntimeError): ... + + +class ConfigurationError(ComputationalSidecarRuntimeError): + msg_template: str = "Application misconfiguration: {msg}" diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_plugin.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_plugin.py new file mode 100644 index 000000000000..554988aa0b2f --- /dev/null +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_plugin.py @@ -0,0 +1,159 @@ +import asyncio +import logging +import threading +from asyncio import AbstractEventLoop +from collections.abc import Awaitable +from typing import Final + +import distributed +from servicelib.async_utils import cancel_wait_task +from servicelib.logging_utils import log_catch, log_context +from servicelib.rabbitmq import RabbitMQClient, wait_till_rabbitmq_responsive +from servicelib.rabbitmq._models import RabbitMessage +from settings_library.rabbit import RabbitSettings + +from .errors import ConfigurationError + +_logger = logging.getLogger(__name__) + +_RABBITMQ_CONFIGURATION_ERROR: Final[str] = ( + "RabbitMQ client is not available. Please check the configuration." +) + + +class RabbitMQPlugin(distributed.WorkerPlugin): + """Dask Worker Plugin for RabbitMQ integration""" + + name = "rabbitmq_plugin" + _main_thread_loop: AbstractEventLoop | None = None + _client: RabbitMQClient | None = None + _settings: RabbitSettings | None = None + _message_queue: asyncio.Queue | None = None + _message_processor: asyncio.Task | None = None + + def __init__(self, settings: RabbitSettings): + self._settings = settings + + async def _process_messages(self) -> None: + """Process messages from worker threads in the main thread""" + assert self._message_queue is not None # nosec + assert self._client is not None # nosec + + with log_context(_logger, logging.INFO, "RabbitMQ message processor"): + while True: + with log_catch(_logger, reraise=False): + exchange_name, message_data = await self._message_queue.get() + try: + await self._client.publish(exchange_name, message_data) + finally: + self._message_queue.task_done() + + def setup(self, worker: distributed.Worker) -> Awaitable[None]: + """Called when the plugin is attached to a worker""" + + async def _() -> None: + if not self._settings: + _logger.warning( + "RabbitMQ client is de-activated (no settings provided)" + ) + return + + if threading.current_thread() is not threading.main_thread(): + _logger.warning( + "RabbitMQ client plugin setup is not in the main thread! Beware! if in pytest it's ok." + ) + + with log_context( + _logger, + logging.INFO, + f"RabbitMQ client initialization for worker {worker.address}", + ): + self._main_thread_loop = asyncio.get_event_loop() + await wait_till_rabbitmq_responsive(self._settings.dsn) + self._client = RabbitMQClient( + client_name="dask-sidecar", settings=self._settings + ) + + self._message_queue = asyncio.Queue() + self._message_processor = asyncio.create_task( + self._process_messages(), name="rabbit_message_processor" + ) + + return _() + + def teardown(self, worker: distributed.Worker) -> Awaitable[None]: + """Called when the worker shuts down or the plugin is removed""" + + async def _() -> None: + with log_context( + _logger, + logging.INFO, + f"RabbitMQ client teardown for worker {worker.address}", + ): + if not self._client: + return + if threading.current_thread() is threading.main_thread(): + _logger.info( + "RabbitMQ client plugin setup is in the main thread! That is good." + ) + else: + _logger.warning( + "RabbitMQ client plugin setup is not the main thread!" + ) + + # Cancel the message processor task + if self._message_processor: + with log_catch(_logger, reraise=False): + await cancel_wait_task(self._message_processor, max_delay=5) + self._message_processor = None + + # close client + current_loop = asyncio.get_event_loop() + if self._main_thread_loop != current_loop: + _logger.warning("RabbitMQ client is de-activated (loop mismatch)") + assert self._main_thread_loop # nosec + with log_catch(_logger, reraise=False): + await asyncio.wait_for(self._client.close(), timeout=5.0) + + self._client = None + + return _() + + def get_client(self) -> RabbitMQClient: + """Returns the RabbitMQ client or raises an error if not available""" + if not self._client: + raise ConfigurationError(msg=_RABBITMQ_CONFIGURATION_ERROR) + return self._client + + async def publish_message_from_any_thread( + self, exchange_name: str, message_data: RabbitMessage + ) -> None: + """Enqueue a message to be published to RabbitMQ from any thread""" + assert self._message_queue # nosec + + if threading.current_thread() is threading.main_thread(): + # If we're in the main thread, add directly to the queue + await self._message_queue.put((exchange_name, message_data)) + return + + # If we're in a worker thread, we need to use a different approach + assert self._main_thread_loop # nosec + + # Create a Future in the main thread's event loop + future = asyncio.run_coroutine_threadsafe( + self._message_queue.put((exchange_name, message_data)), + self._main_thread_loop, + ) + + # waiting here is quick, just queueing + future.result() + + +def get_rabbitmq_client(worker: distributed.Worker) -> RabbitMQPlugin: + """Returns the RabbitMQ client or raises an error if not available""" + if not worker.plugins: + raise ConfigurationError(msg=_RABBITMQ_CONFIGURATION_ERROR) + rabbitmq_plugin = worker.plugins.get(RabbitMQPlugin.name) + if not isinstance(rabbitmq_plugin, RabbitMQPlugin): + raise ConfigurationError(msg=_RABBITMQ_CONFIGURATION_ERROR) + return rabbitmq_plugin diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/scheduler.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/scheduler.py index 8d229c9c8a87..4127fca2528b 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/scheduler.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/scheduler.py @@ -1,20 +1,27 @@ import logging -import dask.config import distributed +from servicelib.logging_utils import log_context from ._meta import print_dask_scheduler_banner +from .settings import ApplicationSettings +from .utils.logs import setup_app_logging _logger = logging.getLogger(__name__) async def dask_setup(scheduler: distributed.Scheduler) -> None: - """This is a special function recognized by the dask worker when starting with flag --preload""" - _logger.info("Setting up scheduler...") + """This is a special function recognized by dask when starting with flag --preload""" assert scheduler # nosec - print(f"dask config: {dask.config.config}", flush=True) # noqa: T201 - print_dask_scheduler_banner() + + settings = ApplicationSettings.create_from_envs() + setup_app_logging(settings) + + with log_context(_logger, logging.INFO, "Launch dask scheduler"): + _logger.info("app settings: %s", settings.model_dump_json(indent=1)) + print_dask_scheduler_banner() async def dask_teardown(_worker: distributed.Worker) -> None: - _logger.info("Shutting down scheduler") + with log_context(_logger, logging.INFO, "Tear down dask scheduler"): + ... diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/settings.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/settings.py index b77811fd57fb..e0a3e41d3a58 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/settings.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/settings.py @@ -4,16 +4,13 @@ from models_library.basic_types import LogLevel from pydantic import AliasChoices, Field, field_validator from servicelib.logging_utils_filtering import LoggerName, MessageSubstring -from settings_library.base import BaseCustomSettings +from settings_library.application import BaseApplicationSettings +from settings_library.rabbit import RabbitSettings from settings_library.utils_logging import MixinLoggingSettings -class Settings(BaseCustomSettings, MixinLoggingSettings): - """Dask-sidecar app settings""" - - SC_BUILD_TARGET: str | None = None - SC_BOOT_MODE: str | None = None - LOG_LEVEL: Annotated[ +class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): + DASK_SIDECAR_LOGLEVEL: Annotated[ LogLevel, Field( validation_alias=AliasChoices( @@ -22,48 +19,49 @@ class Settings(BaseCustomSettings, MixinLoggingSettings): ), ] = LogLevel.INFO - # sidecar config --- - SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME: str SIDECAR_COMP_SERVICES_SHARED_FOLDER: Path - SIDECAR_INTERVAL_TO_CHECK_TASK_ABORTED_S: int | None = 5 - - # dask config ---- - - DASK_START_AS_SCHEDULER: bool | None = Field( - default=False, description="If this env is set, then the app boots as scheduler" - ) + DASK_SIDECAR_INTERVAL_TO_CHECK_TASK_ABORTED_S: int | None = 5 - DASK_SCHEDULER_HOST: str | None = Field( - None, - description="Address of the scheduler to register (only if started as worker )", - ) + DASK_START_AS_SCHEDULER: Annotated[ + bool | None, + Field(description="If this env is set, then the app boots as scheduler"), + ] = False - DASK_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field( - default=False, - validation_alias=AliasChoices( - "DASK_LOG_FORMAT_LOCAL_DEV_ENABLED", - "LOG_FORMAT_LOCAL_DEV_ENABLED", + DASK_SCHEDULER_HOST: Annotated[ + str | None, + Field( + description="Address of the scheduler to register (only if started as worker )", ), - description="Enables local development log format. WARNING: make sure it is disabled if you want to have structured logs!", - ) - DASK_LOG_FILTER_MAPPING: dict[LoggerName, list[MessageSubstring]] = Field( - default_factory=dict, - validation_alias=AliasChoices("DASK_LOG_FILTER_MAPPING", "LOG_FILTER_MAPPING"), - description="is a dictionary that maps specific loggers (such as 'uvicorn.access' or 'gunicorn.access') to a list of log message patterns that should be filtered out.", - ) + ] = None - def as_scheduler(self) -> bool: - return bool(self.DASK_START_AS_SCHEDULER) + DASK_LOG_FORMAT_LOCAL_DEV_ENABLED: Annotated[ + bool, + Field( + validation_alias=AliasChoices( + "DASK_LOG_FORMAT_LOCAL_DEV_ENABLED", + "LOG_FORMAT_LOCAL_DEV_ENABLED", + ), + description="Enables local development log format. WARNING: make sure it is disabled if you want to have structured logs!", + ), + ] = False + DASK_LOG_FILTER_MAPPING: Annotated[ + dict[LoggerName, list[MessageSubstring]], + Field( + default_factory=dict, + validation_alias=AliasChoices( + "DASK_LOG_FILTER_MAPPING", "LOG_FILTER_MAPPING" + ), + description="is a dictionary that maps specific loggers (such as 'uvicorn.access' or 'gunicorn.access') to a list of log message patterns that should be filtered out.", + ), + ] - def as_worker(self) -> bool: - as_worker = not self.as_scheduler() - if as_worker: - assert self.DASK_SCHEDULER_HOST is not None # nosec - return as_worker + DASK_SIDECAR_RABBITMQ: Annotated[ + RabbitSettings | None, Field(json_schema_extra={"auto_default_from_env": True}) + ] - @field_validator("LOG_LEVEL", mode="before") + @field_validator("DASK_SIDECAR_LOGLEVEL", mode="before") @classmethod def _validate_loglevel(cls, value: Any) -> str: return cls.validate_log_level(f"{value}") diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/__init__.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/dask.py similarity index 79% rename from services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py rename to services/dask-sidecar/src/simcore_service_dask_sidecar/utils/dask.py index d04682dac075..feab5e4d632a 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/dask.py @@ -9,7 +9,6 @@ from dask_task_models_library.container_tasks.errors import TaskCancelledError from dask_task_models_library.container_tasks.events import ( BaseTaskEvent, - TaskLogEvent, TaskProgressEvent, ) from dask_task_models_library.container_tasks.io import TaskCancelEventName @@ -17,8 +16,11 @@ from distributed.worker import get_worker from distributed.worker_state_machine import TaskState from models_library.progress_bar import ProgressReport +from models_library.rabbitmq_messages import LoggerRabbitMessage from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch +from ..rabbitmq_plugin import get_rabbitmq_client + _logger = logging.getLogger(__name__) @@ -63,11 +65,9 @@ class TaskPublisher: task_owner: TaskOwner progress: distributed.Pub = field(init=False) _last_published_progress_value: float = -1 - logs: distributed.Pub = field(init=False) def __post_init__(self) -> None: self.progress = distributed.Pub(TaskProgressEvent.topic_name()) - self.logs = distributed.Pub(TaskLogEvent.topic_name()) def publish_progress(self, report: ProgressReport) -> None: rounded_value = round(report.percent_value, ndigits=2) @@ -82,19 +82,38 @@ def publish_progress(self, report: ProgressReport) -> None: self._last_published_progress_value = rounded_value _logger.debug("PROGRESS: %s", rounded_value) - def publish_logs( + async def publish_logs( self, *, message: LogMessageStr, log_level: LogLevelInt, ) -> None: with log_catch(logger=_logger, reraise=False): - publish_event( - self.logs, - TaskLogEvent.from_dask_worker( - log=message, log_level=log_level, task_owner=self.task_owner - ), + rabbitmq_client = get_rabbitmq_client(get_worker()) + base_message = LoggerRabbitMessage.model_construct( + user_id=self.task_owner.user_id, + project_id=self.task_owner.project_id, + node_id=self.task_owner.node_id, + messages=[message], + log_level=log_level, + ) + await rabbitmq_client.publish_message_from_any_thread( + base_message.channel_name, base_message ) + if self.task_owner.has_parent: + assert self.task_owner.parent_project_id # nosec + assert self.task_owner.parent_node_id # nosec + parent_message = LoggerRabbitMessage.model_construct( + user_id=self.task_owner.user_id, + project_id=self.task_owner.parent_project_id, + node_id=self.task_owner.parent_node_id, + messages=[message], + log_level=log_level, + ) + await rabbitmq_client.publish_message_from_any_thread( + parent_message.channel_name, parent_message + ) + _logger.log(log_level, message) @@ -114,7 +133,7 @@ async def cancel_task(task_name: str) -> None: if task := next( (t for t in asyncio.all_tasks() if t.get_name() == task_name), None ): - task_publishers.publish_logs( + await task_publishers.publish_logs( message="[sidecar] cancelling task...", log_level=logging.INFO ) task.cancel() @@ -134,7 +153,7 @@ async def periodicaly_check_if_aborted(task_name: str) -> None: yield except asyncio.CancelledError as exc: - task_publishers.publish_logs( + await task_publishers.publish_logs( message="[sidecar] task run was aborted", log_level=logging.INFO ) diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/file_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/files.py similarity index 100% rename from services/dask-sidecar/src/simcore_service_dask_sidecar/file_utils.py rename to services/dask-sidecar/src/simcore_service_dask_sidecar/utils/files.py diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/gpus.py similarity index 100% rename from services/dask-sidecar/src/simcore_service_dask_sidecar/utils.py rename to services/dask-sidecar/src/simcore_service_dask_sidecar/utils/gpus.py diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/logs.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/logs.py new file mode 100644 index 000000000000..74b158de9e2e --- /dev/null +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/logs.py @@ -0,0 +1,20 @@ +import logging + +from servicelib.logging_utils import config_all_loggers + +from ..settings import ApplicationSettings + + +def setup_app_logging(settings: ApplicationSettings) -> None: + # set up logging + logging.basicConfig(level=settings.DASK_SIDECAR_LOGLEVEL.value) + logging.root.setLevel(level=settings.DASK_SIDECAR_LOGLEVEL.value) + # NOTE: Dask attaches a StreamHandler to the logger in distributed + # removing them solves dual propagation of logs + for handler in logging.getLogger("distributed").handlers: + logging.getLogger("distributed").removeHandler(handler) + config_all_loggers( + log_format_local_dev_enabled=settings.DASK_LOG_FORMAT_LOCAL_DEV_ENABLED, + logger_filter_mapping=settings.DASK_LOG_FILTER_MAPPING, + tracing_settings=None, # no tracing for dask sidecar + ) diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/tasks.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/worker.py similarity index 69% rename from services/dask-sidecar/src/simcore_service_dask_sidecar/tasks.py rename to services/dask-sidecar/src/simcore_service_dask_sidecar/worker.py index cc061d6cd39d..de3ecd6d66cc 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/tasks.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/worker.py @@ -11,14 +11,19 @@ ContainerTaskParameters, LogFileUploadURL, ) -from distributed.worker import logger -from servicelib.logging_utils import config_all_loggers +from servicelib.logging_utils import log_context from settings_library.s3 import S3Settings from ._meta import print_dask_sidecar_banner from .computational_sidecar.core import ComputationalSidecar -from .dask_utils import TaskPublisher, get_current_task_resources, monitor_task_abortion -from .settings import Settings +from .rabbitmq_plugin import RabbitMQPlugin +from .settings import ApplicationSettings +from .utils.dask import ( + TaskPublisher, + get_current_task_resources, + monitor_task_abortion, +) +from .utils.logs import setup_app_logging _logger = logging.getLogger(__name__) @@ -40,7 +45,7 @@ def __init__(self, worker: distributed.Worker): def exit_gracefully(self, *_args): tasks = asyncio.all_tasks() - logger.warning( + _logger.warning( "Application shutdown detected!\n %s", pformat([t.get_name() for t in tasks]), ) @@ -52,37 +57,34 @@ def exit_gracefully(self, *_args): async def dask_setup(worker: distributed.Worker) -> None: - """This is a special function recognized by the dask worker when starting with flag --preload""" - settings = Settings.create_from_envs() - # set up logging - logging.basicConfig(level=settings.LOG_LEVEL.value) - logging.root.setLevel(level=settings.LOG_LEVEL.value) - logger.setLevel(level=settings.LOG_LEVEL.value) - # NOTE: Dask attaches a StreamHandler to the logger in distributed - # removing them solves dual propagation of logs - for handler in logging.getLogger("distributed").handlers: - logging.getLogger("distributed").removeHandler(handler) - config_all_loggers( - log_format_local_dev_enabled=settings.DASK_LOG_FORMAT_LOCAL_DEV_ENABLED, - logger_filter_mapping=settings.DASK_LOG_FILTER_MAPPING, - tracing_settings=None, # no tracing for dask sidecar - ) + """This is a special function recognized by dask when starting with flag --preload""" + settings = ApplicationSettings.create_from_envs() + setup_app_logging(settings) + + with log_context(_logger, logging.INFO, "Launch dask worker"): + _logger.info("app settings: %s", settings.model_dump_json(indent=1)) - logger.info("Setting up worker...") - logger.info("Settings: %s", pformat(settings.model_dump())) + if threading.current_thread() is threading.main_thread(): + GracefulKiller(worker) - print_dask_sidecar_banner() + loop = asyncio.get_event_loop() + _logger.info("We do have a running loop in the main thread: %s", f"{loop=}") - if threading.current_thread() is threading.main_thread(): - loop = asyncio.get_event_loop() - logger.info("We do have a running loop in the main thread: %s", f"{loop=}") + if settings.DASK_SIDECAR_RABBITMQ: + try: + await worker.plugin_add( + RabbitMQPlugin(settings.DASK_SIDECAR_RABBITMQ), catch_errors=False + ) + except Exception: + await worker.close() + raise - if threading.current_thread() is threading.main_thread(): - GracefulKiller(worker) + print_dask_sidecar_banner() -async def dask_teardown(_worker: distributed.Worker) -> None: - logger.warning("Tearing down worker!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") +async def dask_teardown(worker: distributed.Worker) -> None: + with log_context(_logger, logging.INFO, f"tear down dask {worker.address}"): + ... async def _run_computational_sidecar_async( diff --git a/services/dask-sidecar/tests/unit/conftest.py b/services/dask-sidecar/tests/unit/conftest.py index 4d4801752d94..49d61fb1bd26 100644 --- a/services/dask-sidecar/tests/unit/conftest.py +++ b/services/dask-sidecar/tests/unit/conftest.py @@ -15,6 +15,8 @@ import pytest import simcore_service_dask_sidecar from aiobotocore.session import AioBaseClient, get_session +from common_library.json_serialization import json_dumps +from common_library.serialization import model_dump_with_secrets from dask_task_models_library.container_tasks.protocol import TaskOwner from faker import Faker from models_library.projects import ProjectID @@ -25,8 +27,11 @@ from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict +from settings_library.rabbit import RabbitSettings from settings_library.s3 import S3Settings -from simcore_service_dask_sidecar.file_utils import _s3fs_settings_from_s3_settings +from simcore_service_dask_sidecar.utils.files import ( + _s3fs_settings_from_s3_settings, +) from yarl import URL pytest_plugins = [ @@ -37,6 +42,7 @@ "pytest_simcore.docker_swarm", "pytest_simcore.environment_configs", "pytest_simcore.faker_users_data", + "pytest_simcore.rabbit_service", "pytest_simcore.repository_paths", ] @@ -80,6 +86,7 @@ def app_environment( monkeypatch: pytest.MonkeyPatch, env_devel_dict: EnvVarsDict, shared_data_folder: Path, + rabbit_service: RabbitSettings, ) -> EnvVarsDict: # configured as worker envs = setenvs_from_dict( @@ -88,6 +95,9 @@ def app_environment( # .env-devel **env_devel_dict, # Variables directly define inside Dockerfile + "DASK_SIDECAR_RABBITMQ": json_dumps( + model_dump_with_secrets(rabbit_service, show_secrets=True) + ), "SC_BOOT_MODE": "debug", "SIDECAR_LOGLEVEL": "DEBUG", "SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME": "simcore_computational_shared_data", @@ -107,10 +117,11 @@ def local_cluster(app_environment: EnvVarsDict) -> Iterator[distributed.LocalClu with distributed.LocalCluster( worker_class=distributed.Worker, resources={"CPU": 10, "GPU": 10}, - preload="simcore_service_dask_sidecar.tasks", + preload="simcore_service_dask_sidecar.worker", ) as cluster: assert cluster assert isinstance(cluster, distributed.LocalCluster) + print(cluster.workers) yield cluster @@ -119,6 +130,7 @@ def dask_client( local_cluster: distributed.LocalCluster, ) -> Iterator[distributed.Client]: with distributed.Client(local_cluster) as client: + client.wait_for_workers(1, timeout=10) yield client @@ -130,7 +142,7 @@ async def async_local_cluster( async with distributed.LocalCluster( worker_class=distributed.Worker, resources={"CPU": 10, "GPU": 10}, - preload="simcore_service_dask_sidecar.tasks", + preload="simcore_service_dask_sidecar.worker", asynchronous=True, ) as cluster: assert cluster @@ -231,7 +243,7 @@ def creator() -> AnyUrl: open_file = fsspec.open(f"{new_remote_file}", mode="wt", **s3_storage_kwargs) with open_file as fp: fp.write( # type: ignore - f"This is the file contents of file #'{(len(list_of_created_files)+1):03}'\n" + f"This is the file contents of file #'{(len(list_of_created_files) + 1):03}'\n" ) for s in faker.sentences(5): fp.write(f"{s}\n") # type: ignore diff --git a/services/dask-sidecar/tests/unit/test__requirements.py b/services/dask-sidecar/tests/unit/test__requirements.py index 737f4417a9fb..de6bd947e8c9 100644 --- a/services/dask-sidecar/tests/unit/test__requirements.py +++ b/services/dask-sidecar/tests/unit/test__requirements.py @@ -4,6 +4,7 @@ import re from pathlib import Path +from typing import TypeAlias import pytest @@ -16,11 +17,13 @@ def requirements_folder(project_slug_dir: Path) -> Path: return reqs_dir +NameVersionTuple: TypeAlias = tuple[str, str] + + def test_dask_requirements_in_sync(requirements_folder: Path): """If this test fails, do update requirements to re-sync all listings""" REQS_ENTRY_REGEX = re.compile(r"(\w+)==([\.\w]+)") - NameVersionTuple = tuple[str, str] def get_reqs(fname: str) -> set[NameVersionTuple]: return set(REQS_ENTRY_REGEX.findall((requirements_folder / fname).read_text())) diff --git a/services/dask-sidecar/tests/unit/test_cli.py b/services/dask-sidecar/tests/unit/test_cli.py index 7a359d44cc0e..09762400f4e7 100644 --- a/services/dask-sidecar/tests/unit/test_cli.py +++ b/services/dask-sidecar/tests/unit/test_cli.py @@ -10,9 +10,13 @@ from pytest_simcore.helpers.typing_env import EnvVarsDict from simcore_service_dask_sidecar._meta import API_VERSION from simcore_service_dask_sidecar.cli import main -from simcore_service_dask_sidecar.settings import Settings +from simcore_service_dask_sidecar.settings import ApplicationSettings from typer.testing import CliRunner +pytest_simcore_core_services_selection = [ + "rabbit", +] + def test_cli_help_and_version(cli_runner: CliRunner): # invitations-maker --help @@ -28,5 +32,5 @@ def test_list_settings(cli_runner: CliRunner, app_environment: EnvVarsDict): result = cli_runner.invoke(main, ["settings", "--show-secrets", "--as-json"]) assert result.exit_code == os.EX_OK, result.output - settings = Settings(result.output) - assert settings.model_dump() == Settings.create_from_envs().model_dump() + settings = ApplicationSettings(result.output) + assert settings.model_dump() == ApplicationSettings.create_from_envs().model_dump() diff --git a/services/dask-sidecar/tests/unit/test_docker_utils.py b/services/dask-sidecar/tests/unit/test_computational_docker_utils.py similarity index 100% rename from services/dask-sidecar/tests/unit/test_docker_utils.py rename to services/dask-sidecar/tests/unit/test_computational_docker_utils.py diff --git a/services/dask-sidecar/tests/unit/test_models.py b/services/dask-sidecar/tests/unit/test_computational_models.py similarity index 100% rename from services/dask-sidecar/tests/unit/test_models.py rename to services/dask-sidecar/tests/unit/test_computational_models.py diff --git a/services/dask-sidecar/tests/unit/test_task_shared_volume.py b/services/dask-sidecar/tests/unit/test_computational_sidecar_task_shared_volume.py similarity index 100% rename from services/dask-sidecar/tests/unit/test_task_shared_volume.py rename to services/dask-sidecar/tests/unit/test_computational_sidecar_task_shared_volume.py diff --git a/services/dask-sidecar/tests/unit/test_deployment.py b/services/dask-sidecar/tests/unit/test_deployment.py index 08beb0cd2e7d..ee4fedf3d387 100644 --- a/services/dask-sidecar/tests/unit/test_deployment.py +++ b/services/dask-sidecar/tests/unit/test_deployment.py @@ -1,8 +1,8 @@ -from typing import Any, Dict +from typing import Any def test_sidecar_service_is_deployed_in_global_mode( - simcore_docker_compose: Dict[str, Any] + simcore_docker_compose: dict[str, Any], ): dask_sidecar_deploy_config = simcore_docker_compose["services"]["dask-sidecar"][ "deploy" diff --git a/services/dask-sidecar/tests/unit/test_rabbitmq_plugin.py b/services/dask-sidecar/tests/unit/test_rabbitmq_plugin.py new file mode 100644 index 000000000000..de632c818ec8 --- /dev/null +++ b/services/dask-sidecar/tests/unit/test_rabbitmq_plugin.py @@ -0,0 +1,16 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable +# pylint: disable=no-member + +import distributed + +# Selection of core and tool services started in this swarm fixture (integration) +pytest_simcore_core_services_selection = [ + "rabbit", +] + +pytest_simcore_ops_services_selection = [] + + +def test_rabbitmq_plugin_initializes(dask_client: distributed.Client): ... diff --git a/services/dask-sidecar/tests/unit/test_settings.py b/services/dask-sidecar/tests/unit/test_settings.py deleted file mode 100644 index 3f7596469a62..000000000000 --- a/services/dask-sidecar/tests/unit/test_settings.py +++ /dev/null @@ -1,25 +0,0 @@ -# pylint: disable=redefined-outer-name -# pylint: disable=unused-argument -# pylint: disable=unused-variable - - -import pytest -from pytest_simcore.helpers.typing_env import EnvVarsDict -from simcore_service_dask_sidecar.settings import Settings - - -def test_settings_as_worker( - app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch -): - settings = Settings.create_from_envs() - assert settings.as_worker() - - -def test_settings_as_scheduler( - app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch -): - assert app_environment.get("DASK_START_AS_SCHEDULER", None) != "1" - monkeypatch.setenv("DASK_START_AS_SCHEDULER", "1") - - settings = Settings.create_from_envs() - assert settings.as_scheduler() diff --git a/services/dask-sidecar/tests/unit/test_dask_utils.py b/services/dask-sidecar/tests/unit/test_utils_dask.py similarity index 93% rename from services/dask-sidecar/tests/unit/test_dask_utils.py rename to services/dask-sidecar/tests/unit/test_utils_dask.py index 214a95502009..9a1f6c7d18ae 100644 --- a/services/dask-sidecar/tests/unit/test_dask_utils.py +++ b/services/dask-sidecar/tests/unit/test_utils_dask.py @@ -6,7 +6,6 @@ import asyncio import concurrent.futures -import logging import time from collections.abc import AsyncIterator, Callable, Coroutine from typing import Any @@ -14,10 +13,10 @@ import distributed import pytest from dask_task_models_library.container_tasks.errors import TaskCancelledError -from dask_task_models_library.container_tasks.events import TaskLogEvent +from dask_task_models_library.container_tasks.events import TaskProgressEvent from dask_task_models_library.container_tasks.io import TaskCancelEventName from dask_task_models_library.container_tasks.protocol import TaskOwner -from simcore_service_dask_sidecar.dask_utils import ( +from simcore_service_dask_sidecar.utils.dask import ( _DEFAULT_MAX_RESOURCES, TaskPublisher, get_current_task_resources, @@ -33,16 +32,20 @@ DASK_TASK_STARTED_EVENT = "task_started" DASK_TESTING_TIMEOUT_S = 25 +pytest_simcore_core_services_selection = [ + "rabbit", +] + def test_publish_event( dask_client: distributed.Client, job_id: str, task_owner: TaskOwner ): dask_pub = distributed.Pub("some_topic", client=dask_client) dask_sub = distributed.Sub("some_topic", client=dask_client) - event_to_publish = TaskLogEvent( + event_to_publish = TaskProgressEvent( job_id=job_id, - log="the log", - log_level=logging.INFO, + msg="the log", + progress=1, task_owner=task_owner, ) publish_event(dask_pub=dask_pub, event=event_to_publish) @@ -53,7 +56,7 @@ def test_publish_event( message = dask_sub.get(timeout=DASK_TESTING_TIMEOUT_S) assert message is not None assert isinstance(message, str) - received_task_log_event = TaskLogEvent.model_validate_json(message) + received_task_log_event = TaskProgressEvent.model_validate_json(message) assert received_task_log_event == event_to_publish @@ -62,8 +65,8 @@ async def test_publish_event_async( ): dask_pub = distributed.Pub("some_topic", client=async_dask_client) dask_sub = distributed.Sub("some_topic", client=async_dask_client) - event_to_publish = TaskLogEvent( - job_id=job_id, log="the log", log_level=logging.INFO, task_owner=task_owner + event_to_publish = TaskProgressEvent( + job_id=job_id, msg="the log", progress=2, task_owner=task_owner ) publish_event(dask_pub=dask_pub, event=event_to_publish) @@ -74,7 +77,7 @@ async def test_publish_event_async( assert isinstance(message, Coroutine) message = await message assert message is not None - received_task_log_event = TaskLogEvent.model_validate_json(message) + received_task_log_event = TaskProgressEvent.model_validate_json(message) assert received_task_log_event == event_to_publish @@ -117,11 +120,10 @@ async def _dask_sub_consumer_task(sub: distributed.Sub) -> None: async def _dask_publisher_task(pub: distributed.Pub) -> None: print("--> starting publisher task") - for n in range(NUMBER_OF_MESSAGES): - event_to_publish = TaskLogEvent( + for _ in range(NUMBER_OF_MESSAGES): + event_to_publish = TaskProgressEvent( job_id=job_id, - log=f"the log {n}", - log_level=logging.INFO, + progress=0.5, task_owner=task_owner, ) publish_event(dask_pub=pub, event=event_to_publish) diff --git a/services/dask-sidecar/tests/unit/test_file_utils.py b/services/dask-sidecar/tests/unit/test_utils_files.py similarity index 97% rename from services/dask-sidecar/tests/unit/test_file_utils.py rename to services/dask-sidecar/tests/unit/test_utils_files.py index b31980b46a50..770d05e3cb87 100644 --- a/services/dask-sidecar/tests/unit/test_file_utils.py +++ b/services/dask-sidecar/tests/unit/test_utils_files.py @@ -19,7 +19,7 @@ from pytest_localftpserver.servers import ProcessFTPServer from pytest_mock.plugin import MockerFixture from settings_library.s3 import S3Settings -from simcore_service_dask_sidecar.file_utils import ( +from simcore_service_dask_sidecar.utils.files import ( _s3fs_settings_from_s3_settings, pull_file_from_remote, push_file_to_remote, @@ -310,14 +310,17 @@ async def test_pull_compressed_zip_file_from_remote( if remote_parameters.s3_settings: storage_kwargs = _s3fs_settings_from_s3_settings(remote_parameters.s3_settings) - with cast( - fsspec.core.OpenFile, - fsspec.open( - f"{destination_url}", - mode="wb", - **storage_kwargs, - ), - ) as dest_fp, local_zip_file_path.open("rb") as src_fp: + with ( + cast( + fsspec.core.OpenFile, + fsspec.open( + f"{destination_url}", + mode="wb", + **storage_kwargs, + ), + ) as dest_fp, + local_zip_file_path.open("rb") as src_fp, + ): dest_fp.write(src_fp.read()) # now we want to download that file so it becomes the source diff --git a/services/dask-sidecar/tests/unit/test_utils.py b/services/dask-sidecar/tests/unit/test_utils_gpus.py similarity index 93% rename from services/dask-sidecar/tests/unit/test_utils.py rename to services/dask-sidecar/tests/unit/test_utils_gpus.py index f3d162952ff8..88e2d5ecec57 100644 --- a/services/dask-sidecar/tests/unit/test_utils.py +++ b/services/dask-sidecar/tests/unit/test_utils_gpus.py @@ -10,13 +10,17 @@ import pytest from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict -from simcore_service_dask_sidecar.utils import num_available_gpus +from simcore_service_dask_sidecar.utils.gpus import num_available_gpus + +pytest_simcore_core_services_selection = [ + "rabbit", +] @pytest.fixture def mock_aiodocker(mocker: MockerFixture) -> mock.MagicMock: return mocker.patch( - "simcore_service_dask_sidecar.utils.aiodocker.Docker", autospec=True + "simcore_service_dask_sidecar.utils.gpus.aiodocker.Docker", autospec=True ) diff --git a/services/dask-sidecar/tests/unit/test_tasks.py b/services/dask-sidecar/tests/unit/test_worker.py similarity index 80% rename from services/dask-sidecar/tests/unit/test_tasks.py rename to services/dask-sidecar/tests/unit/test_worker.py index 423f057b3779..d826270fca4e 100644 --- a/services/dask-sidecar/tests/unit/test_tasks.py +++ b/services/dask-sidecar/tests/unit/test_worker.py @@ -8,7 +8,8 @@ import json import logging import re -from collections.abc import Callable, Coroutine, Iterable +import threading +from collections.abc import AsyncIterator, Callable, Iterable # copied out from dask from dataclasses import dataclass @@ -23,10 +24,7 @@ from common_library.json_serialization import json_dumps from dask_task_models_library.container_tasks.docker import DockerBasicAuth from dask_task_models_library.container_tasks.errors import ServiceRuntimeError -from dask_task_models_library.container_tasks.events import ( - TaskLogEvent, - TaskProgressEvent, -) +from dask_task_models_library.container_tasks.events import TaskProgressEvent from dask_task_models_library.container_tasks.io import ( FileUrl, TaskInputData, @@ -39,12 +37,15 @@ ) from faker import Faker from models_library.basic_types import EnvVarKey +from models_library.rabbitmq_messages import LoggerRabbitMessage from models_library.services import ServiceMetaDataPublished from models_library.services_resources import BootMode from packaging import version from pydantic import AnyUrl, SecretStr, TypeAdapter from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.rabbitmq._client import RabbitMQClient +from servicelib.rabbitmq._constants import BIND_TO_ALL_TOPICS from settings_library.s3 import S3Settings from simcore_service_dask_sidecar.computational_sidecar.docker_utils import ( LEGACY_SERVICE_LOG_FILE_NAME, @@ -56,24 +57,38 @@ LEGACY_INTEGRATION_VERSION, ImageLabels, ) -from simcore_service_dask_sidecar.dask_utils import _DEFAULT_MAX_RESOURCES -from simcore_service_dask_sidecar.file_utils import _s3fs_settings_from_s3_settings -from simcore_service_dask_sidecar.tasks import run_computational_sidecar +from simcore_service_dask_sidecar.utils.dask import _DEFAULT_MAX_RESOURCES +from simcore_service_dask_sidecar.utils.files import ( + _s3fs_settings_from_s3_settings, +) +from simcore_service_dask_sidecar.worker import run_computational_sidecar +from tenacity import ( + AsyncRetrying, + retry_if_exception_type, + stop_after_delay, + wait_fixed, +) -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) + +pytest_simcore_core_services_selection = [ + "rabbit", +] @pytest.fixture() -def dask_subsystem_mock(mocker: MockerFixture) -> dict[str, mock.Mock]: +def dask_subsystem_mock( + mocker: MockerFixture, create_rabbitmq_client: Callable[[str], RabbitMQClient] +) -> dict[str, mock.Mock]: # mock dask client dask_client_mock = mocker.patch("distributed.Client", autospec=True) # mock tasks get worker and state dask_distributed_worker_mock = mocker.patch( - "simcore_service_dask_sidecar.dask_utils.get_worker", autospec=True + "simcore_service_dask_sidecar.utils.dask.get_worker", autospec=True ) dask_task_mock = mocker.patch( - "simcore_service_dask_sidecar.dask_utils.TaskState", autospec=True + "simcore_service_dask_sidecar.utils.dask.TaskState", autospec=True ) dask_task_mock.resource_restrictions = {} dask_distributed_worker_mock.return_value.state.tasks.get.return_value = ( @@ -92,18 +107,33 @@ def dask_subsystem_mock(mocker: MockerFixture) -> dict[str, mock.Mock]: ) # mock dask event publishing dask_utils_publish_event_mock = mocker.patch( - "simcore_service_dask_sidecar.dask_utils.distributed.Pub", + "simcore_service_dask_sidecar.utils.dask.distributed.Pub", autospec=True, ) mocker.patch( - "simcore_service_dask_sidecar.dask_utils.distributed.Sub", + "simcore_service_dask_sidecar.utils.dask.distributed.Sub", autospec=True, ) mocker.patch( - "simcore_service_dask_sidecar.dask_utils.is_current_task_aborted", + "simcore_service_dask_sidecar.utils.dask.is_current_task_aborted", autospec=True, return_value=False, ) + # mock dask rabbitmq plugin + mock_dask_rabbitmq_plugin = mocker.patch( + "simcore_service_dask_sidecar.rabbitmq_plugin.RabbitMQPlugin", autospec=True + ) + mock_rabbitmq_client = create_rabbitmq_client("pytest_dask_sidecar_logs_publisher") + mock_dask_rabbitmq_plugin.get_client.return_value = mock_rabbitmq_client + mock_dask_rabbitmq_plugin.publish_message_from_any_thread = ( + mock_rabbitmq_client.publish + ) + + mocker.patch( + "simcore_service_dask_sidecar.utils.dask.get_rabbitmq_client", + autospec=True, + return_value=mock_dask_rabbitmq_plugin, + ) return { "dask_client": dask_client_mock, @@ -148,10 +178,6 @@ def sidecar_params(self) -> dict[str, Any]: } -pytest_simcore_core_services_selection = ["postgres"] -pytest_simcore_ops_services_selection = [] - - def _bash_check_env_exist(variable_name: str, variable_value: str) -> list[str]: return [ f"if [ -z ${{{variable_name}+x}} ];then echo {variable_name} does not exist && exit 9;fi", @@ -159,7 +185,10 @@ def _bash_check_env_exist(variable_name: str, variable_value: str) -> list[str]: ] -@pytest.fixture(params=list(BootMode), ids=str) +@pytest.fixture( + params=list(BootMode), + ids=lambda v: f"boot_mode.{v.name}", +) def boot_mode(request: pytest.FixtureRequest) -> BootMode: return request.param @@ -443,9 +472,6 @@ def caplog_info_level( yield caplog -# from pydantic.json_schema import JsonDict - - @pytest.fixture def mocked_get_image_labels( integration_version: version.Version, mocker: MockerFixture @@ -462,6 +488,61 @@ def mocked_get_image_labels( ) +@pytest.fixture +async def log_rabbit_client_parser( + create_rabbitmq_client: Callable[[str], RabbitMQClient], mocker: MockerFixture +) -> AsyncIterator[mock.AsyncMock]: + # Create a threading event to track when subscription is ready + ready_event = threading.Event() + shutdown_event = threading.Event() + the_mock = mocker.AsyncMock(return_value=True) + + # Worker function to process messages in a separate thread + def message_processor(a_mock: mock.AsyncMock): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + client = create_rabbitmq_client("dask_sidecar_pytest_logs_consumer") + + async def subscribe_and_process(a_mock: mock.AsyncMock): + queue_name, _ = await client.subscribe( + LoggerRabbitMessage.get_channel_name(), + a_mock, + exclusive_queue=False, + topics=[BIND_TO_ALL_TOPICS], + ) + ready_event.set() + + # Wait until the test is done + while not shutdown_event.is_set(): + await asyncio.sleep(0.1) + + # Cleanup + await client.unsubscribe(queue_name) + + loop.run_until_complete(subscribe_and_process(a_mock)) + loop.run_until_complete(client.close()) + loop.close() + + # Start the worker thread + worker = threading.Thread( + target=message_processor, kwargs={"a_mock": the_mock}, daemon=False + ) + worker.start() + + # Wait for subscription to be ready + assert ready_event.wait(timeout=10), "Failed to initialize RabbitMQ subscription" + + try: + yield the_mock + finally: + # Signal the worker thread to shut down + shutdown_event.set() + worker.join(timeout=5) + if worker.is_alive(): + _logger.warning("RabbitMQ worker thread did not terminate properly") + + def test_run_computational_sidecar_real_fct( caplog_info_level: pytest.LogCaptureFixture, event_loop: asyncio.AbstractEventLoop, @@ -470,6 +551,7 @@ def test_run_computational_sidecar_real_fct( sleeper_task: ServiceExampleParam, mocked_get_image_labels: mock.Mock, s3_settings: S3Settings, + log_rabbit_client_parser: mock.AsyncMock, ): output_data = run_computational_sidecar( **sleeper_task.sidecar_params(), @@ -480,10 +562,11 @@ def test_run_computational_sidecar_real_fct( sleeper_task.service_key, sleeper_task.service_version, ) - for event in [TaskProgressEvent, TaskLogEvent]: + for event in [TaskProgressEvent]: dask_subsystem_mock["dask_event_publish"].assert_any_call( name=event.topic_name() ) + assert log_rabbit_client_parser.called # check that the task produces expected logs for log in sleeper_task.expected_logs: @@ -546,7 +629,7 @@ def test_run_multiple_computational_sidecar_dask( results = dask_client.gather(futures) assert results - assert not isinstance(results, Coroutine) + assert isinstance(results, list) # for result in results: # check that the task produce the expected data, not less not more for output_data in results: @@ -557,13 +640,6 @@ def test_run_multiple_computational_sidecar_dask( mocked_get_image_labels.assert_called() -@pytest.fixture -def log_sub( - dask_client: distributed.Client, -) -> distributed.Sub: - return distributed.Sub(TaskLogEvent.topic_name(), client=dask_client) - - @pytest.fixture def progress_sub(dask_client: distributed.Client) -> distributed.Sub: return distributed.Sub(TaskProgressEvent.topic_name(), client=dask_client) @@ -573,12 +649,13 @@ def progress_sub(dask_client: distributed.Client) -> distributed.Sub: "integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True ) async def test_run_computational_sidecar_dask( - dask_client: distributed.Client, + app_environment: EnvVarsDict, sleeper_task: ServiceExampleParam, - log_sub: distributed.Sub, progress_sub: distributed.Sub, mocked_get_image_labels: mock.Mock, s3_settings: S3Settings, + log_rabbit_client_parser: mock.AsyncMock, + dask_client: distributed.Client, ): future = dask_client.submit( run_computational_sidecar, @@ -603,15 +680,30 @@ async def test_run_computational_sidecar_dask( ), "ordering of progress values incorrectly sorted!" assert worker_progresses[0] == 0, "missing/incorrect initial progress value" assert worker_progresses[-1] == 1, "missing/incorrect final progress value" - worker_logs = [TaskLogEvent.model_validate_json(msg).log for msg in log_sub.buffer] - print(f"<-- we got {len(worker_logs)} lines of logs") - - for log in sleeper_task.expected_logs: - r = re.compile(rf"^({log}).*") - search_results = list(filter(r.search, worker_logs)) - assert ( - len(search_results) > 0 - ), f"Could not find {log} in worker_logs:\n {pformat(worker_logs, width=240)}" + async for attempt in AsyncRetrying( + wait=wait_fixed(1), + stop=stop_after_delay(30), + reraise=True, + retry=retry_if_exception_type(AssertionError), + ): + with attempt: + assert log_rabbit_client_parser.called + worker_logs = [ + message + for msg in log_rabbit_client_parser.call_args_list + for message in LoggerRabbitMessage.model_validate_json( + msg.args[0] + ).messages + ] + + print(f"<-- we got {len(worker_logs)} lines of logs") + + for log in sleeper_task.expected_logs: + r = re.compile(rf"^({log}).*") + search_results = list(filter(r.search, worker_logs)) + assert ( + len(search_results) > 0 + ), f"Could not find {log} in worker_logs:\n {pformat(worker_logs, width=240)}" # check that the task produce the expected data, not less not more assert isinstance(output_data, TaskOutputData) @@ -632,14 +724,16 @@ async def test_run_computational_sidecar_dask( @pytest.mark.parametrize( - "integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True + "integration_version, boot_mode, task_owner", + [("1.0.0", BootMode.CPU, "no_parent_node")], + indirect=True, ) async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub( dask_client: distributed.Client, sidecar_task: Callable[..., ServiceExampleParam], - log_sub: distributed.Sub, progress_sub: distributed.Sub, mocked_get_image_labels: mock.Mock, + log_rabbit_client_parser: mock.AsyncMock, ): mocked_get_image_labels.assert_not_called() NUMBER_OF_LOGS = 20000 @@ -675,10 +769,27 @@ async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub assert worker_progresses[0] == 0, "missing/incorrect initial progress value" assert worker_progresses[-1] == 1, "missing/incorrect final progress value" - worker_logs = [TaskLogEvent.model_validate_json(msg).log for msg in log_sub.buffer] - # check all the awaited logs are in there - filtered_worker_logs = filter(lambda log: "This is iteration" in log, worker_logs) - assert len(list(filtered_worker_logs)) == NUMBER_OF_LOGS + async for attempt in AsyncRetrying( + wait=wait_fixed(1), + stop=stop_after_delay(30), + reraise=True, + retry=retry_if_exception_type(AssertionError), + ): + with attempt: + assert log_rabbit_client_parser.called + + worker_logs = [ + message + for msg in log_rabbit_client_parser.call_args_list + for message in LoggerRabbitMessage.model_validate_json( + msg.args[0] + ).messages + ] + # check all the awaited logs are in there + filtered_worker_logs = filter( + lambda log: "This is iteration" in log, worker_logs + ) + assert len(list(filtered_worker_logs)) == NUMBER_OF_LOGS mocked_get_image_labels.assert_called() diff --git a/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py b/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py index 75a8a8848ac8..949ef83bbdf6 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py +++ b/services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py @@ -30,7 +30,7 @@ def get_base_repository(engine: AsyncEngine, repo_type: type[RepoType]) -> RepoT checkedout = engine.pool.checkedout() # type: ignore # connections in use total_size = engine.pool.size() # type: ignore # current total connections - if checkedin <= 1: + if (checkedin < 2) and (total_size > 1): # noqa: PLR2004 logger.warning( "Database connection pool near limits: total=%d, in_use=%d, available=%d", total_size, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py index 98e5d5432c5b..49465084d5d7 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py @@ -198,7 +198,7 @@ async def _update_run_result_from_tasks( iteration: Iteration, pipeline_tasks: dict[NodeIDStr, CompTaskAtDB], ) -> RunningState: - pipeline_state_from_tasks: RunningState = get_pipeline_state_from_task_states( + pipeline_state_from_tasks = get_pipeline_state_from_task_states( list(pipeline_tasks.values()), ) _logger.debug( @@ -342,6 +342,7 @@ async def _process_started_tasks( tasks: list[CompTaskAtDB], *, user_id: UserID, + project_id: ProjectID, iteration: Iteration, run_metadata: RunMetadataDict, ) -> None: @@ -441,6 +442,12 @@ async def _process_started_tasks( for t in tasks ) ) + await CompRunsRepository.instance(self.db_engine).mark_as_started( + user_id=user_id, + project_id=project_id, + iteration=iteration, + started_time=utc_now, + ) async def _process_waiting_tasks(self, tasks: list[CompTaskAtDB]) -> None: comp_tasks_repo = CompTasksRepository(self.db_engine) @@ -488,6 +495,7 @@ async def _update_states_from_comp_backend( await self._process_started_tasks( sorted_tasks.started, user_id=user_id, + project_id=project_id, iteration=iteration, run_metadata=comp_run.metadata, ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py index 955ca1d1c9ec..77a4c807e6e8 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py @@ -8,7 +8,6 @@ import arrow from dask_task_models_library.container_tasks.errors import TaskCancelledError from dask_task_models_library.container_tasks.events import ( - TaskLogEvent, TaskProgressEvent, ) from dask_task_models_library.container_tasks.io import TaskOutputData @@ -38,7 +37,6 @@ ) from ...utils.dask_client_utils import TaskHandlers from ...utils.rabbitmq import ( - publish_service_log, publish_service_progress, publish_service_resource_tracking_stopped, publish_service_stopped_metrics, @@ -92,7 +90,6 @@ def __post_init__(self) -> None: self.dask_clients_pool.register_handlers( TaskHandlers( self._task_progress_change_handler, - self._task_log_change_handler, ) ) @@ -363,6 +360,7 @@ async def _task_progress_change_handler(self, event: str) -> None: await self._process_started_tasks( [task], user_id=user_id, + project_id=project_id, iteration=run.iteration, run_metadata=run.metadata, ) @@ -377,27 +375,3 @@ async def _task_progress_change_handler(self, event: str) -> None: node_id=node_id, progress=task_progress_event.progress, ) - - async def _task_log_change_handler(self, event: str) -> None: - with log_catch(_logger, reraise=False): - task_log_event = TaskLogEvent.model_validate_json(event) - _logger.debug("received task log update: %s", task_log_event) - await publish_service_log( - self.rabbitmq_client, - user_id=task_log_event.task_owner.user_id, - project_id=task_log_event.task_owner.project_id, - node_id=task_log_event.task_owner.node_id, - log=task_log_event.log, - log_level=task_log_event.log_level, - ) - if task_log_event.task_owner.has_parent: - assert task_log_event.task_owner.parent_project_id # nosec - assert task_log_event.task_owner.parent_node_id # nosec - await publish_service_log( - self.rabbitmq_client, - user_id=task_log_event.task_owner.user_id, - project_id=task_log_event.task_owner.parent_project_id, - node_id=task_log_event.task_owner.parent_node_id, - log=task_log_event.log, - log_level=task_log_event.log_level, - ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 63617a3eb617..6ac294238e84 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -187,7 +187,6 @@ async def delete(self) -> None: def register_handlers(self, task_handlers: TaskHandlers) -> None: _event_consumer_map = [ (self.backend.progress_sub, task_handlers.task_progress_handler), - (self.backend.logs_sub, task_handlers.task_log_handler), ] self._subscribed_tasks = [ asyncio.create_task( @@ -223,7 +222,7 @@ def _comp_sidecar_fct( ) -> TaskOutputData: """This function is serialized by the Dask client and sent over to the Dask sidecar(s) Therefore, (screaming here) DO NOT MOVE THAT IMPORT ANYWHERE ELSE EVER!!""" - from simcore_service_dask_sidecar.tasks import ( # type: ignore[import-not-found] # this runs inside the dask-sidecar + from simcore_service_dask_sidecar.worker import ( # type: ignore[import-not-found] # this runs inside the dask-sidecar run_computational_sidecar, ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index 507ac369b173..b2f366b99d47 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -295,7 +295,6 @@ async def create( project_uuid=f"{project_id}", iteration=iteration, result=RUNNING_STATE_TO_DB[RunningState.PUBLISHED], - started=datetime.datetime.now(tz=datetime.UTC), metadata=jsonable_encoder(metadata), use_on_demand_clusters=use_on_demand_clusters, ) @@ -343,6 +342,21 @@ async def set_run_result( **values, ) + async def mark_as_started( + self, + *, + user_id: UserID, + project_id: ProjectID, + iteration: PositiveInt, + started_time: datetime.datetime, + ) -> CompRunsAtDB | None: + return await self.update( + user_id, + project_id, + iteration, + started=started_time, + ) + async def mark_for_cancellation( self, *, user_id: UserID, project_id: ProjectID, iteration: PositiveInt ) -> CompRunsAtDB | None: diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py b/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py index 0ec66eeabdd8..34e11952d314 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py @@ -6,7 +6,6 @@ import distributed from dask_task_models_library.container_tasks.events import ( - TaskLogEvent, TaskProgressEvent, ) from models_library.clusters import ClusterAuthentication, TLSAuthentication @@ -19,7 +18,6 @@ @dataclass class TaskHandlers: task_progress_handler: Callable[[str], Awaitable[None]] - task_log_handler: Callable[[str], Awaitable[None]] logger = logging.getLogger(__name__) @@ -30,13 +28,11 @@ class DaskSubSystem: client: distributed.Client scheduler_id: str progress_sub: distributed.Sub = field(init=False) - logs_sub: distributed.Sub = field(init=False) def __post_init__(self) -> None: self.progress_sub = distributed.Sub( TaskProgressEvent.topic_name(), client=self.client ) - self.logs_sub = distributed.Sub(TaskLogEvent.topic_name(), client=self.client) async def close(self) -> None: # NOTE: if the Sub are deleted before closing the connection, diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index 479efcd47b18..c52647b70479 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -20,7 +20,6 @@ from dask_task_models_library.container_tasks.docker import DockerBasicAuth from dask_task_models_library.container_tasks.errors import TaskCancelledError from dask_task_models_library.container_tasks.events import ( - TaskLogEvent, TaskProgressEvent, ) from dask_task_models_library.container_tasks.io import ( @@ -1078,9 +1077,7 @@ def fake_remote_fct( @pytest.fixture async def fake_task_handlers(mocker: MockerFixture) -> TaskHandlers: - return TaskHandlers( - task_progress_handler=mocker.MagicMock(), task_log_handler=mocker.MagicMock() - ) + return TaskHandlers(task_progress_handler=mocker.MagicMock()) async def test_dask_sub_handlers( @@ -1106,9 +1103,7 @@ def fake_remote_fct( s3_settings: S3Settings | None, ) -> TaskOutputData: progress_pub = distributed.Pub(TaskProgressEvent.topic_name()) - logs_pub = distributed.Pub(TaskLogEvent.topic_name()) progress_pub.put("my name is progress") - logs_pub.put("my name is logs") # tell the client we are done published_event = Event(name=_DASK_START_EVENT) published_event.set() @@ -1154,7 +1149,6 @@ def fake_remote_fct( fake_task_handlers.task_progress_handler.assert_called_with( "my name is progress" ) - fake_task_handlers.task_log_handler.assert_called_with("my name is logs") await _assert_wait_for_cb_call(mocked_user_completed_cb) diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py index bb4adba21357..5b1cbf64aa3f 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py @@ -108,7 +108,6 @@ def _assert_dask_client_correctly_initialized( cast( # noqa: SLF001 DaskScheduler, scheduler )._task_progress_change_handler, - cast(DaskScheduler, scheduler)._task_log_change_handler, # noqa: SLF001 ) ) diff --git a/services/docker-compose.yml b/services/docker-compose.yml index f61c6156f47b..3374ed032c44 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -1164,6 +1164,11 @@ services: DASK_SIDECAR_LOGLEVEL: ${DASK_SIDECAR_LOGLEVEL} SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME: ${SWARM_STACK_NAME}_computational_shared_data SIDECAR_COMP_SERVICES_SHARED_FOLDER: ${SIDECAR_COMP_SERVICES_SHARED_FOLDER:-/home/scu/computational_shared_data} + RABBIT_HOST: ${RABBIT_HOST} + RABBIT_PASSWORD: ${RABBIT_PASSWORD} + RABBIT_PORT: ${RABBIT_PORT} + RABBIT_SECURE: ${RABBIT_SECURE} + RABBIT_USER: ${RABBIT_USER} networks: - computational_services_subnet secrets: *dask_tls_secrets @@ -1266,6 +1271,7 @@ services: - rabbit_data:/var/lib/rabbitmq networks: - default + - computational_services_subnet - interactive_services_subnet - autoscaling_subnet healthcheck: