-
Notifications
You must be signed in to change notification settings - Fork 24
fsspec integration #63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
0a6d056
ed0073e
18bde17
a9b0d51
bf4f5e0
fdd2a79
6495836
68a2d1f
ec9c559
956b1c0
a5a8e82
0a0a2fc
032c976
d26a651
bf1e368
3072e4e
71d9ef7
31fbb58
79449fd
abe7a44
4a05dcb
2f70443
d8bba78
fca4619
3fc017e
f02fa2b
c1ee71d
cdfa2ab
5b9b0f7
45868c9
ecc9399
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
"""Fsspec integration. | ||
|
||
The underlying `object_store` Rust crate cautions against relying too strongly on stateful filesystem representations of object stores: | ||
|
||
> The ObjectStore interface is designed to mirror the APIs of object stores and not filesystems, and thus has stateless APIs instead of cursor based interfaces such as Read or Seek available in filesystems. | ||
> | ||
> This design provides the following advantages: | ||
> | ||
> - All operations are atomic, and readers cannot observe partial and/or failed writes | ||
> - Methods map directly to object store APIs, providing both efficiency and predictability | ||
> - Abstracts away filesystem and operating system specific quirks, ensuring portability | ||
> - Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads | ||
|
||
Where possible, implementations should use the underlying `object-store-rs` APIs | ||
martindurant marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
directly. Only where this is not possible should users fall back to this fsspec | ||
integration. | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
import asyncio | ||
from collections import defaultdict | ||
from typing import Any, Coroutine, Dict, List, Tuple | ||
|
||
import fsspec.asyn | ||
import fsspec.spec | ||
|
||
import obstore as obs | ||
|
||
|
||
class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def __init__( | ||
self, | ||
store, | ||
martindurant marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
*args, | ||
asynchronous=False, | ||
martindurant marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
loop=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the right type for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest leaving it here even if we don't use it, to match the normal AsyncFileSystem signature. |
||
batch_size=None, | ||
**kwargs, | ||
): | ||
self.store = store | ||
super().__init__( | ||
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size, **kwargs | ||
) | ||
|
||
async def _rm_file(self, path, **kwargs): | ||
return await obs.delete_async(self.store, path) | ||
|
||
async def _cp_file(self, path1, path2, **kwargs): | ||
return await obs.copy_async(self.store, path1, path2) | ||
|
||
async def _pipe_file(self, path, value, **kwargs): | ||
return await obs.put_async(self.store, path, value) | ||
|
||
async def _cat_file(self, path, start=None, end=None, **kwargs): | ||
if start is None and end is None: | ||
resp = await obs.get_async(self.store, path) | ||
return await resp.bytes_async() | ||
|
||
if start is not None and end is not None: | ||
return await obs.get_range_async( | ||
self.store, path, offset=start, length=end - start | ||
) | ||
martindurant marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
raise NotImplementedError("todo: handle open-ended ranges") | ||
martindurant marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
async def _cat_ranges( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you be able to add some tests for this? I think this function is the least stable of everything because it relies on custom code to split the ranges into per-file ranges and piece the outputs together again. We should have some tests for multiple ranges in a single file, single range for multiple files, multiple ranges for multiple files, and maybe overlapping ranges too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll look into this. I strongly suspect that it will "just work" with the upstream fsspec code, actually, rather than having to carve out the per-file requests and repeat the code in rust. The only downside of that would be bytes copies in python. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, it would just work with the upstream code, but the benefit here is that we get request merging in the underlying Rust code. We should absolutely use the underlying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
as_opposed to doing it in python, you mean? I wonder if it makes a practical difference. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True. My position is mostly: we should use whatever tools I don't think it's too much work to have this helper function, and it would also be useful in places that don't depend on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I can agree with that reasoning. |
||
self, | ||
paths: List[str], | ||
starts: List[int], | ||
ends: List[int], | ||
|
||
max_gap=None, | ||
batch_size=None, | ||
on_error="return", | ||
**kwargs, | ||
): | ||
# TODO: need to go through this again and test it | ||
per_file_requests: Dict[str, List[Tuple[int, int, int]]] = defaultdict(list) | ||
for idx, (path, start, end) in enumerate(zip(paths, starts, ends)): | ||
per_file_requests[path].append((start, end, idx)) | ||
|
||
futs: List[Coroutine[Any, Any, List[bytes]]] = [] | ||
for path, ranges in per_file_requests.items(): | ||
offsets = [r[0] for r in ranges] | ||
lengths = [r[1] - r[0] for r in ranges] | ||
fut = obs.get_ranges_async( | ||
self.store, path, offsets=offsets, lengths=lengths | ||
) | ||
martindurant marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
futs.append(fut) | ||
|
||
result = await asyncio.gather(*futs) | ||
|
||
output_buffers: List[bytes] = [b""] * len(paths) | ||
for per_file_request, buffers in zip(per_file_requests.items(), result): | ||
path, ranges = per_file_request | ||
for buffer, ranges_ in zip(buffers, ranges): | ||
initial_index = ranges_[2] | ||
output_buffers[initial_index] = buffer | ||
|
||
return output_buffers | ||
|
||
async def _put_file(self, lpath, rpath, **kwargs): | ||
with open(lpath, "rb") as f: | ||
await obs.put_async(self.store, rpath, f) | ||
|
||
async def _get_file(self, rpath, lpath, **kwargs): | ||
with open(lpath, "wb") as f: | ||
resp = await obs.get_async(self.store, rpath) | ||
async for buffer in resp.stream(): | ||
f.write(buffer) | ||
|
||
async def _info(self, path, **kwargs): | ||
head = await obs.head_async(self.store, path) | ||
return { | ||
# Required of `info`: (?) | ||
"name": head["path"], | ||
"size": head["size"], | ||
"type": "directory" if head["path"].endswith("/") else "file", | ||
# Implementation-specific keys | ||
"e_tag": head["e_tag"], | ||
kylebarron marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"last_modified": head["last_modified"], | ||
"version": head["version"], | ||
} | ||
|
||
async def _ls(self, path, detail=True, **kwargs): | ||
result = await obs.list_with_delimiter_async(self.store, path) | ||
objects = result["objects"] | ||
prefs = result["common_prefixes"] | ||
if detail: | ||
return [ | ||
{ | ||
"name": object["path"], | ||
"size": object["size"], | ||
"type": "file", | ||
"ETag": object["e_tag"], | ||
martindurant marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
for object in objects | ||
] + [{"name": object, "size": 0, "type": "directory"} for object in prefs] | ||
else: | ||
return sorted([object["path"] for object in objects] + prefs) | ||
|
||
def _open(self, path, mode="rb", **kwargs): | ||
"""Return raw bytes-mode file-like from the file-system""" | ||
out = BufferedFileSimple(self, path, mode) | ||
return out | ||
|
||
|
||
|
||
class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): | ||
def __init__(self, fs, path, mode="rb", cache_type="none", **kwargs): | ||
super().__init__(fs, path, mode, mode, cache_type=cache_type, **kwargs) | ||
|
||
def read(self, length=-1): | ||
if length < 0: | ||
data = self.fs.cat_file(self.path, self.loc, self.size) | ||
self.loc = self.size | ||
else: | ||
data = self.fs.cat_file(self.path, self.loc, self.loc + length) | ||
self.loc += length | ||
return data |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,74 +1,17 @@ | ||
import boto3 | ||
import pytest | ||
from botocore import UNSIGNED | ||
from botocore.client import Config | ||
from moto.moto_server.threaded_moto_server import ThreadedMotoServer | ||
|
||
import obstore as obs | ||
from obstore.store import S3Store | ||
|
||
TEST_BUCKET_NAME = "test" | ||
|
||
|
||
# See docs here: https://docs.getmoto.org/en/latest/docs/server_mode.html | ||
@pytest.fixture(scope="module") | ||
def moto_server_uri(): | ||
"""Fixture to run a mocked AWS server for testing.""" | ||
# Note: pass `port=0` to get a random free port. | ||
server = ThreadedMotoServer(ip_address="localhost", port=0) | ||
server.start() | ||
host, port = server.get_host_and_port() | ||
uri = f"http://{host}:{port}" | ||
yield uri | ||
server.stop() | ||
|
||
|
||
@pytest.fixture() | ||
def s3(moto_server_uri: str): | ||
client = boto3.client( | ||
"s3", | ||
config=Config(signature_version=UNSIGNED), | ||
region_name="us-east-1", | ||
endpoint_url=moto_server_uri, | ||
) | ||
client.create_bucket(Bucket=TEST_BUCKET_NAME, ACL="public-read") | ||
client.put_object(Bucket=TEST_BUCKET_NAME, Key="afile", Body=b"hello world") | ||
return moto_server_uri | ||
|
||
|
||
# @pytest.fixture(autouse=True) | ||
# def reset_s3_fixture(moto_server_uri): | ||
# import requests | ||
|
||
# # We reuse the MotoServer for all tests | ||
# # But we do want a clean state for every test | ||
# try: | ||
# requests.post(f"{moto_server_uri}/moto-api/reset") | ||
# except: | ||
# pass | ||
|
||
|
||
@pytest.fixture() | ||
def store(s3): | ||
return S3Store.from_url( | ||
f"s3://{TEST_BUCKET_NAME}/", | ||
config={ | ||
"AWS_ENDPOINT_URL": s3, | ||
"AWS_REGION": "us-east-1", | ||
"AWS_SKIP_SIGNATURE": "True", | ||
"AWS_ALLOW_HTTP": "true", | ||
}, | ||
) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_list_async(store: S3Store): | ||
list_result = await obs.list(store).collect_async() | ||
async def test_list_async(s3_store: S3Store): | ||
list_result = await obs.list(s3_store).collect_async() | ||
assert any("afile" in x["path"] for x in list_result) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_get_async(store: S3Store): | ||
resp = await obs.get_async(store, "afile") | ||
async def test_get_async(s3_store: S3Store): | ||
resp = await obs.get_async(s3_store, "afile") | ||
buf = await resp.bytes_async() | ||
assert buf == b"hello world" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import pytest | ||
|
||
pytest.importorskip("moto") | ||
martindurant marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
import pyarrow.parquet as pq | ||
|
||
import obstore as obs | ||
from obstore.fsspec import AsyncFsspecStore | ||
|
||
|
||
@pytest.fixture() | ||
def fs(s3_store): | ||
return AsyncFsspecStore(s3_store) | ||
|
||
|
||
def test_list(fs): | ||
out = fs.ls("", detail=False) | ||
kylebarron marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assert out == ["afile"] | ||
fs.pipe_file("dir/bfile", b"data") | ||
out = fs.ls("", detail=False) | ||
assert out == ["afile", "dir"] | ||
out = fs.ls("", detail=True) | ||
assert out[0]["type"] == "file" | ||
assert out[1]["type"] == "directory" | ||
|
||
|
||
def test_remote_parquet(): | ||
store = obs.store.HTTPStore.from_url("https://github.com") | ||
fs = AsyncFsspecStore(store) | ||
url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" | ||
pq.read_metadata(url, filesystem=fs) |
Uh oh!
There was an error while loading. Please reload this page.