Skip to content

Commit dc35a5e

Browse files
🎨Computational backend: Fail fast in case of malformed input syntax and improve unzipping (#7804)
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent c3e1573 commit dc35a5e

File tree

5 files changed

+135
-35
lines changed

5 files changed

+135
-35
lines changed

packages/dask-task-models-library/src/dask_task_models_library/container_tasks/errors.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
""" Dask task exceptions
1+
"""Dask task exceptions"""
22

3-
"""
43
from common_library.errors_classes import OsparcErrorMixin
54

65

7-
class TaskValueError(OsparcErrorMixin, ValueError):
8-
...
6+
class TaskValueError(OsparcErrorMixin, ValueError): ...
97

108

119
class TaskCancelledError(OsparcErrorMixin, RuntimeError):
@@ -18,3 +16,12 @@ class ServiceRuntimeError(OsparcErrorMixin, RuntimeError):
1816
" running in container {container_id} failed with code"
1917
" {exit_code}. Last logs:\n{service_logs}"
2018
)
19+
20+
21+
class ServiceInputsUseFileToKeyMapButReceivesZipDataError(
22+
OsparcErrorMixin, RuntimeError
23+
):
24+
msg_template = (
25+
"The service {service_key}:{service_version} {input} uses a file-to-key {file_to_key_map} map but receives zip data instead. "
26+
"TIP: either pass a single file or zip file and remove the file-to-key map parameter."
27+
)

services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
from aiodocker import Docker
1414
from common_library.json_serialization import json_dumps
1515
from dask_task_models_library.container_tasks.docker import DockerBasicAuth
16-
from dask_task_models_library.container_tasks.errors import ServiceRuntimeError
16+
from dask_task_models_library.container_tasks.errors import (
17+
ServiceInputsUseFileToKeyMapButReceivesZipDataError,
18+
ServiceRuntimeError,
19+
)
1720
from dask_task_models_library.container_tasks.io import FileUrl, TaskOutputData
1821
from dask_task_models_library.container_tasks.protocol import ContainerTaskParameters
1922
from models_library.progress_bar import ProgressReport
@@ -27,7 +30,11 @@
2730

2831
from ..settings import ApplicationSettings
2932
from ..utils.dask import TaskPublisher
30-
from ..utils.files import pull_file_from_remote, push_file_to_remote
33+
from ..utils.files import (
34+
check_need_unzipping,
35+
pull_file_from_remote,
36+
push_file_to_remote,
37+
)
3138
from .docker_utils import (
3239
create_container_config,
3340
get_computational_shared_data_mount_point,
@@ -75,6 +82,17 @@ async def _write_input_data(
7582

7683
destination_path = task_volumes.inputs_folder / file_name
7784

85+
need_unzipping = check_need_unzipping(
86+
input_params.url, input_params.file_mime_type, destination_path
87+
)
88+
if input_params.file_mapping and need_unzipping:
89+
raise ServiceInputsUseFileToKeyMapButReceivesZipDataError(
90+
service_key=self.task_parameters.image,
91+
service_version=self.task_parameters.tag,
92+
input_key=input_key,
93+
file_to_key_map=input_params.file_mapping,
94+
)
95+
7896
if destination_path.parent != task_volumes.inputs_folder:
7997
# NOTE: only 'task_volumes.inputs_folder' part of 'destination_path' is guaranteed,
8098
# if extra subfolders via file-mapping,

services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/task_shared_volume.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __post_init__(self) -> None:
2323

2424
assert not folder_path.exists() # nosec
2525
folder_path.mkdir(parents=True)
26-
logger.debug(
26+
logger.info(
2727
"created %s in %s",
2828
f"{folder=}",
2929
f"{self.base_path=}",

services/dask-sidecar/src/simcore_service_dask_sidecar/utils/files.py

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66
import zipfile
77
from collections.abc import Callable, Coroutine
8+
from contextlib import AsyncExitStack
89
from io import IOBase
910
from pathlib import Path
1011
from typing import Any, Final, TypedDict, cast
@@ -140,6 +141,23 @@ async def _copy_file(
140141
_ZIP_MIME_TYPE: Final[str] = "application/zip"
141142

142143

144+
def check_need_unzipping(
145+
src_url: AnyUrl,
146+
target_mime_type: str | None,
147+
dst_path: Path,
148+
) -> bool:
149+
"""
150+
Checks if the source URL points to a zip file and if the target mime type is not zip.
151+
If so, extraction is needed.
152+
"""
153+
assert src_url.path # nosec
154+
src_mime_type, _ = mimetypes.guess_type(src_url.path)
155+
dst_mime_type = target_mime_type
156+
if not dst_mime_type:
157+
dst_mime_type, _ = mimetypes.guess_type(dst_path)
158+
return (src_mime_type == _ZIP_MIME_TYPE) and (dst_mime_type != _ZIP_MIME_TYPE)
159+
160+
143161
async def pull_file_from_remote(
144162
src_url: AnyUrl,
145163
target_mime_type: str | None,
@@ -156,38 +174,51 @@ async def pull_file_from_remote(
156174
msg = f"{dst_path.parent=} does not exist. It must be created by the caller"
157175
raise ValueError(msg)
158176

159-
src_mime_type, _ = mimetypes.guess_type(f"{src_url.path}")
160-
if not target_mime_type:
161-
target_mime_type, _ = mimetypes.guess_type(dst_path)
162-
163177
storage_kwargs: S3FsSettingsDict | dict[str, Any] = {}
164178
if s3_settings and src_url.scheme in S3_FILE_SYSTEM_SCHEMES:
165179
storage_kwargs = _s3fs_settings_from_s3_settings(s3_settings)
166-
await _copy_file(
167-
src_url,
168-
TypeAdapter(FileUrl).validate_python(dst_path.as_uri()),
169-
src_storage_cfg=cast(dict[str, Any], storage_kwargs),
170-
log_publishing_cb=log_publishing_cb,
171-
text_prefix=f"Downloading '{src_url.path.strip('/')}':",
172-
)
173180

174-
await log_publishing_cb(
175-
f"Download of '{src_url}' into local file '{dst_path}' complete.",
176-
logging.INFO,
177-
)
178-
179-
if src_mime_type == _ZIP_MIME_TYPE and target_mime_type != _ZIP_MIME_TYPE:
180-
await log_publishing_cb(f"Uncompressing '{dst_path.name}'...", logging.INFO)
181-
logger.debug("%s is a zip file and will be now uncompressed", dst_path)
182-
with repro_zipfile.ReproducibleZipFile(dst_path, "r") as zip_obj:
183-
await asyncio.get_event_loop().run_in_executor(
184-
None, zip_obj.extractall, dst_path.parents[0]
181+
need_unzipping = check_need_unzipping(src_url, target_mime_type, dst_path)
182+
async with AsyncExitStack() as exit_stack:
183+
if need_unzipping:
184+
# we need to extract the file, so we create a temporary directory
185+
# where the file will be downloaded and extracted
186+
tmp_dir = await exit_stack.enter_async_context(
187+
aiofiles.tempfile.TemporaryDirectory()
185188
)
186-
# finally remove the zip archive
189+
download_dst_path = Path(f"{tmp_dir}") / Path(src_url.path).name
190+
else:
191+
# no extraction needed, so we can use the provided dst_path directly
192+
download_dst_path = dst_path
193+
194+
await _copy_file(
195+
src_url,
196+
TypeAdapter(FileUrl).validate_python(f"{download_dst_path.as_uri()}"),
197+
src_storage_cfg=cast(dict[str, Any], storage_kwargs),
198+
log_publishing_cb=log_publishing_cb,
199+
text_prefix=f"Downloading '{src_url.path.strip('/')}':",
200+
)
201+
187202
await log_publishing_cb(
188-
f"Uncompressing '{dst_path.name}' complete.", logging.INFO
203+
f"Download of '{src_url}' into local file '{download_dst_path}' complete.",
204+
logging.INFO,
189205
)
190-
dst_path.unlink()
206+
207+
if need_unzipping:
208+
await log_publishing_cb(
209+
f"Uncompressing '{download_dst_path.name}'...", logging.INFO
210+
)
211+
logger.debug(
212+
"%s is a zip file and will be now uncompressed", download_dst_path
213+
)
214+
with repro_zipfile.ReproducibleZipFile(download_dst_path, "r") as zip_obj:
215+
await asyncio.get_event_loop().run_in_executor(
216+
None, zip_obj.extractall, dst_path.parents[0]
217+
)
218+
# finally remove the zip archive
219+
await log_publishing_cb(
220+
f"Uncompressing '{download_dst_path.name}' complete.", logging.INFO
221+
)
191222

192223

193224
async def _push_file_to_http_link(

services/dask-sidecar/tests/unit/test_computational_sidecar_tasks.py

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import pytest
2424
from common_library.json_serialization import json_dumps
2525
from dask_task_models_library.container_tasks.docker import DockerBasicAuth
26-
from dask_task_models_library.container_tasks.errors import ServiceRuntimeError
26+
from dask_task_models_library.container_tasks.errors import (
27+
ServiceInputsUseFileToKeyMapButReceivesZipDataError,
28+
ServiceRuntimeError,
29+
)
2730
from dask_task_models_library.container_tasks.events import TaskProgressEvent
2831
from dask_task_models_library.container_tasks.io import (
2932
FileUrl,
@@ -417,7 +420,9 @@ def sidecar_task(
417420
task_owner: TaskOwner,
418421
s3_settings: S3Settings,
419422
) -> Callable[..., ServiceExampleParam]:
420-
def _creator(command: list[str] | None = None) -> ServiceExampleParam:
423+
def _creator(
424+
command: list[str] | None = None, input_data: TaskInputData | None = None
425+
) -> ServiceExampleParam:
421426
return ServiceExampleParam(
422427
docker_basic_auth=DockerBasicAuth(
423428
server_address="docker.io", username="pytest", password=SecretStr("")
@@ -426,7 +431,7 @@ def _creator(command: list[str] | None = None) -> ServiceExampleParam:
426431
service_version="latest",
427432
command=command
428433
or ["/bin/bash", "-c", "echo 'hello I'm an empty ubuntu task!"],
429-
input_data=TaskInputData.model_validate({}),
434+
input_data=input_data or TaskInputData.model_validate({}),
430435
output_data_keys=TaskOutputDataSchema.model_validate({}),
431436
log_file_url=s3_remote_file_url(file_path="log.dat"),
432437
expected_output_data=TaskOutputData.model_validate({}),
@@ -456,6 +461,30 @@ def sleeper_task_unexpected_output(
456461
return sleeper_task
457462

458463

464+
@pytest.fixture()
465+
def task_with_file_to_key_map_in_input_data(
466+
sidecar_task: Callable[..., ServiceExampleParam],
467+
) -> ServiceExampleParam:
468+
"""This task has a file-to-key map in the input data but receives zip data instead"""
469+
return sidecar_task(
470+
command=["/bin/bash", "-c", "echo we create nothingness"],
471+
input_data=TaskInputData.model_validate(
472+
{
473+
"input_1": 23,
474+
"input_23": "a string input",
475+
"the_input_43": 15.0,
476+
"the_bool_input_54": False,
477+
"some_file_input_with_mapping": FileUrl(
478+
url=TypeAdapter(AnyUrl).validate_python(
479+
"s3://myserver/some_file_url.zip"
480+
),
481+
file_mapping="some_file_mapping",
482+
),
483+
}
484+
),
485+
)
486+
487+
459488
@pytest.fixture()
460489
def caplog_info_level(
461490
caplog: pytest.LogCaptureFixture,
@@ -809,6 +838,21 @@ def test_running_service_that_generates_unexpected_data_raises_exception(
809838
)
810839

811840

841+
@pytest.mark.parametrize(
842+
"integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True
843+
)
844+
def test_running_service_with_incorrect_zip_data_that_uses_a_file_to_key_map_raises_exception(
845+
caplog_info_level: pytest.LogCaptureFixture,
846+
app_environment: EnvVarsDict,
847+
dask_subsystem_mock: dict[str, mock.Mock],
848+
task_with_file_to_key_map_in_input_data: ServiceExampleParam,
849+
):
850+
with pytest.raises(ServiceInputsUseFileToKeyMapButReceivesZipDataError):
851+
run_computational_sidecar(
852+
**task_with_file_to_key_map_in_input_data.sidecar_params(),
853+
)
854+
855+
812856
@pytest.mark.parametrize(
813857
"integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True
814858
)

0 commit comments

Comments
 (0)