diff --git a/CHANGELOG.md b/CHANGELOG.md index 4557f27..9fc7aea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,12 @@ All notable changes to this project will be documented in this file. ## Unreleased - Drop support for Python 3.9 and below +- Build free-threaded wheels for CPython 3.13 and 3.14, even if the feature is not supported +- Refactor SeekableZstdFile to make it independent of ZstdFile ## 0.18.0 (October 5, 2025) - Support for Python 3.14 -- Build free-threaded wheels for CPython 3.13 and 3.14, even if the feature is not supported - Deprecate the `read_size` and `write_size` parameters of `ZstdFile` and `SeekableZstdFile` - Deprecate `richmem_compress` and `RichMemZstdCompressor` - Rework documentation to suggest using `compression.zstd` from Python stdlib, and provide a migration guide diff --git a/src/_seekable_zstdfile.py b/src/_seekable_zstdfile.py index 41dece6..7998c96 100644 --- a/src/_seekable_zstdfile.py +++ b/src/_seekable_zstdfile.py @@ -1,15 +1,22 @@ from array import array from bisect import bisect_right +import io +from os import PathLike from os.path import isfile from struct import Struct -from warnings import warn +import warnings -from pyzstd._zstdfile import _ZstdDecompressReader, ZstdFile, \ - _MODE_CLOSED, _MODE_READ, _MODE_WRITE, \ - PathLike, io, _DEPRECATED_PLACEHOLDER +from pyzstd import _ZSTD_DStreamSizes, ZstdCompressor, ZstdDecompressor +from pyzstd._zstdfile import _DEPRECATED_PLACEHOLDER __all__ = ('SeekableFormatError', 'SeekableZstdFile') +_ZSTD_DStreamOutSize = _ZSTD_DStreamSizes[1] + +_MODE_CLOSED = 0 +_MODE_READ = 1 +_MODE_WRITE = 2 + class SeekableFormatError(Exception): 'An error related to Zstandard Seekable Format.' def __init__(self, msg): @@ -246,12 +253,13 @@ def write_seek_table(self, fp): # Exceeded format limit if self._frames_count > 0xFFFFFFFF: # Emit a warning - warn(('SeekableZstdFile\'s seek table has %d entries, ' - 'which exceeds the maximal value allowed by ' - 'Zstandard Seekable Format (0xFFFFFFFF). The ' - 'entries will be merged into 0xFFFFFFFF entries, ' - 'this may reduce seeking performance.') % self._frames_count, - RuntimeWarning, 3) + warnings.warn( + ('SeekableZstdFile\'s seek table has %d entries, ' + 'which exceeds the maximal value allowed by ' + 'Zstandard Seekable Format (0xFFFFFFFF). The ' + 'entries will be merged into 0xFFFFFFFF entries, ' + 'this may reduce seeking performance.') % self._frames_count, + RuntimeWarning, 3) # Merge frames self._merge_frames(0xFFFFFFFF) @@ -293,7 +301,12 @@ def get_info(self): self._full_c_size, self._full_d_size) -class _SeekableDecompressReader(_ZstdDecompressReader): + +class _EOFSuccess(EOFError): + pass + + +class _SeekableDecompressReader(io.RawIOBase): def __init__(self, fp, zstd_dict, option, read_size): # Check fp readable/seekable if not hasattr(fp, 'readable') or not hasattr(fp, "seekable"): @@ -310,22 +323,69 @@ def __init__(self, fp, zstd_dict, option, read_size): "be seekable. If the file object is not seekable, it can be " "read sequentially using ZstdFile class.")) + self._fp = fp + self._zstd_dict = zstd_dict + self._option = option + self._read_size = read_size + # Load seek table self._seek_table = _SeekTable(read_mode=True) self._seek_table.load_seek_table(fp, seek_to_0=True) + self._size = self._seek_table.get_full_d_size() + + self._pos = 0 + self._decompressor = ZstdDecompressor(self._zstd_dict, self._option) - # Initialize super() - super().__init__(fp, zstd_dict, option, read_size) - self._decomp.size = self._seek_table.get_full_d_size() + def close(self): + self._decompressor = None + return super().close() + + def readable(self): + return True - # super().seekable() returns self._fp.seekable(). - # Seekable has been checked in .__init__() method. - # BufferedReader.seek() checks this in each invoke, if self._fp.seekable() - # becomes False at runtime, .seek() method just raise OSError instead of - # io.UnsupportedOperation. def seekable(self): return True + def tell(self): + return self._pos + + def _decompress(self, size): + """ + Decompress up to size bytes. + May return b"", in which case try again. + Raises _EOFSuccess if EOF is reached at frame edge. + Raises EOFError if EOF is reached elsewhere. + """ + if self._decompressor is None: # frame edge + data = self._fp.read(self._read_size) + if not data: # EOF + raise _EOFSuccess + elif self._decompressor.needs_input: + data = self._fp.read(self._read_size) + if not data: # EOF + raise EOFError("Compressed file ended before the end-of-stream marker was reached") + else: + data = self._decompressor.unused_data + if self._decompressor.eof: # frame edge + self._decompressor = None + if not data: # may not be at EOF + return b"" + if self._decompressor is None: + self._decompressor = ZstdDecompressor(self._zstd_dict, self._option) + out = self._decompressor.decompress(data, size) + self._pos += len(out) + return out + + def readinto(self, b): + with memoryview(b) as view, view.cast('B') as byte_view: + try: + while True: + if out := self._decompress(byte_view.nbytes): + byte_view[:len(out)] = out + return len(out) + except _EOFSuccess: + return 0 + # If the new position is within BufferedReader's buffer, # this method may not be called. def seek(self, offset, whence=0): @@ -333,9 +393,9 @@ def seek(self, offset, whence=0): if whence == 0: # SEEK_SET pass elif whence == 1: # SEEK_CUR - offset = self._decomp.pos + offset + offset = self._pos + offset elif whence == 2: # SEEK_END - offset = self._decomp.size + offset + offset = self._size + offset else: raise ValueError("Invalid value for whence: {}".format(whence)) @@ -343,13 +403,13 @@ def seek(self, offset, whence=0): new_frame = self._seek_table.index_by_dpos(offset) # offset >= EOF if new_frame is None: - self._decomp.eof = True - self._decomp.pos = self._decomp.size + self._pos = self._size + self._decompressor = None self._fp.seek(self._seek_table.file_size) - return self._decomp.pos + return self._pos # Prepare to jump - old_frame = self._seek_table.index_by_dpos(self._decomp.pos) + old_frame = self._seek_table.index_by_dpos(self._pos) c_pos, d_pos = self._seek_table.get_frame_sizes(new_frame) # If at P1, seeking to P2 will unnecessarily read the skippable @@ -358,22 +418,21 @@ def seek(self, offset, whence=0): # cpos: ^P1 # dpos: ^P1 ^P2 if new_frame == old_frame and \ - offset >= self._decomp.pos and \ + offset >= self._pos and \ self._fp.tell() >= c_pos: pass else: # Jump - self._decomp.eof = False - self._decomp.pos = d_pos - self._decomp.reset_session() + self._pos = d_pos + self._decompressor = None self._fp.seek(c_pos) # offset is bytes number to skip forward - offset -= self._decomp.pos - # If offset <= 0, .forward() method does nothing. - self._decomp.forward(offset) + offset -= self._pos + while offset > 0: + offset -= len(self._decompress(offset)) - return self._decomp.pos + return self._pos def get_seek_table_info(self): return self._seek_table.get_info() @@ -382,7 +441,7 @@ def get_seek_table_info(self): # of underlying file object carefully. Need to check seekable in # each situation. For example, there may be a CD-R file system that # is seekable when reading, but not seekable when appending. -class SeekableZstdFile(ZstdFile): +class SeekableZstdFile(io.BufferedIOBase): """This class can only create/write/read Zstandard Seekable Format file, or read 0-size file. It provides relatively fast seeking ability in read mode. @@ -393,7 +452,8 @@ class SeekableZstdFile(ZstdFile): # Zstd seekable format's example code also use 1GiB as max content size. FRAME_MAX_D_SIZE = 1*1024*1024*1024 - _READER_CLASS = _SeekableDecompressReader + FLUSH_BLOCK = ZstdCompressor.FLUSH_BLOCK + FLUSH_FRAME = ZstdCompressor.FLUSH_FRAME def __init__(self, filename, mode="r", *, level_or_option=None, zstd_dict=None, @@ -428,19 +488,52 @@ def __init__(self, filename, mode="r", *, it will reduce seeking speed, but increase compression ratio. You can also manually generate a frame using f.flush(f.FLUSH_FRAME). """ - # For self.close() - self._write_in_close = False - # For super().close() + if read_size == _DEPRECATED_PLACEHOLDER: + read_size = 131075 + else: + warnings.warn("pyzstd.SeekableZstdFile()'s read_size parameter is deprecated", DeprecationWarning, stacklevel=2) + if write_size == _DEPRECATED_PLACEHOLDER: + write_size = 131591 + else: + warnings.warn("pyzstd.SeekableZstdFile()'s write_size parameter is deprecated", DeprecationWarning, stacklevel=2) + self._fp = None - self._closefp = False + self._close_fp = False self._mode = _MODE_CLOSED - - if mode in ("r", "rb"): + self._buffer = None + + if not isinstance(mode, str): + raise ValueError('mode must be a str') + mode = mode.removesuffix('b') # handle rb, wb, xb, ab + + # Read or write mode + if mode == "r": + if not isinstance(level_or_option, (type(None), dict)): + raise TypeError( + ("In read mode (decompression), level_or_option argument " + "should be a dict object, that represents decompression " + "option. It doesn't support int type compression level " + "in this case.")) + if read_size <= 0: + raise ValueError("read_size argument should > 0") + if write_size != 131591: + raise ValueError( + "write_size argument is only valid in write modes.") # Specified max_frame_content_size argument if max_frame_content_size != 1024*1024*1024: raise ValueError(('max_frame_content_size argument is only ' 'valid in write modes (compression).')) - elif mode in ("w", "wb", "a", "ab", "x", "xb"): + mode_code = _MODE_READ + + elif mode in {"w", "a", "x"}: + if not isinstance(level_or_option, (type(None), int, dict)): + raise TypeError(("level_or_option argument " + "should be int or dict object.")) + if read_size != 131075: + raise ValueError( + "read_size argument is only valid in read mode.") + if write_size <= 0: + raise ValueError("write_size argument should > 0") if not (0 < max_frame_content_size <= self.FRAME_MAX_D_SIZE): raise ValueError( ('max_frame_content_size argument should be ' @@ -452,8 +545,13 @@ def __init__(self, filename, mode="r", *, self._reset_frame_sizes() self._seek_table = _SeekTable(read_mode=False) + mode_code = _MODE_WRITE + self._compressor = ZstdCompressor(level_or_option=level_or_option, + zstd_dict=zstd_dict) + self._pos = 0 + # Load seek table in append mode - if mode in ("a", "ab"): + if mode == "a": if not isinstance(filename, (str, bytes, PathLike)): raise TypeError( ("In append mode ('a', 'ab'), " @@ -471,14 +569,30 @@ def __init__(self, filename, mode="r", *, "object should be seekable.")) self._seek_table.load_seek_table(f, seek_to_0=False) - super().__init__(filename, mode, - level_or_option=level_or_option, - zstd_dict=zstd_dict, - read_size=read_size, - write_size=write_size) + else: + raise ValueError("Invalid mode: {!r}".format(mode)) + + # File object + if isinstance(filename, (str, bytes, PathLike)): + self._fp = io.open(filename, mode + "b") + self._close_fp = True + elif hasattr(filename, "read") or hasattr(filename, "write"): + self._fp = filename + else: + raise TypeError(("filename must be a str, bytes, " + "file or PathLike object")) + + self._mode = mode_code - # Overwrite seek table in append mode - if mode in ("a", "ab"): + if self._mode == _MODE_READ: + raw = _SeekableDecompressReader( + self._fp, + zstd_dict=zstd_dict, + option=level_or_option, + read_size=read_size) + self._buffer = io.BufferedReader(raw, _ZSTD_DStreamOutSize) + + elif mode == "a": if self._fp.seekable(): self._fp.seek(self._seek_table.get_full_c_size()) # Necessary if the current table has many (0, 0) entries @@ -488,43 +602,62 @@ def __init__(self, filename, mode="r", *, self._seek_table.append_entry( self._seek_table.seek_frame_size, 0) # Emit a warning - warn(("SeekableZstdFile is opened in append mode " - "('a', 'ab'), but the underlying file object " - "is not seekable. Therefore the seek table (a " - "zstd skippable frame) at the end of the file " - "can't be overwritten. Each time open such file " - "in append mode, it will waste some storage " - "space. %d bytes were wasted this time.") % \ - self._seek_table.seek_frame_size, + warnings.warn(("SeekableZstdFile is opened in append mode " + "('a', 'ab'), but the underlying file object " + "is not seekable. Therefore the seek table (a " + "zstd skippable frame) at the end of the file " + "can't be overwritten. Each time open such file " + "in append mode, it will waste some storage " + "space. %d bytes were wasted this time.") % \ + self._seek_table.seek_frame_size, RuntimeWarning, 2) - # Initialized successfully - self._write_in_close = (self._mode == _MODE_WRITE) - def _reset_frame_sizes(self): self._current_c_size = 0 self._current_d_size = 0 self._left_d_size = self._max_frame_content_size + def _check_not_closed(self): + if self.closed: + raise ValueError("I/O operation on closed file") + + def _check_can_read(self): + if not self.readable(): + raise io.UnsupportedOperation("File not open for reading") + + def _check_can_write(self): + if not self.writable(): + raise io.UnsupportedOperation("File not open for writing") + def close(self): """Flush and close the file. May be called more than once without error. Once the file is closed, any other operation on it will raise a ValueError. """ + if self._mode == _MODE_CLOSED: + return + + if self._fp is None: + return try: - if self._write_in_close: - try: - self.flush(self.FLUSH_FRAME) - self._seek_table.write_seek_table(self._fp) - finally: - # For multiple calls to .close() - self._write_in_close = False + if self._mode == _MODE_READ: + if getattr(self, '_buffer', None): + self._buffer.close() + self._buffer = None + elif self._mode == _MODE_WRITE: + self.flush(self.FLUSH_FRAME) + self._seek_table.write_seek_table(self._fp) + self._compressor = None finally: - # Clear write mode's seek table. - # Put here for failures in/after super().__init__(). + self._mode = _MODE_CLOSED self._seek_table = None - super().close() + try: + if self._close_fp: + self._fp.close() + finally: + self._fp = None + self._close_fp = False def write(self, data): """Write a bytes-like object to the file. @@ -534,9 +667,7 @@ def write(self, data): the file on disk may not reflect the data written until .flush() or .close() is called. """ - if self._mode != _MODE_WRITE: - self._check_mode(_MODE_WRITE) - + self._check_can_write() # Accept any data that supports the buffer protocol. # And memoryview's subview is faster than slice. with memoryview(data) as view, view.cast('B') as byte_view: @@ -547,18 +678,10 @@ def write(self, data): # Write size write_size = min(nbytes, self._left_d_size) - # Use inserted super().write() method, to prevent - # self._fp.tell() from reporting incorrect position. - # ------------------------- - # super().write() begin - # ------------------------- # Compress & write - _, output_size = self._writer.write( - byte_view[pos:pos+write_size]) + compressed = self._compressor.compress(byte_view[pos:pos+write_size]) + output_size = self._fp.write(compressed) self._pos += write_size - # ----------------------- - # super().write() end - # ----------------------- pos += write_size nbytes -= write_size @@ -575,7 +698,7 @@ def write(self, data): return pos - def flush(self, mode=ZstdFile.FLUSH_BLOCK): + def flush(self, mode=ZstdCompressor.FLUSH_BLOCK): """Flush remaining data to the underlying stream. The mode argument can be ZstdFile.FLUSH_BLOCK, ZstdFile.FLUSH_FRAME. @@ -587,29 +710,26 @@ def flush(self, mode=ZstdFile.FLUSH_BLOCK): This method does nothing in reading mode. """ - if self._mode != _MODE_WRITE: - # Like IOBase.flush(), do nothing in reading mode. - # TextIOWrapper.close() relies on this behavior. - if self._mode == _MODE_READ: - return - # Closed, raise ValueError. - self._check_mode() - - # Use inserted super().flush() method, to prevent - # self._fp.tell() from reporting incorrect position. - # ------------------------- - # super().flush() begin - # ------------------------- - # Flush zstd block/frame, and write. - _, output_size = self._writer.flush(mode) - # ----------------------- - # super().flush() end - # ----------------------- - - # Cumulate - self._current_c_size += output_size - # self._current_d_size += 0 - # self._left_d_size -= 0 + if self._mode == _MODE_READ: + return + + self._check_not_closed() + if mode not in {self.FLUSH_BLOCK, self.FLUSH_FRAME}: + raise ValueError('Invalid mode argument, expected either ' + 'ZstdFile.FLUSH_FRAME or ' + 'ZstdFile.FLUSH_BLOCK') + + if self._compressor.last_mode != mode: + # Flush zstd block/frame, and write. + compressed = self._compressor.flush(mode) + output_size = self._fp.write(compressed) + if hasattr(self._fp, 'flush'): + self._fp.flush() + + # Cumulate + self._current_c_size += output_size + # self._current_d_size += 0 + # self._left_d_size -= 0 if mode == self.FLUSH_FRAME and \ self._current_c_size != 0: @@ -618,6 +738,131 @@ def flush(self, mode=ZstdFile.FLUSH_BLOCK): self._current_d_size) self._reset_frame_sizes() + def read(self, size=-1): + """Read up to size uncompressed bytes from the file. + + If size is negative or omitted, read until EOF is reached. + Returns b"" if the file is already at EOF. + """ + if size is None: + size = -1 + self._check_can_read() + return self._buffer.read(size) + + def read1(self, size=-1): + """Read up to size uncompressed bytes, while trying to avoid + making multiple reads from the underlying stream. Reads up to a + buffer's worth of data if size is negative. + + Returns b"" if the file is at EOF. + """ + self._check_can_read() + if size < 0: + size = _ZSTD_DStreamOutSize + return self._buffer.read1(size) + + def readinto(self, b): + """Read bytes into b. + + Returns the number of bytes read (0 for EOF). + """ + self._check_can_read() + return self._buffer.readinto(b) + + def readinto1(self, b): + """Read bytes into b, while trying to avoid making multiple reads + from the underlying stream. + + Returns the number of bytes read (0 for EOF). + """ + self._check_can_read() + return self._buffer.readinto1(b) + + def readline(self, size=-1): + """Read a line of uncompressed bytes from the file. + + The terminating newline (if present) is retained. If size is + non-negative, no more than size bytes will be read (in which + case the line may be incomplete). Returns b'' if already at EOF. + """ + self._check_can_read() + return self._buffer.readline(size) + + def seek(self, offset, whence=io.SEEK_SET): + """Change the file position. + + The new position is specified by offset, relative to the + position indicated by whence. Possible values for whence are: + + 0: start of stream (default): offset must not be negative + 1: current stream position + 2: end of stream; offset must not be positive + + Returns the new file position. + + Note that seeking is emulated, so depending on the arguments, + this operation may be extremely slow. + """ + self._check_can_read() + return self._buffer.seek(offset, whence) + + def peek(self, size=-1): + """Return buffered data without advancing the file position. + + Always returns at least one byte of data, unless at EOF. + The exact number of bytes returned is unspecified. + """ + self._check_can_read() + return self._buffer.peek(size) + + def __iter__(self): + self._check_can_read() + return self + + def __next__(self): + self._check_can_read() + if ret := self._buffer.readline(): + return ret + raise StopIteration + + def tell(self): + """Return the current file position.""" + self._check_not_closed() + if self._mode == _MODE_READ: + return self._buffer.tell() + elif self._mode == _MODE_WRITE: + return self._pos + + def fileno(self): + """Return the file descriptor for the underlying file.""" + self._check_not_closed() + return self._fp.fileno() + + @property + def name(self): + """Return the file name for the underlying file.""" + self._check_not_closed() + return self._fp.name + + @property + def closed(self): + """True if this file is closed.""" + return self._mode == _MODE_CLOSED + + def writable(self): + """Return whether the file was opened for writing.""" + self._check_not_closed() + return self._mode == _MODE_WRITE + + def readable(self): + """Return whether the file was opened for reading.""" + self._check_not_closed() + return self._mode == _MODE_READ + + def seekable(self): + """Return whether the file supports seeking.""" + return self.readable() and self._buffer.seekable() + @property def seek_table_info(self): """A tuple: (frames_number, compressed_size, decompressed_size) diff --git a/src/_zstdfile.py b/src/_zstdfile.py index 3e241a3..553ab98 100644 --- a/src/_zstdfile.py +++ b/src/_zstdfile.py @@ -1,11 +1,6 @@ import io import warnings -try: - from os import PathLike -except ImportError: - # For Python 3.5 - class PathLike: - pass +from os import PathLike from pyzstd import ZstdCompressor, _ZstdFileReader, \ _ZstdFileWriter, _ZSTD_DStreamSizes @@ -127,11 +122,15 @@ def __init__(self, filename, mode="r", *, warnings.warn("pyzstd.ZstdFile()'s write_size parameter is deprecated", DeprecationWarning, stacklevel=2) self._fp = None - self._closefp = False + self._close_fp = False self._mode = _MODE_CLOSED + if not isinstance(mode, str): + raise ValueError('mode must be a str') + mode = mode.removesuffix('b') # handle rb, wb, xb, ab + # Read or write mode - if mode in ("r", "rb"): + if mode == "r": if not isinstance(level_or_option, (type(None), dict)): raise TypeError( ("In read mode (decompression), level_or_option argument " @@ -142,7 +141,7 @@ def __init__(self, filename, mode="r", *, raise ValueError( "write_size argument is only valid in write modes.") mode_code = _MODE_READ - elif mode in ("w", "wb", "a", "ab", "x", "xb"): + elif mode in {"w", "a", "x"}: if not isinstance(level_or_option, (type(None), int, dict)): raise TypeError(("level_or_option argument " "should be int or dict object.")) @@ -155,17 +154,15 @@ def __init__(self, filename, mode="r", *, # File object if isinstance(filename, (str, bytes, PathLike)): - if "b" not in mode: - mode += "b" - self._fp = io.open(filename, mode) - self._closefp = True + self._fp = io.open(filename, mode + "b") + self._close_fp = True elif hasattr(filename, "read") or hasattr(filename, "write"): self._fp = filename else: raise TypeError(("filename must be a str, bytes, " "file or PathLike object")) - # Set ._mode here for ._closefp in .close(). If the following code + # Set ._mode here for ._close_fp in .close(). If the following code # fails, IOBase's cleanup code will call .close(), so that ._fp can # be closed. self._mode = mode_code @@ -212,11 +209,11 @@ def close(self): self._writer = None finally: try: - if self._closefp: + if self._close_fp: self._fp.close() finally: self._fp = None - self._closefp = False + self._close_fp = False self._mode = _MODE_CLOSED # None argument means the file should be closed diff --git a/tests/test_seekable.py b/tests/test_seekable.py index c29e309..d3eb070 100644 --- a/tests/test_seekable.py +++ b/tests/test_seekable.py @@ -40,8 +40,8 @@ def _check_deprecated(testcase): testcase.assertIn( str(warn.message), [ - "pyzstd.ZstdFile()'s read_size parameter is deprecated", - "pyzstd.ZstdFile()'s write_size parameter is deprecated", + "pyzstd.SeekableZstdFile()'s read_size parameter is deprecated", + "pyzstd.SeekableZstdFile()'s write_size parameter is deprecated", ] )