diff --git a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/errors.py b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/errors.py index 1597ddfb6f4..1d4a48bc535 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/errors.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/errors.py @@ -1,11 +1,9 @@ -""" Dask task exceptions +"""Dask task exceptions""" -""" from common_library.errors_classes import OsparcErrorMixin -class TaskValueError(OsparcErrorMixin, ValueError): - ... +class TaskValueError(OsparcErrorMixin, ValueError): ... class TaskCancelledError(OsparcErrorMixin, RuntimeError): @@ -18,3 +16,12 @@ class ServiceRuntimeError(OsparcErrorMixin, RuntimeError): " running in container {container_id} failed with code" " {exit_code}. Last logs:\n{service_logs}" ) + + +class ServiceInputsUseFileToKeyMapButReceivesZipDataError( + OsparcErrorMixin, RuntimeError +): + msg_template = ( + "The service {service_key}:{service_version} {input} uses a file-to-key {file_to_key_map} map but receives zip data instead. " + "TIP: either pass a single file or zip file and remove the file-to-key map parameter." + ) diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py index 2bd094306fb..c066bd79044 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py @@ -13,7 +13,10 @@ from aiodocker import Docker from common_library.json_serialization import json_dumps from dask_task_models_library.container_tasks.docker import DockerBasicAuth -from dask_task_models_library.container_tasks.errors import ServiceRuntimeError +from dask_task_models_library.container_tasks.errors import ( + ServiceInputsUseFileToKeyMapButReceivesZipDataError, + ServiceRuntimeError, +) from dask_task_models_library.container_tasks.io import FileUrl, TaskOutputData from dask_task_models_library.container_tasks.protocol import ContainerTaskParameters from models_library.progress_bar import ProgressReport @@ -27,7 +30,11 @@ from ..settings import ApplicationSettings from ..utils.dask import TaskPublisher -from ..utils.files import pull_file_from_remote, push_file_to_remote +from ..utils.files import ( + check_need_unzipping, + pull_file_from_remote, + push_file_to_remote, +) from .docker_utils import ( create_container_config, get_computational_shared_data_mount_point, @@ -75,6 +82,17 @@ async def _write_input_data( destination_path = task_volumes.inputs_folder / file_name + need_unzipping = check_need_unzipping( + input_params.url, input_params.file_mime_type, destination_path + ) + if input_params.file_mapping and need_unzipping: + raise ServiceInputsUseFileToKeyMapButReceivesZipDataError( + service_key=self.task_parameters.image, + service_version=self.task_parameters.tag, + input_key=input_key, + file_to_key_map=input_params.file_mapping, + ) + if destination_path.parent != task_volumes.inputs_folder: # NOTE: only 'task_volumes.inputs_folder' part of 'destination_path' is guaranteed, # if extra subfolders via file-mapping, diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/task_shared_volume.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/task_shared_volume.py index a757c8b6306..a6ca199f551 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/task_shared_volume.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/task_shared_volume.py @@ -23,7 +23,7 @@ def __post_init__(self) -> None: assert not folder_path.exists() # nosec folder_path.mkdir(parents=True) - logger.debug( + logger.info( "created %s in %s", f"{folder=}", f"{self.base_path=}", diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/files.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/files.py index 2a108cc595f..c2ba1b86ece 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/files.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils/files.py @@ -5,6 +5,7 @@ import time import zipfile from collections.abc import Callable, Coroutine +from contextlib import AsyncExitStack from io import IOBase from pathlib import Path from typing import Any, Final, TypedDict, cast @@ -140,6 +141,23 @@ async def _copy_file( _ZIP_MIME_TYPE: Final[str] = "application/zip" +def check_need_unzipping( + src_url: AnyUrl, + target_mime_type: str | None, + dst_path: Path, +) -> bool: + """ + Checks if the source URL points to a zip file and if the target mime type is not zip. + If so, extraction is needed. + """ + assert src_url.path # nosec + src_mime_type, _ = mimetypes.guess_type(src_url.path) + dst_mime_type = target_mime_type + if not dst_mime_type: + dst_mime_type, _ = mimetypes.guess_type(dst_path) + return (src_mime_type == _ZIP_MIME_TYPE) and (dst_mime_type != _ZIP_MIME_TYPE) + + async def pull_file_from_remote( src_url: AnyUrl, target_mime_type: str | None, @@ -156,38 +174,51 @@ async def pull_file_from_remote( msg = f"{dst_path.parent=} does not exist. It must be created by the caller" raise ValueError(msg) - src_mime_type, _ = mimetypes.guess_type(f"{src_url.path}") - if not target_mime_type: - target_mime_type, _ = mimetypes.guess_type(dst_path) - storage_kwargs: S3FsSettingsDict | dict[str, Any] = {} if s3_settings and src_url.scheme in S3_FILE_SYSTEM_SCHEMES: storage_kwargs = _s3fs_settings_from_s3_settings(s3_settings) - await _copy_file( - src_url, - TypeAdapter(FileUrl).validate_python(dst_path.as_uri()), - src_storage_cfg=cast(dict[str, Any], storage_kwargs), - log_publishing_cb=log_publishing_cb, - text_prefix=f"Downloading '{src_url.path.strip('/')}':", - ) - await log_publishing_cb( - f"Download of '{src_url}' into local file '{dst_path}' complete.", - logging.INFO, - ) - - if src_mime_type == _ZIP_MIME_TYPE and target_mime_type != _ZIP_MIME_TYPE: - await log_publishing_cb(f"Uncompressing '{dst_path.name}'...", logging.INFO) - logger.debug("%s is a zip file and will be now uncompressed", dst_path) - with repro_zipfile.ReproducibleZipFile(dst_path, "r") as zip_obj: - await asyncio.get_event_loop().run_in_executor( - None, zip_obj.extractall, dst_path.parents[0] + need_unzipping = check_need_unzipping(src_url, target_mime_type, dst_path) + async with AsyncExitStack() as exit_stack: + if need_unzipping: + # we need to extract the file, so we create a temporary directory + # where the file will be downloaded and extracted + tmp_dir = await exit_stack.enter_async_context( + aiofiles.tempfile.TemporaryDirectory() ) - # finally remove the zip archive + download_dst_path = Path(f"{tmp_dir}") / Path(src_url.path).name + else: + # no extraction needed, so we can use the provided dst_path directly + download_dst_path = dst_path + + await _copy_file( + src_url, + TypeAdapter(FileUrl).validate_python(f"{download_dst_path.as_uri()}"), + src_storage_cfg=cast(dict[str, Any], storage_kwargs), + log_publishing_cb=log_publishing_cb, + text_prefix=f"Downloading '{src_url.path.strip('/')}':", + ) + await log_publishing_cb( - f"Uncompressing '{dst_path.name}' complete.", logging.INFO + f"Download of '{src_url}' into local file '{download_dst_path}' complete.", + logging.INFO, ) - dst_path.unlink() + + if need_unzipping: + await log_publishing_cb( + f"Uncompressing '{download_dst_path.name}'...", logging.INFO + ) + logger.debug( + "%s is a zip file and will be now uncompressed", download_dst_path + ) + with repro_zipfile.ReproducibleZipFile(download_dst_path, "r") as zip_obj: + await asyncio.get_event_loop().run_in_executor( + None, zip_obj.extractall, dst_path.parents[0] + ) + # finally remove the zip archive + await log_publishing_cb( + f"Uncompressing '{download_dst_path.name}' complete.", logging.INFO + ) async def _push_file_to_http_link( diff --git a/services/dask-sidecar/tests/unit/test_computational_sidecar_tasks.py b/services/dask-sidecar/tests/unit/test_computational_sidecar_tasks.py index 57cf06de1ad..94503ba4fd7 100644 --- a/services/dask-sidecar/tests/unit/test_computational_sidecar_tasks.py +++ b/services/dask-sidecar/tests/unit/test_computational_sidecar_tasks.py @@ -23,7 +23,10 @@ import pytest from common_library.json_serialization import json_dumps from dask_task_models_library.container_tasks.docker import DockerBasicAuth -from dask_task_models_library.container_tasks.errors import ServiceRuntimeError +from dask_task_models_library.container_tasks.errors import ( + ServiceInputsUseFileToKeyMapButReceivesZipDataError, + ServiceRuntimeError, +) from dask_task_models_library.container_tasks.events import TaskProgressEvent from dask_task_models_library.container_tasks.io import ( FileUrl, @@ -417,7 +420,9 @@ def sidecar_task( task_owner: TaskOwner, s3_settings: S3Settings, ) -> Callable[..., ServiceExampleParam]: - def _creator(command: list[str] | None = None) -> ServiceExampleParam: + def _creator( + command: list[str] | None = None, input_data: TaskInputData | None = None + ) -> ServiceExampleParam: return ServiceExampleParam( docker_basic_auth=DockerBasicAuth( server_address="docker.io", username="pytest", password=SecretStr("") @@ -426,7 +431,7 @@ def _creator(command: list[str] | None = None) -> ServiceExampleParam: service_version="latest", command=command or ["/bin/bash", "-c", "echo 'hello I'm an empty ubuntu task!"], - input_data=TaskInputData.model_validate({}), + input_data=input_data or TaskInputData.model_validate({}), output_data_keys=TaskOutputDataSchema.model_validate({}), log_file_url=s3_remote_file_url(file_path="log.dat"), expected_output_data=TaskOutputData.model_validate({}), @@ -456,6 +461,30 @@ def sleeper_task_unexpected_output( return sleeper_task +@pytest.fixture() +def task_with_file_to_key_map_in_input_data( + sidecar_task: Callable[..., ServiceExampleParam], +) -> ServiceExampleParam: + """This task has a file-to-key map in the input data but receives zip data instead""" + return sidecar_task( + command=["/bin/bash", "-c", "echo we create nothingness"], + input_data=TaskInputData.model_validate( + { + "input_1": 23, + "input_23": "a string input", + "the_input_43": 15.0, + "the_bool_input_54": False, + "some_file_input_with_mapping": FileUrl( + url=TypeAdapter(AnyUrl).validate_python( + "s3://myserver/some_file_url.zip" + ), + file_mapping="some_file_mapping", + ), + } + ), + ) + + @pytest.fixture() def caplog_info_level( caplog: pytest.LogCaptureFixture, @@ -809,6 +838,21 @@ def test_running_service_that_generates_unexpected_data_raises_exception( ) +@pytest.mark.parametrize( + "integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True +) +def test_running_service_with_incorrect_zip_data_that_uses_a_file_to_key_map_raises_exception( + caplog_info_level: pytest.LogCaptureFixture, + app_environment: EnvVarsDict, + dask_subsystem_mock: dict[str, mock.Mock], + task_with_file_to_key_map_in_input_data: ServiceExampleParam, +): + with pytest.raises(ServiceInputsUseFileToKeyMapButReceivesZipDataError): + run_computational_sidecar( + **task_with_file_to_key_map_in_input_data.sidecar_params(), + ) + + @pytest.mark.parametrize( "integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True )