diff --git a/Doc/library/zipfile.rst b/Doc/library/zipfile.rst index bf9136a2139112..78c906208c4191 100644 --- a/Doc/library/zipfile.rst +++ b/Doc/library/zipfile.rst @@ -45,6 +45,26 @@ The module defines the following items: not been enabled. +.. exception:: ZipStructuralError + + The error raised when ZIP file structure is invalid or inconsistent. + This includes issues like mismatched offsets, invalid sizes, + or structural inconsistencies between different parts of the archive. + This is a subclass of :exc:`BadZipFile`. + + .. versionadded:: next + + +.. exception:: ZipValidationError + + The error raised when ZIP file validation fails. + This includes CRC mismatches, compression validation failures, + or other data integrity issues. + This is a subclass of :exc:`BadZipFile`. + + .. versionadded:: next + + .. class:: ZipFile :noindex: @@ -144,6 +164,32 @@ The module defines the following items: .. versionadded:: 3.14 + +.. class:: ZipValidationLevel + + An :class:`~enum.IntEnum` for the ZIP file validation levels that can be + specified for the *strict_validation* parameter of :class:`ZipFile`. + + .. data:: ZipValidationLevel.BASIC + + Basic validation with magic number checks only (default behavior). + This provides backward compatibility with existing code. + + .. data:: ZipValidationLevel.STRUCTURAL + + Comprehensive structure validation including consistency checks between + different parts of the ZIP archive. This detects issues like mismatched + offsets, invalid sizes, entry count mismatches, and potential zip bombs + through compression ratio analysis. + + .. data:: ZipValidationLevel.STRICT + + Includes all structural validation plus CRC verification during file + reading and deep validation checks. This provides the highest level of + validation but may impact performance. + + .. versionadded:: next + .. note:: The ZIP file format specification has included support for bzip2 compression @@ -171,7 +217,7 @@ ZipFile Objects .. class:: ZipFile(file, mode='r', compression=ZIP_STORED, allowZip64=True, \ compresslevel=None, *, strict_timestamps=True, \ - metadata_encoding=None) + metadata_encoding=None, strict_validation=ZipValidationLevel.BASIC) Open a ZIP file, where *file* can be a path to a file (a string), a file-like object or a :term:`path-like object`. @@ -224,6 +270,23 @@ ZipFile Objects which will be used to decode metadata such as the names of members and ZIP comments. + The *strict_validation* parameter controls the level of validation performed + on the ZIP file structure. It can be set to one of the :class:`ZipValidationLevel` + values: + + * :data:`ZipValidationLevel.BASIC` (default): Performs only basic magic number + validation, maintaining backward compatibility with existing code. + * :data:`ZipValidationLevel.STRUCTURAL`: Enables comprehensive structure + validation including consistency checks between different parts of the ZIP + archive, entry count validation, compression ratio analysis for zip bomb + detection, and overlap detection. + * :data:`ZipValidationLevel.STRICT`: Includes all structural validation plus + CRC verification during file reading and additional deep validation checks. + + Higher validation levels provide better security against malformed or + malicious ZIP files but may impact performance and compatibility with some + malformed but readable archives. + If the file is created with mode ``'w'``, ``'x'`` or ``'a'`` and then :meth:`closed ` without adding any files to the archive, the appropriate ZIP structures for an empty archive will be written to the file. @@ -278,6 +341,10 @@ ZipFile Objects Added support for specifying member name encoding for reading metadata in the zipfile's directory and file headers. + .. versionchanged:: next + Added the *strict_validation* parameter for controlling ZIP file + structure validation levels. + .. method:: ZipFile.close() diff --git a/Lib/test/test_zipfile/test_validation.py b/Lib/test/test_zipfile/test_validation.py new file mode 100644 index 00000000000000..4dfb485e1ecaad --- /dev/null +++ b/Lib/test/test_zipfile/test_validation.py @@ -0,0 +1,240 @@ +""" +Test suite for zipfile validation features. +""" + +import io +import os +import struct +import tempfile +import unittest +import zipfile +from zipfile import ( + ZipFile, ZipValidationLevel, ZipStructuralError, ZipValidationError, + BadZipFile, sizeEndCentDir, stringEndArchive, structEndArchive, + sizeCentralDir, stringCentralDir, structCentralDir, + sizeFileHeader, stringFileHeader, structFileHeader, + _ECD_ENTRIES_TOTAL, _ECD_SIZE, _ECD_OFFSET, _ECD_COMMENT_SIZE +) +from test.support.os_helper import TESTFN, unlink + + +class TestZipValidation(unittest.TestCase): + """Test zipfile validation functionality.""" + + def setUp(self): + """Set up test fixtures.""" + self.temp_files = [] + + def tearDown(self): + """Clean up test fixtures.""" + for temp_file in self.temp_files: + try: + unlink(temp_file) + except OSError: + pass + + def create_temp_file(self, content=b''): + """Create a temporary file with given content.""" + fd, path = tempfile.mkstemp() + self.temp_files.append(path) + with os.fdopen(fd, 'wb') as f: + f.write(content) + return path + + def test_basic_validation_backward_compatibility(self): + """Test that basic validation mode maintains backward compatibility.""" + # Create a valid ZIP file + temp_path = self.create_temp_file() + with ZipFile(temp_path, 'w') as zf: + zf.writestr('test.txt', 'Hello, World!') + + # Test default behavior (should be BASIC validation) + with ZipFile(temp_path, 'r') as zf: + self.assertEqual(zf._strict_validation, ZipValidationLevel.BASIC) + self.assertEqual(zf.read('test.txt'), b'Hello, World!') + + # Test explicit BASIC validation + with ZipFile(temp_path, 'r', strict_validation=ZipValidationLevel.BASIC) as zf: + self.assertEqual(zf._strict_validation, ZipValidationLevel.BASIC) + self.assertEqual(zf.read('test.txt'), b'Hello, World!') + + def test_validation_level_enum(self): + """Test validation level enum values.""" + self.assertEqual(ZipValidationLevel.BASIC, 0) + self.assertEqual(ZipValidationLevel.STRUCTURAL, 1) + self.assertEqual(ZipValidationLevel.STRICT, 2) + + # Test enum conversion + self.assertEqual(ZipValidationLevel(0), ZipValidationLevel.BASIC) + self.assertEqual(ZipValidationLevel(1), ZipValidationLevel.STRUCTURAL) + self.assertEqual(ZipValidationLevel(2), ZipValidationLevel.STRICT) + + def test_structural_validation_valid_file(self): + """Test structural validation with a valid ZIP file.""" + temp_path = self.create_temp_file() + with ZipFile(temp_path, 'w') as zf: + zf.writestr('test.txt', 'Hello, World!') + zf.writestr('dir/nested.txt', 'Nested content') + + # Should pass structural validation + with ZipFile(temp_path, 'r', strict_validation=ZipValidationLevel.STRUCTURAL) as zf: + self.assertEqual(len(zf.filelist), 2) + self.assertEqual(zf.read('test.txt'), b'Hello, World!') + self.assertEqual(zf.read('dir/nested.txt'), b'Nested content') + + def test_strict_validation_valid_file(self): + """Test strict validation with a valid ZIP file.""" + temp_path = self.create_temp_file() + with ZipFile(temp_path, 'w') as zf: + zf.writestr('test.txt', 'Hello, World!') + + # Should pass strict validation + with ZipFile(temp_path, 'r', strict_validation=ZipValidationLevel.STRICT) as zf: + self.assertEqual(zf.read('test.txt'), b'Hello, World!') + + def test_malformed_eocd_too_many_entries(self): + """Test detection of EOCD with too many entries.""" + # Create a basic ZIP file first + temp_path = self.create_temp_file() + with ZipFile(temp_path, 'w') as zf: + zf.writestr('test.txt', 'Hello') + + # Read the file and modify the EOCD to claim too many entries + with open(temp_path, 'rb') as f: + data = bytearray(f.read()) + + # Find EOCD signature and modify entry count + eocd_pos = data.rfind(stringEndArchive) + if eocd_pos >= 0: + # Modify total entries field to exceed limit (65535 is max for H format) + struct.pack_into(' 1000 + with self.assertRaises(ZipStructuralError) as cm: + _validate_zipinfo_fields(zinfo, ZipValidationLevel.STRUCTURAL) + self.assertIn("Suspicious compression ratio", str(cm.exception)) + + def test_constructor_parameter_validation(self): + """Test validation of constructor parameters.""" + temp_path = self.create_temp_file() + with ZipFile(temp_path, 'w') as zf: + zf.writestr('test.txt', 'Hello') + + # Test invalid validation level + with self.assertRaises(ValueError): + ZipFile(temp_path, 'r', strict_validation=99) + + # Test valid validation levels + for level in [ZipValidationLevel.BASIC, ZipValidationLevel.STRUCTURAL, ZipValidationLevel.STRICT]: + with ZipFile(temp_path, 'r', strict_validation=level) as zf: + self.assertEqual(zf._strict_validation, level) + + +class TestValidationIntegration(unittest.TestCase): + """Test integration of validation with existing zipfile functionality.""" + + def setUp(self): + self.temp_files = [] + + def tearDown(self): + for temp_file in self.temp_files: + try: + unlink(temp_file) + except OSError: + pass + + def create_temp_file(self, content=b''): + fd, path = tempfile.mkstemp() + self.temp_files.append(path) + with os.fdopen(fd, 'wb') as f: + f.write(content) + return path + + def test_existing_methods_work_with_validation(self): + """Test that existing ZipFile methods work with validation enabled.""" + temp_path = self.create_temp_file() + with ZipFile(temp_path, 'w') as zf: + zf.writestr('test1.txt', 'Content 1') + zf.writestr('test2.txt', 'Content 2') + + with ZipFile(temp_path, 'r', strict_validation=ZipValidationLevel.STRUCTURAL) as zf: + # Test namelist + names = zf.namelist() + self.assertEqual(set(names), {'test1.txt', 'test2.txt'}) + + # Test infolist + infos = zf.infolist() + self.assertEqual(len(infos), 2) + + # Test getinfo + info = zf.getinfo('test1.txt') + self.assertEqual(info.filename, 'test1.txt') + + # Test read + content = zf.read('test1.txt') + self.assertEqual(content, b'Content 1') + + # Test testzip + result = zf.testzip() + self.assertIsNone(result) # No errors + + def test_validation_with_different_compression_methods(self): + """Test validation works with different compression methods.""" + temp_path = self.create_temp_file() + with ZipFile(temp_path, 'w') as zf: + # Test different compression methods + zf.writestr('stored.txt', 'Stored content', compress_type=zipfile.ZIP_STORED) + try: + import zlib + zf.writestr('deflated.txt', 'Deflated content', compress_type=zipfile.ZIP_DEFLATED) + has_zlib = True + except ImportError: + has_zlib = False + + # Should work with structural validation + with ZipFile(temp_path, 'r', strict_validation=ZipValidationLevel.STRUCTURAL) as zf: + self.assertEqual(zf.read('stored.txt'), b'Stored content') + if has_zlib: + self.assertEqual(zf.read('deflated.txt'), b'Deflated content') + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/zipfile/__init__.py b/Lib/zipfile/__init__.py index 18caeb3e04a2b5..4e7d1107c56349 100644 --- a/Lib/zipfile/__init__.py +++ b/Lib/zipfile/__init__.py @@ -13,6 +13,7 @@ import sys import threading import time +from enum import IntEnum try: import zlib # We may need its compression method @@ -39,7 +40,8 @@ __all__ = ["BadZipFile", "BadZipfile", "error", "ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA", "ZIP_ZSTANDARD", "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", - "LargeZipFile", "Path"] + "LargeZipFile", "Path", "ZipValidationLevel", "ZipStructuralError", + "ZipValidationError"] class BadZipFile(Exception): pass @@ -51,6 +53,36 @@ class LargeZipFile(Exception): and those extensions are disabled. """ + +class ZipStructuralError(BadZipFile): + """ + Raised when ZIP file structure is invalid or inconsistent. + This includes issues like mismatched offsets, invalid sizes, + or structural inconsistencies between different parts of the archive. + """ + + +class ZipValidationError(BadZipFile): + """ + Raised when ZIP file validation fails. + This includes CRC mismatches, compression validation failures, + or other data integrity issues. + """ + + +class ZipValidationLevel(IntEnum): + """ + ZIP file validation levels for controlling structural validation. + + BASIC: Only basic magic number validation (default, backward compatible) + STRUCTURAL: Comprehensive structure validation including consistency checks + STRICT: Includes CRC verification and deep validation + """ + BASIC = 0 + STRUCTURAL = 1 + STRICT = 2 + + error = BadZipfile = BadZipFile # Pre-3.2 compatibility names @@ -58,6 +90,10 @@ class LargeZipFile(Exception): ZIP_FILECOUNT_LIMIT = (1 << 16) - 1 ZIP_MAX_COMMENT = (1 << 16) - 1 +# Validation limits +MAX_COMPRESSION_RATIO = 1000 # Detect zip bombs +MAX_REASONABLE_FILE_SIZE = 1 << 40 # 1TB limit + # constants for Zip file compression methods ZIP_STORED = 0 ZIP_DEFLATED = 8 @@ -388,6 +424,167 @@ def _EndRecData(fpin): # Unable to find a valid end of central directory structure return None + +def _validate_eocd_consistency(endrec, filesize, concat, strict_level): + """Validate End of Central Directory record consistency.""" + if strict_level < ZipValidationLevel.STRUCTURAL: + return + + # Validate field ranges + total_entries = endrec[_ECD_ENTRIES_TOTAL] + cd_size = endrec[_ECD_SIZE] + cd_offset = endrec[_ECD_OFFSET] + comment_size = endrec[_ECD_COMMENT_SIZE] + + # Calculate actual central directory position accounting for prepended data + actual_cd_start = cd_offset + concat + + # Check bounds - central directory should not extend beyond file end + if actual_cd_start + cd_size > filesize: + raise ZipStructuralError("Central directory extends beyond file end") + + if total_entries > ZIP_FILECOUNT_LIMIT: + raise ZipStructuralError(f"Too many entries: {total_entries}") + + if cd_size > filesize: + raise ZipStructuralError("Central directory size exceeds file size") + + # Validate comment consistency + if comment_size > ZIP_MAX_COMMENT: + raise ZipStructuralError(f"Comment too large: {comment_size}") + + # Check for reasonable central directory offset (before adding concat) + if cd_offset < 0: + raise ZipStructuralError(f"Invalid central directory offset: {cd_offset}") + + # Check that actual start position is reasonable + if actual_cd_start < 0: + raise ZipStructuralError(f"Invalid actual central directory start: {actual_cd_start}") + + +def _validate_zipinfo_fields(zinfo, strict_level): + """Validate individual ZipInfo entry fields.""" + if strict_level < ZipValidationLevel.STRUCTURAL: + return + + # Check for reasonable file sizes + if zinfo.file_size > MAX_REASONABLE_FILE_SIZE: + raise ZipStructuralError(f"File too large: {zinfo.filename}") + + # Check compression ratio to detect zip bombs + if (zinfo.compress_size > 0 and + zinfo.file_size / zinfo.compress_size > MAX_COMPRESSION_RATIO): + raise ZipStructuralError( + f"Suspicious compression ratio in {zinfo.filename}") + + # Validate header offset bounds + if zinfo.header_offset < 0: + raise ZipStructuralError(f"Invalid header offset: {zinfo.header_offset}") + + # Validate compression method + if zinfo.compress_type not in (ZIP_STORED, ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA, ZIP_ZSTANDARD): + if strict_level >= ZipValidationLevel.STRICT: + raise ZipStructuralError(f"Unknown compression method: {zinfo.compress_type}") + + +def _validate_central_directory_integrity(endrec, filelist, strict_level): + """Validate central directory structure and entries.""" + if strict_level < ZipValidationLevel.STRUCTURAL: + return + + cd_entries_claimed = endrec[_ECD_ENTRIES_TOTAL] + cd_entries_actual = len(filelist) + + # Validate entry count consistency + if cd_entries_claimed != cd_entries_actual: + raise ZipStructuralError( + f"Entry count mismatch: claimed {cd_entries_claimed}, " + f"found {cd_entries_actual}") + + # Validate each entry + for zinfo in filelist: + _validate_zipinfo_fields(zinfo, strict_level) + + +def _validate_archive_structure(filelist, endrec, strict_level): + """Validate overall archive structure and detect overlaps.""" + if strict_level < ZipValidationLevel.STRUCTURAL: + return + + if not filelist: + return + + # Sort entries by header offset to check for overlaps + sorted_entries = sorted(filelist, key=lambda z: z.header_offset) + + # Check for overlapping entries (more comprehensive than existing check) + for i, zinfo in enumerate(sorted_entries[:-1]): + next_zinfo = sorted_entries[i + 1] + + # Calculate where this entry's data should end + entry_end = (zinfo.header_offset + sizeFileHeader + + len(zinfo.filename.encode('utf-8')) + + len(zinfo.extra) + zinfo.compress_size) + + if entry_end > next_zinfo.header_offset: + raise ZipStructuralError( + f"Overlapping entries detected: {zinfo.filename} and {next_zinfo.filename}") + + # Validate that entries don't overlap with central directory + cd_start = endrec[_ECD_OFFSET] + for zinfo in filelist: + entry_end = (zinfo.header_offset + sizeFileHeader + + len(zinfo.filename.encode('utf-8')) + + len(zinfo.extra) + zinfo.compress_size) + if entry_end > cd_start: + raise ZipStructuralError( + f"Entry {zinfo.filename} overlaps with central directory") + + +def _validate_local_header_consistency(fheader, fname, zinfo, strict_level): + """Validate local header matches central directory entry.""" + if strict_level < ZipValidationLevel.STRUCTURAL: + return + + # Validate filename consistency + fname_str = fname.decode('utf-8' if fheader[_FH_GENERAL_PURPOSE_FLAG_BITS] & _MASK_UTF_FILENAME else 'cp437') + if fname_str != zinfo.orig_filename: + raise ZipStructuralError( + f'File name in directory {zinfo.orig_filename!r} and header {fname_str!r} differ.') + + # Validate compression method consistency + if fheader[_FH_COMPRESSION_METHOD] != zinfo.compress_type: + raise ZipStructuralError( + f"Compression method mismatch for {zinfo.filename}: " + f"central dir={zinfo.compress_type}, local header={fheader[_FH_COMPRESSION_METHOD]}") + + # If not using data descriptor, validate sizes match + if not (zinfo.flag_bits & _MASK_USE_DATA_DESCRIPTOR): + if fheader[_FH_COMPRESSED_SIZE] != zinfo.compress_size: + raise ZipStructuralError( + f"Compressed size mismatch for {zinfo.filename}: " + f"central dir={zinfo.compress_size}, local header={fheader[_FH_COMPRESSED_SIZE]}") + + if fheader[_FH_UNCOMPRESSED_SIZE] != zinfo.file_size: + raise ZipStructuralError( + f"Uncompressed size mismatch for {zinfo.filename}: " + f"central dir={zinfo.file_size}, local header={fheader[_FH_UNCOMPRESSED_SIZE]}") + + if fheader[_FH_CRC] != zinfo.CRC: + raise ZipStructuralError( + f"CRC mismatch for {zinfo.filename}: " + f"central dir={zinfo.CRC}, local header={fheader[_FH_CRC]}") + + +def _enable_strict_crc_validation(zext_file, strict_level): + """Enable CRC validation for strict mode, even when normally disabled.""" + if strict_level >= ZipValidationLevel.STRICT: + # Ensure CRC checking is enabled even for seekable files + if hasattr(zext_file, '_expected_crc') and zext_file._expected_crc is not None: + # Reset CRC state to ensure validation occurs + zext_file._running_crc = crc32(b'') + + def _sanitize_filename(filename): """Terminate the file name at the first null byte and ensure paths always use forward slashes as the directory separator.""" @@ -1372,7 +1569,8 @@ class ZipFile: """ Class with methods to open, read, write, close, list zip files. z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=True, - compresslevel=None) + compresslevel=None, *, strict_timestamps=True, metadata_encoding=None, + strict_validation=ZipValidationLevel.BASIC) file: Either the path to the file, or a file-like object. If it is a path, the file will be opened and closed by ZipFile. @@ -1392,6 +1590,13 @@ class ZipFile: When using ZIP_ZSTANDARD integers -7 though 22 are common, see the CompressionParameter enum in compression.zstd for details. + strict_validation: Controls ZIP file structure validation level. + ZipValidationLevel.BASIC (default): Basic magic number checks only + ZipValidationLevel.STRUCTURAL: Comprehensive structure validation + ZipValidationLevel.STRICT: Includes CRC verification and deep checks + + Note: Higher validation levels provide better security but may impact + performance and compatibility with some malformed but readable ZIP files. """ @@ -1399,7 +1604,8 @@ class ZipFile: _windows_illegal_name_trans_table = None def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True, - compresslevel=None, *, strict_timestamps=True, metadata_encoding=None): + compresslevel=None, *, strict_timestamps=True, metadata_encoding=None, + strict_validation=ZipValidationLevel.BASIC): """Open the ZIP file with mode read 'r', write 'w', exclusive create 'x', or append 'a'.""" if mode not in ('r', 'w', 'x', 'a'): @@ -1419,6 +1625,7 @@ def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True, self._comment = b'' self._strict_timestamps = strict_timestamps self.metadata_encoding = metadata_encoding + self._strict_validation = ZipValidationLevel(strict_validation) # Check that we don't try to write with nonconforming codecs if self.metadata_encoding and mode != 'r': @@ -1599,6 +1806,23 @@ def _RealGetContents(self): zinfo._end_offset = end_offset end_offset = zinfo.header_offset + # Perform validation if requested + if self._strict_validation >= ZipValidationLevel.STRUCTURAL: + try: + # Use the original file pointer to get actual file size + original_pos = self.fp.tell() + self.fp.seek(0, 2) # Go to end of file + filesize = self.fp.tell() + self.fp.seek(original_pos) # Restore position + _validate_eocd_consistency(endrec, filesize, concat, self._strict_validation) + _validate_central_directory_integrity(endrec, self.filelist, self._strict_validation) + _validate_archive_structure(self.filelist, endrec, self._strict_validation) + except Exception as e: + if isinstance(e, (ZipStructuralError, ZipValidationError)): + raise + # Convert other validation errors to structural errors + raise ZipStructuralError(f"Validation failed: {e}") from e + @property def data_offset(self): """The offset to the start of zip data in the file or None if @@ -1761,6 +1985,10 @@ def open(self, name, mode="r", pwd=None, *, force_zip64=False): 'File name in directory %r and header %r differ.' % (zinfo.orig_filename, fname)) + # Perform additional local header validation if requested + if self._strict_validation >= ZipValidationLevel.STRUCTURAL: + _validate_local_header_consistency(fheader, fname, zinfo, self._strict_validation) + if (zinfo._end_offset is not None and zef_file.tell() + zinfo.compress_size > zinfo._end_offset): if zinfo._end_offset == zinfo.header_offset: @@ -1787,7 +2015,9 @@ def open(self, name, mode="r", pwd=None, *, force_zip64=False): else: pwd = None - return ZipExtFile(zef_file, mode + 'b', zinfo, pwd, True) + zip_ext_file = ZipExtFile(zef_file, mode + 'b', zinfo, pwd, True) + _enable_strict_crc_validation(zip_ext_file, self._strict_validation) + return zip_ext_file except: zef_file.close() raise