From 0a6d0563f03d04ae3c6b2b426dd6aaba5f5d463e Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 21 Oct 2024 16:40:39 -0400 Subject: [PATCH 1/4] Initial fsspec integration --- .../python/object_store_rs/fsspec.py | 93 +++++++++++++++++++ pyproject.toml | 1 + uv.lock | 11 +++ 3 files changed, 105 insertions(+) create mode 100644 object-store-rs/python/object_store_rs/fsspec.py diff --git a/object-store-rs/python/object_store_rs/fsspec.py b/object-store-rs/python/object_store_rs/fsspec.py new file mode 100644 index 00000000..61648ccf --- /dev/null +++ b/object-store-rs/python/object_store_rs/fsspec.py @@ -0,0 +1,93 @@ +"""Fsspec integration. + +The underlying `object_store` Rust crate cautions against relying too strongly on stateful filesystem representations of object stores: + +> The ObjectStore interface is designed to mirror the APIs of object stores and not filesystems, and thus has stateless APIs instead of cursor based interfaces such as Read or Seek available in filesystems. +> +> This design provides the following advantages: +> +> - All operations are atomic, and readers cannot observe partial and/or failed writes +> - Methods map directly to object store APIs, providing both efficiency and predictability +> - Abstracts away filesystem and operating system specific quirks, ensuring portability +> - Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads + +Where possible, implementations should use the underlying `object-store-rs` APIs +directly. Only where this is not possible should users fall back to this fsspec +integration. +""" + +import fsspec.asyn + +import object_store_rs as obs +from object_store_rs.store import ObjectStore + + +class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): + store: ObjectStore + + def __init__( + self, + store: ObjectStore, + *args, + asynchronous=False, + loop=None, + batch_size=None, + **kwargs, + ): + self.store = store + super().__init__( + *args, asynchronous=asynchronous, loop=loop, batch_size=batch_size, **kwargs + ) + + async def _rm_file(self, path, **kwargs): + return await obs.delete_async(self.store, path) + + async def _cp_file(self, path1, path2, **kwargs): + return await obs.copy_async(self.store, path1, path2) + + # pipe_file + + async def _cat_file(self, path, start=None, end=None, **kwargs): + if start is None and end is None: + resp = await obs.get_async(self.store, path) + return await resp.bytes_async() + + if start and end: + return await obs.get_range_async( + self.store, path, offset=start, length=end - start + ) + + raise NotImplementedError("todo: handle open-ended ranges") + + # cat_ranges + + async def _put_file(self, lpath, rpath, **kwargs): + with open(lpath, "rb") as f: + await obs.put_async(self.store, rpath, f) + + async def _get_file(self, rpath, lpath, **kwargs): + with open(lpath, "wb") as f: + resp = await obs.get_async(self.store, rpath) + async for buffer in resp.stream(): + f.write(buffer) + + async def _info(self, path, **kwargs): + head = await obs.head_async(self.store, path) + return { + # Required of `info`: (?) + "name": head["path"], + "size": head["size"], + "type": "directory" if head["path"].endswith("/") else "file", + # Implementation-specific keys + "e_tag": head["e_tag"], + "last_modified": head["last_modified"], + "version": head["version"], + } + + async def _ls(self, path, detail=True, **kwargs): + if detail: + raise NotImplementedError("Not sure how to format these dicts") + + result = await obs.list_with_delimiter_async(self.store, path) + objects = result["objects"] + return [object["path"] for object in objects] diff --git a/pyproject.toml b/pyproject.toml index 326f4087..a685100e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [] dev-dependencies = [ "black>=24.10.0", "boto3>=1.35.38", + "fsspec>=2024.10.0", "griffe-inherited-docstrings>=1.0.1", "ipykernel>=6.29.5", "maturin>=1.7.4", diff --git a/uv.lock b/uv.lock index 46dd163c..eeebc0e5 100644 --- a/uv.lock +++ b/uv.lock @@ -321,6 +321,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b5/fd/afcd0496feca3276f509df3dbd5dae726fcc756f1a08d9e25abe1733f962/executing-2.1.0-py2.py3-none-any.whl", hash = "sha256:8d63781349375b5ebccc3142f4b30350c0cd9c79f921cde38be2be4637e98eaf", size = 25805 }, ] +[[package]] +name = "fsspec" +version = "2024.10.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a0/52/f16a068ebadae42526484c31f4398e62962504e5724a8ba5dc3409483df2/fsspec-2024.10.0.tar.gz", hash = "sha256:eda2d8a4116d4f2429db8550f2457da57279247dd930bb12f821b58391359493", size = 286853 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c6/b2/454d6e7f0158951d8a78c2e1eb4f69ae81beb8dca5fee9809c6c99e9d0d0/fsspec-2024.10.0-py3-none-any.whl", hash = "sha256:03b9a6785766a4de40368b88906366755e2819e758b83705c88cd7cb5fe81871", size = 179641 }, +] + [[package]] name = "ghp-import" version = "2.1.0" @@ -1335,6 +1344,7 @@ source = { virtual = "." } dev = [ { name = "black" }, { name = "boto3" }, + { name = "fsspec" }, { name = "griffe-inherited-docstrings" }, { name = "ipykernel" }, { name = "maturin" }, @@ -1354,6 +1364,7 @@ dev = [ dev = [ { name = "black", specifier = ">=24.10.0" }, { name = "boto3", specifier = ">=1.35.38" }, + { name = "fsspec", specifier = ">=2024.10.0" }, { name = "griffe-inherited-docstrings", specifier = ">=1.0.1" }, { name = "ipykernel", specifier = ">=6.29.5" }, { name = "maturin", specifier = ">=1.7.4" }, From ed0073e9ad16eaf805dec374ff5d317b49e59b71 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 21 Oct 2024 16:52:56 -0400 Subject: [PATCH 2/4] Add _cat_ranges --- .../python/object_store_rs/fsspec.py | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/object-store-rs/python/object_store_rs/fsspec.py b/object-store-rs/python/object_store_rs/fsspec.py index 61648ccf..39bd25d5 100644 --- a/object-store-rs/python/object_store_rs/fsspec.py +++ b/object-store-rs/python/object_store_rs/fsspec.py @@ -16,6 +16,9 @@ integration. """ +import asyncio +from collections import defaultdict +from typing import Any, Coroutine, Dict, List, Tuple import fsspec.asyn import object_store_rs as obs @@ -59,7 +62,40 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): raise NotImplementedError("todo: handle open-ended ranges") - # cat_ranges + async def _cat_ranges( + self, + paths: List[str], + starts: List[int], + ends: List[int], + max_gap=None, + batch_size=None, + on_error="return", + **kwargs, + ): + # TODO: need to go through this again and test it + per_file_requests: Dict[str, List[Tuple[int, int, int]]] = defaultdict(list) + for idx, (path, start, end) in enumerate(zip(paths, starts, ends)): + per_file_requests[path].append((start, end, idx)) + + futs: List[Coroutine[Any, Any, List[bytes]]] = [] + for path, ranges in per_file_requests.items(): + offsets = [r[0] for r in ranges] + lengths = [r[1] - r[0] for r in ranges] + fut = obs.get_ranges_async( + self.store, path, offsets=offsets, lengths=lengths + ) + futs.append(fut) + + result = await asyncio.gather(*futs) + + output_buffers: List[bytes] = [b""] * len(paths) + for per_file_request, buffers in zip(per_file_requests.items(), result): + path, ranges = per_file_request + for buffer, ranges_ in zip(buffers, ranges): + initial_index = ranges_[2] + output_buffers[initial_index] = buffer + + return output_buffers async def _put_file(self, lpath, rpath, **kwargs): with open(lpath, "rb") as f: From 18bde17693f3755c775770bcb508a9ac87769ca3 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 21 Oct 2024 19:56:24 -0400 Subject: [PATCH 3/4] Add failing test --- .../python/object_store_rs/fsspec.py | 9 +++++-- pyproject.toml | 1 + tests/test_fsspec.py | 11 ++++++++ uv.lock | 27 +++++++++++++++++++ 4 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 tests/test_fsspec.py diff --git a/object-store-rs/python/object_store_rs/fsspec.py b/object-store-rs/python/object_store_rs/fsspec.py index 39bd25d5..52987f76 100644 --- a/object-store-rs/python/object_store_rs/fsspec.py +++ b/object-store-rs/python/object_store_rs/fsspec.py @@ -16,13 +16,18 @@ integration. """ +from __future__ import annotations + import asyncio from collections import defaultdict -from typing import Any, Coroutine, Dict, List, Tuple +from typing import TYPE_CHECKING, Any, Coroutine, Dict, List, Tuple + import fsspec.asyn import object_store_rs as obs -from object_store_rs.store import ObjectStore + +if TYPE_CHECKING: + from object_store_rs.store import ObjectStore class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): diff --git a/pyproject.toml b/pyproject.toml index a685100e..4b7e3eba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dev-dependencies = [ "mkdocstrings[python]>=0.26.1", "pandas>=2.2.3", "pip>=24.2", + "pyarrow>=17.0.0", "pytest-asyncio>=0.24.0", "pytest>=8.3.3", ] diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py new file mode 100644 index 00000000..a6e3ce5d --- /dev/null +++ b/tests/test_fsspec.py @@ -0,0 +1,11 @@ +import boto3 +import object_store_rs as obs +import pyarrow.parquet as pq +from object_store_rs.fsspec import AsyncFsspecStore + +# session = boto3.Session() + +store = obs.store.HTTPStore.from_url("https://github.com") +fs = AsyncFsspecStore(store) +url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" +test = pq.read_metadata(url, filesystem=fs) diff --git a/uv.lock b/uv.lock index eeebc0e5..2316bb8f 100644 --- a/uv.lock +++ b/uv.lock @@ -1025,6 +1025,31 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8e/37/efad0257dc6e593a18957422533ff0f87ede7c9c6ea010a2177d738fb82f/pure_eval-0.2.3-py3-none-any.whl", hash = "sha256:1db8e35b67b3d218d818ae653e27f06c3aa420901fa7b081ca98cbedc874e0d0", size = 11842 }, ] +[[package]] +name = "pyarrow" +version = "17.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/27/4e/ea6d43f324169f8aec0e57569443a38bab4b398d09769ca64f7b4d467de3/pyarrow-17.0.0.tar.gz", hash = "sha256:4beca9521ed2c0921c1023e68d097d0299b62c362639ea315572a58f3f50fd28", size = 1112479 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f9/46/ce89f87c2936f5bb9d879473b9663ce7a4b1f4359acc2f0eb39865eaa1af/pyarrow-17.0.0-cp311-cp311-macosx_10_15_x86_64.whl", hash = "sha256:1c8856e2ef09eb87ecf937104aacfa0708f22dfeb039c363ec99735190ffb977", size = 29028748 }, + { url = "https://files.pythonhosted.org/packages/8d/8e/ce2e9b2146de422f6638333c01903140e9ada244a2a477918a368306c64c/pyarrow-17.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2e19f569567efcbbd42084e87f948778eb371d308e137a0f97afe19bb860ccb3", size = 27190965 }, + { url = "https://files.pythonhosted.org/packages/3b/c8/5675719570eb1acd809481c6d64e2136ffb340bc387f4ca62dce79516cea/pyarrow-17.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6b244dc8e08a23b3e352899a006a26ae7b4d0da7bb636872fa8f5884e70acf15", size = 39269081 }, + { url = "https://files.pythonhosted.org/packages/5e/78/3931194f16ab681ebb87ad252e7b8d2c8b23dad49706cadc865dff4a1dd3/pyarrow-17.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b72e87fe3e1db343995562f7fff8aee354b55ee83d13afba65400c178ab2597", size = 39864921 }, + { url = "https://files.pythonhosted.org/packages/d8/81/69b6606093363f55a2a574c018901c40952d4e902e670656d18213c71ad7/pyarrow-17.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:dc5c31c37409dfbc5d014047817cb4ccd8c1ea25d19576acf1a001fe07f5b420", size = 38740798 }, + { url = "https://files.pythonhosted.org/packages/4c/21/9ca93b84b92ef927814cb7ba37f0774a484c849d58f0b692b16af8eebcfb/pyarrow-17.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:e3343cb1e88bc2ea605986d4b94948716edc7a8d14afd4e2c097232f729758b4", size = 39871877 }, + { url = "https://files.pythonhosted.org/packages/30/d1/63a7c248432c71c7d3ee803e706590a0b81ce1a8d2b2ae49677774b813bb/pyarrow-17.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:a27532c38f3de9eb3e90ecab63dfda948a8ca859a66e3a47f5f42d1e403c4d03", size = 25151089 }, + { url = "https://files.pythonhosted.org/packages/d4/62/ce6ac1275a432b4a27c55fe96c58147f111d8ba1ad800a112d31859fae2f/pyarrow-17.0.0-cp312-cp312-macosx_10_15_x86_64.whl", hash = "sha256:9b8a823cea605221e61f34859dcc03207e52e409ccf6354634143e23af7c8d22", size = 29019418 }, + { url = "https://files.pythonhosted.org/packages/8e/0a/dbd0c134e7a0c30bea439675cc120012337202e5fac7163ba839aa3691d2/pyarrow-17.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:f1e70de6cb5790a50b01d2b686d54aaf73da01266850b05e3af2a1bc89e16053", size = 27152197 }, + { url = "https://files.pythonhosted.org/packages/cb/05/3f4a16498349db79090767620d6dc23c1ec0c658a668d61d76b87706c65d/pyarrow-17.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0071ce35788c6f9077ff9ecba4858108eebe2ea5a3f7cf2cf55ebc1dbc6ee24a", size = 39263026 }, + { url = "https://files.pythonhosted.org/packages/c2/0c/ea2107236740be8fa0e0d4a293a095c9f43546a2465bb7df34eee9126b09/pyarrow-17.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:757074882f844411fcca735e39aae74248a1531367a7c80799b4266390ae51cc", size = 39880798 }, + { url = "https://files.pythonhosted.org/packages/f6/b0/b9164a8bc495083c10c281cc65064553ec87b7537d6f742a89d5953a2a3e/pyarrow-17.0.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:9ba11c4f16976e89146781a83833df7f82077cdab7dc6232c897789343f7891a", size = 38715172 }, + { url = "https://files.pythonhosted.org/packages/f1/c4/9625418a1413005e486c006e56675334929fad864347c5ae7c1b2e7fe639/pyarrow-17.0.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:b0c6ac301093b42d34410b187bba560b17c0330f64907bfa4f7f7f2444b0cf9b", size = 39874508 }, + { url = "https://files.pythonhosted.org/packages/ae/49/baafe2a964f663413be3bd1cf5c45ed98c5e42e804e2328e18f4570027c1/pyarrow-17.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:392bc9feabc647338e6c89267635e111d71edad5fcffba204425a7c8d13610d7", size = 25099235 }, +] + [[package]] name = "pycparser" version = "2.22" @@ -1354,6 +1379,7 @@ dev = [ { name = "mkdocstrings", extra = ["python"] }, { name = "pandas" }, { name = "pip" }, + { name = "pyarrow" }, { name = "pytest" }, { name = "pytest-asyncio" }, ] @@ -1374,6 +1400,7 @@ dev = [ { name = "mkdocstrings", extras = ["python"], specifier = ">=0.26.1" }, { name = "pandas", specifier = ">=2.2.3" }, { name = "pip", specifier = ">=24.2" }, + { name = "pyarrow", specifier = ">=17.0.0" }, { name = "pytest", specifier = ">=8.3.3" }, { name = "pytest-asyncio", specifier = ">=0.24.0" }, ] From a9b0d51e1ad16b386f9eed18b86da1d96c96c3f6 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 22 Oct 2024 11:03:41 -0400 Subject: [PATCH 4/4] Add pipe_file --- object-store-rs/python/object_store_rs/fsspec.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/object-store-rs/python/object_store_rs/fsspec.py b/object-store-rs/python/object_store_rs/fsspec.py index 52987f76..6181d1dc 100644 --- a/object-store-rs/python/object_store_rs/fsspec.py +++ b/object-store-rs/python/object_store_rs/fsspec.py @@ -53,14 +53,15 @@ async def _rm_file(self, path, **kwargs): async def _cp_file(self, path1, path2, **kwargs): return await obs.copy_async(self.store, path1, path2) - # pipe_file + async def _pipe_file(self, path, value, **kwargs): + return await obs.put_async(self.store, path, value) async def _cat_file(self, path, start=None, end=None, **kwargs): if start is None and end is None: resp = await obs.get_async(self.store, path) return await resp.bytes_async() - if start and end: + if start is not None and end is not None: return await obs.get_range_async( self.store, path, offset=start, length=end - start )