Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 49 additions & 6 deletions obstore/python/obstore/_buffered.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,51 @@ if sys.version_info >= (3, 12):
else:
from typing_extensions import Buffer

def open_reader(store: ObjectStore, path: str) -> ReadableFile:
def open_reader(
store: ObjectStore, path: str, *, buffer_size: int = 1024 * 1024
) -> ReadableFile:
"""Open a readable file object from the specified location.

Args:
store: The ObjectStore instance to use.
path: The path within ObjectStore to retrieve.

Keyword Args:
buffer_size: The number of bytes to read in a single request. Up to `buffer_size` bytes will be buffered in memory. If `buffer_size` is exceeded, data will be uploaded as a multipart upload in chunks of `buffer_size`.

Returns:
ReadableFile
"""

async def open_reader_async(store: ObjectStore, path: str) -> AsyncReadableFile:
async def open_reader_async(
store: ObjectStore, path: str, *, buffer_size: int = 1024 * 1024
) -> AsyncReadableFile:
"""Call `open_reader` asynchronously, returning a readable file object with asynchronous operations.

Refer to the documentation for [open_reader][obstore.open_reader].
"""

class ReadableFile:
"""A readable file object with synchronous operations.
"""A synchronous-buffered reader that implements a similar interface as a Python
[`BufferedReader`][io.BufferedReader].

Internally this maintains a buffer of the requested size, and uses
[`get_range`][obstore.get_range] to populate its internal buffer once depleted. This
buffer is cleared on seek.

Whilst simple, this interface will typically be outperformed by the native `obstore`
methods that better map to the network APIs. This is because most object stores have
very [high first-byte latencies], on the order of 100-200ms, and so avoiding
unnecessary round-trips is critical to throughput.

This implements a similar interface as a generic readable Python binary file-like
object.
Systems looking to sequentially scan a file should instead consider using
[`get`][obstore.get], or [`get_range`][obstore.get_range] to read a particular
range.

Systems looking to read multiple ranges of a file should instead consider using
[`get_ranges`][obstore.get_ranges], which will optimise the vectored IO.

[high first-byte latencies]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
"""

def close(self) -> None:
Expand Down Expand Up @@ -78,7 +101,27 @@ class ReadableFile:
"""Return the current stream position."""

class AsyncReadableFile:
"""A readable file object with **asynchronous** operations."""
"""An async-buffered reader that implements a similar interface as a Python
[`BufferedReader`][io.BufferedReader].

Internally this maintains a buffer of the requested size, and uses
[`get_range`][obstore.get_range] to populate its internal buffer once depleted. This
buffer is cleared on seek.

Whilst simple, this interface will typically be outperformed by the native `obstore`
methods that better map to the network APIs. This is because most object stores have
very [high first-byte latencies], on the order of 100-200ms, and so avoiding
unnecessary round-trips is critical to throughput.

Systems looking to sequentially scan a file should instead consider using
[`get`][obstore.get], or [`get_range`][obstore.get_range] to read a particular
range.

Systems looking to read multiple ranges of a file should instead consider using
[`get_ranges`][obstore.get_ranges], which will optimise the vectored IO.

[high first-byte latencies]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
"""

def close(self) -> None:
"""Close the current file.
Expand Down
27 changes: 20 additions & 7 deletions obstore/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,48 @@ use crate::runtime::get_runtime;
use crate::tags::PyTagSet;

#[pyfunction]
#[pyo3(signature = (store, path, *, buffer_size=1024 * 1024))]
pub(crate) fn open_reader(
py: Python,
store: PyObjectStore,
path: String,
buffer_size: usize,
) -> PyObjectStoreResult<PyReadableFile> {
let store = store.into_inner();
let runtime = get_runtime(py)?;
let meta = py.allow_threads(|| runtime.block_on(store.head(&path.into())))?;
let reader = Arc::new(Mutex::new(BufReader::new(store, &meta)));
let reader = py.allow_threads(|| runtime.block_on(create_reader(store, path, buffer_size)))?;
Ok(PyReadableFile::new(reader, false))
}

#[pyfunction]
#[pyo3(signature = (store, path, *, buffer_size=1024 * 1024))]
pub(crate) fn open_reader_async(
py: Python,
store: PyObjectStore,
path: String,
buffer_size: usize,
) -> PyResult<Bound<PyAny>> {
let store = store.into_inner();
future_into_py(py, async move {
let meta = store
.head(&path.into())
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
let reader = Arc::new(Mutex::new(BufReader::new(store, &meta)));
let reader = create_reader(store, path, buffer_size).await?;
Ok(PyReadableFile::new(reader, true))
})
}

async fn create_reader(
store: Arc<dyn ObjectStore>,
path: String,
capacity: usize,
) -> PyObjectStoreResult<Arc<Mutex<BufReader>>> {
let meta = store
.head(&path.into())
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(Arc::new(Mutex::new(BufReader::with_capacity(
store, &meta, capacity,
))))
}

#[pyclass(name = "ReadableFile", frozen)]
pub(crate) struct PyReadableFile {
reader: Arc<Mutex<BufReader>>,
Expand Down