Skip to content

Commit ba3684c

Browse files
GitHKAndrei Neagu
andauthored
Avoids memory issues with big files while extracting archives (#2192)
* no longer requires to create folder before extracting * adding chunked extractor to avoid huge mermory usage * this is till required * no need to create destination * moved archiving to servicelib * fixing tests * pylint dose not undestand * replaced archiving and unarchivng - fixes an error with with high memory consumption * disabled compression * adding extra test information to error * adding extra tests * remove extra line * removing error from destination_folder * properly fixed error * only extract files * handle directories differently, just create them * fixing very badly written test Co-authored-by: Andrei Neagu <[email protected]>
1 parent dbcaf9f commit ba3684c

File tree

5 files changed

+401
-124
lines changed

5 files changed

+401
-124
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import asyncio
2+
import logging
3+
import zipfile
4+
from pathlib import Path
5+
from concurrent.futures import ProcessPoolExecutor
6+
from typing import Iterator
7+
8+
9+
log = logging.getLogger(__name__)
10+
11+
12+
def _full_file_path_from_dir_and_subdirs(dir_path: Path) -> Iterator[Path]:
13+
for path in dir_path.rglob("*"):
14+
if path.is_file():
15+
yield path
16+
17+
18+
def _strip_directory_from_path(input_path: Path, to_strip: Path) -> Path:
19+
to_strip = f"{str(to_strip)}/"
20+
return Path(str(input_path).replace(to_strip, ""))
21+
22+
23+
def _read_in_chunks(file_object, chunk_size=1024 * 8):
24+
"""Lazy function (generator) to read a file piece by piece.
25+
Default chunk size: 8k."""
26+
while True:
27+
data = file_object.read(chunk_size)
28+
if not data:
29+
break
30+
yield data
31+
32+
33+
def _zipfile_single_file_extract_worker(
34+
zip_file_path: Path, file_in_archive: str, destination_folder: Path, is_dir: bool
35+
) -> None:
36+
"""Extracing in chunks to avoid memory pressure on zip/unzip"""
37+
with zipfile.ZipFile(zip_file_path) as zf:
38+
# assemble destination and ensure it exits
39+
destination_path = destination_folder / file_in_archive
40+
41+
if is_dir:
42+
destination_path.mkdir(parents=True, exist_ok=True)
43+
return
44+
45+
with zf.open(name=file_in_archive) as zip_fp:
46+
with open(destination_path, "wb") as destination_fp:
47+
for chunk in _read_in_chunks(zip_fp):
48+
destination_fp.write(chunk)
49+
50+
51+
def ensure_destination_subdirectories_exist(
52+
zip_file_handler: zipfile.ZipFile, destination_folder: Path
53+
) -> None:
54+
# assemble full destination paths
55+
full_destination_paths = {
56+
destination_folder / entry.filename for entry in zip_file_handler.infolist()
57+
}
58+
# extract all possible subdirectories
59+
subdirectories = {x.parent for x in full_destination_paths}
60+
# create all subdirectories before extracting
61+
for subdirectory in subdirectories:
62+
Path(subdirectory).mkdir(parents=True, exist_ok=True)
63+
64+
65+
async def unarchive_dir(archive_to_extract: Path, destination_folder: Path) -> None:
66+
with zipfile.ZipFile(archive_to_extract, mode="r") as zip_file_handler:
67+
with ProcessPoolExecutor() as pool:
68+
loop = asyncio.get_event_loop()
69+
70+
# running in process poll is not ideal for concurrency issues
71+
# to avoid race conditions all subdirectories where files will be extracted need to exist
72+
# creating them before the extraction is under way avoids the issue
73+
# the following avoids race conditions while unzippin in parallel
74+
ensure_destination_subdirectories_exist(
75+
zip_file_handler=zip_file_handler,
76+
destination_folder=destination_folder,
77+
)
78+
79+
tasks = [
80+
loop.run_in_executor(
81+
pool,
82+
_zipfile_single_file_extract_worker,
83+
archive_to_extract,
84+
zip_entry.filename,
85+
destination_folder,
86+
zip_entry.is_dir(),
87+
)
88+
for zip_entry in zip_file_handler.infolist()
89+
]
90+
91+
await asyncio.gather(*tasks)
92+
93+
94+
def _serial_add_to_archive(
95+
dir_to_compress: Path, destination: Path, compress: bool, store_relative_path: bool
96+
) -> None:
97+
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
98+
with zipfile.ZipFile(destination, "w", compression=compression) as zip_file_handler:
99+
files_to_compress_generator = _full_file_path_from_dir_and_subdirs(
100+
dir_to_compress
101+
)
102+
for file_to_add in files_to_compress_generator:
103+
try:
104+
file_name_in_archive = (
105+
_strip_directory_from_path(file_to_add, dir_to_compress)
106+
if store_relative_path
107+
else file_to_add
108+
)
109+
zip_file_handler.write(file_to_add, file_name_in_archive)
110+
except ValueError:
111+
log.exception("Could write files to archive, please check logs")
112+
return False
113+
return True
114+
115+
116+
async def archive_dir(
117+
dir_to_compress: Path, destination: Path, compress: bool, store_relative_path: bool
118+
) -> bool:
119+
with ProcessPoolExecutor(max_workers=1) as pool:
120+
return await asyncio.get_event_loop().run_in_executor(
121+
pool,
122+
_serial_add_to_archive,
123+
dir_to_compress,
124+
destination,
125+
compress,
126+
store_relative_path,
127+
)
128+
129+
130+
__all__ = ["archive_dir", "unarchive_dir"]
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
# pylint:disable=redefined-outer-name,unused-argument
2+
3+
import os
4+
import tempfile
5+
import hashlib
6+
import random
7+
from pathlib import Path
8+
import asyncio
9+
from typing import Set, List, Dict, Iterator, Tuple
10+
from concurrent.futures import ProcessPoolExecutor
11+
import string
12+
import secrets
13+
14+
15+
import pytest
16+
17+
from servicelib.archiving_utils import archive_dir, unarchive_dir
18+
19+
20+
@pytest.fixture
21+
def temp_dir_one() -> Path:
22+
with tempfile.TemporaryDirectory() as temp_dir:
23+
yield Path(temp_dir)
24+
25+
26+
@pytest.fixture
27+
def temp_dir_two(tmpdir) -> Path:
28+
with tempfile.TemporaryDirectory() as temp_dir:
29+
yield Path(temp_dir)
30+
31+
32+
@pytest.fixture
33+
def dir_with_random_content() -> Path:
34+
def random_string(length: int) -> str:
35+
return "".join(secrets.choice(string.ascii_letters) for i in range(length))
36+
37+
def make_files_in_dir(dir_path: Path, file_count: int) -> None:
38+
for _ in range(file_count):
39+
(dir_path / f"{random_string(8)}.bin").write_bytes(
40+
os.urandom(random.randint(1, 10))
41+
)
42+
43+
def ensure_dir(path_to_ensure: Path) -> Path:
44+
path_to_ensure.mkdir(parents=True, exist_ok=True)
45+
return path_to_ensure
46+
47+
def make_subdirectory_with_content(subdir_name: Path, max_file_count: int) -> None:
48+
subdir_name = ensure_dir(subdir_name)
49+
make_files_in_dir(
50+
dir_path=subdir_name,
51+
file_count=random.randint(1, max_file_count),
52+
)
53+
54+
def make_subdirectories_with_content(
55+
subdir_name: Path, max_subdirectories_count: int, max_file_count: int
56+
) -> None:
57+
subdirectories_count = random.randint(1, max_subdirectories_count)
58+
for _ in range(subdirectories_count):
59+
make_subdirectory_with_content(
60+
subdir_name=subdir_name / f"{random_string(4)}",
61+
max_file_count=max_file_count,
62+
)
63+
64+
def get_dirs_and_subdris_in_path(path_to_scan: Path) -> Iterator[Path]:
65+
return [path for path in path_to_scan.rglob("*") if path.is_dir()]
66+
67+
with tempfile.TemporaryDirectory() as temp_dir:
68+
temp_dir_path = Path(temp_dir)
69+
data_container = ensure_dir(temp_dir_path / "study_data")
70+
71+
make_subdirectories_with_content(
72+
subdir_name=data_container, max_subdirectories_count=5, max_file_count=5
73+
)
74+
make_files_in_dir(dir_path=data_container, file_count=5)
75+
76+
# creates a good amount of files
77+
for _ in range(4):
78+
for subdirectory_path in get_dirs_and_subdris_in_path(data_container):
79+
make_subdirectories_with_content(
80+
subdir_name=subdirectory_path,
81+
max_subdirectories_count=3,
82+
max_file_count=3,
83+
)
84+
85+
yield temp_dir_path
86+
87+
88+
def strip_directory_from_path(input_path: Path, to_strip: Path) -> Path:
89+
to_strip = f"{str(to_strip)}/"
90+
return Path(str(input_path).replace(to_strip, ""))
91+
92+
93+
def get_all_files_in_dir(dir_path: Path) -> Set[Path]:
94+
return {
95+
strip_directory_from_path(x, dir_path)
96+
for x in dir_path.rglob("*")
97+
if x.is_file()
98+
}
99+
100+
101+
def _compute_hash(file_path: Path) -> Tuple[Path, str]:
102+
with open(file_path, "rb") as file_to_hash:
103+
file_hash = hashlib.md5()
104+
chunk = file_to_hash.read(8192)
105+
while chunk:
106+
file_hash.update(chunk)
107+
chunk = file_to_hash.read(8192)
108+
109+
return file_path, file_hash.hexdigest()
110+
111+
112+
async def compute_hashes(file_paths: List[Path]) -> Dict[Path, str]:
113+
"""given a list of files computes hashes for the files on a process pool"""
114+
115+
loop = asyncio.get_event_loop()
116+
117+
with ProcessPoolExecutor() as prcess_pool_executor:
118+
tasks = [
119+
loop.run_in_executor(prcess_pool_executor, _compute_hash, file_path)
120+
for file_path in file_paths
121+
]
122+
# pylint: disable=unnecessary-comprehension
123+
# see return value of _compute_hash it is a tuple, mapping list[Tuple[Path,str]] to Dict[Path, str] here
124+
return {k: v for k, v in await asyncio.gather(*tasks)}
125+
126+
127+
def full_file_path_from_dir_and_subdirs(dir_path: Path) -> List[Path]:
128+
return [x for x in dir_path.rglob("*") if x.is_file()]
129+
130+
131+
async def assert_same_directory_content(
132+
dir_to_compress: Path, output_dir: Path, inject_relative_path: Path = None
133+
) -> None:
134+
def _relative_path(input_path: Path) -> Path:
135+
return Path(str(inject_relative_path / str(input_path))[1:])
136+
137+
input_set = get_all_files_in_dir(dir_to_compress)
138+
output_set = get_all_files_in_dir(output_dir)
139+
140+
if inject_relative_path is not None:
141+
input_set = {_relative_path(x) for x in input_set}
142+
143+
assert (
144+
input_set == output_set
145+
), f"There following files are missing {input_set - output_set}"
146+
147+
# computing the hashes for dir_to_compress and map in a dict
148+
# with the name starting from the root of the directory and md5sum
149+
dir_to_compress_hashes = {
150+
strip_directory_from_path(k, dir_to_compress): v
151+
for k, v in (
152+
await compute_hashes(full_file_path_from_dir_and_subdirs(dir_to_compress))
153+
).items()
154+
}
155+
156+
# computing the hashes for output_dir and map in a dict
157+
# with the name starting from the root of the directory and md5sum
158+
output_dir_hashes = {
159+
strip_directory_from_path(k, output_dir): v
160+
for k, v in (
161+
await compute_hashes(full_file_path_from_dir_and_subdirs(output_dir))
162+
).items()
163+
}
164+
165+
# finally check if hashes are mapped 1 to 1 in order to verify
166+
# that the compress/decompress worked correctly
167+
for key in dir_to_compress_hashes:
168+
assert (
169+
dir_to_compress_hashes[key]
170+
== output_dir_hashes[_relative_path(key) if inject_relative_path else key]
171+
)
172+
173+
174+
# end utils
175+
176+
177+
@pytest.mark.parametrize(
178+
"compress,store_relative_path",
179+
[[True, True], [True, False], [False, True], [False, False]],
180+
)
181+
async def test_archive_unarchive_same_structure_dir(
182+
dir_with_random_content: Path,
183+
temp_dir_one: Path,
184+
temp_dir_two: Path,
185+
compress: bool,
186+
store_relative_path: bool,
187+
):
188+
archive_file = temp_dir_one / "archive.zip"
189+
190+
archive_result = await archive_dir(
191+
dir_to_compress=dir_with_random_content,
192+
destination=archive_file,
193+
store_relative_path=store_relative_path,
194+
compress=compress,
195+
)
196+
assert archive_result is True
197+
198+
await unarchive_dir(
199+
archive_to_extract=archive_file, destination_folder=temp_dir_two
200+
)
201+
202+
await assert_same_directory_content(
203+
dir_with_random_content,
204+
temp_dir_two,
205+
None if store_relative_path else dir_with_random_content,
206+
)
207+
208+
209+
@pytest.mark.parametrize(
210+
"compress,store_relative_path",
211+
[[True, True], [True, False], [False, True], [False, False]],
212+
)
213+
async def test_unarchive_in_same_dir_as_archive(
214+
dir_with_random_content: Path,
215+
temp_dir_one: Path,
216+
compress: bool,
217+
store_relative_path: bool,
218+
):
219+
archive_file = temp_dir_one / "archive.zip"
220+
221+
archive_result = await archive_dir(
222+
dir_to_compress=dir_with_random_content,
223+
destination=archive_file,
224+
store_relative_path=store_relative_path,
225+
compress=compress,
226+
)
227+
assert archive_result is True
228+
229+
await unarchive_dir(
230+
archive_to_extract=archive_file, destination_folder=temp_dir_one
231+
)
232+
archive_file.unlink()
233+
await assert_same_directory_content(
234+
dir_with_random_content,
235+
temp_dir_one,
236+
None if store_relative_path else dir_with_random_content,
237+
)

0 commit comments

Comments
 (0)