Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ Simple, fast integration with object storage services like Amazon S3, Google Clo

- Sync and async API.
- Streaming downloads with configurable chunking.
- Streaming uploads from async or sync iterators.
- Streaming `list`, with no need to paginate.
- File-like object API and [fsspec](https://github.com/fsspec/filesystem_spec) integration.
- Support for conditional put ("put if not exists"), as well as custom tags and attributes.
- Automatically uses [multipart uploads](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html) under the hood for large file objects.
- Optionally return list results as [Arrow](https://arrow.apache.org/), which is faster than materializing Python `dict`/`list` objects.
- Easy to install with no required Python dependencies.
- The [underlying Rust library](https://docs.rs/object_store) is production quality and used in large scale production systems, such as the Rust package registry [crates.io](https://crates.io/).
- Support for zero-copy data exchange from Rust into Python in `get_range` and `get_ranges`.
- Zero-copy data exchange between Rust and Python in `get_range`, `get_ranges`, and `put` via the Python buffer protocol.
- Simple API with static type checking.
- Helpers for constructing from environment variables and `boto3.Session` objects

Expand Down Expand Up @@ -56,6 +57,10 @@ Classes to construct a store are exported from the `obstore.store` submodule:
- [`LocalStore`](https://developmentseed.org/obstore/latest/api/store/local/): Local filesystem storage providing the same object store interface.
- [`MemoryStore`](https://developmentseed.org/obstore/latest/api/store/memory/): A fully in-memory implementation of ObjectStore.

Additionally, some middlewares exist:

- [`PrefixStore`](https://developmentseed.org/obstore/latest/api/store/middleware/#obstore.store.PrefixStore): Store wrapper that applies a constant prefix to all paths handled by the store.

#### Example

```py
Expand All @@ -81,7 +86,7 @@ All methods for interacting with a store are exported as **top-level functions**
- [`get`](https://developmentseed.org/obstore/latest/api/get/): Return the bytes that are stored at the specified location.
- [`head`](https://developmentseed.org/obstore/latest/api/head/): Return the metadata for the specified location
- [`list`](https://developmentseed.org/obstore/latest/api/list/): List all the objects with the given prefix.
- [`put`](https://developmentseed.org/obstore/latest/api/put/): Save the provided bytes to the specified location
- [`put`](https://developmentseed.org/obstore/latest/api/put/): Save the provided buffer to the specified location.
- [`rename`](https://developmentseed.org/obstore/latest/api/rename/): Move an object from one path to another in the same object store.

There are a few additional APIs useful for specific use cases:
Expand Down
3 changes: 2 additions & 1 deletion docs/api/get.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
::: obstore.get_ranges_async
::: obstore.GetOptions
::: obstore.GetResult
::: obstore.Buffer
::: obstore.BytesStream
::: obstore.Bytes
::: obstore.OffsetRange
::: obstore.SuffixRange
3 changes: 2 additions & 1 deletion obstore/python/obstore/_buffer.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ if sys.version_info >= (3, 12):
else:
from typing_extensions import Buffer as _Buffer

class Buffer(_Buffer):
class Bytes(_Buffer):
"""
A buffer implementing the Python buffer protocol, allowing zero-copy access to the
underlying memory provided by Rust.
Expand All @@ -15,4 +15,5 @@ class Buffer(_Buffer):

def to_bytes(self) -> bytes:
"""Copy this buffer into a Python `bytes` object."""
def __repr__(self) -> str: ...
def __len__(self) -> int: ...
43 changes: 32 additions & 11 deletions obstore/python/obstore/_get.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ from datetime import datetime
from typing import List, Sequence, Tuple, TypedDict

from ._attributes import Attributes
from ._buffer import Buffer
from ._buffer import Bytes
from ._list import ObjectMeta
from .store import ObjectStore

Expand Down Expand Up @@ -180,7 +180,7 @@ class GetResult:
Args:
min_chunk_size: The minimum size in bytes for each chunk in the returned
`BytesStream`. All chunks except for the last chunk will be at least
this size. Defaults to 10*1024*1024 (10MB).
this size. Defaults to 10\\*1024\\*1024 (10MB).

Returns:
A chunked stream
Expand All @@ -199,7 +199,30 @@ class GetResult:
"""

class BytesStream:
"""An async stream of bytes."""
"""An async stream of bytes.

!!! note "Request timeouts"
The underlying stream needs to stay alive until the last chunk is polled. If the
file is large, it may exceed the default timeout of 30 seconds. In this case,
you may see an error like:

```
GenericError: Generic {
store: "HTTP",
source: reqwest::Error {
kind: Decode,
source: reqwest::Error {
kind: Body,
source: TimedOut,
},
},
}
```

To fix this, set the `timeout` parameter in the `client_options` passed to the
initial `get` or `get_async` call. See
[ClientConfigKey][obstore.store.ClientConfigKey].
"""

def __aiter__(self) -> BytesStream:
"""Return `Self` as an async iterator."""
Expand Down Expand Up @@ -235,7 +258,7 @@ async def get_async(
Refer to the documentation for [get][obstore.get].
"""

def get_range(store: ObjectStore, path: str, start: int, end: int) -> Buffer:
def get_range(store: ObjectStore, path: str, start: int, end: int) -> Bytes:
"""
Return the bytes that are stored at the specified location in the given byte range.

Expand All @@ -251,21 +274,19 @@ def get_range(store: ObjectStore, path: str, start: int, end: int) -> Buffer:
end: The end of the byte range (exclusive).

Returns:
A `Buffer` object implementing the Python buffer protocol, allowing
A `Bytes` object implementing the Python buffer protocol, allowing
zero-copy access to the underlying memory provided by Rust.
"""

async def get_range_async(
store: ObjectStore, path: str, start: int, end: int
) -> Buffer:
async def get_range_async(store: ObjectStore, path: str, start: int, end: int) -> Bytes:
"""Call `get_range` asynchronously.

Refer to the documentation for [get_range][obstore.get_range].
"""

def get_ranges(
store: ObjectStore, path: str, starts: Sequence[int], ends: Sequence[int]
) -> List[Buffer]:
) -> List[Bytes]:
"""
Return the bytes that are stored at the specified location in the given byte ranges

Expand All @@ -281,14 +302,14 @@ def get_ranges(
ends: A sequence of `int` where each offset ends (exclusive).

Returns:
A sequence of `Buffer`, one for each range. This `Buffer` object implements the
A sequence of `Bytes`, one for each range. This `Bytes` object implements the
Python buffer protocol, allowing zero-copy access to the underlying memory
provided by Rust.
"""

async def get_ranges_async(
store: ObjectStore, path: str, starts: Sequence[int], ends: Sequence[int]
) -> List[Buffer]:
) -> List[Bytes]:
"""Call `get_ranges` asynchronously.

Refer to the documentation for [get_ranges][obstore.get_ranges].
Expand Down
3 changes: 2 additions & 1 deletion obstore/python/obstore/_obstore.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ from ._copy import copy as copy
from ._copy import copy_async as copy_async
from ._delete import delete as delete
from ._delete import delete_async as delete_async
from ._get import Buffer as Buffer
from ._get import Bytes as Bytes
from ._get import BytesStream as BytesStream
from ._get import GetOptions as GetOptions
from ._get import GetResult as GetResult
from ._get import OffsetRange as OffsetRange
Expand Down
57 changes: 51 additions & 6 deletions obstore/python/obstore/_put.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
import sys
from pathlib import Path
from typing import IO, Dict, Literal, TypedDict
from typing import (
IO,
AsyncIterable,
AsyncIterator,
Dict,
Iterable,
Iterator,
Literal,
TypedDict,
)

from ._attributes import Attributes
from .store import ObjectStore

if sys.version_info >= (3, 12):
from collections.abc import Buffer
else:
from typing_extensions import Buffer

class UpdateVersion(TypedDict, total=False):
"""
Uniquely identifies a version of an object to update
Expand Down Expand Up @@ -53,7 +68,7 @@ class PutResult(TypedDict):
def put(
store: ObjectStore,
path: str,
file: IO[bytes] | Path | bytes,
file: IO[bytes] | Path | bytes | Buffer | Iterator[Buffer] | Iterable[Buffer],
*,
attributes: Attributes | None = None,
tags: Dict[str, str] | None = None,
Expand All @@ -71,8 +86,16 @@ def put(
Args:
store: The ObjectStore instance to use.
path: The path within ObjectStore for where to save the file.
file: The object to upload. Can either be file-like, a `Path` to a local file,
or a `bytes` object.
file: The object to upload. Supports various input:

- A file-like object opened in binary read mode
- A [`Path`][pathlib.Path] to a local file
- A [`bytes`][] object.
- Any object implementing the Python [buffer
protocol](https://docs.python.org/3/c-api/buffer.html) (includes `bytes`
but also `memoryview`, numpy arrays, and more).
- An iterator or iterable of objects implementing the Python buffer
protocol.

Keyword args:
mode: Configure the `PutMode` for this operation. If this provided and is not `"overwrite"`, a non-multipart upload will be performed. Defaults to `"overwrite"`.
Expand All @@ -86,7 +109,14 @@ def put(
async def put_async(
store: ObjectStore,
path: str,
file: IO[bytes] | Path | bytes,
file: IO[bytes]
| Path
| bytes
| Buffer
| AsyncIterator[Buffer]
| AsyncIterable[Buffer]
| Iterator[Buffer]
| Iterable[Buffer],
*,
attributes: Attributes | None = None,
tags: Dict[str, str] | None = None,
Expand All @@ -97,5 +127,20 @@ async def put_async(
) -> PutResult:
"""Call `put` asynchronously.

Refer to the documentation for [put][obstore.put].
Refer to the documentation for [`put`][obstore.put]. In addition to what the
synchronous `put` allows for the `file` parameter, this **also supports an async
iterator or iterable** of objects implementing the Python buffer protocol.

This means, for example, you can pass the result of `get_async` directly to
`put_async`, and the request will be streamed through Python during the put
operation:

```py
import obstore as obs

# This only constructs the stream, it doesn't materialize the data in memory
resp = await obs.get_async(store1, path1)
# A streaming upload is created to copy the file to path2
await obs.put_async(store2, path2)
```
"""
3 changes: 0 additions & 3 deletions obstore/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ impl<'py> FromPyObject<'py> for PyGetRange {
} else if let Ok(suffix_range) = ob.extract::<PySuffixRange>() {
Ok(Self(suffix_range.into()))
} else {
// dbg!(ob);
// let x = ob.extract::<PyOffsetRange>()?;
// dbg!(x.offset);
Err(PyValueError::new_err("Unexpected input for byte range.\nExpected two-integer tuple or list, or dict with 'offset' or 'suffix' key." ))
}
}
Expand Down
2 changes: 1 addition & 1 deletion obstore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use pyo3::intern;
use pyo3::prelude::*;

mod attributes;
Expand Down Expand Up @@ -28,6 +27,7 @@ fn check_debug_build(_py: Python) -> PyResult<()> {
#[cfg(debug_assertions)]
{
use pyo3::exceptions::PyRuntimeWarning;
use pyo3::intern;
use pyo3::types::PyTuple;

let warnings_mod = _py.import(intern!(_py, "warnings"))?;
Expand Down
Loading
Loading