Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,28 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

[Unreleased]

### Added

- `stream_io.open_stream()` now respects Boto3's configuration files
and environment variables when searching for object storage credentials to use

### Fixed

- `stream_io.open_stream()` now uses virtual-hosted-style
bucket addressing for the `cwobject.com` and `cwlota.com` endpoints
- `stream_io.open_stream()` now allows the `use_https` entry of `.s3cfg`
configuration files to fill in its `force_http` parameter if `force_http` is
not explicitly specified as `True` or `False`
- `TensorSerializer` no longer throws an error when attempting to serialize
very large tensors on some non-Linux platforms
- Object storage uploads managed by `stream_io.open_stream()` now finalize
correctly on Python 3.12+ even without an explicit call to their `close()`
method
- A fix for this was originally implemented in release 2.7.2,
but it only worked for Python versions below 3.12

[2.9.3] - 2025-05-09

### Changed
Expand Down Expand Up @@ -424,6 +446,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `get_gpu_name`
- `no_init_or_tensor`

[Unreleased]: https://github.com/coreweave/tensorizer/compare/v2.9.3...HEAD
[2.9.3]: https://github.com/coreweave/tensorizer/compare/v2.9.2...v2.9.3
[2.9.2]: https://github.com/coreweave/tensorizer/compare/v2.9.1...v2.9.2
[2.9.1]: https://github.com/coreweave/tensorizer/compare/v2.9.0...v2.9.1
Expand Down
30 changes: 24 additions & 6 deletions tensorizer/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import queue
import stat
import struct
import sys
import threading
import time
import types
Expand Down Expand Up @@ -3506,16 +3507,33 @@ def _pwrite(
raise RuntimeError("pwrite was called before being initialized")

@staticmethod
def _mv_suffix(data: "collections.abc.Buffer", start: int):
def _mv_slice(data: "collections.abc.Buffer", s: slice):
if not isinstance(data, memoryview):
data = memoryview(data)
try:
if data.ndim != 1 or data.format != "B":
data = data.cast("B")
return data[start:]
return data[s]
finally:
del data

if sys.platform == "linux":
_pwrite_compat = staticmethod(os.pwrite)
else:

@staticmethod
def _pwrite_compat(_fd: int, _str, _offset: int, /) -> int:
# Some systems error on single I/O calls larger than the maximum
# value of a signed 32-bit integer, so limit os.pwrite calls
# to a maximum size of about one memory page less than that
MAX_LEN: typing.Final[int] = 2147479552

if TensorSerializer._buffer_size(_str) > MAX_LEN:
with TensorSerializer._mv_slice(_str, slice(MAX_LEN)) as mv:
return os.pwrite(_fd, mv, _offset)

return os.pwrite(_fd, _str, _offset)

def _pwrite_syscall(
self, data, offset: int, verify: Union[bool, int] = True
) -> int:
Expand All @@ -3525,14 +3543,14 @@ def _pwrite_syscall(
expected_bytes_written: int = (
verify if isinstance(verify, int) else self._buffer_size(data)
)
bytes_just_written: int = os.pwrite(self._fd, data, offset)
bytes_just_written: int = self._pwrite_compat(self._fd, data, offset)
if bytes_just_written > 0:
bytes_written += bytes_just_written
while bytes_written < expected_bytes_written and bytes_just_written > 0:
# Writes larger than ~2 GiB may not complete in a single pwrite call
offset += bytes_just_written
with self._mv_suffix(data, bytes_written) as mv:
bytes_just_written = os.pwrite(self._fd, mv, offset)
with self._mv_slice(data, slice(bytes_written, None)) as mv:
bytes_just_written = self._pwrite_compat(self._fd, mv, offset)
if bytes_just_written > 0:
bytes_written += bytes_just_written
if isinstance(verify, int) or verify:
Expand All @@ -3553,7 +3571,7 @@ def _write(self, data, expected_bytes_written: Optional[int] = None) -> int:
if bytes_just_written > expected_bytes_written:
raise ValueError("Wrote more data than expected")
while bytes_written < expected_bytes_written and bytes_just_written > 0:
with self._mv_suffix(data, bytes_written) as mv:
with self._mv_slice(data, slice(bytes_written, None)) as mv:
bytes_just_written = self._file.write(mv)
bytes_written += bytes_just_written
return bytes_written
Expand Down
Loading