diff --git a/Lib/test/test_zipfile/test_core.py b/Lib/test/test_zipfile/test_core.py index 7c8a82d821a020..ad50bb38158f89 100644 --- a/Lib/test/test_zipfile/test_core.py +++ b/Lib/test/test_zipfile/test_core.py @@ -14,6 +14,7 @@ import unittest import unittest.mock as mock import zipfile +import filecmp from tempfile import TemporaryFile @@ -610,14 +611,15 @@ def test_io_on_closed_zipextfile(self): self.assertRaises(ValueError, fid.seek, 0) self.assertRaises(ValueError, fid.tell) - def test_write_to_readonly(self): - """Check that trying to call write() on a readonly ZipFile object + def test_writing_to_readonly(self): + """Check that trying to write to a readonly ZipFile object raises a ValueError.""" with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: zipfp.writestr("somefile.txt", "bogus") with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: self.assertRaises(ValueError, zipfp.write, TESTFN) + self.assertRaises(ValueError, zipfp.writestr, TESTFN, "data") with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: with self.assertRaises(ValueError): @@ -2057,6 +2059,7 @@ def test_closed_zip_raises_ValueError(self): # and report that the first file in the archive was corrupt. self.assertRaises(ValueError, zipf.read, "foo.txt") self.assertRaises(ValueError, zipf.open, "foo.txt") + self.assertRaises(ValueError, zipf.open, "foo.txt", "w") self.assertRaises(ValueError, zipf.testzip) self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") with open(TESTFN, 'w', encoding='utf-8') as f: @@ -2307,6 +2310,16 @@ def test_zipfile_with_short_extra_field(self): # testzip returns the name of the first corrupt file, or None self.assertIsNone(zipf.testzip()) + def test_open_for_write_issues_exception_when_pwd_provided(self): + with zipfile.ZipFile(TESTFN2, 'w') as zipf: + with self.assertRaises(ValueError): + zipf.open("foo.txt", mode='w', pwd="password") + + def test_open_for_write_issues_exception_when_force_zip_not_allowed(self): + with zipfile.ZipFile(TESTFN2, 'w', allowZip64=False) as zipf: + with self.assertRaises(ValueError): + zipf.open("foo.txt", mode='w', force_zip64=True) + def test_open_conflicting_handles(self): # It's only possible to open one writable file handle at a time msg1 = b"It's fun to charter an accountant!" @@ -2320,6 +2333,8 @@ def test_open_conflicting_handles(self): zipf.open('handle', mode='w') with self.assertRaises(ValueError): zipf.open('foo', mode='r') + with self.assertRaises(ValueError): + zipf.read('foo') with self.assertRaises(ValueError): zipf.writestr('str', 'abcde') with self.assertRaises(ValueError): @@ -2670,12 +2685,11 @@ class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): b'\x00>\x00\x00\x00\x00\x00') -class DecryptionTests(unittest.TestCase): - """Check that ZIP decryption works. Since the library does not - support encryption at the moment, we use a pre-generated encrypted - ZIP file.""" +class EncryptedFiles: + """ Since the library does not support encryption at the moment, + we use pre-generated encrypted ZIP files.""" - data = ( + encrypted_zip1_data = ( b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' @@ -2683,7 +2697,10 @@ class DecryptionTests(unittest.TestCase): b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' b'\x00\x00L\x00\x00\x00\x00\x00' ) - data2 = ( + + zip1_filename = "test.txt" + + encrypted_zip2_data = ( b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' b'\x00\xe8\x03\xe8\x03\xc7 ZIP64_LIMIT: @@ -1322,7 +1345,6 @@ def close(self): self._zipfile._writing = False - class ZipFile: """ Class with methods to open, read, write, close, list zip files. @@ -1663,21 +1685,43 @@ def open(self, name, mode="r", pwd=None, *, force_zip64=False): raise ValueError( "Attempt to use ZIP archive that was already closed") - # Make sure we have an info object + if mode == 'w': + return self._open_to_write(name, force_zip64=force_zip64) + if mode == "r": + return self._open_to_read(name, pwd) + + def _raise_if_cannot_write(self, force_zip64): + if not self.fp: + raise ValueError( + "Attempt to use ZIP archive that was already closed") + if force_zip64 and not self._allowZip64: + raise ValueError( + "force_zip64 is True, but allowZip64 was False when opening " + "the ZIP file." + ) + if self._writing: + raise ValueError("Can't write to the ZIP file while there is " + "another write handle open on it. " + "Close the first handle before opening another.") + + def _setup_for_writing(self, zinfo, is_zip64): + if self._seekable: + self.fp.seek(self.start_dir) + zinfo.header_offset = self.fp.tell() + + self._writecheck(zinfo) + self._didModify = True + + self.fp.write(zinfo.FileHeader(is_zip64)) + + self._writing = True + + def _open_to_read(self, name, pwd=None, *, decompress=True): if isinstance(name, ZipInfo): - # 'name' is already an info object zinfo = name - elif mode == 'w': - zinfo = ZipInfo(name) - zinfo.compress_type = self.compression - zinfo.compress_level = self.compresslevel else: - # Get info object for name zinfo = self.getinfo(name) - if mode == 'w': - return self._open_to_write(zinfo, force_zip64=force_zip64) - if self._writing: raise ValueError("Can't read from the ZIP file while there " "is an open writing handle on it. " @@ -1734,7 +1778,7 @@ def open(self, name, mode="r", pwd=None, *, force_zip64=False): # check for encrypted flag & handle password is_encrypted = zinfo.flag_bits & _MASK_ENCRYPTED - if is_encrypted: + if is_encrypted and decompress: if not pwd: pwd = self.pwd if pwd and not isinstance(pwd, bytes): @@ -1745,21 +1789,30 @@ def open(self, name, mode="r", pwd=None, *, force_zip64=False): else: pwd = None - return ZipExtFile(zef_file, mode + 'b', zinfo, pwd, True) + return ZipExtFile(zef_file, 'rb', zinfo, pwd, True, decompress) except: zef_file.close() raise - def _open_to_write(self, zinfo, force_zip64=False): - if force_zip64 and not self._allowZip64: - raise ValueError( - "force_zip64 is True, but allowZip64 was False when opening " - "the ZIP file." - ) - if self._writing: - raise ValueError("Can't write to the ZIP file while there is " - "another write handle open on it. " - "Close the first handle before opening another.") + def _open_to_write_precompressed(self, zinfo, force_zip64=False): + self._raise_if_cannot_write(force_zip64) + + zip64 = force_zip64 or (zinfo.compress_size > ZIP64_LIMIT) + if not self._allowZip64 and zip64: + raise LargeZipFile("Filesize would require ZIP64 extensions") + + self._setup_for_writing(zinfo, zip64) + return _ZipWriteFile(self, zinfo, zip64, precompressed=True) + + def _open_to_write(self, name, force_zip64=False): + self._raise_if_cannot_write(force_zip64) + + if isinstance(name, ZipInfo): + zinfo = name + else: + zinfo = ZipInfo(name) + zinfo.compress_type = self.compression + zinfo._compresslevel = self.compresslevel # Size and CRC are overwritten with correct data after processing the file zinfo.compress_size = 0 @@ -1780,16 +1833,7 @@ def _open_to_write(self, zinfo, force_zip64=False): if not self._allowZip64 and zip64: raise LargeZipFile("Filesize would require ZIP64 extensions") - if self._seekable: - self.fp.seek(self.start_dir) - zinfo.header_offset = self.fp.tell() - - self._writecheck(zinfo) - self._didModify = True - - self.fp.write(zinfo.FileHeader(zip64)) - - self._writing = True + self._setup_for_writing(zinfo, zip64) return _ZipWriteFile(self, zinfo, zip64) def extract(self, member, path=None, pwd=None): @@ -1911,6 +1955,25 @@ def _writecheck(self, zinfo): raise LargeZipFile(requires_zip64 + " would require ZIP64 extensions") + def _raise_if_archive_not_available_for_writing(self): + if not self.fp: + raise ValueError( + "Attempt to write to ZIP archive that was already closed") + if self._writing: + raise ValueError( + "Can't write to ZIP archive while an open writing handle exists" + ) + + def _write_precompressed(self, zinfo, file_contents): + self._raise_if_archive_not_available_for_writing() + + if zinfo.is_dir(): + self.mkdir(zinfo) + return + + with self._open_to_write_precompressed(zinfo) as destination: + shutil.copyfileobj(file_contents, destination, 1024*8) + def write(self, filename, arcname=None, compress_type=None, compresslevel=None): """Put the bytes from filename into the archive under the name @@ -2012,6 +2075,37 @@ def mkdir(self, zinfo_or_directory_name, mode=511): self.fp.write(zinfo.FileHeader(False)) self.start_dir = self.fp.tell() + def _copy_file(self, source_zipfile, source_zinfo): + with source_zipfile._open_to_read(source_zinfo, 'r', + decompress=False) as source_file_contents: + self._write_precompressed(source_zinfo, source_file_contents) + + def copy_files(self, source_zipfile, files=None): + """ + copy files from the source_zipfile to the ZIP archive. This method + copies the files as is, avoiding the overhead of decompressing and + compressing the data again. + + source_zipfile is a string of the path of the ZIP archive to copy from. + + files is an iterable of strings, where each string is a file name found + within the source_zipfile. If files=None, all files from source_zipfile + are copied. + """ + if self.mode not in ('w', 'x', 'a'): + raise ValueError( + "copy_files() requires mode 'w', 'x', or 'a', but " + f"mode is '{self.mode}'") + + with ZipFile(source_zipfile, 'r') as source: + if files is None: + zinfos = source.infolist() + else: + zinfos = [source.getinfo(file) for file in files] + + for zinfo in zinfos: + self._copy_file(source, zinfo) + def __del__(self): """Call the "close()" method in case the user forgot.""" self.close() diff --git a/Misc/NEWS.d/next/Library/2024-10-21-00-08-11.gh-issue-113924.IuVDzq.rst b/Misc/NEWS.d/next/Library/2024-10-21-00-08-11.gh-issue-113924.IuVDzq.rst new file mode 100644 index 00000000000000..0b657c30c53e75 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-10-21-00-08-11.gh-issue-113924.IuVDzq.rst @@ -0,0 +1 @@ +Added `zipfile.ZipFile.copy_files` to allow copying from a zipfile.