Skip to content
71 changes: 71 additions & 0 deletions Lib/test/test_zipfile/test_core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import _pyio
import array
import contextlib
import importlib.util
Expand Down Expand Up @@ -3448,5 +3449,75 @@ def test_too_short(self):
b"zzz", zipfile._Extra.strip(b"zzz", (self.ZIP64_EXTRA,)))


class StatIO(_pyio.BytesIO):
"""Buffer which remembers the number of bytes that were read."""

def __init__(self):
super().__init__()
self.bytes_read = 0

def read(self, size=-1):
bs = super().read(size)
self.bytes_read += len(bs)
return bs


class StoredZipExtFileRandomReadTest(unittest.TestCase):
def test_random_read(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a docstring describing what this function is testing. Give an overview of what the problem is and what is being guaranteed by the test. Also, link to the reported issue, where the problem should be described in exquisite detail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this function seems too broad. It's not simply testing a random read, but it's also testing the case where it's following an optimized path for an uncompressed file. Consider test_stored_seek.


sio = StatIO()
# 20000 bytes
txt = b'0123456789' * 2000

# The seek length must be greater than ZipExtFile.MIN_READ_SIZE
# as `ZipExtFile._read2()` reads in blocks of this size and we
# need to seek out of the buffered data
min_size = zipfile.ZipExtFile.MIN_READ_SIZE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this variable "seek_length", since that's what's described in the preceding comment.

self.assertGreaterEqual(10002, min_size) # for forward seek test
self.assertGreaterEqual(5003, min_size) # for backward seek test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's set variables for these constants.

Suggested change
self.assertGreaterEqual(10002, min_size) # for forward seek test
self.assertGreaterEqual(5003, min_size) # for backward seek test
forward_seek = 10002
backward_seek = 5003
self.assertGreaterEqual(forward_seek, min_size)
self.assertGreaterEqual(backward_seek, min_size)

# The read length must be less than MIN_READ_SIZE, since we assume that
# only 1 block is read in the test.
self.assertGreaterEqual(min_size, 100) # for read() calls
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider setting a read_length variable to describe the 100. Then, in this assertion, use the same language. e.g.

Suggested change
# The read length must be less than MIN_READ_SIZE, since we assume that
# only 1 block is read in the test.
self.assertGreaterEqual(min_size, 100) # for read() calls
# Set read length less than MIN_READ_SIZE to ensure only 1 block is read.
read_length = 100
self.assertLessThan(read_length, min_size)


with zipfile.ZipFile(sio, "w", compression=zipfile.ZIP_STORED) as zipf:
zipf.writestr("foo.txt", txt)

# check random seek and read on a file
with zipfile.ZipFile(sio, "r") as zipf:
with zipf.open("foo.txt", "r") as fp:
# Test this optimized read hasn't rewound and read from the
# start of the file (as in the case of the unoptimized path)

# forward seek
old_count = sio.bytes_read
fp.seek(10002, os.SEEK_CUR)
self.assertEqual(fp.tell(), 10002)
self.assertEqual(fp._left, fp._compress_left)
arr = fp.read(100)
self.assertEqual(fp.tell(), 10102)
self.assertEqual(arr, txt[10002:10102])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-use the variables defined above.

Suggested change
arr = fp.read(100)
self.assertEqual(fp.tell(), 10102)
self.assertEqual(arr, txt[10002:10102])
arr = fp.read(read_length)
self.assertEqual(fp.tell(), forward_seek + read_length)
self.assertEqual(arr, txt[forward_seek:forward_seek + read_length])

self.assertEqual(fp._left, fp._compress_left)
d = sio.bytes_read - old_count
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a better name for d that indicates its meaning, or just inline it if it's only needed once and its meaning is inconsequential.

self.assertLessEqual(d, min_size)

# backward seek
old_count = sio.bytes_read
fp.seek(-5003, os.SEEK_CUR)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fp.seek(-5003, os.SEEK_CUR)
fp.seek(-backward_seek, os.SEEK_CUR)

self.assertEqual(fp.tell(), 5099) # 5099 = 10102 - 5003
self.assertEqual(fp._left, fp._compress_left)
arr = fp.read(100)
self.assertEqual(fp.tell(), 5199)
self.assertEqual(arr, txt[5099:5199])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.assertEqual(fp.tell(), 5099) # 5099 = 10102 - 5003
self.assertEqual(fp._left, fp._compress_left)
arr = fp.read(100)
self.assertEqual(fp.tell(), 5199)
self.assertEqual(arr, txt[5099:5199])
self.assertEqual(fp.tell(), forward_seek - backward_seek + read_length)
self.assertEqual(fp._left, fp._compress_left)
arr = fp.read(read_length)
backward_pos = forward_seek - backward_seek + read_length
self.assertEqual(fp.tell(), backward_pos + read_length)
self.assertEqual(arr, txt[backward_pos:backward_pos + read_length])

self.assertEqual(fp._left, fp._compress_left)
d = sio.bytes_read - old_count
self.assertLessEqual(d, min_size)

# eof flags test
fp.seek(0, os.SEEK_END)
self.assertTrue(fp._eof)
fp.seek(12345, os.SEEK_SET)
self.assertFalse(fp._eof)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it should be a separate test (or two). In fact, I'm not even sure I understand why this private flag is even relevant to the issue at hand. If it's not a separate test with a separate justification, can you explain why it's related to the issue at hand? If these are private attributes, what is the public effect that's being validated?

Copy link
Contributor Author

@5ec1cff 5ec1cff Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is to ensure that the eof flag is correctly updated after seeking to the end of the file and then seeking back.



if __name__ == "__main__":
unittest.main()
4 changes: 3 additions & 1 deletion Lib/zipfile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,13 +1162,15 @@ def seek(self, offset, whence=os.SEEK_SET):
self._offset = buff_offset
read_offset = 0
# Fast seek uncompressed unencrypted file
elif self._compress_type == ZIP_STORED and self._decrypter is None and read_offset > 0:
elif self._compress_type == ZIP_STORED and self._decrypter is None:
# disable CRC checking after first seeking - it would be invalid
self._expected_crc = None
# seek actual file taking already buffered data into account
read_offset -= len(self._readbuffer) - self._offset
self._fileobj.seek(read_offset, os.SEEK_CUR)
self._left -= read_offset
self._compress_left -= read_offset
self._eof = self._left <= 0
read_offset = 0
# flush read buffer
self._readbuffer = b''
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Completely support random access of uncompressed unencrypted read-only
zip files obtained by :meth:`ZipFile.open <zipfile.ZipFile.open>`.
Loading