Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions docs/api/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

Native support for reading from object stores as a file-like object.

Use `obstore.open` or `obstore.open_async` to open files. Writing files in this way is not yet supported.
Use `obstore.open_reader` or `obstore.open_reader_async` to open readable files. Use `obstore.open_writer` or `obstore.open_writer_async` to open writable files.

::: obstore.open
::: obstore.open_async
::: obstore.open_reader
::: obstore.open_reader_async
::: obstore.open_writer
::: obstore.open_writer_async
::: obstore.ReadableFile
::: obstore.AsyncReadableFile
::: obstore.WritableFile
::: obstore.AsyncWritableFile
3 changes: 2 additions & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ There are a few additional APIs useful for specific use cases:

File-like object support is also provided:

- [`open`][obstore.open]: Open a remote object as a Python file-like object.
- [`open_reader`][obstore.open_reader]: Open a remote object as a readable file-like object, similar to a Python [`BufferedReader`](https://docs.python.org/3/library/io.html#io.BufferedReader).
- [`open_writer`][obstore.open_writer]: Open a remote object as a writable file-like object, similar to a Python [`BufferedWriter`](https://docs.python.org/3/library/io.html#io.BufferedWriter)
- [`AsyncFsspecStore`][obstore.fsspec.AsyncFsspecStore] adapter for use with [`fsspec`](https://github.com/fsspec/filesystem_spec).

**All operations have a comparable async method** with the same name plus an `_async` suffix.
Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ nav:
- api/attributes.md
- api/exceptions.md
- api/file.md
- obstore.fsspec: api/fsspec.md
- fsspec Integration: api/fsspec.md
- CHANGELOG.md

watch:
Expand Down
111 changes: 105 additions & 6 deletions obstore/python/obstore/_buffered.pyi
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import os
from typing import List
import sys
from contextlib import AbstractAsyncContextManager, AbstractContextManager
from typing import Dict, List, Self

from ._attributes import Attributes
from ._bytes import Bytes
from .store import ObjectStore

def open(store: ObjectStore, path: str) -> ReadableFile:
"""Open a file object from the specified location.
if sys.version_info >= (3, 12):
from collections.abc import Buffer
else:
from typing_extensions import Buffer

def open_reader(store: ObjectStore, path: str) -> ReadableFile:
"""Open a readable file object from the specified location.

Args:
store: The ObjectStore instance to use.
Expand All @@ -15,10 +23,10 @@ def open(store: ObjectStore, path: str) -> ReadableFile:
ReadableFile
"""

async def open_async(store: ObjectStore, path: str) -> AsyncReadableFile:
"""Call `open` asynchronously, returning a file object with asynchronous operations.
async def open_reader_async(store: ObjectStore, path: str) -> AsyncReadableFile:
"""Call `open_reader` asynchronously, returning a readable file object with asynchronous operations.

Refer to the documentation for [open][obstore.open].
Refer to the documentation for [open_reader][obstore.open_reader].
"""

class ReadableFile:
Expand Down Expand Up @@ -112,3 +120,94 @@ class AsyncReadableFile:

async def tell(self) -> int:
"""Return the current stream position."""

def open_writer(
store: ObjectStore,
path: str,
*,
attributes: Attributes | None = None,
buffer_size: int = 10 * 1024 * 1024,
tags: Dict[str, str] | None = None,
max_concurrency: int = 12,
) -> WritableFile:
"""Open a writable file object at the specified location.

Args:
store: The ObjectStore instance to use.
path: The path within ObjectStore to retrieve.

Keyword args:
attributes: Provide a set of `Attributes`. Defaults to `None`.
buffer_size: The underlying buffer size to use. Up to `buffer_size` bytes will be buffered in memory. If `buffer_size` is exceeded, data will be uploaded as a multipart upload in chunks of `buffer_size`.
tags: Provide tags for this object. Defaults to `None`.
max_concurrency: The maximum number of chunks to upload concurrently. Defaults to 12.

Returns:
ReadableFile
"""

def open_writer_async(
store: ObjectStore,
path: str,
*,
attributes: Attributes | None = None,
buffer_size: int = 10 * 1024 * 1024,
tags: Dict[str, str] | None = None,
max_concurrency: int = 12,
) -> AsyncWritableFile:
"""Open an **asynchronous** writable file object at the specified location.

Refer to the documentation for [open_writer][obstore.open_writer].
"""

class WritableFile(AbstractContextManager):
"""A buffered writable file object with synchronous operations.

This implements a similar interface as a Python
[`BufferedWriter`][io.BufferedWriter].
"""

def __enter__(self) -> Self: ...
def __exit__(self, exc_type, exc_value, traceback) -> None: ...
def close(self) -> None:
"""Close the current file."""

def closed(self) -> bool:
"""Returns `True` if the current file has already been closed.

Note that this is a method, not an attribute.
"""

def flush(self) -> None:
"""
Flushes this output stream, ensuring that all intermediately buffered contents reach their destination.
"""

def write(self, buffer: bytes | Buffer, /) -> int:
"""
Write the [bytes-like object](https://docs.python.org/3/glossary.html#term-bytes-like-object), `buffer`, and return the number of bytes written.
"""

class AsyncWritableFile(AbstractAsyncContextManager):
"""A buffered writable file object with **asynchronous** operations."""

async def __aenter__(self) -> Self: ...
async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...
async def close(self) -> None:
"""Close the current file."""

async def closed(self) -> bool:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little unfortunate that this is a method instead of an attribute.

We need an Option somewhere in order to be able to drop the internal BufWriter to check that it has already been closed. (The object_store API will error if the file is closed twice, but doesn't give a way to check if the file has already been closed).

This being an async method is an artifact of storing the underlying BufWriter inside of an

Arc<Mutex<Option<BufWriter>>>

where the Mutex is a tokio::sync::Mutex.

Thus we need to use async to open the mutex. We could add a second layer of mutex, where the top-level mutex is a std::sync::Mutex, but I assume that two levels of mutexes would be detrimental for performance.

"""Returns `True` if the current file has already been closed.

Note that this is an async method, not an attribute.
"""

async def flush(self) -> None:
"""
Flushes this output stream, ensuring that all intermediately buffered contents reach their destination.
"""

async def write(self, buffer: bytes | Buffer, /) -> int:
"""
Write the [bytes-like object](https://docs.python.org/3/glossary.html#term-bytes-like-object), `buffer`, and return the number of bytes written.
"""
8 changes: 6 additions & 2 deletions obstore/python/obstore/_obstore.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from ._attributes import Attribute as Attribute
from ._attributes import Attributes as Attributes
from ._buffered import AsyncReadableFile as AsyncReadableFile
from ._buffered import AsyncWritableFile as AsyncWritableFile
from ._buffered import ReadableFile as ReadableFile
from ._buffered import open as open
from ._buffered import open_async as open_async
from ._buffered import WritableFile as WritableFile
from ._buffered import open_reader as open_reader
from ._buffered import open_reader_async as open_reader_async
from ._buffered import open_writer as open_writer
from ._buffered import open_writer_async as open_writer_async
from ._bytes import Bytes as Bytes
from ._copy import copy as copy
from ._copy import copy_async as copy_async
Expand Down
8 changes: 6 additions & 2 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""[fsspec] integration.
"""Integration with the [fsspec] library.

This integration is designed for compatibility and may not provide the same performance
as other obstore APIs.

[fsspec]: https://github.com/fsspec/filesystem_spec

Expand Down Expand Up @@ -188,7 +191,8 @@ def __init__(self, fs, path, mode="rb", **kwargs):
def read(self, length: int = -1):
"""Return bytes from the remote file

length: if positive, returns up to this many bytes; if negative, return all
Args:
length: if positive, returns up to this many bytes; if negative, return all
remaining byets.
"""
if length < 0:
Expand Down
Loading