diff --git a/object-store-rs/python/object_store_rs/fsspec.py b/object-store-rs/python/object_store_rs/fsspec.py new file mode 100644 index 00000000..6181d1dc --- /dev/null +++ b/object-store-rs/python/object_store_rs/fsspec.py @@ -0,0 +1,135 @@ +"""Fsspec integration. + +The underlying `object_store` Rust crate cautions against relying too strongly on stateful filesystem representations of object stores: + +> The ObjectStore interface is designed to mirror the APIs of object stores and not filesystems, and thus has stateless APIs instead of cursor based interfaces such as Read or Seek available in filesystems. +> +> This design provides the following advantages: +> +> - All operations are atomic, and readers cannot observe partial and/or failed writes +> - Methods map directly to object store APIs, providing both efficiency and predictability +> - Abstracts away filesystem and operating system specific quirks, ensuring portability +> - Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads + +Where possible, implementations should use the underlying `object-store-rs` APIs +directly. Only where this is not possible should users fall back to this fsspec +integration. +""" + +from __future__ import annotations + +import asyncio +from collections import defaultdict +from typing import TYPE_CHECKING, Any, Coroutine, Dict, List, Tuple + +import fsspec.asyn + +import object_store_rs as obs + +if TYPE_CHECKING: + from object_store_rs.store import ObjectStore + + +class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): + store: ObjectStore + + def __init__( + self, + store: ObjectStore, + *args, + asynchronous=False, + loop=None, + batch_size=None, + **kwargs, + ): + self.store = store + super().__init__( + *args, asynchronous=asynchronous, loop=loop, batch_size=batch_size, **kwargs + ) + + async def _rm_file(self, path, **kwargs): + return await obs.delete_async(self.store, path) + + async def _cp_file(self, path1, path2, **kwargs): + return await obs.copy_async(self.store, path1, path2) + + async def _pipe_file(self, path, value, **kwargs): + return await obs.put_async(self.store, path, value) + + async def _cat_file(self, path, start=None, end=None, **kwargs): + if start is None and end is None: + resp = await obs.get_async(self.store, path) + return await resp.bytes_async() + + if start is not None and end is not None: + return await obs.get_range_async( + self.store, path, offset=start, length=end - start + ) + + raise NotImplementedError("todo: handle open-ended ranges") + + async def _cat_ranges( + self, + paths: List[str], + starts: List[int], + ends: List[int], + max_gap=None, + batch_size=None, + on_error="return", + **kwargs, + ): + # TODO: need to go through this again and test it + per_file_requests: Dict[str, List[Tuple[int, int, int]]] = defaultdict(list) + for idx, (path, start, end) in enumerate(zip(paths, starts, ends)): + per_file_requests[path].append((start, end, idx)) + + futs: List[Coroutine[Any, Any, List[bytes]]] = [] + for path, ranges in per_file_requests.items(): + offsets = [r[0] for r in ranges] + lengths = [r[1] - r[0] for r in ranges] + fut = obs.get_ranges_async( + self.store, path, offsets=offsets, lengths=lengths + ) + futs.append(fut) + + result = await asyncio.gather(*futs) + + output_buffers: List[bytes] = [b""] * len(paths) + for per_file_request, buffers in zip(per_file_requests.items(), result): + path, ranges = per_file_request + for buffer, ranges_ in zip(buffers, ranges): + initial_index = ranges_[2] + output_buffers[initial_index] = buffer + + return output_buffers + + async def _put_file(self, lpath, rpath, **kwargs): + with open(lpath, "rb") as f: + await obs.put_async(self.store, rpath, f) + + async def _get_file(self, rpath, lpath, **kwargs): + with open(lpath, "wb") as f: + resp = await obs.get_async(self.store, rpath) + async for buffer in resp.stream(): + f.write(buffer) + + async def _info(self, path, **kwargs): + head = await obs.head_async(self.store, path) + return { + # Required of `info`: (?) + "name": head["path"], + "size": head["size"], + "type": "directory" if head["path"].endswith("/") else "file", + # Implementation-specific keys + "e_tag": head["e_tag"], + "last_modified": head["last_modified"], + "version": head["version"], + } + + async def _ls(self, path, detail=True, **kwargs): + if detail: + raise NotImplementedError("Not sure how to format these dicts") + + result = await obs.list_with_delimiter_async(self.store, path) + objects = result["objects"] + return [object["path"] for object in objects] diff --git a/pyproject.toml b/pyproject.toml index 326f4087..4b7e3eba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [] dev-dependencies = [ "black>=24.10.0", "boto3>=1.35.38", + "fsspec>=2024.10.0", "griffe-inherited-docstrings>=1.0.1", "ipykernel>=6.29.5", "maturin>=1.7.4", @@ -19,6 +20,7 @@ dev-dependencies = [ "mkdocstrings[python]>=0.26.1", "pandas>=2.2.3", "pip>=24.2", + "pyarrow>=17.0.0", "pytest-asyncio>=0.24.0", "pytest>=8.3.3", ] diff --git a/tests/test_fsspec.py b/tests/test_fsspec.py new file mode 100644 index 00000000..a6e3ce5d --- /dev/null +++ b/tests/test_fsspec.py @@ -0,0 +1,11 @@ +import boto3 +import object_store_rs as obs +import pyarrow.parquet as pq +from object_store_rs.fsspec import AsyncFsspecStore + +# session = boto3.Session() + +store = obs.store.HTTPStore.from_url("https://github.com") +fs = AsyncFsspecStore(store) +url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" +test = pq.read_metadata(url, filesystem=fs) diff --git a/uv.lock b/uv.lock index 46dd163c..2316bb8f 100644 --- a/uv.lock +++ b/uv.lock @@ -321,6 +321,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b5/fd/afcd0496feca3276f509df3dbd5dae726fcc756f1a08d9e25abe1733f962/executing-2.1.0-py2.py3-none-any.whl", hash = "sha256:8d63781349375b5ebccc3142f4b30350c0cd9c79f921cde38be2be4637e98eaf", size = 25805 }, ] +[[package]] +name = "fsspec" +version = "2024.10.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a0/52/f16a068ebadae42526484c31f4398e62962504e5724a8ba5dc3409483df2/fsspec-2024.10.0.tar.gz", hash = "sha256:eda2d8a4116d4f2429db8550f2457da57279247dd930bb12f821b58391359493", size = 286853 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c6/b2/454d6e7f0158951d8a78c2e1eb4f69ae81beb8dca5fee9809c6c99e9d0d0/fsspec-2024.10.0-py3-none-any.whl", hash = "sha256:03b9a6785766a4de40368b88906366755e2819e758b83705c88cd7cb5fe81871", size = 179641 }, +] + [[package]] name = "ghp-import" version = "2.1.0" @@ -1016,6 +1025,31 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8e/37/efad0257dc6e593a18957422533ff0f87ede7c9c6ea010a2177d738fb82f/pure_eval-0.2.3-py3-none-any.whl", hash = "sha256:1db8e35b67b3d218d818ae653e27f06c3aa420901fa7b081ca98cbedc874e0d0", size = 11842 }, ] +[[package]] +name = "pyarrow" +version = "17.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/27/4e/ea6d43f324169f8aec0e57569443a38bab4b398d09769ca64f7b4d467de3/pyarrow-17.0.0.tar.gz", hash = "sha256:4beca9521ed2c0921c1023e68d097d0299b62c362639ea315572a58f3f50fd28", size = 1112479 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f9/46/ce89f87c2936f5bb9d879473b9663ce7a4b1f4359acc2f0eb39865eaa1af/pyarrow-17.0.0-cp311-cp311-macosx_10_15_x86_64.whl", hash = "sha256:1c8856e2ef09eb87ecf937104aacfa0708f22dfeb039c363ec99735190ffb977", size = 29028748 }, + { url = "https://files.pythonhosted.org/packages/8d/8e/ce2e9b2146de422f6638333c01903140e9ada244a2a477918a368306c64c/pyarrow-17.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2e19f569567efcbbd42084e87f948778eb371d308e137a0f97afe19bb860ccb3", size = 27190965 }, + { url = "https://files.pythonhosted.org/packages/3b/c8/5675719570eb1acd809481c6d64e2136ffb340bc387f4ca62dce79516cea/pyarrow-17.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6b244dc8e08a23b3e352899a006a26ae7b4d0da7bb636872fa8f5884e70acf15", size = 39269081 }, + { url = "https://files.pythonhosted.org/packages/5e/78/3931194f16ab681ebb87ad252e7b8d2c8b23dad49706cadc865dff4a1dd3/pyarrow-17.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b72e87fe3e1db343995562f7fff8aee354b55ee83d13afba65400c178ab2597", size = 39864921 }, + { url = "https://files.pythonhosted.org/packages/d8/81/69b6606093363f55a2a574c018901c40952d4e902e670656d18213c71ad7/pyarrow-17.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:dc5c31c37409dfbc5d014047817cb4ccd8c1ea25d19576acf1a001fe07f5b420", size = 38740798 }, + { url = "https://files.pythonhosted.org/packages/4c/21/9ca93b84b92ef927814cb7ba37f0774a484c849d58f0b692b16af8eebcfb/pyarrow-17.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:e3343cb1e88bc2ea605986d4b94948716edc7a8d14afd4e2c097232f729758b4", size = 39871877 }, + { url = "https://files.pythonhosted.org/packages/30/d1/63a7c248432c71c7d3ee803e706590a0b81ce1a8d2b2ae49677774b813bb/pyarrow-17.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:a27532c38f3de9eb3e90ecab63dfda948a8ca859a66e3a47f5f42d1e403c4d03", size = 25151089 }, + { url = "https://files.pythonhosted.org/packages/d4/62/ce6ac1275a432b4a27c55fe96c58147f111d8ba1ad800a112d31859fae2f/pyarrow-17.0.0-cp312-cp312-macosx_10_15_x86_64.whl", hash = "sha256:9b8a823cea605221e61f34859dcc03207e52e409ccf6354634143e23af7c8d22", size = 29019418 }, + { url = "https://files.pythonhosted.org/packages/8e/0a/dbd0c134e7a0c30bea439675cc120012337202e5fac7163ba839aa3691d2/pyarrow-17.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:f1e70de6cb5790a50b01d2b686d54aaf73da01266850b05e3af2a1bc89e16053", size = 27152197 }, + { url = "https://files.pythonhosted.org/packages/cb/05/3f4a16498349db79090767620d6dc23c1ec0c658a668d61d76b87706c65d/pyarrow-17.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0071ce35788c6f9077ff9ecba4858108eebe2ea5a3f7cf2cf55ebc1dbc6ee24a", size = 39263026 }, + { url = "https://files.pythonhosted.org/packages/c2/0c/ea2107236740be8fa0e0d4a293a095c9f43546a2465bb7df34eee9126b09/pyarrow-17.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:757074882f844411fcca735e39aae74248a1531367a7c80799b4266390ae51cc", size = 39880798 }, + { url = "https://files.pythonhosted.org/packages/f6/b0/b9164a8bc495083c10c281cc65064553ec87b7537d6f742a89d5953a2a3e/pyarrow-17.0.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:9ba11c4f16976e89146781a83833df7f82077cdab7dc6232c897789343f7891a", size = 38715172 }, + { url = "https://files.pythonhosted.org/packages/f1/c4/9625418a1413005e486c006e56675334929fad864347c5ae7c1b2e7fe639/pyarrow-17.0.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:b0c6ac301093b42d34410b187bba560b17c0330f64907bfa4f7f7f2444b0cf9b", size = 39874508 }, + { url = "https://files.pythonhosted.org/packages/ae/49/baafe2a964f663413be3bd1cf5c45ed98c5e42e804e2328e18f4570027c1/pyarrow-17.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:392bc9feabc647338e6c89267635e111d71edad5fcffba204425a7c8d13610d7", size = 25099235 }, +] + [[package]] name = "pycparser" version = "2.22" @@ -1335,6 +1369,7 @@ source = { virtual = "." } dev = [ { name = "black" }, { name = "boto3" }, + { name = "fsspec" }, { name = "griffe-inherited-docstrings" }, { name = "ipykernel" }, { name = "maturin" }, @@ -1344,6 +1379,7 @@ dev = [ { name = "mkdocstrings", extra = ["python"] }, { name = "pandas" }, { name = "pip" }, + { name = "pyarrow" }, { name = "pytest" }, { name = "pytest-asyncio" }, ] @@ -1354,6 +1390,7 @@ dev = [ dev = [ { name = "black", specifier = ">=24.10.0" }, { name = "boto3", specifier = ">=1.35.38" }, + { name = "fsspec", specifier = ">=2024.10.0" }, { name = "griffe-inherited-docstrings", specifier = ">=1.0.1" }, { name = "ipykernel", specifier = ">=6.29.5" }, { name = "maturin", specifier = ">=1.7.4" }, @@ -1363,6 +1400,7 @@ dev = [ { name = "mkdocstrings", extras = ["python"], specifier = ">=0.26.1" }, { name = "pandas", specifier = ">=2.2.3" }, { name = "pip", specifier = ">=24.2" }, + { name = "pyarrow", specifier = ">=17.0.0" }, { name = "pytest", specifier = ">=8.3.3" }, { name = "pytest-asyncio", specifier = ">=0.24.0" }, ]