|
1 | 1 | import zipfile |
2 | 2 |
|
3 | 3 | from pathlib import Path |
4 | | -from typing import List |
| 4 | +from typing import List, Set |
5 | 5 |
|
6 | 6 | from launchpad.utils.file_utils import cleanup_directory, create_temp_directory |
7 | 7 | from launchpad.utils.logging import get_logger |
|
11 | 11 | DEFAULT_MAX_FILE_COUNT = 100000 |
12 | 12 | DEFAULT_MAX_UNCOMPRESSED_SIZE = 10 * 1024 * 1024 * 1024 |
13 | 13 |
|
| 14 | +# Compression method constants |
| 15 | +COMPRESSION_ZSTD = 93 # Zstandard compression method |
| 16 | + |
14 | 17 |
|
15 | 18 | class UnreasonableZipError(ValueError): |
16 | 19 | """Raised when a zip file exceeds reasonable limits.""" |
@@ -92,13 +95,43 @@ def extract_to_temp_directory(self) -> Path: |
92 | 95 | self._temp_dirs.append(temp_dir) |
93 | 96 |
|
94 | 97 | self._safe_extract(str(self.path), str(temp_dir)) |
95 | | - logger.debug(f"Extracted zip contents to {temp_dir} using system unzip") |
| 98 | + logger.debug(f"Extracted zip contents to {temp_dir}") |
96 | 99 |
|
97 | 100 | return temp_dir |
98 | 101 |
|
| 102 | + def _detect_compression_methods(self, zip_path: str) -> Set[int]: |
| 103 | + """Detect compression methods used in the zip file. |
| 104 | +
|
| 105 | + Args: |
| 106 | + zip_path: Path to the zip file |
| 107 | +
|
| 108 | + Returns: |
| 109 | + Set of compression method integers used in the zip file |
| 110 | + """ |
| 111 | + with zipfile.ZipFile(zip_path, "r") as zf: |
| 112 | + return {info.compress_type for info in zf.infolist()} |
| 113 | + |
99 | 114 | def _safe_extract(self, zip_path: str, extract_path: str): |
100 | | - """Extract the zip contents to a temporary directory, ensuring that the paths are safe from path traversal attacks.""" |
| 115 | + """Extract the zip contents to a temporary directory, ensuring that the paths are safe from path traversal attacks. |
| 116 | +
|
| 117 | + Supports both standard compression methods and Zstandard compression. |
| 118 | + """ |
101 | 119 | base_dir = Path(extract_path) |
| 120 | + |
| 121 | + # Detect if zstandard compression is used |
| 122 | + compression_methods = self._detect_compression_methods(zip_path) |
| 123 | + uses_zstd = COMPRESSION_ZSTD in compression_methods |
| 124 | + |
| 125 | + if uses_zstd: |
| 126 | + logger.debug("Detected Zstandard compression in zip file") |
| 127 | + try: |
| 128 | + import zipfile_zstd # noqa: F401 |
| 129 | + except ImportError: |
| 130 | + raise RuntimeError( |
| 131 | + "Zstandard-compressed zip file detected, but zipfile-zstd package is not installed. " |
| 132 | + "Install it with: pip install zipfile-zstd" |
| 133 | + ) |
| 134 | + |
102 | 135 | with zipfile.ZipFile(zip_path, "r") as zip_ref: |
103 | 136 | check_reasonable_zip(zip_ref) |
104 | 137 | for member in zip_ref.namelist(): |
|
0 commit comments