Skip to content

Commit 25178f5

Browse files
GitHKAndrei Neagu
andauthored
🐛 Making storage logs more readable & faster decompression (ITISFoundation#3024)
* cancelled error is now captured and logged * errors are more meaningful now * pylint * refactor * better wording * making file unzipping way faster * docstrings * pylint * usigng create task * improoved log messages * pylint * raised error to debug * updated docstring * making parameter mandatory Co-authored-by: Andrei Neagu <[email protected]>
1 parent 046b370 commit 25178f5

File tree

7 files changed

+109
-39
lines changed

7 files changed

+109
-39
lines changed

packages/service-library/src/servicelib/archiving_utils.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,42 @@ def _read_in_chunks(file_object, chunk_size=1024 * 8):
4242
yield data
4343

4444

45+
class _FastZipFileReader(zipfile.ZipFile):
46+
"""
47+
Used to gain a speed boost of several orders of magnitude.
48+
49+
When opening archives the `_RealGetContents` is called
50+
generating the list of files contained in the zip archive.
51+
This is done by the constructor.
52+
53+
If the archive contains a very large amount, the file scan operation
54+
can take up to seconds. This was observed with 10000+ files.
55+
56+
When opening the zip file in the background worker the entire file
57+
list generation can be skipped because the `zipfile.ZipFile.open`
58+
is used passing `ZipInfo` object as file to decompress.
59+
Using a `ZipInfo` object does nto require to have the list of
60+
files contained in the archive.
61+
"""
62+
63+
def _RealGetContents(self):
64+
"""method disabled"""
65+
66+
4567
def _zipfile_single_file_extract_worker(
46-
zip_file_path: Path, file_in_archive: str, destination_folder: Path, is_dir: bool
68+
zip_file_path: Path,
69+
file_in_archive: zipfile.ZipInfo,
70+
destination_folder: Path,
71+
is_dir: bool,
4772
) -> Path:
4873
"""Extracts file_in_archive from the archive zip_file_path -> destination_folder/file_in_archive
4974
5075
Extracts in chunks to avoid memory pressure on zip/unzip
51-
Retuns a path to extracted file or directory
76+
returns: a path to extracted file or directory
5277
"""
53-
with zipfile.ZipFile(zip_file_path) as zf:
78+
with _FastZipFileReader(zip_file_path) as zf:
5479
# assemble destination and ensure it exits
55-
destination_path = destination_folder / file_in_archive
80+
destination_path = destination_folder / file_in_archive.filename
5681

5782
if is_dir:
5883
destination_path.mkdir(parents=True, exist_ok=True)
@@ -109,7 +134,7 @@ async def unarchive_dir(
109134
pool,
110135
_zipfile_single_file_extract_worker,
111136
archive_to_extract,
112-
zip_entry.filename,
137+
zip_entry,
113138
destination_folder,
114139
zip_entry.is_dir(),
115140
)

packages/service-library/src/servicelib/pools.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
from concurrent.futures import ProcessPoolExecutor
33
from contextlib import contextmanager
4-
from typing import Any, Callable
4+
from typing import Any, Callable, Iterator
55

66
# only gets created on use and is guaranteed to be the s
77
# ame for the entire lifetime of the application
@@ -21,7 +21,7 @@ def get_shared_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
2121

2222

2323
@contextmanager
24-
def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
24+
def non_blocking_process_pool_executor(**kwargs) -> Iterator[ProcessPoolExecutor]:
2525
"""
2626
Avoids default context manger behavior which calls
2727
shutdown with wait=True an blocks.

packages/service-library/src/servicelib/utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@ def fire_and_forget_task(
6969
def log_exception_callback(fut: asyncio.Future):
7070
try:
7171
fut.result()
72+
except asyncio.CancelledError:
73+
logger.exception("%s spawned as fire&forget was cancelled", fut)
7274
except Exception: # pylint: disable=broad-except
73-
logger.exception("Error occured while running task!")
75+
logger.exception("Error occurred while running task!")
7476

7577
future.add_done_callback(log_exception_callback)
7678
return future

packages/service-library/tests/test_utils.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44

55
import asyncio
66
from pathlib import Path
7+
from typing import Awaitable, Coroutine, Union, Type
78

89
import pytest
9-
from servicelib.utils import logged_gather
10+
from servicelib.utils import logged_gather, fire_and_forget_task
1011

1112

1213
async def _value_error(uid, *, delay=1):
@@ -91,3 +92,45 @@ def print_tree(path: Path, level=0):
9192
print(f"{tab}{'+' if path.is_dir() else '-'} {path if level==0 else path.name}")
9293
for p in path.glob("*"):
9394
print_tree(p, level + 1)
95+
96+
97+
@pytest.fixture(params=[Awaitable, Coroutine])
98+
def future_type(request) -> Type[Union[asyncio.Future, Awaitable]]:
99+
return request.param
100+
101+
102+
@pytest.fixture()
103+
async def future_to_test(
104+
future_type: Type[Union[asyncio.Future, Awaitable]]
105+
) -> Union[asyncio.Future, Awaitable]:
106+
async def _self_cancelling() -> None:
107+
raise asyncio.CancelledError("manual cancellation")
108+
109+
if future_type == Awaitable:
110+
return asyncio.create_task(_self_cancelling())
111+
if future_type == Coroutine:
112+
return _self_cancelling()
113+
114+
raise RuntimeError("not defined")
115+
116+
117+
async def test_fire_and_forget_cancellation_errors_raised_when_awaited(
118+
future_type: Type[Union[asyncio.Future, Awaitable]],
119+
future_to_test: Union[asyncio.Future, Awaitable],
120+
) -> None:
121+
assert isinstance(future_to_test, future_type)
122+
123+
task = fire_and_forget_task(future_to_test)
124+
with pytest.raises(asyncio.CancelledError):
125+
await task
126+
127+
128+
async def test_fire_and_forget_cancellation_no_errors_raised(
129+
future_type: Type[Union[asyncio.Future, Awaitable]],
130+
future_to_test: Union[asyncio.Future, Awaitable],
131+
) -> None:
132+
assert isinstance(future_to_test, future_type)
133+
134+
task = fire_and_forget_task(future_to_test)
135+
await asyncio.sleep(0.1)
136+
assert task.cancelled() is True

services/director-v2/src/simcore_service_director_v2/modules/projects_networks.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,14 @@ async def _get_networks_with_aliases_for_default_network(
205205
project_id=project_id,
206206
node_id=UUID(node_uuid),
207207
messages=[
208+
# pylint:disable=anomalous-backslash-in-string
208209
(
209-
f"Service with label '{node_content.label}' cannot "
210-
f"be connected to network {default_network}. If you "
211-
"whish for your services in this study to communicate "
212-
"provide a label containing the following characters "
213-
"'a-zA-Z0-9_-' and must be max 64 letters long."
210+
f"Service with label '{node_content.label}' cannot be "
211+
"identified on service network due to invalid name. "
212+
"To communicate over the network, please rename the "
213+
"service alphanumeric characters <64 characters, "
214+
"e.g. re.sub(r'\W+', '', SERVICE_NAME). "
215+
f"Network name is {default_network}"
214216
)
215217
],
216218
)

services/storage/src/simcore_service_storage/dsm.py

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@
3131
from sqlalchemy.dialects.postgresql import insert as pg_insert
3232
from sqlalchemy.sql.expression import literal_column
3333
from tenacity import retry
34+
from tenacity._asyncio import AsyncRetrying
3435
from tenacity.before_sleep import before_sleep_log
35-
from tenacity.retry import retry_if_exception_type, retry_if_result
36+
from tenacity.retry import retry_if_exception_type
3637
from tenacity.stop import stop_after_delay
3738
from tenacity.wait import wait_exponential
3839
from yarl import URL
@@ -234,6 +235,7 @@ async def list_files(
234235
dex.fmd.file_uuid,
235236
dex.fmd.bucket_name,
236237
dex.fmd.object_name,
238+
reraise_exceptions=False,
237239
)
238240
if dex:
239241
data.append(dex)
@@ -410,6 +412,7 @@ async def list_file(
410412
file_metadata.fmd.file_uuid,
411413
file_metadata.fmd.bucket_name,
412414
file_metadata.fmd.object_name,
415+
reraise_exceptions=False,
413416
)
414417
return file_metadata
415418
# FIXME: returns None in both cases: file does not exist or use has no access
@@ -441,7 +444,8 @@ async def try_update_database_from_storage(
441444
file_uuid: str,
442445
bucket_name: str,
443446
object_name: str,
444-
silence_exception: bool = False,
447+
*,
448+
reraise_exceptions: bool,
445449
) -> Optional[FileMetaDataEx]:
446450
try:
447451
async with self._create_aiobotocore_client_context() as aioboto_client:
@@ -472,29 +476,23 @@ async def try_update_database_from_storage(
472476

473477
return to_meta_data_extended(row)
474478
except botocore.exceptions.ClientError:
475-
if silence_exception:
476-
logger.debug("Error happened while trying to access %s", file_uuid)
477-
else:
478-
logger.warning(
479-
"Error happened while trying to access %s", file_uuid, exc_info=True
480-
)
481-
# the file is not existing or some error happened
482-
return None
479+
logger.warning("Error happened while trying to access %s", file_uuid)
480+
if reraise_exceptions:
481+
raise
483482

484-
@retry(
485-
stop=stop_after_delay(1 * _HOUR),
486-
wait=wait_exponential(multiplier=0.1, exp_base=1.2, max=30),
487-
retry=(
488-
retry_if_exception_type() | retry_if_result(lambda result: result is None)
489-
),
490-
before_sleep=before_sleep_log(logger, logging.INFO),
491-
)
492483
async def auto_update_database_from_storage_task(
493484
self, file_uuid: str, bucket_name: str, object_name: str
494485
) -> Optional[FileMetaDataEx]:
495-
return await self.try_update_database_from_storage(
496-
file_uuid, bucket_name, object_name, silence_exception=True
497-
)
486+
async for attempt in AsyncRetrying(
487+
stop=stop_after_delay(1 * _HOUR),
488+
wait=wait_exponential(multiplier=0.1, exp_base=1.2, max=30),
489+
retry=(retry_if_exception_type()),
490+
before_sleep=before_sleep_log(logger, logging.INFO),
491+
):
492+
with attempt:
493+
return await self.try_update_database_from_storage(
494+
file_uuid, bucket_name, object_name, reraise_exceptions=True
495+
)
498496

499497
async def update_metadata(
500498
self, file_uuid: str, user_id: int

services/storage/tests/unit/test_dsm.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,29 +180,29 @@ async def test_update_metadata_from_storage(
180180

181181
assert (
182182
await dsm_fixture.try_update_database_from_storage( # pylint: disable=protected-access
183-
"some_fake_uuid", fmd.bucket_name, fmd.object_name
183+
"some_fake_uuid", fmd.bucket_name, fmd.object_name, reraise_exceptions=False
184184
)
185185
is None
186186
)
187187

188188
assert (
189189
await dsm_fixture.try_update_database_from_storage( # pylint: disable=protected-access
190-
fmd.file_uuid, "some_fake_bucket", fmd.object_name
190+
fmd.file_uuid, "some_fake_bucket", fmd.object_name, reraise_exceptions=False
191191
)
192192
is None
193193
)
194194

195195
assert (
196196
await dsm_fixture.try_update_database_from_storage( # pylint: disable=protected-access
197-
fmd.file_uuid, fmd.bucket_name, "some_fake_object"
197+
fmd.file_uuid, fmd.bucket_name, "some_fake_object", reraise_exceptions=False
198198
)
199199
is None
200200
)
201201

202202
file_metadata: Optional[
203203
FileMetaDataEx
204204
] = await dsm_fixture.try_update_database_from_storage( # pylint: disable=protected-access
205-
fmd.file_uuid, fmd.bucket_name, fmd.object_name
205+
fmd.file_uuid, fmd.bucket_name, fmd.object_name, reraise_exceptions=False
206206
)
207207
assert file_metadata is not None
208208
assert file_metadata.fmd.file_size == Path(tmp_file).stat().st_size

0 commit comments

Comments
 (0)