Skip to content

Commit 21a15b3

Browse files
authored
🐛 Dask scheduler: Fix issue when value and/or size is not set in file download progress (ITISFoundation#2627)
* add hostname as name so it's easier to find which dask-sidecar
1 parent 79c9788 commit 21a15b3

File tree

22 files changed

+1065
-419
lines changed

22 files changed

+1065
-419
lines changed

packages/pytest-simcore/src/pytest_simcore/helpers/utils_docker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,4 +179,5 @@ def save_docker_infos(destination_path: Path):
179179
cont.logs(timestamps=True, stdout=True, stderr=True).decode(), width=200
180180
),
181181
)
182-
print("\n\twrote docker log files in ", destination_path)
182+
if all_containers:
183+
print("\n\twrote docker log files in ", destination_path)

packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,3 +318,14 @@ async def get_file_metadata(
318318
raise exceptions.StorageInvalidCall(f"The file '{s3_object}' cannot be found")
319319
log.debug("Result for metadata s3_object=%s, result=%s", s3_object, result)
320320
return (f"{result.get('location_id', '')}", result.get("entity_tag", ""))
321+
322+
323+
async def delete_file(
324+
user_id: int,
325+
store_id: str,
326+
s3_object: str,
327+
client_session: Optional[ClientSession] = None,
328+
) -> None:
329+
async with ClientSessionContextManager(client_session) as session:
330+
log.debug("Will delete file for s3_object=%s", s3_object)
331+
await storage_client.delete_file(session, s3_object, store_id, user_id)

packages/simcore-sdk/src/simcore_sdk/node_ports_common/storage_client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,26 @@ async def get_file_metadata(
149149
if file_metadata_enveloped.data is None:
150150
raise exceptions.S3InvalidPathError(file_id)
151151
return file_metadata_enveloped.data.dict(by_alias=True)
152+
153+
154+
@handle_client_exception
155+
async def delete_file(
156+
session: ClientSession, file_id: str, location_id: str, user_id: UserID
157+
) -> None:
158+
if (
159+
not isinstance(file_id, str)
160+
or not isinstance(location_id, str)
161+
or not isinstance(user_id, int)
162+
):
163+
raise exceptions.StorageInvalidCall(
164+
f"invalid call: user_id '{user_id}', location_id '{location_id}', file_id '{file_id}' are invalid",
165+
)
166+
if file_id is None or location_id is None or user_id is None:
167+
raise exceptions.StorageInvalidCall(
168+
f"invalid call: user_id '{user_id}', location_id '{location_id}', file_id '{file_id}' are not allowed to be empty",
169+
)
170+
async with session.delete(
171+
f"{_base_url()}/locations/{location_id}/files/{quote(file_id, safe='')}",
172+
params={"user_id": f"{user_id}"},
173+
) as response:
174+
response.raise_for_status()

packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,28 @@ async def get_upload_link_from_storage(
9191
return parse_obj_as(AnyUrl, f"{link}")
9292

9393

94+
async def target_link_exists(
95+
user_id: int, project_id: str, node_id: str, file_name: str
96+
) -> bool:
97+
log.debug(
98+
"checking if target of link to file from storage for %s exists", file_name
99+
)
100+
s3_object = data_items_utils.encode_file_id(Path(file_name), project_id, node_id)
101+
return await filemanager.entry_exists(
102+
user_id=user_id, store_id="0", s3_object=s3_object
103+
)
104+
105+
106+
async def delete_target_link(
107+
user_id: int, project_id: str, node_id: str, file_name: str
108+
) -> None:
109+
log.debug("deleting target of link to file from storage for %s", file_name)
110+
s3_object = data_items_utils.encode_file_id(Path(file_name), project_id, node_id)
111+
return await filemanager.delete_file(
112+
user_id=user_id, store_id="0", s3_object=s3_object
113+
)
114+
115+
94116
async def pull_file_from_store(
95117
user_id: int,
96118
key: str,

packages/simcore-sdk/src/simcore_sdk/node_ports_v2/ports_mapping.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def __getitem__(self, key: Union[int, PortKey]) -> Port:
1818
key = list(self.__root__.keys())[key]
1919
if not key in self.__root__:
2020
raise UnboundPortError(key)
21+
assert isinstance(key, str) # no sec
2122
return self.__root__[key]
2223

2324
def __iter__(self) -> Iterator[PortKey]:

packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import filecmp
77
from pathlib import Path
8-
from typing import Callable
8+
from typing import Any, Awaitable, Callable, Optional
99
from uuid import uuid4
1010

1111
import np_helpers
@@ -125,7 +125,9 @@ async def test_errors_upon_invalid_file_identifiers(
125125
await filemanager.download_file_from_s3(
126126
user_id=user_id,
127127
store_id=store,
128-
s3_object=np_helpers.file_uuid("invisible.txt", project_id, f"{uuid4()}"),
128+
s3_object=np_helpers.file_uuid(
129+
Path("invisible.txt"), project_id, f"{uuid4()}"
130+
),
129131
local_folder=download_folder,
130132
)
131133

@@ -201,27 +203,70 @@ async def test_valid_metadata(
201203
assert is_metadata_present is True
202204

203205

206+
@pytest.mark.parametrize(
207+
"fct",
208+
[filemanager.entry_exists, filemanager.delete_file, filemanager.get_file_metadata],
209+
)
204210
async def test_invalid_call_raises_exception(
205211
tmpdir: Path,
206212
bucket: str,
207213
filemanager_cfg: None,
208214
user_id: int,
209215
create_valid_file_uuid: Callable[[Path], str],
210216
s3_simcore_location: str,
217+
fct: Callable[[int, str, str, Optional[Any]], Awaitable],
211218
):
212219
file_path = Path(tmpdir) / "test.test"
213220
file_id = create_valid_file_uuid(file_path)
214221
assert file_path.exists() is False
215222

216223
with pytest.raises(exceptions.StorageInvalidCall):
217-
await filemanager.entry_exists(
224+
await fct(
218225
user_id=None, store_id=s3_simcore_location, s3_object=file_id # type: ignore
219226
)
220227
with pytest.raises(exceptions.StorageInvalidCall):
221-
await filemanager.entry_exists(
222-
user_id=user_id, store_id=None, s3_object=file_id # type: ignore
223-
)
228+
await fct(user_id=user_id, store_id=None, s3_object=file_id) # type: ignore
224229
with pytest.raises(exceptions.StorageInvalidCall):
225-
await filemanager.entry_exists(
230+
await fct(
226231
user_id=user_id, store_id=s3_simcore_location, s3_object=None # type: ignore
227232
)
233+
234+
235+
async def test_delete_File(
236+
tmpdir: Path,
237+
bucket: str,
238+
filemanager_cfg: None,
239+
user_id: int,
240+
create_valid_file_uuid: Callable[[Path], str],
241+
s3_simcore_location: str,
242+
):
243+
file_path = Path(tmpdir) / "test.test"
244+
file_path.write_text("I am a test file")
245+
assert file_path.exists()
246+
247+
file_id = create_valid_file_uuid(file_path)
248+
store_id, e_tag = await filemanager.upload_file(
249+
user_id=user_id,
250+
store_id=s3_simcore_location,
251+
s3_object=file_id,
252+
local_file_path=file_path,
253+
)
254+
assert store_id == s3_simcore_location
255+
assert e_tag
256+
257+
is_metadata_present = await filemanager.entry_exists(
258+
user_id=user_id, store_id=store_id, s3_object=file_id
259+
)
260+
assert is_metadata_present is True
261+
262+
await filemanager.delete_file(
263+
user_id=user_id, store_id=s3_simcore_location, s3_object=file_id
264+
)
265+
266+
# check that it disappeared
267+
assert (
268+
await filemanager.entry_exists(
269+
user_id=user_id, store_id=store_id, s3_object=file_id
270+
)
271+
== False
272+
)

services/dask-sidecar/Dockerfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ COPY --chown=scu:scu services/dask-sidecar/docker services/dask-sidecar/docker
125125
# make sure dask worker is started as ``dask-worker --dashboard-address 8787``.
126126
# Otherwise the worker will take random ports to serve the /health entrypoint.
127127
HEALTHCHECK \
128-
--interval=60s \
129-
--timeout=60s \
130-
--start-period=10s \
128+
--interval=30s \
129+
--timeout=15s \
130+
--start-period=5s \
131131
--retries=3 \
132132
CMD ["curl", "-Lf", "http://127.0.0.1:8787/health"]
133133

services/dask-sidecar/docker/boot.sh

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,23 @@ else
111111
--preload simcore_service_dask_sidecar.tasks \
112112
--reconnect \
113113
--no-nanny \
114+
--nprocs 1 \
114115
--nthreads "$num_cpus" \
115116
--dashboard-address 8787 \
116117
--memory-limit "$ram" \
117-
--resources "$resources"
118+
--resources "$resources" \
119+
--name "$(hostname)"
118120
else
119121
exec dask-worker "${DASK_SCHEDULER_ADDRESS}" \
120122
--local-directory /tmp/dask-sidecar \
121123
--preload simcore_service_dask_sidecar.tasks \
122124
--reconnect \
123125
--no-nanny \
126+
--nprocs 1 \
124127
--nthreads "$num_cpus" \
125128
--dashboard-address 8787 \
126129
--memory-limit "$ram" \
127-
--resources "$resources"
130+
--resources "$resources" \
131+
--name "$(hostname)"
128132
fi
129133
fi

services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from pathlib import Path
77
from pprint import pformat
88
from types import TracebackType
9-
from typing import Any, Awaitable, Dict, List, Optional, Type
9+
from typing import Any, Dict, List, Optional, Type
1010
from uuid import uuid4
1111

1212
from aiodocker import Docker
@@ -160,7 +160,7 @@ async def _publish_sidecar_state(
160160
async def run(self, command: List[str]) -> TaskOutputData:
161161
await self._publish_sidecar_state(RunningState.STARTED)
162162
await self._publish_sidecar_log(
163-
f"Starting task for {self.service_key}:{self.service_version}..."
163+
f"Starting task for {self.service_key}:{self.service_version} on {socket.gethostname()}..."
164164
)
165165

166166
settings = Settings.create_from_envs()
@@ -222,10 +222,6 @@ async def run(self, command: List[str]) -> TaskOutputData:
222222
msg=f"error while running container '{container.id}' for '{self.service_key}:{self.service_version}'",
223223
)
224224

225-
await self._publish_sidecar_log(
226-
f"Error while running container: task FAILED with exit code '{container_data['State']['ExitCode']}'"
227-
)
228-
229225
raise ServiceRunError(
230226
self.service_key,
231227
self.service_version,
@@ -250,6 +246,9 @@ async def __aexit__(
250246
exc_type: Optional[Type[BaseException]],
251247
exc: Optional[BaseException],
252248
tb: Optional[TracebackType],
253-
) -> Awaitable[Optional[bool]]:
254-
"""NOTE: this is empty but this is intended. the ComputationSidecar
255-
is meant to be used as context manager"""
249+
) -> None:
250+
if exc:
251+
await self._publish_sidecar_log(f"Task error:\n{exc}")
252+
await self._publish_sidecar_log(
253+
"There might be more information in the service log file"
254+
)

services/dask-sidecar/src/simcore_service_dask_sidecar/file_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ def _file_progress_cb(
3333
asyncio.run_coroutine_threadsafe(
3434
log_publishing_cb(
3535
f"{text_prefix}"
36-
f" {100.0 * float(value)/float(size):.1f}%"
37-
f" ({ByteSize(value).human_readable()} / {ByteSize(size).human_readable() if size != 0 else 'NaN'})"
36+
f" {100.0 * float(value or 0)/float(size or 1):.1f}%"
37+
f" ({ByteSize(value).human_readable() if value else 0} / {ByteSize(size).human_readable() if size else 'NaN'})"
3838
),
3939
main_loop,
4040
)

0 commit comments

Comments
 (0)