Skip to content

Commit 78860c6

Browse files
authored
Async stream upload (#54)
* wip: async stream upload * Async stream upload * Update comment and typing * comment * Improved error messages with underlying debug info * Remove unused comment * Accept iterator as well as iterable into put * Fix markdown escaping * Fix import for clippy * Improved docstrings * Document bytes stream timeout * Add async and sync iterable put tests * Updated put docs * comment * remove unused comment
1 parent b8d218b commit 78860c6

File tree

11 files changed

+361
-65
lines changed

11 files changed

+361
-65
lines changed

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@ Simple, fast integration with object storage services like Amazon S3, Google Clo
1212

1313
- Sync and async API.
1414
- Streaming downloads with configurable chunking.
15+
- Streaming uploads from async or sync iterators.
1516
- Streaming `list`, with no need to paginate.
1617
- File-like object API and [fsspec](https://github.com/fsspec/filesystem_spec) integration.
1718
- Support for conditional put ("put if not exists"), as well as custom tags and attributes.
1819
- Automatically uses [multipart uploads](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html) under the hood for large file objects.
1920
- Optionally return list results as [Arrow](https://arrow.apache.org/), which is faster than materializing Python `dict`/`list` objects.
2021
- Easy to install with no required Python dependencies.
2122
- The [underlying Rust library](https://docs.rs/object_store) is production quality and used in large scale production systems, such as the Rust package registry [crates.io](https://crates.io/).
22-
- Support for zero-copy data exchange from Rust into Python in `get_range` and `get_ranges`.
23+
- Zero-copy data exchange between Rust and Python in `get_range`, `get_ranges`, and `put` via the Python buffer protocol.
2324
- Simple API with static type checking.
2425
- Helpers for constructing from environment variables and `boto3.Session` objects
2526

@@ -56,6 +57,10 @@ Classes to construct a store are exported from the `obstore.store` submodule:
5657
- [`LocalStore`](https://developmentseed.org/obstore/latest/api/store/local/): Local filesystem storage providing the same object store interface.
5758
- [`MemoryStore`](https://developmentseed.org/obstore/latest/api/store/memory/): A fully in-memory implementation of ObjectStore.
5859

60+
Additionally, some middlewares exist:
61+
62+
- [`PrefixStore`](https://developmentseed.org/obstore/latest/api/store/middleware/#obstore.store.PrefixStore): Store wrapper that applies a constant prefix to all paths handled by the store.
63+
5964
#### Example
6065

6166
```py
@@ -81,7 +86,7 @@ All methods for interacting with a store are exported as **top-level functions**
8186
- [`get`](https://developmentseed.org/obstore/latest/api/get/): Return the bytes that are stored at the specified location.
8287
- [`head`](https://developmentseed.org/obstore/latest/api/head/): Return the metadata for the specified location
8388
- [`list`](https://developmentseed.org/obstore/latest/api/list/): List all the objects with the given prefix.
84-
- [`put`](https://developmentseed.org/obstore/latest/api/put/): Save the provided bytes to the specified location
89+
- [`put`](https://developmentseed.org/obstore/latest/api/put/): Save the provided buffer to the specified location.
8590
- [`rename`](https://developmentseed.org/obstore/latest/api/rename/): Move an object from one path to another in the same object store.
8691

8792
There are a few additional APIs useful for specific use cases:

docs/api/get.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
::: obstore.get_ranges_async
99
::: obstore.GetOptions
1010
::: obstore.GetResult
11-
::: obstore.Buffer
11+
::: obstore.BytesStream
12+
::: obstore.Bytes
1213
::: obstore.OffsetRange
1314
::: obstore.SuffixRange

obstore/python/obstore/_buffer.pyi

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ if sys.version_info >= (3, 12):
55
else:
66
from typing_extensions import Buffer as _Buffer
77

8-
class Buffer(_Buffer):
8+
class Bytes(_Buffer):
99
"""
1010
A buffer implementing the Python buffer protocol, allowing zero-copy access to the
1111
underlying memory provided by Rust.
@@ -15,4 +15,5 @@ class Buffer(_Buffer):
1515

1616
def to_bytes(self) -> bytes:
1717
"""Copy this buffer into a Python `bytes` object."""
18+
def __repr__(self) -> str: ...
1819
def __len__(self) -> int: ...

obstore/python/obstore/_get.pyi

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ from datetime import datetime
22
from typing import List, Sequence, Tuple, TypedDict
33

44
from ._attributes import Attributes
5-
from ._buffer import Buffer
5+
from ._buffer import Bytes
66
from ._list import ObjectMeta
77
from .store import ObjectStore
88

@@ -180,7 +180,7 @@ class GetResult:
180180
Args:
181181
min_chunk_size: The minimum size in bytes for each chunk in the returned
182182
`BytesStream`. All chunks except for the last chunk will be at least
183-
this size. Defaults to 10*1024*1024 (10MB).
183+
this size. Defaults to 10\\*1024\\*1024 (10MB).
184184
185185
Returns:
186186
A chunked stream
@@ -199,7 +199,30 @@ class GetResult:
199199
"""
200200

201201
class BytesStream:
202-
"""An async stream of bytes."""
202+
"""An async stream of bytes.
203+
204+
!!! note "Request timeouts"
205+
The underlying stream needs to stay alive until the last chunk is polled. If the
206+
file is large, it may exceed the default timeout of 30 seconds. In this case,
207+
you may see an error like:
208+
209+
```
210+
GenericError: Generic {
211+
store: "HTTP",
212+
source: reqwest::Error {
213+
kind: Decode,
214+
source: reqwest::Error {
215+
kind: Body,
216+
source: TimedOut,
217+
},
218+
},
219+
}
220+
```
221+
222+
To fix this, set the `timeout` parameter in the `client_options` passed to the
223+
initial `get` or `get_async` call. See
224+
[ClientConfigKey][obstore.store.ClientConfigKey].
225+
"""
203226

204227
def __aiter__(self) -> BytesStream:
205228
"""Return `Self` as an async iterator."""
@@ -235,7 +258,7 @@ async def get_async(
235258
Refer to the documentation for [get][obstore.get].
236259
"""
237260

238-
def get_range(store: ObjectStore, path: str, start: int, end: int) -> Buffer:
261+
def get_range(store: ObjectStore, path: str, start: int, end: int) -> Bytes:
239262
"""
240263
Return the bytes that are stored at the specified location in the given byte range.
241264
@@ -251,21 +274,19 @@ def get_range(store: ObjectStore, path: str, start: int, end: int) -> Buffer:
251274
end: The end of the byte range (exclusive).
252275
253276
Returns:
254-
A `Buffer` object implementing the Python buffer protocol, allowing
277+
A `Bytes` object implementing the Python buffer protocol, allowing
255278
zero-copy access to the underlying memory provided by Rust.
256279
"""
257280

258-
async def get_range_async(
259-
store: ObjectStore, path: str, start: int, end: int
260-
) -> Buffer:
281+
async def get_range_async(store: ObjectStore, path: str, start: int, end: int) -> Bytes:
261282
"""Call `get_range` asynchronously.
262283
263284
Refer to the documentation for [get_range][obstore.get_range].
264285
"""
265286

266287
def get_ranges(
267288
store: ObjectStore, path: str, starts: Sequence[int], ends: Sequence[int]
268-
) -> List[Buffer]:
289+
) -> List[Bytes]:
269290
"""
270291
Return the bytes that are stored at the specified location in the given byte ranges
271292
@@ -281,14 +302,14 @@ def get_ranges(
281302
ends: A sequence of `int` where each offset ends (exclusive).
282303
283304
Returns:
284-
A sequence of `Buffer`, one for each range. This `Buffer` object implements the
305+
A sequence of `Bytes`, one for each range. This `Bytes` object implements the
285306
Python buffer protocol, allowing zero-copy access to the underlying memory
286307
provided by Rust.
287308
"""
288309

289310
async def get_ranges_async(
290311
store: ObjectStore, path: str, starts: Sequence[int], ends: Sequence[int]
291-
) -> List[Buffer]:
312+
) -> List[Bytes]:
292313
"""Call `get_ranges` asynchronously.
293314
294315
Refer to the documentation for [get_ranges][obstore.get_ranges].

obstore/python/obstore/_obstore.pyi

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ from ._copy import copy as copy
88
from ._copy import copy_async as copy_async
99
from ._delete import delete as delete
1010
from ._delete import delete_async as delete_async
11-
from ._get import Buffer as Buffer
11+
from ._get import Bytes as Bytes
12+
from ._get import BytesStream as BytesStream
1213
from ._get import GetOptions as GetOptions
1314
from ._get import GetResult as GetResult
1415
from ._get import OffsetRange as OffsetRange

obstore/python/obstore/_put.pyi

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,24 @@
1+
import sys
12
from pathlib import Path
2-
from typing import IO, Dict, Literal, TypedDict
3+
from typing import (
4+
IO,
5+
AsyncIterable,
6+
AsyncIterator,
7+
Dict,
8+
Iterable,
9+
Iterator,
10+
Literal,
11+
TypedDict,
12+
)
313

414
from ._attributes import Attributes
515
from .store import ObjectStore
616

17+
if sys.version_info >= (3, 12):
18+
from collections.abc import Buffer
19+
else:
20+
from typing_extensions import Buffer
21+
722
class UpdateVersion(TypedDict, total=False):
823
"""
924
Uniquely identifies a version of an object to update
@@ -53,7 +68,7 @@ class PutResult(TypedDict):
5368
def put(
5469
store: ObjectStore,
5570
path: str,
56-
file: IO[bytes] | Path | bytes,
71+
file: IO[bytes] | Path | bytes | Buffer | Iterator[Buffer] | Iterable[Buffer],
5772
*,
5873
attributes: Attributes | None = None,
5974
tags: Dict[str, str] | None = None,
@@ -71,8 +86,16 @@ def put(
7186
Args:
7287
store: The ObjectStore instance to use.
7388
path: The path within ObjectStore for where to save the file.
74-
file: The object to upload. Can either be file-like, a `Path` to a local file,
75-
or a `bytes` object.
89+
file: The object to upload. Supports various input:
90+
91+
- A file-like object opened in binary read mode
92+
- A [`Path`][pathlib.Path] to a local file
93+
- A [`bytes`][] object.
94+
- Any object implementing the Python [buffer
95+
protocol](https://docs.python.org/3/c-api/buffer.html) (includes `bytes`
96+
but also `memoryview`, numpy arrays, and more).
97+
- An iterator or iterable of objects implementing the Python buffer
98+
protocol.
7699
77100
Keyword args:
78101
mode: Configure the `PutMode` for this operation. If this provided and is not `"overwrite"`, a non-multipart upload will be performed. Defaults to `"overwrite"`.
@@ -86,7 +109,14 @@ def put(
86109
async def put_async(
87110
store: ObjectStore,
88111
path: str,
89-
file: IO[bytes] | Path | bytes,
112+
file: IO[bytes]
113+
| Path
114+
| bytes
115+
| Buffer
116+
| AsyncIterator[Buffer]
117+
| AsyncIterable[Buffer]
118+
| Iterator[Buffer]
119+
| Iterable[Buffer],
90120
*,
91121
attributes: Attributes | None = None,
92122
tags: Dict[str, str] | None = None,
@@ -97,5 +127,20 @@ async def put_async(
97127
) -> PutResult:
98128
"""Call `put` asynchronously.
99129
100-
Refer to the documentation for [put][obstore.put].
130+
Refer to the documentation for [`put`][obstore.put]. In addition to what the
131+
synchronous `put` allows for the `file` parameter, this **also supports an async
132+
iterator or iterable** of objects implementing the Python buffer protocol.
133+
134+
This means, for example, you can pass the result of `get_async` directly to
135+
`put_async`, and the request will be streamed through Python during the put
136+
operation:
137+
138+
```py
139+
import obstore as obs
140+
141+
# This only constructs the stream, it doesn't materialize the data in memory
142+
resp = await obs.get_async(store1, path1)
143+
# A streaming upload is created to copy the file to path2
144+
await obs.put_async(store2, path2)
145+
```
101146
"""

obstore/src/get.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,6 @@ impl<'py> FromPyObject<'py> for PyGetRange {
110110
} else if let Ok(suffix_range) = ob.extract::<PySuffixRange>() {
111111
Ok(Self(suffix_range.into()))
112112
} else {
113-
// dbg!(ob);
114-
// let x = ob.extract::<PyOffsetRange>()?;
115-
// dbg!(x.offset);
116113
Err(PyValueError::new_err("Unexpected input for byte range.\nExpected two-integer tuple or list, or dict with 'offset' or 'suffix' key." ))
117114
}
118115
}

obstore/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use pyo3::intern;
21
use pyo3::prelude::*;
32

43
mod attributes;
@@ -28,6 +27,7 @@ fn check_debug_build(_py: Python) -> PyResult<()> {
2827
#[cfg(debug_assertions)]
2928
{
3029
use pyo3::exceptions::PyRuntimeWarning;
30+
use pyo3::intern;
3131
use pyo3::types::PyTuple;
3232

3333
let warnings_mod = _py.import(intern!(_py, "warnings"))?;

0 commit comments

Comments
 (0)