Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import re
import shlex
from collections.abc import Awaitable, Callable
from contextlib import AsyncExitStack
from pathlib import Path
Expand Down Expand Up @@ -214,7 +215,7 @@ async def archive_dir(
"-mta=off", # Don't store file access time
]
)
command = f"{_7ZIP_EXECUTABLE} {options} {destination} {dir_to_compress}/*"
command = f"{_7ZIP_EXECUTABLE} {options} {shlex.quote(f'{destination}')} {shlex.quote(f'{dir_to_compress}')}/*"

folder_size_bytes = sum(
file.stat().st_size for file in iter_files_to_compress(dir_to_compress)
Expand Down Expand Up @@ -295,7 +296,7 @@ async def unarchive_dir(
# get archive information
archive_info_parser = _7ZipArchiveInfoParser()
list_output = await _run_cli_command(
f"{_7ZIP_EXECUTABLE} l {archive_to_extract}",
f"{_7ZIP_EXECUTABLE} l {shlex.quote(f'{archive_to_extract}')}",
output_handler=archive_info_parser.parse_chunk,
)
file_names_in_archive = _extract_file_names_from_archive(list_output)
Expand Down Expand Up @@ -330,7 +331,7 @@ async def _decompressed_bytes(byte_progress: NonNegativeInt) -> None:
]
)
await _run_cli_command(
f"{_7ZIP_EXECUTABLE} {options} {archive_to_extract} -o{destination_folder}",
f"{_7ZIP_EXECUTABLE} {options} {shlex.quote(f'{archive_to_extract}')} -o{shlex.quote(f'{destination_folder}')}",
output_handler=_7ZipProgressParser(_decompressed_bytes).parse_chunk,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path

import pytest
from helpers import print_tree
from pydantic import NonNegativeInt
from servicelib.archiving_utils._interface_7zip import (
_7ZipProgressParser,
Expand Down Expand Up @@ -61,12 +62,19 @@ async def _progress_handler(byte_progress: NonNegativeInt) -> None:
assert sum(detected_entries) == expected_size


def _assert_same_folder_content(f1: Path, f2: Path) -> None:
in_f1 = {x.relative_to(f1) for x in f1.rglob("*")}
in_f2 = {x.relative_to(f2) for x in f2.rglob("*")}
assert in_f1 == in_f2


@pytest.mark.parametrize("compress", [True, False])
async def test_archive_unarchive(
mixed_file_types: Path, archive_path: Path, unpacked_archive: Path, compress: bool
):
await archive_dir(mixed_file_types, archive_path, compress=compress)
await unarchive_dir(archive_path, unpacked_archive)
_assert_same_folder_content(mixed_file_types, unpacked_archive)


@pytest.fixture
Expand All @@ -82,6 +90,7 @@ async def test_archive_unarchive_empty_folder(
):
await archive_dir(empty_folder, archive_path, compress=compress)
await unarchive_dir(archive_path, unpacked_archive)
_assert_same_folder_content(empty_folder, unpacked_archive)


@pytest.mark.parametrize(
Expand All @@ -102,3 +111,27 @@ def test__extract_file_names_from_archive(
archive_list_stdout_path.read_text()
files = _extract_file_names_from_archive(archive_list_stdout_path.read_text())
assert len(files) == expected_file_count


@pytest.mark.parametrize("compress", [True, False])
async def test_archive_unarchive_with_names_with_spaces(tmp_path: Path, compress: bool):
to_archive_path = tmp_path / "'source of files!a ads now strange'"
to_archive_path.mkdir(parents=True, exist_ok=True)
assert to_archive_path.exists()

# generate some content
for i in range(10):
(to_archive_path / f"f{i}.txt").write_text("*" * i)
print_tree(to_archive_path)

archive_path = tmp_path / "archived version herre!)!(/£)!'"
assert not archive_path.exists()

extracted_to_path = tmp_path / "this is where i want them to be extracted to''''"
extracted_to_path.mkdir(parents=True, exist_ok=True)
assert extracted_to_path.exists()

# source and destination all with spaces
await archive_dir(to_archive_path, archive_path, compress=compress)
await unarchive_dir(archive_path, extracted_to_path)
_assert_same_folder_content(to_archive_path, extracted_to_path)
Loading