Skip to content
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
""" Dask task exceptions
"""Dask task exceptions"""

"""
from common_library.errors_classes import OsparcErrorMixin


class TaskValueError(OsparcErrorMixin, ValueError):
...
class TaskValueError(OsparcErrorMixin, ValueError): ...


class TaskCancelledError(OsparcErrorMixin, RuntimeError):
Expand All @@ -18,3 +16,12 @@ class ServiceRuntimeError(OsparcErrorMixin, RuntimeError):
" running in container {container_id} failed with code"
" {exit_code}. Last logs:\n{service_logs}"
)


class ServiceInputsUseFileToKeyMapButReceivesZipDataError(
OsparcErrorMixin, RuntimeError
):
msg_template = (
"The service {service_key}:{service_version} {input} uses a file-to-key {file_to_key_map} map but receives zip data instead. "
"TIP: either pass a single file or zip file and remove the file-to-key map parameter."
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from aiodocker import Docker
from common_library.json_serialization import json_dumps
from dask_task_models_library.container_tasks.docker import DockerBasicAuth
from dask_task_models_library.container_tasks.errors import ServiceRuntimeError
from dask_task_models_library.container_tasks.errors import (
ServiceInputsUseFileToKeyMapButReceivesZipDataError,
ServiceRuntimeError,
)
from dask_task_models_library.container_tasks.io import FileUrl, TaskOutputData
from dask_task_models_library.container_tasks.protocol import ContainerTaskParameters
from models_library.progress_bar import ProgressReport
Expand All @@ -27,7 +30,11 @@

from ..settings import ApplicationSettings
from ..utils.dask import TaskPublisher
from ..utils.files import pull_file_from_remote, push_file_to_remote
from ..utils.files import (
check_need_unzipping,
pull_file_from_remote,
push_file_to_remote,
)
from .docker_utils import (
create_container_config,
get_computational_shared_data_mount_point,
Expand Down Expand Up @@ -75,6 +82,17 @@ async def _write_input_data(

destination_path = task_volumes.inputs_folder / file_name

need_unzipping = check_need_unzipping(
input_params.url, input_params.file_mime_type, destination_path
)
if input_params.file_mapping and need_unzipping:
raise ServiceInputsUseFileToKeyMapButReceivesZipDataError(
service_key=self.task_parameters.image,
service_version=self.task_parameters.tag,
input_key=input_key,
file_to_key_map=input_params.file_mapping,
)

if destination_path.parent != task_volumes.inputs_folder:
# NOTE: only 'task_volumes.inputs_folder' part of 'destination_path' is guaranteed,
# if extra subfolders via file-mapping,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __post_init__(self) -> None:

assert not folder_path.exists() # nosec
folder_path.mkdir(parents=True)
logger.debug(
logger.info(
"created %s in %s",
f"{folder=}",
f"{self.base_path=}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import zipfile
from collections.abc import Callable, Coroutine
from contextlib import AsyncExitStack
from io import IOBase
from pathlib import Path
from typing import Any, Final, TypedDict, cast
Expand Down Expand Up @@ -140,6 +141,23 @@ async def _copy_file(
_ZIP_MIME_TYPE: Final[str] = "application/zip"


def check_need_unzipping(
src_url: AnyUrl,
target_mime_type: str | None,
dst_path: Path,
) -> bool:
"""
Checks if the source URL points to a zip file and if the target mime type is not zip.
If so, extraction is needed.
"""
assert src_url.path # nosec
src_mime_type, _ = mimetypes.guess_type(src_url.path)
dst_mime_type = target_mime_type
if not dst_mime_type:
dst_mime_type, _ = mimetypes.guess_type(dst_path)
return (src_mime_type == _ZIP_MIME_TYPE) and (dst_mime_type != _ZIP_MIME_TYPE)


async def pull_file_from_remote(
src_url: AnyUrl,
target_mime_type: str | None,
Expand All @@ -156,38 +174,51 @@ async def pull_file_from_remote(
msg = f"{dst_path.parent=} does not exist. It must be created by the caller"
raise ValueError(msg)

src_mime_type, _ = mimetypes.guess_type(f"{src_url.path}")
if not target_mime_type:
target_mime_type, _ = mimetypes.guess_type(dst_path)

storage_kwargs: S3FsSettingsDict | dict[str, Any] = {}
if s3_settings and src_url.scheme in S3_FILE_SYSTEM_SCHEMES:
storage_kwargs = _s3fs_settings_from_s3_settings(s3_settings)
await _copy_file(
src_url,
TypeAdapter(FileUrl).validate_python(dst_path.as_uri()),
src_storage_cfg=cast(dict[str, Any], storage_kwargs),
log_publishing_cb=log_publishing_cb,
text_prefix=f"Downloading '{src_url.path.strip('/')}':",
)

await log_publishing_cb(
f"Download of '{src_url}' into local file '{dst_path}' complete.",
logging.INFO,
)

if src_mime_type == _ZIP_MIME_TYPE and target_mime_type != _ZIP_MIME_TYPE:
await log_publishing_cb(f"Uncompressing '{dst_path.name}'...", logging.INFO)
logger.debug("%s is a zip file and will be now uncompressed", dst_path)
with repro_zipfile.ReproducibleZipFile(dst_path, "r") as zip_obj:
await asyncio.get_event_loop().run_in_executor(
None, zip_obj.extractall, dst_path.parents[0]
need_unzipping = check_need_unzipping(src_url, target_mime_type, dst_path)
async with AsyncExitStack() as exit_stack:
if need_unzipping:
# we need to extract the file, so we create a temporary directory
# where the file will be downloaded and extracted
tmp_dir = await exit_stack.enter_async_context(
aiofiles.tempfile.TemporaryDirectory()
)
# finally remove the zip archive
download_dst_path = Path(f"{tmp_dir}") / Path(src_url.path).name
else:
# no extraction needed, so we can use the provided dst_path directly
download_dst_path = dst_path

await _copy_file(
src_url,
TypeAdapter(FileUrl).validate_python(f"{download_dst_path.as_uri()}"),
src_storage_cfg=cast(dict[str, Any], storage_kwargs),
log_publishing_cb=log_publishing_cb,
text_prefix=f"Downloading '{src_url.path.strip('/')}':",
)

await log_publishing_cb(
f"Uncompressing '{dst_path.name}' complete.", logging.INFO
f"Download of '{src_url}' into local file '{download_dst_path}' complete.",
logging.INFO,
)
dst_path.unlink()

if need_unzipping:
await log_publishing_cb(
f"Uncompressing '{download_dst_path.name}'...", logging.INFO
)
logger.debug(
"%s is a zip file and will be now uncompressed", download_dst_path
)
with repro_zipfile.ReproducibleZipFile(download_dst_path, "r") as zip_obj:
await asyncio.get_event_loop().run_in_executor(
None, zip_obj.extractall, dst_path.parents[0]
)
# finally remove the zip archive
await log_publishing_cb(
f"Uncompressing '{download_dst_path.name}' complete.", logging.INFO
)


async def _push_file_to_http_link(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import pytest
from common_library.json_serialization import json_dumps
from dask_task_models_library.container_tasks.docker import DockerBasicAuth
from dask_task_models_library.container_tasks.errors import ServiceRuntimeError
from dask_task_models_library.container_tasks.errors import (
ServiceInputsUseFileToKeyMapButReceivesZipDataError,
ServiceRuntimeError,
)
from dask_task_models_library.container_tasks.events import TaskProgressEvent
from dask_task_models_library.container_tasks.io import (
FileUrl,
Expand Down Expand Up @@ -417,7 +420,9 @@ def sidecar_task(
task_owner: TaskOwner,
s3_settings: S3Settings,
) -> Callable[..., ServiceExampleParam]:
def _creator(command: list[str] | None = None) -> ServiceExampleParam:
def _creator(
command: list[str] | None = None, input_data: TaskInputData | None = None
) -> ServiceExampleParam:
return ServiceExampleParam(
docker_basic_auth=DockerBasicAuth(
server_address="docker.io", username="pytest", password=SecretStr("")
Expand All @@ -426,7 +431,7 @@ def _creator(command: list[str] | None = None) -> ServiceExampleParam:
service_version="latest",
command=command
or ["/bin/bash", "-c", "echo 'hello I'm an empty ubuntu task!"],
input_data=TaskInputData.model_validate({}),
input_data=input_data or TaskInputData.model_validate({}),
output_data_keys=TaskOutputDataSchema.model_validate({}),
log_file_url=s3_remote_file_url(file_path="log.dat"),
expected_output_data=TaskOutputData.model_validate({}),
Expand Down Expand Up @@ -456,6 +461,30 @@ def sleeper_task_unexpected_output(
return sleeper_task


@pytest.fixture()
def task_with_file_to_key_map_in_input_data(
sidecar_task: Callable[..., ServiceExampleParam],
) -> ServiceExampleParam:
"""This task has a file-to-key map in the input data but receives zip data instead"""
return sidecar_task(
command=["/bin/bash", "-c", "echo we create nothingness"],
input_data=TaskInputData.model_validate(
{
"input_1": 23,
"input_23": "a string input",
"the_input_43": 15.0,
"the_bool_input_54": False,
"some_file_input_with_mapping": FileUrl(
url=TypeAdapter(AnyUrl).validate_python(
"s3://myserver/some_file_url.zip"
),
file_mapping="some_file_mapping",
),
}
),
)


@pytest.fixture()
def caplog_info_level(
caplog: pytest.LogCaptureFixture,
Expand Down Expand Up @@ -809,6 +838,21 @@ def test_running_service_that_generates_unexpected_data_raises_exception(
)


@pytest.mark.parametrize(
"integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True
)
def test_running_service_with_incorrect_zip_data_that_uses_a_file_to_key_map_raises_exception(
caplog_info_level: pytest.LogCaptureFixture,
app_environment: EnvVarsDict,
dask_subsystem_mock: dict[str, mock.Mock],
task_with_file_to_key_map_in_input_data: ServiceExampleParam,
):
with pytest.raises(ServiceInputsUseFileToKeyMapButReceivesZipDataError):
run_computational_sidecar(
**task_with_file_to_key_map_in_input_data.sidecar_params(),
)


@pytest.mark.parametrize(
"integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True
)
Expand Down
Loading