From d1acae3f7054c0ce67a9eadc8d32ea5d4544d3d2 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 16 Jan 2025 14:11:12 +0100 Subject: [PATCH] adde escaping support --- .../archiving_utils/_interface_7zip.py | 7 ++-- .../test_archiving__interface_7zip.py | 33 +++++++++++++++++++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py b/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py index 483fefd26e2..c14b25726a5 100644 --- a/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py +++ b/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py @@ -3,6 +3,7 @@ import logging import os import re +import shlex from collections.abc import Awaitable, Callable from contextlib import AsyncExitStack from pathlib import Path @@ -214,7 +215,7 @@ async def archive_dir( "-mta=off", # Don't store file access time ] ) - command = f"{_7ZIP_EXECUTABLE} {options} {destination} {dir_to_compress}/*" + command = f"{_7ZIP_EXECUTABLE} {options} {shlex.quote(f'{destination}')} {shlex.quote(f'{dir_to_compress}')}/*" folder_size_bytes = sum( file.stat().st_size for file in iter_files_to_compress(dir_to_compress) @@ -295,7 +296,7 @@ async def unarchive_dir( # get archive information archive_info_parser = _7ZipArchiveInfoParser() list_output = await _run_cli_command( - f"{_7ZIP_EXECUTABLE} l {archive_to_extract}", + f"{_7ZIP_EXECUTABLE} l {shlex.quote(f'{archive_to_extract}')}", output_handler=archive_info_parser.parse_chunk, ) file_names_in_archive = _extract_file_names_from_archive(list_output) @@ -330,7 +331,7 @@ async def _decompressed_bytes(byte_progress: NonNegativeInt) -> None: ] ) await _run_cli_command( - f"{_7ZIP_EXECUTABLE} {options} {archive_to_extract} -o{destination_folder}", + f"{_7ZIP_EXECUTABLE} {options} {shlex.quote(f'{archive_to_extract}')} -o{shlex.quote(f'{destination_folder}')}", output_handler=_7ZipProgressParser(_decompressed_bytes).parse_chunk, ) diff --git a/packages/service-library/tests/archiving_utils/test_archiving__interface_7zip.py b/packages/service-library/tests/archiving_utils/test_archiving__interface_7zip.py index 5d49c14fe3b..da6ba3260c2 100644 --- a/packages/service-library/tests/archiving_utils/test_archiving__interface_7zip.py +++ b/packages/service-library/tests/archiving_utils/test_archiving__interface_7zip.py @@ -5,6 +5,7 @@ from pathlib import Path import pytest +from helpers import print_tree from pydantic import NonNegativeInt from servicelib.archiving_utils._interface_7zip import ( _7ZipProgressParser, @@ -61,12 +62,19 @@ async def _progress_handler(byte_progress: NonNegativeInt) -> None: assert sum(detected_entries) == expected_size +def _assert_same_folder_content(f1: Path, f2: Path) -> None: + in_f1 = {x.relative_to(f1) for x in f1.rglob("*")} + in_f2 = {x.relative_to(f2) for x in f2.rglob("*")} + assert in_f1 == in_f2 + + @pytest.mark.parametrize("compress", [True, False]) async def test_archive_unarchive( mixed_file_types: Path, archive_path: Path, unpacked_archive: Path, compress: bool ): await archive_dir(mixed_file_types, archive_path, compress=compress) await unarchive_dir(archive_path, unpacked_archive) + _assert_same_folder_content(mixed_file_types, unpacked_archive) @pytest.fixture @@ -82,6 +90,7 @@ async def test_archive_unarchive_empty_folder( ): await archive_dir(empty_folder, archive_path, compress=compress) await unarchive_dir(archive_path, unpacked_archive) + _assert_same_folder_content(empty_folder, unpacked_archive) @pytest.mark.parametrize( @@ -102,3 +111,27 @@ def test__extract_file_names_from_archive( archive_list_stdout_path.read_text() files = _extract_file_names_from_archive(archive_list_stdout_path.read_text()) assert len(files) == expected_file_count + + +@pytest.mark.parametrize("compress", [True, False]) +async def test_archive_unarchive_with_names_with_spaces(tmp_path: Path, compress: bool): + to_archive_path = tmp_path / "'source of files!a ads now strange'" + to_archive_path.mkdir(parents=True, exist_ok=True) + assert to_archive_path.exists() + + # generate some content + for i in range(10): + (to_archive_path / f"f{i}.txt").write_text("*" * i) + print_tree(to_archive_path) + + archive_path = tmp_path / "archived version herre!)!(/£)!'" + assert not archive_path.exists() + + extracted_to_path = tmp_path / "this is where i want them to be extracted to''''" + extracted_to_path.mkdir(parents=True, exist_ok=True) + assert extracted_to_path.exists() + + # source and destination all with spaces + await archive_dir(to_archive_path, archive_path, compress=compress) + await unarchive_dir(archive_path, extracted_to_path) + _assert_same_folder_content(to_archive_path, extracted_to_path)