Skip to content

Commit 7656dc0

Browse files
authored
🐛 is659/decompressing-hangs (ITISFoundation#3291)
1 parent 7a9bd3f commit 7656dc0

File tree

3 files changed

+149
-73
lines changed

3 files changed

+149
-73
lines changed

packages/service-library/src/servicelib/archiving_utils.py

Lines changed: 123 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,28 @@
66
from contextlib import contextmanager
77
from functools import partial
88
from pathlib import Path
9-
from typing import Final, Iterator, Optional, Union
9+
from typing import Final, Iterator, Optional
1010

11-
from servicelib.pools import non_blocking_process_pool_executor
1211
from tqdm import tqdm
1312
from tqdm.asyncio import tqdm_asyncio
1413
from tqdm.contrib.logging import logging_redirect_tqdm, tqdm_logging_redirect
1514

16-
MAX_UNARCHIVING_WORKER_COUNT: Final[int] = 2
17-
CHUNK_SIZE: Final[int] = 1024 * 8
15+
from .file_utils import remove_directory
16+
from .pools import non_blocking_process_pool_executor
17+
18+
_MIN: Final[int] = 60 # secs
19+
_MAX_UNARCHIVING_WORKER_COUNT: Final[int] = 2
20+
_CHUNK_SIZE: Final[int] = 1024 * 8
21+
1822
log = logging.getLogger(__name__)
1923

2024

25+
class ArchiveError(Exception):
26+
"""
27+
Error raised while archiving or unarchiving
28+
"""
29+
30+
2131
def _human_readable_size(size, decimal_places=3):
2232
human_readable_file_size = float(size)
2333
unit = "B"
@@ -109,13 +119,13 @@ def _zipfile_single_file_extract_worker(
109119
) as dest_fp, tqdm_logging_redirect(
110120
total=file_in_archive.file_size, desc=desc, **_TQDM_FILE_OPTIONS
111121
) as pbar:
112-
while chunk := zip_fp.read(CHUNK_SIZE):
122+
while chunk := zip_fp.read(_CHUNK_SIZE):
113123
dest_fp.write(chunk)
114124
pbar.update(len(chunk))
115125
return destination_path
116126

117127

118-
def ensure_destination_subdirectories_exist(
128+
def _ensure_destination_subdirectories_exist(
119129
zip_file_handler: zipfile.ZipFile, destination_folder: Path
120130
) -> None:
121131
# assemble full destination paths
@@ -133,32 +143,40 @@ async def unarchive_dir(
133143
archive_to_extract: Path,
134144
destination_folder: Path,
135145
*,
136-
max_workers: int = MAX_UNARCHIVING_WORKER_COUNT,
146+
max_workers: int = _MAX_UNARCHIVING_WORKER_COUNT,
137147
) -> set[Path]:
138148
"""Extracts zipped file archive_to_extract to destination_folder,
139149
preserving all relative files and folders inside the archive
140150
141151
Returns a set with all the paths extracted from archive. It includes
142152
all tree leafs, which might include files or empty folders
153+
154+
155+
NOTE: ``destination_folder`` is fully deleted after error
156+
157+
::raise ArchiveError
143158
"""
144159
with zipfile.ZipFile(
145160
archive_to_extract, mode="r"
146161
) as zip_file_handler, logging_redirect_tqdm():
147-
with non_blocking_process_pool_executor(max_workers=max_workers) as pool:
148-
loop = asyncio.get_event_loop()
162+
with non_blocking_process_pool_executor(
163+
max_workers=max_workers
164+
) as process_pool:
165+
event_loop = asyncio.get_event_loop()
149166

150167
# running in process poll is not ideal for concurrency issues
151168
# to avoid race conditions all subdirectories where files will be extracted need to exist
152169
# creating them before the extraction is under way avoids the issue
153170
# the following avoids race conditions while unzippin in parallel
154-
ensure_destination_subdirectories_exist(
171+
_ensure_destination_subdirectories_exist(
155172
zip_file_handler=zip_file_handler,
156173
destination_folder=destination_folder,
157174
)
158175

159-
tasks = [
160-
loop.run_in_executor(
161-
pool,
176+
futures: list[asyncio.Future] = [
177+
event_loop.run_in_executor(
178+
process_pool,
179+
# ---------
162180
_zipfile_single_file_extract_worker,
163181
archive_to_extract,
164182
zip_entry,
@@ -168,20 +186,41 @@ async def unarchive_dir(
168186
for zip_entry in zip_file_handler.infolist()
169187
]
170188

171-
extracted_paths: list[Path] = await tqdm_asyncio.gather(
172-
*tasks,
173-
desc=f"decompressing {archive_to_extract} -> {destination_folder} [{len(tasks)} file{'s' if len(tasks) > 1 else ''}"
174-
f"/{_human_readable_size(archive_to_extract.stat().st_size)}]\n",
175-
total=len(tasks),
176-
**_TQDM_MULTI_FILES_OPTIONS,
177-
)
189+
try:
190+
extracted_paths: list[Path] = await tqdm_asyncio.gather(
191+
*futures,
192+
desc=f"decompressing {archive_to_extract} -> {destination_folder} [{len(futures)} file{'s' if len(futures) > 1 else ''}"
193+
f"/{_human_readable_size(archive_to_extract.stat().st_size)}]\n",
194+
total=len(futures),
195+
**_TQDM_MULTI_FILES_OPTIONS,
196+
)
197+
198+
except Exception as err:
199+
for f in futures:
200+
f.cancel()
201+
202+
# wait until all tasks are cancelled
203+
await asyncio.wait(
204+
*futures, timeout=2 * _MIN, return_when=asyncio.ALL_COMPLETED
205+
)
206+
207+
# now we can cleanup
208+
if destination_folder.exists() and destination_folder.is_dir():
209+
await remove_directory(destination_folder, ignore_errors=True)
178210

179-
# NOTE: extracted_paths includes all tree leafs, which might include files and empty folders
180-
return {
181-
p
182-
for p in extracted_paths
183-
if p.is_file() or (p.is_dir() and not any(p.glob("*")))
184-
}
211+
raise ArchiveError(
212+
f"Failed unarchiving {archive_to_extract} -> {destination_folder} due to {type(err)}."
213+
f"Details: {err}"
214+
) from err
215+
216+
else:
217+
218+
# NOTE: extracted_paths includes all tree leafs, which might include files and empty folders
219+
return {
220+
p
221+
for p in extracted_paths
222+
if p.is_file() or (p.is_dir() and not any(p.glob("*")))
223+
}
185224

186225

187226
@contextmanager
@@ -215,39 +254,33 @@ def _add_to_archive(
215254
compress: bool,
216255
store_relative_path: bool,
217256
exclude_patterns: Optional[set[str]] = None,
218-
) -> Optional[Exception]:
219-
try:
220-
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
221-
folder_size_bytes = sum(
222-
file.stat().st_size
223-
for file in _iter_files_to_compress(dir_to_compress, exclude_patterns)
224-
)
225-
desc = f"compressing {dir_to_compress} -> {destination}"
226-
with tqdm_logging_redirect(
227-
desc=f"{desc}\n", total=folder_size_bytes, **_TQDM_FILE_OPTIONS
228-
) as progress_bar, _progress_enabled_zip_write_handler(
229-
zipfile.ZipFile(destination, "w", compression=compression), progress_bar
230-
) as zip_file_handler:
231-
for file_to_add in _iter_files_to_compress(
232-
dir_to_compress, exclude_patterns
233-
):
234-
progress_bar.set_description(f"{desc}/{file_to_add.name}\n")
235-
file_name_in_archive = (
236-
_strip_directory_from_path(file_to_add, dir_to_compress)
237-
if store_relative_path
238-
else file_to_add
239-
)
257+
) -> None:
258+
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
259+
folder_size_bytes = sum(
260+
file.stat().st_size
261+
for file in _iter_files_to_compress(dir_to_compress, exclude_patterns)
262+
)
263+
desc = f"compressing {dir_to_compress} -> {destination}"
264+
with tqdm_logging_redirect(
265+
desc=f"{desc}\n", total=folder_size_bytes, **_TQDM_FILE_OPTIONS
266+
) as progress_bar, _progress_enabled_zip_write_handler(
267+
zipfile.ZipFile(destination, "w", compression=compression), progress_bar
268+
) as zip_file_handler:
269+
for file_to_add in _iter_files_to_compress(dir_to_compress, exclude_patterns):
270+
progress_bar.set_description(f"{desc}/{file_to_add.name}\n")
271+
file_name_in_archive = (
272+
_strip_directory_from_path(file_to_add, dir_to_compress)
273+
if store_relative_path
274+
else file_to_add
275+
)
240276

241-
# because surrogates are not allowed in zip files,
242-
# replacing them will ensure errors will not happen.
243-
escaped_file_name_in_archive = _strip_undecodable_in_path(
244-
file_name_in_archive
245-
)
277+
# because surrogates are not allowed in zip files,
278+
# replacing them will ensure errors will not happen.
279+
escaped_file_name_in_archive = _strip_undecodable_in_path(
280+
file_name_in_archive
281+
)
246282

247-
zip_file_handler.write(file_to_add, escaped_file_name_in_archive)
248-
except Exception as e: # pylint: disable=broad-except
249-
return e
250-
return None
283+
zip_file_handler.write(file_to_add, escaped_file_name_in_archive)
251284

252285

253286
async def archive_dir(
@@ -265,22 +298,38 @@ async def archive_dir(
265298
266299
The **exclude_patterns** is a set of patterns created using
267300
Unix shell-style wildcards to exclude files and directories.
301+
302+
destination: Path deleted if errors
303+
304+
::raise ArchiveError
268305
"""
269-
with non_blocking_process_pool_executor(max_workers=1) as pool:
270-
add_to_archive_error: Union[
271-
None, Exception
272-
] = await asyncio.get_event_loop().run_in_executor(
273-
pool,
274-
_add_to_archive,
275-
dir_to_compress,
276-
destination,
277-
compress,
278-
store_relative_path,
279-
exclude_patterns,
280-
)
281-
282-
if isinstance(add_to_archive_error, Exception):
283-
raise add_to_archive_error
306+
with non_blocking_process_pool_executor(max_workers=1) as process_pool:
307+
event_loop = asyncio.get_event_loop()
308+
309+
try:
310+
await event_loop.run_in_executor(
311+
process_pool,
312+
# ---------
313+
_add_to_archive,
314+
dir_to_compress,
315+
destination,
316+
compress,
317+
store_relative_path,
318+
exclude_patterns,
319+
)
320+
except Exception as err:
321+
if destination.is_file():
322+
destination.unlink(missing_ok=True)
323+
324+
raise ArchiveError(
325+
f"Failed archiving {dir_to_compress} -> {destination} due to {type(err)}."
326+
f"Details: {err}"
327+
) from err
328+
329+
except BaseException:
330+
if destination.is_file():
331+
destination.unlink(missing_ok=True)
332+
raise
284333

285334

286335
def is_leaf_path(p: Path) -> bool:
@@ -347,6 +396,8 @@ def prune(self, exclude: set[Path]) -> None:
347396

348397
__all__ = (
349398
"archive_dir",
399+
"ArchiveError",
400+
"is_leaf_path",
350401
"PrunableFolder",
351402
"unarchive_dir",
352403
)

packages/service-library/src/servicelib/file_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ async def remove_directory(
2525
if only_children:
2626
await asyncio.gather(*[_rm(child, ignore_errors) for child in path.glob("*")])
2727
else:
28-
shutil.rmtree(path, ignore_errors=ignore_errors)
28+
await _shutil_rmtree(path, ignore_errors=ignore_errors)

packages/service-library/tests/test_archiving_utils.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,31 @@ def assert_unarchived_paths(
240240
# TESTS
241241

242242

243+
@pytest.mark.skip(reason="DEV:only for manual tessting")
244+
async def test_archiving_utils_against_sample(
245+
osparc_simcore_root_dir: Path, tmp_path: Path
246+
):
247+
"""
248+
ONLY for manual testing
249+
User MUST provide a sample of a zip file in ``sample_path``
250+
"""
251+
sample_path = osparc_simcore_root_dir / "keep.ignore" / "workspace.zip"
252+
destination = tmp_path / "unzipped"
253+
254+
extracted_paths = await unarchive_dir(sample_path, destination)
255+
assert extracted_paths
256+
257+
for p in extracted_paths:
258+
assert isinstance(p, Path), p
259+
260+
await archive_dir(
261+
dir_to_compress=destination,
262+
destination=tmp_path / "test_it.zip",
263+
compress=True,
264+
store_relative_path=True,
265+
)
266+
267+
243268
@pytest.mark.parametrize(
244269
"compress,store_relative_path",
245270
itertools.product([True, False], repeat=2),

0 commit comments

Comments
 (0)