From 0a6d0563f03d04ae3c6b2b426dd6aaba5f5d463e Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 21 Oct 2024 16:40:39 -0400 Subject: [PATCH 01/26] Initial fsspec integration --- .../python/object_store_rs/fsspec.py | 93 +++++++++++++++++++ pyproject.toml | 1 + uv.lock | 11 +++ 3 files changed, 105 insertions(+) create mode 100644 object-store-rs/python/object_store_rs/fsspec.py diff --git a/object-store-rs/python/object_store_rs/fsspec.py b/object-store-rs/python/object_store_rs/fsspec.py new file mode 100644 index 00000000..61648ccf --- /dev/null +++ b/object-store-rs/python/object_store_rs/fsspec.py @@ -0,0 +1,93 @@ +"""Fsspec integration. + +The underlying `object_store` Rust crate cautions against relying too strongly on stateful filesystem representations of object stores: + +> The ObjectStore interface is designed to mirror the APIs of object stores and not filesystems, and thus has stateless APIs instead of cursor based interfaces such as Read or Seek available in filesystems. +> +> This design provides the following advantages: +> +> - All operations are atomic, and readers cannot observe partial and/or failed writes +> - Methods map directly to object store APIs, providing both efficiency and predictability +> - Abstracts away filesystem and operating system specific quirks, ensuring portability +> - Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads + +Where possible, implementations should use the underlying `object-store-rs` APIs +directly. Only where this is not possible should users fall back to this fsspec +integration. +""" + +import fsspec.asyn + +import object_store_rs as obs +from object_store_rs.store import ObjectStore + + +class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): + store: ObjectStore + + def __init__( + self, + store: ObjectStore, + *args, + asynchronous=False, + loop=None, + batch_size=None, + **kwargs, + ): + self.store = store + super().__init__( + *args, asynchronous=asynchronous, loop=loop, batch_size=batch_size, **kwargs + ) + + async def _rm_file(self, path, **kwargs): + return await obs.delete_async(self.store, path) + + async def _cp_file(self, path1, path2, **kwargs): + return await obs.copy_async(self.store, path1, path2) + + # pipe_file + + async def _cat_file(self, path, start=None, end=None, **kwargs): + if start is None and end is None: + resp = await obs.get_async(self.store, path) + return await resp.bytes_async() + + if start and end: + return await obs.get_range_async( + self.store, path, offset=start, length=end - start + ) + + raise NotImplementedError("todo: handle open-ended ranges") + + # cat_ranges + + async def _put_file(self, lpath, rpath, **kwargs): + with open(lpath, "rb") as f: + await obs.put_async(self.store, rpath, f) + + async def _get_file(self, rpath, lpath, **kwargs): + with open(lpath, "wb") as f: + resp = await obs.get_async(self.store, rpath) + async for buffer in resp.stream(): + f.write(buffer) + + async def _info(self, path, **kwargs): + head = await obs.head_async(self.store, path) + return { + # Required of `info`: (?) + "name": head["path"], + "size": head["size"], + "type": "directory" if head["path"].endswith("/") else "file", + # Implementation-specific keys + "e_tag": head["e_tag"], + "last_modified": head["last_modified"], + "version": head["version"], + } + + async def _ls(self, path, detail=True, **kwargs): + if detail: + raise NotImplementedError("Not sure how to format these dicts") + + result = await obs.list_with_delimiter_async(self.store, path) + objects = result["objects"] + return [object["path"] for object in objects] diff --git a/pyproject.toml b/pyproject.toml index 326f4087..a685100e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [] dev-dependencies = [ "black>=24.10.0", "boto3>=1.35.38", + "fsspec>=2024.10.0", "griffe-inherited-docstrings>=1.0.1", "ipykernel>=6.29.5", "maturin>=1.7.4", diff --git a/uv.lock b/uv.lock index 46dd163c..eeebc0e5 100644 --- a/uv.lock +++ b/uv.lock @@ -321,6 +321,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b5/fd/afcd0496feca3276f509df3dbd5dae726fcc756f1a08d9e25abe1733f962/executing-2.1.0-py2.py3-none-any.whl", hash = "sha256:8d63781349375b5ebccc3142f4b30350c0cd9c79f921cde38be2be4637e98eaf", size = 25805 }, ] +[[package]] +name = "fsspec" +version = "2024.10.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a0/52/f16a068ebadae42526484c31f4398e62962504e5724a8ba5dc3409483df2/fsspec-2024.10.0.tar.gz", hash = "sha256:eda2d8a4116d4f2429db8550f2457da57279247dd930bb12f821b58391359493", size = 286853 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c6/b2/454d6e7f0158951d8a78c2e1eb4f69ae81beb8dca5fee9809c6c99e9d0d0/fsspec-2024.10.0-py3-none-any.whl", hash = "sha256:03b9a6785766a4de40368b88906366755e2819e758b83705c88cd7cb5fe81871", size = 179641 }, +] + [[package]] name = "ghp-import" version = "2.1.0" @@ -1335,6 +1344,7 @@ source = { virtual = "." } dev = [ { name = "black" }, { name = "boto3" }, + { name = "fsspec" }, { name = "griffe-inherited-docstrings" }, { name = "ipykernel" }, { name = "maturin" }, @@ -1354,6 +1364,7 @@ dev = [ dev = [ { name = "black", specifier = ">=24.10.0" }, { name = "boto3", specifier = ">=1.35.38" }, + { name = "fsspec", specifier = ">=2024.10.0" }, { name = "griffe-inherited-docstrings", specifier = ">=1.0.1" }, { name = "ipykernel", specifier = ">=6.29.5" }, { name = "maturin", specifier = ">=1.7.4" }, From ed0073e9ad16eaf805dec374ff5d317b49e59b71 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 21 Oct 2024 16:52:56 -0400 Subject: [PATCH 02/26] Add _cat_ranges --- .../python/object_store_rs/fsspec.py | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/object-store-rs/python/object_store_rs/fsspec.py b/object-store-rs/python/object_store_rs/fsspec.py index 61648ccf..39bd25d5 100644 --- a/object-store-rs/python/object_store_rs/fsspec.py +++ b/object-store-rs/python/object_store_rs/fsspec.py @@ -16,6 +16,9 @@ integration. """ +import asyncio +from collections import defaultdict +from typing import Any, Coroutine, Dict, List, Tuple import fsspec.asyn import object_store_rs as obs @@ -59,7 +62,40 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): raise NotImplementedError("todo: handle open-ended ranges") - # cat_ranges + async def _cat_ranges( + self, + paths: List[str], + starts: List[int], + ends: List[int], + max_gap=None, + batch_size=None, + on_error="return", + **kwargs, + ): + # TODO: need to go through this again and test it + per_file_requests: Dict[str, List[Tuple[int, int, int]]] = defaultdict(list) + for idx, (path, start, end) in enumerate(zip(paths, starts, ends)): + per_file_requests[path].append((start, end, idx)) + + futs: List[Coroutine[Any, Any, List[bytes]]] = [] + for path, ranges in per_file_requests.items(): + offsets = [r[0] for r in ranges] + lengths = [r[1] - r[0] for r in ranges] + fut = obs.get_ranges_async( + self.store, path, offsets=offsets, lengths=lengths + ) + futs.append(fut) + + result = await asyncio.gather(*futs) + + output_buffers: List[bytes] = [b""] * len(paths) + for per_file_request, buffers in zip(per_file_requests.items(), result): + path, ranges = per_file_request + for buffer, ranges_ in zip(buffers, ranges): + initial_index = ranges_[2] + output_buffers[initial_index] = buffer + + return output_buffers async def _put_file(self, lpath, rpath, **kwargs): with open(lpath, "rb") as f: From 18bde17693f3755c775770bcb508a9ac87769ca3 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 21 Oct 2024 19:56:24 -0400 Subject: [PATCH 03/26] Add failing test --- .../python/object_store_rs/fsspec.py | 9 +++++-- pyproject.toml | 1 + tests/test_fsspec.py | 11 ++++++++ uv.lock | 27 +++++++++++++++++++ 4 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 tests/test_fsspec.py diff --git a/object-store-rs/python/object_store_rs/fsspec.py b/object-store-rs/python/object_store_rs/fsspec.py index 39bd25d5..52987f76 100644 --- a/object-store-rs/python/object_store_rs/fsspec.py +++ b/object-store-rs/python/object_store_rs/fsspec.py @@ -16,13 +16,18 @@ integration. """ +from __future__ import annotations + import asyncio from collections import defaultdict -from typing import Any, Coroutine, Dict, List, Tuple +from typing import TYPE_CHECKING, Any, Coroutine, Dict, List, Tuple + import fsspec.asyn import object_store_rs as obs -from object_store_rs.store import ObjectStore + +if TYPE_CHECKING: + from object_store_rs.store import ObjectStore class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): diff --git a/pyproject.toml b/pyproject.toml index a685100e..4b7e3eba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dev-dependencies = [ "mkdocstrings[python]>=0.26.1", "pandas>=2.2.3", "pip>=24.2", + "pyarrow>=17.0.0", "pytest-asyncio>=0.24.0", "pytest>=8.3.3", ] diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py new file mode 100644 index 00000000..a6e3ce5d --- /dev/null +++ b/tests/test_fsspec.py @@ -0,0 +1,11 @@ +import boto3 +import object_store_rs as obs +import pyarrow.parquet as pq +from object_store_rs.fsspec import AsyncFsspecStore + +# session = boto3.Session() + +store = obs.store.HTTPStore.from_url("https://github.com") +fs = AsyncFsspecStore(store) +url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" +test = pq.read_metadata(url, filesystem=fs) diff --git a/uv.lock b/uv.lock index eeebc0e5..2316bb8f 100644 --- a/uv.lock +++ b/uv.lock @@ -1025,6 +1025,31 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8e/37/efad0257dc6e593a18957422533ff0f87ede7c9c6ea010a2177d738fb82f/pure_eval-0.2.3-py3-none-any.whl", hash = "sha256:1db8e35b67b3d218d818ae653e27f06c3aa420901fa7b081ca98cbedc874e0d0", size = 11842 }, ] +[[package]] +name = "pyarrow" +version = "17.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/27/4e/ea6d43f324169f8aec0e57569443a38bab4b398d09769ca64f7b4d467de3/pyarrow-17.0.0.tar.gz", hash = "sha256:4beca9521ed2c0921c1023e68d097d0299b62c362639ea315572a58f3f50fd28", size = 1112479 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f9/46/ce89f87c2936f5bb9d879473b9663ce7a4b1f4359acc2f0eb39865eaa1af/pyarrow-17.0.0-cp311-cp311-macosx_10_15_x86_64.whl", hash = "sha256:1c8856e2ef09eb87ecf937104aacfa0708f22dfeb039c363ec99735190ffb977", size = 29028748 }, + { url = "https://files.pythonhosted.org/packages/8d/8e/ce2e9b2146de422f6638333c01903140e9ada244a2a477918a368306c64c/pyarrow-17.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2e19f569567efcbbd42084e87f948778eb371d308e137a0f97afe19bb860ccb3", size = 27190965 }, + { url = "https://files.pythonhosted.org/packages/3b/c8/5675719570eb1acd809481c6d64e2136ffb340bc387f4ca62dce79516cea/pyarrow-17.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6b244dc8e08a23b3e352899a006a26ae7b4d0da7bb636872fa8f5884e70acf15", size = 39269081 }, + { url = "https://files.pythonhosted.org/packages/5e/78/3931194f16ab681ebb87ad252e7b8d2c8b23dad49706cadc865dff4a1dd3/pyarrow-17.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b72e87fe3e1db343995562f7fff8aee354b55ee83d13afba65400c178ab2597", size = 39864921 }, + { url = "https://files.pythonhosted.org/packages/d8/81/69b6606093363f55a2a574c018901c40952d4e902e670656d18213c71ad7/pyarrow-17.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:dc5c31c37409dfbc5d014047817cb4ccd8c1ea25d19576acf1a001fe07f5b420", size = 38740798 }, + { url = "https://files.pythonhosted.org/packages/4c/21/9ca93b84b92ef927814cb7ba37f0774a484c849d58f0b692b16af8eebcfb/pyarrow-17.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:e3343cb1e88bc2ea605986d4b94948716edc7a8d14afd4e2c097232f729758b4", size = 39871877 }, + { url = "https://files.pythonhosted.org/packages/30/d1/63a7c248432c71c7d3ee803e706590a0b81ce1a8d2b2ae49677774b813bb/pyarrow-17.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:a27532c38f3de9eb3e90ecab63dfda948a8ca859a66e3a47f5f42d1e403c4d03", size = 25151089 }, + { url = "https://files.pythonhosted.org/packages/d4/62/ce6ac1275a432b4a27c55fe96c58147f111d8ba1ad800a112d31859fae2f/pyarrow-17.0.0-cp312-cp312-macosx_10_15_x86_64.whl", hash = "sha256:9b8a823cea605221e61f34859dcc03207e52e409ccf6354634143e23af7c8d22", size = 29019418 }, + { url = "https://files.pythonhosted.org/packages/8e/0a/dbd0c134e7a0c30bea439675cc120012337202e5fac7163ba839aa3691d2/pyarrow-17.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:f1e70de6cb5790a50b01d2b686d54aaf73da01266850b05e3af2a1bc89e16053", size = 27152197 }, + { url = "https://files.pythonhosted.org/packages/cb/05/3f4a16498349db79090767620d6dc23c1ec0c658a668d61d76b87706c65d/pyarrow-17.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0071ce35788c6f9077ff9ecba4858108eebe2ea5a3f7cf2cf55ebc1dbc6ee24a", size = 39263026 }, + { url = "https://files.pythonhosted.org/packages/c2/0c/ea2107236740be8fa0e0d4a293a095c9f43546a2465bb7df34eee9126b09/pyarrow-17.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:757074882f844411fcca735e39aae74248a1531367a7c80799b4266390ae51cc", size = 39880798 }, + { url = "https://files.pythonhosted.org/packages/f6/b0/b9164a8bc495083c10c281cc65064553ec87b7537d6f742a89d5953a2a3e/pyarrow-17.0.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:9ba11c4f16976e89146781a83833df7f82077cdab7dc6232c897789343f7891a", size = 38715172 }, + { url = "https://files.pythonhosted.org/packages/f1/c4/9625418a1413005e486c006e56675334929fad864347c5ae7c1b2e7fe639/pyarrow-17.0.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:b0c6ac301093b42d34410b187bba560b17c0330f64907bfa4f7f7f2444b0cf9b", size = 39874508 }, + { url = "https://files.pythonhosted.org/packages/ae/49/baafe2a964f663413be3bd1cf5c45ed98c5e42e804e2328e18f4570027c1/pyarrow-17.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:392bc9feabc647338e6c89267635e111d71edad5fcffba204425a7c8d13610d7", size = 25099235 }, +] + [[package]] name = "pycparser" version = "2.22" @@ -1354,6 +1379,7 @@ dev = [ { name = "mkdocstrings", extra = ["python"] }, { name = "pandas" }, { name = "pip" }, + { name = "pyarrow" }, { name = "pytest" }, { name = "pytest-asyncio" }, ] @@ -1374,6 +1400,7 @@ dev = [ { name = "mkdocstrings", extras = ["python"], specifier = ">=0.26.1" }, { name = "pandas", specifier = ">=2.2.3" }, { name = "pip", specifier = ">=24.2" }, + { name = "pyarrow", specifier = ">=17.0.0" }, { name = "pytest", specifier = ">=8.3.3" }, { name = "pytest-asyncio", specifier = ">=0.24.0" }, ] From a9b0d51e1ad16b386f9eed18b86da1d96c96c3f6 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 22 Oct 2024 11:03:41 -0400 Subject: [PATCH 04/26] Add pipe_file --- object-store-rs/python/object_store_rs/fsspec.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/object-store-rs/python/object_store_rs/fsspec.py b/object-store-rs/python/object_store_rs/fsspec.py index 52987f76..6181d1dc 100644 --- a/object-store-rs/python/object_store_rs/fsspec.py +++ b/object-store-rs/python/object_store_rs/fsspec.py @@ -53,14 +53,15 @@ async def _rm_file(self, path, **kwargs): async def _cp_file(self, path1, path2, **kwargs): return await obs.copy_async(self.store, path1, path2) - # pipe_file + async def _pipe_file(self, path, value, **kwargs): + return await obs.put_async(self.store, path, value) async def _cat_file(self, path, start=None, end=None, **kwargs): if start is None and end is None: resp = await obs.get_async(self.store, path) return await resp.bytes_async() - if start and end: + if start is not None and end is not None: return await obs.get_range_async( self.store, path, offset=start, length=end - start ) From bf4f5e0a14ea0d2398b9337fee6b6d7dea3838e6 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 30 Oct 2024 11:00:06 -0400 Subject: [PATCH 05/26] Add test fixtures --- object-store-rs/python/object_store_rs/fsspec.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/object-store-rs/python/object_store_rs/fsspec.py b/object-store-rs/python/object_store_rs/fsspec.py index 39bd25d5..8092dff2 100644 --- a/object-store-rs/python/object_store_rs/fsspec.py +++ b/object-store-rs/python/object_store_rs/fsspec.py @@ -16,21 +16,23 @@ integration. """ +from __future__ import annotations + import asyncio from collections import defaultdict -from typing import Any, Coroutine, Dict, List, Tuple +from typing import TYPE_CHECKING, Any, Coroutine, Dict, List, Tuple + import fsspec.asyn import object_store_rs as obs -from object_store_rs.store import ObjectStore + class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): - store: ObjectStore def __init__( self, - store: ObjectStore, + store, *args, asynchronous=False, loop=None, @@ -48,14 +50,15 @@ async def _rm_file(self, path, **kwargs): async def _cp_file(self, path1, path2, **kwargs): return await obs.copy_async(self.store, path1, path2) - # pipe_file + async def _pipe_file(self, path, value, **kwargs): + return await obs.put_async(self.store, path, value) async def _cat_file(self, path, start=None, end=None, **kwargs): if start is None and end is None: resp = await obs.get_async(self.store, path) return await resp.bytes_async() - if start and end: + if start is not None and end is not None: return await obs.get_range_async( self.store, path, offset=start, length=end - start ) From 68a2d1f365c997bed2a0fa8a97a02f4951840c25 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 31 Oct 2024 09:52:11 -0400 Subject: [PATCH 06/26] Simple file override --- obstore/python/obstore/fsspec.py | 38 +++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 45d524dd..dc258ce7 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -20,7 +20,7 @@ import asyncio from collections import defaultdict -from typing import TYPE_CHECKING, Any, Coroutine, Dict, List, Tuple +from typing import Any, Coroutine, Dict, List, Tuple import fsspec.asyn import fsspec.spec @@ -29,7 +29,6 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): - def __init__( self, store, @@ -128,19 +127,32 @@ async def _ls(self, path, detail=True, **kwargs): objects = result["objects"] if detail: return [ - {"path": object["path"], "size": object["size"], "type": "file", - "ETag": object["e_tag"]} + { + "path": object["path"], + "size": object["size"], + "type": "file", + "ETag": object["e_tag"], + } for object in objects ] else: return [object["path"] for object in objects] - def _open(self, *args, cache_type=None, **kwargs): - """Return raw bytes-mode file-like from the file-system""" - out = fsspec.spec.AbstractBufferedFile( - self, - *args, - cache_type="none", - **kwargs, - ) - return out + def _open(self, path, mode="rb", **kwargs): + """Return raw bytes-mode file-like from the file-system""" + out = BufferedFileSimple(self, path, mode) + return out + + +class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): + def __init__(self, fs, path, mode="rb", cache_type="none", **kwargs): + super().__init__(fs, path, mode, mode, cache_type=cache_type, **kwargs) + + def read(self, length=-1): + if length < 0: + data = self.fs.cat_file(self.path, self.loc, self.size) + self.loc = self.size + else: + data = self.fs.cat_file(self.path, self.loc, self.loc + length) + self.loc += length + return data From ec9c55940d6aa3da93834c0837a0d354f8fe36ae Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 31 Oct 2024 10:12:54 -0400 Subject: [PATCH 07/26] silghtly more --- .github/workflows/test-python.yml | 2 +- obstore/python/obstore/fsspec.py | 7 +++-- tests/test_fsspec.py | 12 ++++++-- uv.lock | 50 +++++++++++++++++++++++++++++-- 4 files changed, 62 insertions(+), 9 deletions(-) diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml index d4797d8e..5563bca0 100644 --- a/.github/workflows/test-python.yml +++ b/.github/workflows/test-python.yml @@ -62,4 +62,4 @@ jobs: - name: Run python tests run: | - uv run pytest tests + uv run pytest diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index dc258ce7..2339d455 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -125,18 +125,19 @@ async def _info(self, path, **kwargs): async def _ls(self, path, detail=True, **kwargs): result = await obs.list_with_delimiter_async(self.store, path) objects = result["objects"] + prefs = result["common_prefixes"] if detail: return [ { - "path": object["path"], + "name": object["path"], "size": object["size"], "type": "file", "ETag": object["e_tag"], } for object in objects - ] + ] + [{"name": object, "size": 0, "type": "directory"} for object in prefs] else: - return [object["path"] for object in objects] + return sorted([object["path"] for object in objects] + prefs) def _open(self, path, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index fd681580..982c8ec0 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -1,9 +1,9 @@ -import os - import pytest + pytest.importorskip("moto") -import obstore as obs import pyarrow.parquet as pq + +import obstore as obs from obstore.fsspec import AsyncFsspecStore @@ -15,6 +15,12 @@ def fs(s3_store): def test_list(fs): out = fs.ls("", detail=False) assert out == ["afile"] + fs.pipe_file("dir/bfile", b"data") + out = fs.ls("", detail=False) + assert out == ["afile", "dir"] + out = fs.ls("", detail=True) + assert out[0]["type"] == "file" + assert out[1]["type"] == "directory" def test_remote_parquet(): diff --git a/uv.lock b/uv.lock index 36ab8583..58e975dd 100644 --- a/uv.lock +++ b/uv.lock @@ -504,6 +504,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/56/07/1afa0514c876282bebc1c9aee83c6bb98fe6415cf57b88d9b06e7e29bf9c/Flask_Cors-5.0.0-py2.py3-none-any.whl", hash = "sha256:b9e307d082a9261c100d8fb0ba909eec6a228ed1b60a8315fd85f783d61910bc", size = 14463 }, ] +[[package]] +name = "fsspec" +version = "2024.10.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a0/52/f16a068ebadae42526484c31f4398e62962504e5724a8ba5dc3409483df2/fsspec-2024.10.0.tar.gz", hash = "sha256:eda2d8a4116d4f2429db8550f2457da57279247dd930bb12f821b58391359493", size = 286853 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c6/b2/454d6e7f0158951d8a78c2e1eb4f69ae81beb8dca5fee9809c6c99e9d0d0/fsspec-2024.10.0-py3-none-any.whl", hash = "sha256:03b9a6785766a4de40368b88906366755e2819e758b83705c88cd7cb5fe81871", size = 179641 }, +] + [[package]] name = "ghp-import" version = "2.1.0" @@ -1415,8 +1424,6 @@ version = "6.0.0" source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/18/c7/8c6872f7372eb6a6b2e4708b88419fb46b857f7a2e1892966b851cc79fc9/psutil-6.0.0.tar.gz", hash = "sha256:8faae4f310b6d969fa26ca0545338b21f73c6b15db7c4a8d934a5482faa818f2", size = 508067 } wheels = [ - { url = "https://files.pythonhosted.org/packages/c5/66/78c9c3020f573c58101dc43a44f6855d01bbbd747e24da2f0c4491200ea3/psutil-6.0.0-cp27-none-win32.whl", hash = "sha256:02b69001f44cc73c1c5279d02b30a817e339ceb258ad75997325e0e6169d8b35", size = 249766 }, - { url = "https://files.pythonhosted.org/packages/e1/3f/2403aa9558bea4d3854b0e5e567bc3dd8e9fbc1fc4453c0aa9aafeb75467/psutil-6.0.0-cp27-none-win_amd64.whl", hash = "sha256:21f1fb635deccd510f69f485b87433460a603919b45e2a324ad65b0cc74f8fb1", size = 253024 }, { url = "https://files.pythonhosted.org/packages/0b/37/f8da2fbd29690b3557cca414c1949f92162981920699cd62095a984983bf/psutil-6.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:c588a7e9b1173b6e866756dde596fd4cad94f9399daf99ad8c3258b3cb2b47a0", size = 250961 }, { url = "https://files.pythonhosted.org/packages/35/56/72f86175e81c656a01c4401cd3b1c923f891b31fbcebe98985894176d7c9/psutil-6.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6ed2440ada7ef7d0d608f20ad89a04ec47d2d3ab7190896cd62ca5fc4fe08bf0", size = 287478 }, { url = "https://files.pythonhosted.org/packages/19/74/f59e7e0d392bc1070e9a70e2f9190d652487ac115bb16e2eff6b22ad1d24/psutil-6.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5fd9a97c8e94059b0ef54a7d4baf13b405011176c3b6ff257c247cae0d560ecd", size = 290455 }, @@ -1453,6 +1460,41 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2c/24/46262d45ee9e54a181c440fe1a3d87fd538d69f10c8311f699e555119d1f/py_partiql_parser-0.5.6-py2.py3-none-any.whl", hash = "sha256:622d7b0444becd08c1f4e9e73b31690f4b1c309ab6e5ed45bf607fe71319309f", size = 23237 }, ] +[[package]] +name = "pyarrow" +version = "18.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ec/41/6bfd027410ba2cc35da4682394fdc4285dc345b1d99f7bd55e96255d0c7d/pyarrow-18.0.0.tar.gz", hash = "sha256:a6aa027b1a9d2970cf328ccd6dbe4a996bc13c39fd427f502782f5bdb9ca20f5", size = 1118457 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d6/63/a4854246fb3d1387e176e2989d919b8186ce3806ca244fbed27217608708/pyarrow-18.0.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:d5795e37c0a33baa618c5e054cd61f586cf76850a251e2b21355e4085def6280", size = 29532160 }, + { url = "https://files.pythonhosted.org/packages/53/dc/9a6672fb35d36323f4548b08064fb264353024538f60adaedf0c6df6b31d/pyarrow-18.0.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:5f0510608ccd6e7f02ca8596962afb8c6cc84c453e7be0da4d85f5f4f7b0328a", size = 30844030 }, + { url = "https://files.pythonhosted.org/packages/8e/f9/cfcee70dcb48bc0fee6265a5d2502ea85ccdab54957fd2dd5b327dfc8807/pyarrow-18.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:616ea2826c03c16e87f517c46296621a7c51e30400f6d0a61be645f203aa2b93", size = 39177238 }, + { url = "https://files.pythonhosted.org/packages/17/de/cd37c379dc1aa379956b15d9c89ff920cf48c239f64fbed0ca97dffa3acc/pyarrow-18.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a1824f5b029ddd289919f354bc285992cb4e32da518758c136271cf66046ef22", size = 40089208 }, + { url = "https://files.pythonhosted.org/packages/dd/80/83453dcceaa49d7aa42b0b6aaa7a0797231b9aee1cc213f286e0be3bdf89/pyarrow-18.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:6dd1b52d0d58dd8f685ced9971eb49f697d753aa7912f0a8f50833c7a7426319", size = 38606715 }, + { url = "https://files.pythonhosted.org/packages/18/f4/5687ead1672920b5ed8840398551cc3a96a1389be68b68d18aca3944e525/pyarrow-18.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:320ae9bd45ad7ecc12ec858b3e8e462578de060832b98fc4d671dee9f10d9954", size = 40040879 }, + { url = "https://files.pythonhosted.org/packages/49/11/ea314ad45f45d3245f0768dba711fd3d5deb25a9e08af298d0924ab94aee/pyarrow-18.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:2c992716cffb1088414f2b478f7af0175fd0a76fea80841b1706baa8fb0ebaad", size = 25105360 }, + { url = "https://files.pythonhosted.org/packages/e4/ea/a7f77688e6c529723b37589af4db3e7179414e223878301907c5bd49d6bc/pyarrow-18.0.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:e7ab04f272f98ebffd2a0661e4e126036f6936391ba2889ed2d44c5006237802", size = 29493113 }, + { url = "https://files.pythonhosted.org/packages/79/8a/a3af902af623a1cf4f9d4d27d81e634caf1585a819b7530728a8147e391c/pyarrow-18.0.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:03f40b65a43be159d2f97fd64dc998f769d0995a50c00f07aab58b0b3da87e1f", size = 30833386 }, + { url = "https://files.pythonhosted.org/packages/46/1e/f38b22e12e2ce9ee7c9d805ce234f68b23a0568b9a6bea223e3a99ca0068/pyarrow-18.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:be08af84808dff63a76860847c48ec0416928a7b3a17c2f49a072cac7c45efbd", size = 39170798 }, + { url = "https://files.pythonhosted.org/packages/f8/fb/fd0ef3e0f03227ab183f8dc941f4ef59636d8c382e246954601dd29cf1b0/pyarrow-18.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8c70c1965cde991b711a98448ccda3486f2a336457cf4ec4dca257a926e149c9", size = 40103326 }, + { url = "https://files.pythonhosted.org/packages/7c/bd/5de139adba486db5ccc1b7ecab51e328a9dce354c82c6d26c2f642b178d3/pyarrow-18.0.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:00178509f379415a3fcf855af020e3340254f990a8534294ec3cf674d6e255fd", size = 38583592 }, + { url = "https://files.pythonhosted.org/packages/8d/1f/9bb3b3a644892d631dbbe99053cdb5295092d2696b4bcd3d21f29624c689/pyarrow-18.0.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:a71ab0589a63a3e987beb2bc172e05f000a5c5be2636b4b263c44034e215b5d7", size = 40043128 }, + { url = "https://files.pythonhosted.org/packages/74/39/323621402c2b1ce7ba600d03c81cf9645b862350d7c495f3fcef37850d1d/pyarrow-18.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:fe92efcdbfa0bcf2fa602e466d7f2905500f33f09eb90bf0bcf2e6ca41b574c8", size = 25075300 }, + { url = "https://files.pythonhosted.org/packages/13/38/4a8f8e97301adbb51c0bae7e0bc39e6878609c9337543bbbd2e9b1b3046e/pyarrow-18.0.0-cp313-cp313-macosx_12_0_arm64.whl", hash = "sha256:907ee0aa8ca576f5e0cdc20b5aeb2ad4d3953a3b4769fc4b499e00ef0266f02f", size = 29475921 }, + { url = "https://files.pythonhosted.org/packages/11/75/43aad9b0678dfcdf5cc4d632f0ead92abe5666ce5b5cc985abab75e0d410/pyarrow-18.0.0-cp313-cp313-macosx_12_0_x86_64.whl", hash = "sha256:66dcc216ebae2eb4c37b223feaf82f15b69d502821dde2da138ec5a3716e7463", size = 30811777 }, + { url = "https://files.pythonhosted.org/packages/1e/b7/477bcba6ff7e65d8045d0b6c04b36f12051385f533189617a652f551e742/pyarrow-18.0.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bc1daf7c425f58527900876354390ee41b0ae962a73ad0959b9d829def583bb1", size = 39163582 }, + { url = "https://files.pythonhosted.org/packages/c8/a7/37be6828370a98b3ed1125daf41dc651b27e2a9506a3682da305db757f32/pyarrow-18.0.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:871b292d4b696b09120ed5bde894f79ee2a5f109cb84470546471df264cae136", size = 40095799 }, + { url = "https://files.pythonhosted.org/packages/5a/a0/a4eb68c3495c5e72b404c9106c4af2d02860b0a64bc9450023ed9a412c0b/pyarrow-18.0.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:082ba62bdcb939824ba1ce10b8acef5ab621da1f4c4805e07bfd153617ac19d4", size = 38575191 }, + { url = "https://files.pythonhosted.org/packages/95/1f/6c629156ed4b8e2262da57868930cbb8cffba318b8413043acd02db9ad97/pyarrow-18.0.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:2c664ab88b9766413197733c1720d3dcd4190e8fa3bbdc3710384630a0a7207b", size = 40031824 }, + { url = "https://files.pythonhosted.org/packages/00/4f/5add0884b3ee6f4f1875e9cd0e69a30905798fa1497a80ab6df4645b54b4/pyarrow-18.0.0-cp313-cp313-win_amd64.whl", hash = "sha256:dc892be34dbd058e8d189b47db1e33a227d965ea8805a235c8a7286f7fd17d3a", size = 25068305 }, + { url = "https://files.pythonhosted.org/packages/84/f7/fa53f3062dd2e390b8b021ce2d8de064a141b4bffc2add05471b5b2ee0eb/pyarrow-18.0.0-cp313-cp313t-macosx_12_0_arm64.whl", hash = "sha256:28f9c39a56d2c78bf6b87dcc699d520ab850919d4a8c7418cd20eda49874a2ea", size = 29503390 }, + { url = "https://files.pythonhosted.org/packages/2b/d3/03bc8a5356d95098878c0fa076e69992c6abc212898cd7286cfeab0f2c60/pyarrow-18.0.0-cp313-cp313t-macosx_12_0_x86_64.whl", hash = "sha256:f1a198a50c409ab2d009fbf20956ace84567d67f2c5701511d4dd561fae6f32e", size = 30806216 }, + { url = "https://files.pythonhosted.org/packages/75/04/3b27d1352d3252abf42b0a83a2e7f6fcb7665cc98a5d3777f427eaa166bc/pyarrow-18.0.0-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b5bd7fd32e3ace012d43925ea4fc8bd1b02cc6cc1e9813b518302950e89b5a22", size = 39086243 }, + { url = "https://files.pythonhosted.org/packages/30/97/861dfbe3987156f817f3d7e6feb239de1e085a6b576f62454b7bc42c2713/pyarrow-18.0.0-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:336addb8b6f5208be1b2398442c703a710b6b937b1a046065ee4db65e782ff5a", size = 40055188 }, + { url = "https://files.pythonhosted.org/packages/25/3a/14f024a1c8fb5ff67d79b616fe218bbfa06f23f198e762c6a900a843796a/pyarrow-18.0.0-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:45476490dd4adec5472c92b4d253e245258745d0ccaabe706f8d03288ed60a79", size = 38511444 }, + { url = "https://files.pythonhosted.org/packages/92/a2/81c1dd744b322c0c548f793deb521bf23500806d754128ddf6f978736dff/pyarrow-18.0.0-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:b46591222c864e7da7faa3b19455196416cd8355ff6c2cc2e65726a760a3c420", size = 40006508 }, +] + [[package]] name = "pycparser" version = "2.22" @@ -1941,6 +1983,7 @@ dev = [ { name = "arro3-core" }, { name = "black" }, { name = "boto3" }, + { name = "fsspec" }, { name = "griffe-inherited-docstrings" }, { name = "ipykernel" }, { name = "maturin" }, @@ -1951,6 +1994,7 @@ dev = [ { name = "moto", extra = ["s3", "server"] }, { name = "pandas" }, { name = "pip" }, + { name = "pyarrow" }, { name = "pytest" }, { name = "pytest-asyncio" }, ] @@ -1962,6 +2006,7 @@ dev = [ { name = "arro3-core", specifier = ">=0.4.2" }, { name = "black", specifier = ">=24.10.0" }, { name = "boto3", specifier = ">=1.35.38" }, + { name = "fsspec", specifier = ">=2024.10.0" }, { name = "griffe-inherited-docstrings", specifier = ">=1.0.1" }, { name = "ipykernel", specifier = ">=6.29.5" }, { name = "maturin", specifier = ">=1.7.4" }, @@ -1972,6 +2017,7 @@ dev = [ { name = "moto", extras = ["s3", "server"], specifier = ">=5.0.18" }, { name = "pandas", specifier = ">=2.2.3" }, { name = "pip", specifier = ">=24.2" }, + { name = "pyarrow", specifier = ">=17.0.0" }, { name = "pytest", specifier = ">=8.3.3" }, { name = "pytest-asyncio", specifier = ">=0.24.0" }, ] From 956b1c00d272a5da63457171858ac07086a6aee0 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 31 Oct 2024 10:27:14 -0400 Subject: [PATCH 08/26] lint --- .github/workflows/test-python.yml | 2 +- pyproject.toml | 4 ++++ tests/store/test_s3.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml index 5563bca0..d4797d8e 100644 --- a/.github/workflows/test-python.yml +++ b/.github/workflows/test-python.yml @@ -62,4 +62,4 @@ jobs: - name: Run python tests run: | - uv run pytest + uv run pytest tests diff --git a/pyproject.toml b/pyproject.toml index 526c2ad8..88237cd2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,3 +43,7 @@ select = [ "F401", # Allow unused imports in __init__.py files "F403", # unable to detect undefined names ] + +[tool.pytest.ini_options] +addopts = "-v" +testpaths = ["tests"] diff --git a/tests/store/test_s3.py b/tests/store/test_s3.py index 934ba5a4..aa2eb6f7 100644 --- a/tests/store/test_s3.py +++ b/tests/store/test_s3.py @@ -1,7 +1,7 @@ import pytest -from obstore.store import S3Store import obstore as obs +from obstore.store import S3Store @pytest.mark.asyncio From a5a8e826e81912f4f3d1f844304429aab233473f Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 31 Oct 2024 11:32:02 -0400 Subject: [PATCH 09/26] Update obstore/python/obstore/fsspec.py Co-authored-by: Kyle Barron --- obstore/python/obstore/fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 2339d455..948a49ca 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -11,7 +11,7 @@ > - Abstracts away filesystem and operating system specific quirks, ensuring portability > - Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads -Where possible, implementations should use the underlying `object-store-rs` APIs +Where possible, implementations should use the underlying `obstore` APIs directly. Only where this is not possible should users fall back to this fsspec integration. """ From 0a0a2fc7d7119c290b33e1b1985f625dfbb2b79e Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 31 Oct 2024 15:48:59 -0400 Subject: [PATCH 10/26] add conftest --- tests/conftest.py | 51 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_fsspec.py | 4 +--- 2 files changed, 52 insertions(+), 3 deletions(-) create mode 100644 tests/conftest.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..7dfbcc91 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,51 @@ +import boto3 +import pytest +from botocore import UNSIGNED +from botocore.client import Config +from moto.moto_server.threaded_moto_server import ThreadedMotoServer + +from obstore.store import S3Store + +TEST_BUCKET_NAME = "test" + + +# See docs here: https://docs.getmoto.org/en/latest/docs/server_mode.html +@pytest.fixture() +def moto_server_uri(): + """Fixture to run a mocked AWS server for testing.""" + # Note: pass `port=0` to get a random free port. + server = ThreadedMotoServer(ip_address="localhost", port=0) + server.start() + if hasattr(server, "get_host_and_port"): + host, port = server.get_host_and_port() + else: + host, port = server._server.server_address + uri = f"http://{host}:{port}" + yield uri + server.stop() + + +@pytest.fixture() +def s3(moto_server_uri: str): + client = boto3.client( + "s3", + config=Config(signature_version=UNSIGNED), + region_name="us-east-1", + endpoint_url=moto_server_uri, + ) + client.create_bucket(Bucket=TEST_BUCKET_NAME, ACL="public-read") + client.put_object(Bucket=TEST_BUCKET_NAME, Key="afile", Body=b"hello world") + return moto_server_uri + + +@pytest.fixture() +def s3_store(s3): + return S3Store.from_url( + f"s3://{TEST_BUCKET_NAME}/", + config={ + "AWS_ENDPOINT_URL": s3, + "AWS_REGION": "us-east-1", + "AWS_SKIP_SIGNATURE": "True", + "AWS_ALLOW_HTTP": "true", + }, + ) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 982c8ec0..e84b8d86 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -1,7 +1,5 @@ -import pytest - -pytest.importorskip("moto") import pyarrow.parquet as pq +import pytest import obstore as obs from obstore.fsspec import AsyncFsspecStore From 032c9767999abe8cb7a8a4a1521e86b1d33420bf Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 1 Nov 2024 16:22:48 -0400 Subject: [PATCH 11/26] tests and docstrings --- obstore/python/obstore/fsspec.py | 36 ++++++++++++--- tests/conftest.py | 4 +- tests/test_fsspec.py | 75 ++++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 7 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 948a49ca..5459d51b 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -29,15 +29,30 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): + """An fsspec implementation based on a obstore Store""" + def __init__( self, - store, + store: obs.store.ObjectStore, *args, - asynchronous=False, + asynchronous: bool = False, loop=None, - batch_size=None, + batch_size: int | None = None, **kwargs, ): + """ + store: a configured instance of one of the store classes in objstore.store + asynchronous: id this instance meant to be be called using the async API? This + should only be set to true when running within a coroutine + loop: since both fsspec/python and tokio/rust may be using loops, this should + be kept None for now + batch_size: some operations on many files will batch their requests; if you + are seeing timeouts, you may want to set this number smaller than the defaults, + which are determined in fsspec.asyn._get_batch_size + kwargs: not currently supported; extra configuration for the backend should be + done to the Store passed in the first argument. + """ + self.store = store super().__init__( *args, asynchronous=asynchronous, loop=loop, batch_size=batch_size, **kwargs @@ -74,7 +89,9 @@ async def _cat_ranges( on_error="return", **kwargs, ): - # TODO: need to go through this again and test it + if not len(paths) == len(starts) == len(ends): + raise ValueError + per_file_requests: Dict[str, List[Tuple[int, int, int]]] = defaultdict(list) for idx, (path, start, end) in enumerate(zip(paths, starts, ends)): per_file_requests[path].append((start, end, idx)) @@ -95,7 +112,7 @@ async def _cat_ranges( path, ranges = per_file_request for buffer, ranges_ in zip(buffers, ranges): initial_index = ranges_[2] - output_buffers[initial_index] = buffer + output_buffers[initial_index] = buffer.as_bytes() return output_buffers @@ -147,9 +164,16 @@ def _open(self, path, mode="rb", **kwargs): class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): def __init__(self, fs, path, mode="rb", cache_type="none", **kwargs): + if mode != "rb": + raise ValueError("Only 'rb' mode is currently supported") super().__init__(fs, path, mode, mode, cache_type=cache_type, **kwargs) - def read(self, length=-1): + def read(self, length: int = -1): + """Return bytes from the remote file + + length: if positive, returns up to this many bytes; if negative, return all + remaining byets. + """ if length < 0: data = self.fs.cat_file(self.path, self.loc, self.size) self.loc = self.size diff --git a/tests/conftest.py b/tests/conftest.py index 7dfbcc91..9739e932 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import boto3 import pytest +import urllib3 from botocore import UNSIGNED from botocore.client import Config from moto.moto_server.threaded_moto_server import ThreadedMotoServer @@ -35,7 +36,8 @@ def s3(moto_server_uri: str): ) client.create_bucket(Bucket=TEST_BUCKET_NAME, ACL="public-read") client.put_object(Bucket=TEST_BUCKET_NAME, Key="afile", Body=b"hello world") - return moto_server_uri + yield moto_server_uri + urllib3.request(method="post", url=f"{moto_server_uri}/moto-api/reset") @pytest.fixture() diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index e84b8d86..19b35685 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -1,3 +1,5 @@ +import os + import pyarrow.parquet as pq import pytest @@ -21,8 +23,81 @@ def test_list(fs): assert out[1]["type"] == "directory" +@pytest.mark.asyncio +async def test_list_async(s3_store): + fs = AsyncFsspecStore(s3_store, asynchronous=True) + out = await fs._ls("", detail=False) + assert out == ["afile"] + await fs._pipe_file("dir/bfile", b"data") + out = await fs._ls("", detail=False) + assert out == ["afile", "dir"] + out = await fs._ls("", detail=True) + assert out[0]["type"] == "file" + assert out[1]["type"] == "directory" + + +@pytest.mark.network def test_remote_parquet(): store = obs.store.HTTPStore.from_url("https://github.com") fs = AsyncFsspecStore(store) url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" pq.read_metadata(url, filesystem=fs) + + +def test_multi_file_ops(fs): + data = {"dir/test1": b"test data1", "dir/test2": b"test data2"} + fs.pipe(data) + out = fs.cat(list(data)) + assert out == data + out = fs.cat("dir", recursive=True) + assert out == data + fs.cp("dir", "dir2", recursive=True) + out = fs.find("", detail=False) + assert out == ["afile", "dir/test1", "dir/test2", "dir2/test1", "dir2/test2"] + fs.rm(["dir", "dir2"], recursive=True) + out = fs.find("", detail=False) + assert out == ["afile"] + + +def test_cat_ranges_one(fs): + data1 = os.urandom(10000) + fs.pipe_file("data1", data1) + + # single range + out = fs.cat_ranges(["data1"], [10], [20]) + assert out == [data1[10:20]] + + # range oob + out = fs.cat_ranges(["data1"], [0], [11000]) + assert out == [data1] + + # two disjoint ranges, one file + out = fs.cat_ranges(["data1", "data1"], [10, 40], [20, 60]) + assert out == [data1[10:20], data1[40:60]] + + # two adjoining ranges, one file + out = fs.cat_ranges(["data1", "data1"], [10, 30], [20, 60]) + assert out == [data1[10:20], data1[30:60]] + + # two overlapping ranges, one file + out = fs.cat_ranges(["data1", "data1"], [10, 15], [20, 60]) + assert out == [data1[10:20], data1[15:60]] + + # completely overlapping ranges, one file + out = fs.cat_ranges(["data1", "data1"], [10, 0], [20, 60]) + assert out == [data1[10:20], data1[0:60]] + + +def test_cat_ranges_two(fs): + data1 = os.urandom(10000) + data2 = os.urandom(10000) + fs.pipe({"data1": data1, "data2": data2}) + + # single range in each file + out = fs.cat_ranges(["data1", "data2"], [10, 10], [20, 20]) + assert out == [data1[10:20], data2[10:20]] + + +def test_cat_ranges_error(fs): + with pytest.raises(ValueError): + fs.cat_ranges(["path"], [], []) From d26a651ccd5bddc084b8bdf49c342fe0ec0337f9 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 1 Nov 2024 17:08:01 -0400 Subject: [PATCH 12/26] make fs not cachable --- obstore/python/obstore/fsspec.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 5459d51b..03a73c87 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -31,6 +31,8 @@ class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): """An fsspec implementation based on a obstore Store""" + cachable = False + def __init__( self, store: obs.store.ObjectStore, From 3072e4ead32ed8f34b693ad08a3e1c41741c5401 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 1 Nov 2024 17:25:57 -0400 Subject: [PATCH 13/26] start/end --- obstore/python/obstore/fsspec.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 03a73c87..8c5d41b9 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -101,10 +101,8 @@ async def _cat_ranges( futs: List[Coroutine[Any, Any, List[bytes]]] = [] for path, ranges in per_file_requests.items(): offsets = [r[0] for r in ranges] - lengths = [r[1] - r[0] for r in ranges] - fut = obs.get_ranges_async( - self.store, path, offsets=offsets, lengths=lengths - ) + ends = [r[1] for r in ranges] + fut = obs.get_ranges_async(self.store, path, starts=offsets, ends=ends) futs.append(fut) result = await asyncio.gather(*futs) From 71d9ef75fc0f33c0141725d36fc40552b0ae992a Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 1 Nov 2024 17:28:45 -0400 Subject: [PATCH 14/26] in cat also --- obstore/python/obstore/fsspec.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 8c5d41b9..386fc0e3 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -75,9 +75,7 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): return await resp.bytes_async() if start is not None and end is not None: - return await obs.get_range_async( - self.store, path, offset=start, length=end - start - ) + return await obs.get_range_async(self.store, path, start=start, end=end) raise NotImplementedError("todo: handle open-ended ranges") From 31fbb589749c7b2643f494ee45ecd3db0f52200a Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 5 Nov 2024 11:57:39 -0500 Subject: [PATCH 15/26] Try mixed ranges --- obstore/python/obstore/fsspec.py | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 386fc0e3..73765759 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -147,7 +147,7 @@ async def _ls(self, path, detail=True, **kwargs): "name": object["path"], "size": object["size"], "type": "file", - "ETag": object["e_tag"], + "e_tag": object["e_tag"], } for object in objects ] + [{"name": object, "size": 0, "type": "directory"} for object in prefs] @@ -156,26 +156,4 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" - out = BufferedFileSimple(self, path, mode) - return out - - -class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): - def __init__(self, fs, path, mode="rb", cache_type="none", **kwargs): - if mode != "rb": - raise ValueError("Only 'rb' mode is currently supported") - super().__init__(fs, path, mode, mode, cache_type=cache_type, **kwargs) - - def read(self, length: int = -1): - """Return bytes from the remote file - - length: if positive, returns up to this many bytes; if negative, return all - remaining byets. - """ - if length < 0: - data = self.fs.cat_file(self.path, self.loc, self.size) - self.loc = self.size - else: - data = self.fs.cat_file(self.path, self.loc, self.loc + length) - self.loc += length - return data + return fsspec.spec.AbstractBufferedFile(self, path, mode, **kwargs) From 79449fd2b93d35c7c39a0181367cf2998079c728 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 5 Nov 2024 11:58:53 -0500 Subject: [PATCH 16/26] Allow None ranges --- obstore/python/obstore/fsspec.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 73765759..55da368b 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -74,10 +74,7 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): resp = await obs.get_async(self.store, path) return await resp.bytes_async() - if start is not None and end is not None: - return await obs.get_range_async(self.store, path, start=start, end=end) - - raise NotImplementedError("todo: handle open-ended ranges") + return await obs.get_range_async(self.store, path, start=start, end=end) async def _cat_ranges( self, From abe7a4459816762df6a3afbff13b23134b2ec3e7 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 6 Nov 2024 09:54:32 -0500 Subject: [PATCH 17/26] overwrite test --- tests/test_fsspec.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 19b35685..d00f76c9 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -97,6 +97,27 @@ def test_cat_ranges_two(fs): out = fs.cat_ranges(["data1", "data2"], [10, 10], [20, 20]) assert out == [data1[10:20], data2[10:20]] + # all of one file + out = fs.cat_ranges(["data1", "data2"], [10, None], [20, None]) + assert out == [data1[10:20], data2] + + +def test_cat_ranges_mixed(fs): + data1 = os.urandom(10000) + data2 = os.urandom(10000) + fs.pipe({"data1": data1, "data2": data2}) + + # single range in each file + out = fs.cat_ranges(["data1"], [-10, None, 10], [None, -10, -10]) + assert out == [data1[-10:], data1[:-10], data1[10:-10]] + + +def test_atomic_write(fs): + fs.pipe_file("data1", b"data1") + fs.pipe_file("data1", b"data1", mode="overwrite") + with pytest.raises(ValueError): + fs.pipe_file("data1", b"data1", mode="create") + def test_cat_ranges_error(fs): with pytest.raises(ValueError): From 4a05dcb2ff6a9af0694f93b3d9b28cb61427b09c Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 8 Nov 2024 14:02:57 -0500 Subject: [PATCH 18/26] fix --- tests/test_fsspec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index d00f76c9..f780b6a0 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -108,8 +108,8 @@ def test_cat_ranges_mixed(fs): fs.pipe({"data1": data1, "data2": data2}) # single range in each file - out = fs.cat_ranges(["data1"], [-10, None, 10], [None, -10, -10]) - assert out == [data1[-10:], data1[:-10], data1[10:-10]] + out = fs.cat_ranges(["data1", "data1", "data2"], [-10, None, 10], [None, -10, -10]) + assert out == [data1[-10:], data1[:-10], data2[10:-10]] def test_atomic_write(fs): From 2f70443f6e3ee9e02c2fc217e5fdd504a94c156b Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 8 Nov 2024 14:05:19 -0500 Subject: [PATCH 19/26] revive subclass --- obstore/python/obstore/fsspec.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 55da368b..64cd7f87 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -153,4 +153,25 @@ async def _ls(self, path, detail=True, **kwargs): def _open(self, path, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" - return fsspec.spec.AbstractBufferedFile(self, path, mode, **kwargs) + return BufferedFileSimple(self, path, mode, **kwargs) + + +class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): + def __init__(self, fs, path, mode="rb", cache_type="none", **kwargs): + if mode != "rb": + raise ValueError("Only 'rb' mode is currently supported") + super().__init__(fs, path, mode, mode, cache_type=cache_type, **kwargs) + + def read(self, length: int = -1): + """Return bytes from the remote file + + length: if positive, returns up to this many bytes; if negative, return all + remaining byets. + """ + if length < 0: + data = self.fs.cat_file(self.path, self.loc, self.size) + self.loc = self.size + else: + data = self.fs.cat_file(self.path, self.loc, self.loc + length) + self.loc += length + return data From fca4619c71f43aafa4b6f188eba7c0297592f981 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 8 Nov 2024 14:16:43 -0500 Subject: [PATCH 20/26] xfails --- obstore/python/obstore/fsspec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 64cd7f87..0746a13a 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -157,10 +157,10 @@ def _open(self, path, mode="rb", **kwargs): class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): - def __init__(self, fs, path, mode="rb", cache_type="none", **kwargs): + def __init__(self, fs, path, mode="rb", **kwargs): if mode != "rb": raise ValueError("Only 'rb' mode is currently supported") - super().__init__(fs, path, mode, mode, cache_type=cache_type, **kwargs) + super().__init__(fs, path, mode, **kwargs) def read(self, length: int = -1): """Return bytes from the remote file From 3fc017e94c9dc67a33d23fc0012d1837c28f6480 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 8 Nov 2024 14:28:14 -0500 Subject: [PATCH 21/26] xfails didn't stick --- tests/test_fsspec.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index f780b6a0..97a582c9 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -97,11 +97,8 @@ def test_cat_ranges_two(fs): out = fs.cat_ranges(["data1", "data2"], [10, 10], [20, 20]) assert out == [data1[10:20], data2[10:20]] - # all of one file - out = fs.cat_ranges(["data1", "data2"], [10, None], [20, None]) - assert out == [data1[10:20], data2] - +@pytest.mark.xfail("negative and mixed ranges not implemented") def test_cat_ranges_mixed(fs): data1 = os.urandom(10000) data2 = os.urandom(10000) @@ -112,6 +109,7 @@ def test_cat_ranges_mixed(fs): assert out == [data1[-10:], data1[:-10], data2[10:-10]] +@pytest.mark.xfail("atomic writes not working on moto") def test_atomic_write(fs): fs.pipe_file("data1", b"data1") fs.pipe_file("data1", b"data1", mode="overwrite") From f02fa2b7a90951681c08db2b83f3fafe9aa59713 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 8 Nov 2024 14:46:43 -0500 Subject: [PATCH 22/26] give reason --- tests/test_fsspec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py index 97a582c9..ce9a1bf6 100644 --- a/tests/test_fsspec.py +++ b/tests/test_fsspec.py @@ -98,7 +98,7 @@ def test_cat_ranges_two(fs): assert out == [data1[10:20], data2[10:20]] -@pytest.mark.xfail("negative and mixed ranges not implemented") +@pytest.mark.xfail(reason="negative and mixed ranges not implemented") def test_cat_ranges_mixed(fs): data1 = os.urandom(10000) data2 = os.urandom(10000) @@ -109,7 +109,7 @@ def test_cat_ranges_mixed(fs): assert out == [data1[-10:], data1[:-10], data2[10:-10]] -@pytest.mark.xfail("atomic writes not working on moto") +@pytest.mark.xfail(reason="atomic writes not working on moto") def test_atomic_write(fs): fs.pipe_file("data1", b"data1") fs.pipe_file("data1", b"data1", mode="overwrite") From c1ee71d68c5b4f7abca9402d2c948db823b195da Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 11 Nov 2024 15:49:06 -0500 Subject: [PATCH 23/26] Update obstore/python/obstore/fsspec.py --- obstore/python/obstore/fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 0746a13a..a495d94c 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -1,6 +1,6 @@ """Fsspec integration. -The underlying `object_store` Rust crate cautions against relying too strongly on stateful filesystem representations of object stores: +The underlying `object_store` Rust crate [cautions](https://docs.rs/object_store/latest/object_store/#why-not-a-filesystem-interface) against relying too strongly on stateful filesystem representations of object stores: > The ObjectStore interface is designed to mirror the APIs of object stores and not filesystems, and thus has stateless APIs instead of cursor based interfaces such as Read or Seek available in filesystems. > From cdfa2ab131f6e7d99c6dab954019e999736849ec Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 11 Nov 2024 16:01:25 -0500 Subject: [PATCH 24/26] Update obstore/python/obstore/fsspec.py --- obstore/python/obstore/fsspec.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index a495d94c..37a0a11f 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -42,7 +42,8 @@ def __init__( batch_size: int | None = None, **kwargs, ): - """ + """Construct a new AsyncFsspecStore + store: a configured instance of one of the store classes in objstore.store asynchronous: id this instance meant to be be called using the async API? This should only be set to true when running within a coroutine From 5b9b0f70f34d34f39c52f0521ee28de9da57b02d Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 13 Nov 2024 10:44:13 -0500 Subject: [PATCH 25/26] update for signature --- obstore/python/obstore/fsspec.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 0746a13a..ace80035 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -40,7 +40,6 @@ def __init__( asynchronous: bool = False, loop=None, batch_size: int | None = None, - **kwargs, ): """ store: a configured instance of one of the store classes in objstore.store @@ -51,13 +50,11 @@ def __init__( batch_size: some operations on many files will batch their requests; if you are seeing timeouts, you may want to set this number smaller than the defaults, which are determined in fsspec.asyn._get_batch_size - kwargs: not currently supported; extra configuration for the backend should be - done to the Store passed in the first argument. """ self.store = store super().__init__( - *args, asynchronous=asynchronous, loop=loop, batch_size=batch_size, **kwargs + *args, asynchronous=asynchronous, loop=loop, batch_size=batch_size ) async def _rm_file(self, path, **kwargs): @@ -79,13 +76,17 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): async def _cat_ranges( self, paths: List[str], - starts: List[int], - ends: List[int], + starts: List[int] | int, + ends: List[int] | int, max_gap=None, batch_size=None, on_error="return", **kwargs, ): + if isinstance(starts, int): + starts = [starts] * len(paths) + if isinstance(ends, int): + ends = [ends] * len(paths) if not len(paths) == len(starts) == len(ends): raise ValueError From ecc939935b50af0709b13ab70eb8399ca7e67cd7 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 13 Nov 2024 11:00:26 -0500 Subject: [PATCH 26/26] Update obstore/python/obstore/fsspec.py --- obstore/python/obstore/fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 9347fa5f..a6f6c15d 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -47,7 +47,7 @@ def __init__( asynchronous: id this instance meant to be be called using the async API? This should only be set to true when running within a coroutine loop: since both fsspec/python and tokio/rust may be using loops, this should - be kept None for now + be kept `None` for now, and will not be used. batch_size: some operations on many files will batch their requests; if you are seeing timeouts, you may want to set this number smaller than the defaults, which are determined in fsspec.asyn._get_batch_size