diff --git a/obstore/src/list.rs b/obstore/src/list.rs index 9ba9568c..20755a86 100644 --- a/obstore/src/list.rs +++ b/obstore/src/list.rs @@ -8,18 +8,70 @@ use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use futures::stream::{BoxStream, Fuse}; use futures::StreamExt; use indexmap::IndexMap; +use object_store::list::{PaginatedListOptions, PaginatedListStore}; use object_store::path::Path; use object_store::{ListResult, ObjectMeta, ObjectStore}; -use pyo3::exceptions::{PyImportError, PyStopAsyncIteration, PyStopIteration}; +use pyo3::exceptions::{PyImportError, PyStopAsyncIteration, PyStopIteration, PyValueError}; use pyo3::prelude::*; +use pyo3::pybacked::PyBackedStr; use pyo3::types::PyDict; -use pyo3::{intern, IntoPyObjectExt}; +use pyo3::{intern, IntoPyObjectExt, PyTypeInfo}; use pyo3_arrow::export::{Arro3RecordBatch, Arro3Table}; use pyo3_arrow::PyTable; use pyo3_async_runtimes::tokio::get_runtime; -use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult}; +use pyo3_object_store::{ + PyAzureStore, PyGCSStore, PyHttpStore, PyLocalStore, PyMemoryStore, PyObjectStore, + PyObjectStoreError, PyObjectStoreResult, PyS3Store, +}; use tokio::sync::Mutex; +pub(crate) enum MaybePaginatedListStore { + SupportsPagination(Arc), + NoPagination(Arc), +} + +impl<'py> FromPyObject<'py> for MaybePaginatedListStore { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + if let Ok(store) = ob.downcast::() { + Ok(Self::SupportsPagination(store.get().as_ref().clone())) + } else if let Ok(store) = ob.downcast::() { + Ok(Self::SupportsPagination(store.get().as_ref().clone())) + } else if let Ok(store) = ob.downcast::() { + Ok(Self::SupportsPagination(store.get().as_ref().clone())) + } else if let Ok(store) = ob.downcast::() { + Ok(Self::NoPagination(store.get().as_ref().clone())) + } else if let Ok(store) = ob.downcast::() { + Ok(Self::NoPagination(store.get().as_ref().clone())) + } else if let Ok(store) = ob.downcast::() { + Ok(Self::NoPagination(store.get().as_ref().clone())) + } else { + let py = ob.py(); + // Check for object-store instance from other library + let cls_name = ob + .getattr(intern!(py, "__class__"))? + .getattr(intern!(py, "__name__"))? + .extract::()?; + if [ + PyAzureStore::NAME, + PyGCSStore::NAME, + PyHttpStore::NAME, + PyLocalStore::NAME, + PyMemoryStore::NAME, + PyS3Store::NAME, + ] + .contains(&cls_name.as_ref()) + { + return Err(PyValueError::new_err("You must use an object store instance exported from **the same library** as this function. They cannot be used across libraries.\nThis is because object store instances are compiled with a specific version of Rust and Python." )); + } + + Err(PyValueError::new_err(format!( + "Expected an object store instance, got {}", + ob.repr()? + ))) + } + } +} + pub(crate) struct PyObjectMeta(ObjectMeta); impl PyObjectMeta { @@ -352,7 +404,7 @@ impl<'py> IntoPyObject<'py> for PyListResult { #[pyo3(signature = (store, prefix=None, *, offset=None, chunk_size=50, return_arrow=false))] pub(crate) fn list( py: Python, - store: PyObjectStore, + store: MaybePaginatedListStore, prefix: Option, offset: Option, chunk_size: usize, @@ -370,12 +422,13 @@ pub(crate) fn list( .map_err(|err| PyImportError::new_err(format!("{msg}\n\n{err}")))?; } - let store = store.into_inner().clone(); - let prefix = prefix.map(|s| s.into()); - let stream = if let Some(offset) = offset { - store.list_with_offset(prefix.as_ref(), &offset.into()) - } else { - store.list(prefix.as_ref()) + let stream = match store { + MaybePaginatedListStore::SupportsPagination(paginated_store) => { + create_paginated_stream(paginated_store, prefix, offset, chunk_size) + } + MaybePaginatedListStore::NoPagination(object_store) => { + create_filtered_stream(object_store, prefix, offset) + } }; Ok(PyListStream::new(stream, chunk_size, return_arrow)) } @@ -426,3 +479,102 @@ async fn list_with_delimiter_materialize( let list_result = store.list_with_delimiter(prefix).await?; Ok(PyListResult::new(list_result, return_arrow)) } + +fn create_paginated_stream( + store: Arc, + prefix: Option, + offset: Option, + chunk_size: usize, +) -> BoxStream<'static, object_store::Result> { + // Create a stream that will fetch from the paginated store + let stream = futures::stream::unfold( + (store, prefix, offset, None, true), + move |(store, prefix, offset, page_token, has_more)| async move { + if !has_more { + return None; + } + + let opts = PaginatedListOptions { + offset: offset.clone(), + delimiter: None, + max_keys: Some(chunk_size), + page_token, + ..Default::default() + }; + + match store.list_paginated(prefix.as_deref(), opts).await { + Ok(result) => { + let next_has_more = result.page_token.is_some(); + let next_page_token = result.page_token; + let objects = result.result.objects; + + let next_state = (store, prefix, offset, next_page_token, next_has_more); + Some((objects, next_state)) + } + Err(_e) => { + // TODO: propagate error + // For errors, return empty list and stop + Some((Vec::new(), (store, prefix, offset, None, false))) + } + } + }, + ) + .flat_map(|objects| futures::stream::iter(objects.into_iter().map(Ok))); + + Box::pin(stream) +} + +fn create_filtered_stream( + store: Arc, + prefix: Option, + offset: Option, +) -> BoxStream<'static, object_store::Result> { + // For substring filtering, we need to split the prefix into: + // 1. A directory prefix for efficient listing + // 2. A substring filter to apply to the results + let (list_prefix, substring_filter) = if let Some(prefix_str) = &prefix { + if let Some((dir_prefix, substring)) = prefix_str.rsplit_once('/') { + (Some(dir_prefix.to_string()), Some(substring.to_string())) + } else { + (None, Some(prefix_str.clone())) + } + } else { + (None, None) + }; + + let prefix_path = list_prefix.map(|s| s.into()); + let base_stream = if let Some(offset) = offset { + store.list_with_offset(prefix_path.as_ref(), &offset.into()) + } else { + store.list(prefix_path.as_ref()) + }; + + // Apply substring filtering if needed + let filtered_stream = if let Some(substring) = substring_filter { + Box::pin(base_stream.filter_map(move |result| { + let substring = substring.clone(); + async move { + match result { + Ok(meta) => { + // Extract filename from path for substring matching + let path_str = meta.location.as_ref(); + if let Some(filename) = path_str.split('/').last() { + if filename.contains(&substring) { + Some(Ok(meta)) + } else { + None + } + } else { + Some(Ok(meta)) + } + } + Err(e) => Some(Err(e)), + } + } + })) + } else { + base_stream + }; + + filtered_stream +} diff --git a/pyo3-object_store/src/aws/store.rs b/pyo3-object_store/src/aws/store.rs index f2c36d9d..d4d30626 100644 --- a/pyo3-object_store/src/aws/store.rs +++ b/pyo3-object_store/src/aws/store.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use itertools::Itertools; use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey}; +use object_store::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore}; use object_store::ObjectStoreScheme; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; @@ -420,3 +421,14 @@ fn parse_url( Ok(config) } + +#[async_trait::async_trait] +impl PaginatedListStore for PyS3Store { + async fn list_paginated( + &self, + prefix: Option<&str>, + opts: PaginatedListOptions, + ) -> object_store::Result { + self.store.list_paginated(prefix, opts).await + } +} diff --git a/pyo3-object_store/src/azure/store.rs b/pyo3-object_store/src/azure/store.rs index e5a9fcfe..8a5f6781 100644 --- a/pyo3-object_store/src/azure/store.rs +++ b/pyo3-object_store/src/azure/store.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use object_store::azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder}; +use object_store::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore}; use object_store::ObjectStoreScheme; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; @@ -470,3 +471,14 @@ fn parse_url(config: Option, parsed: &Url) -> object_store::Resul Ok(config) } + +#[async_trait::async_trait] +impl PaginatedListStore for PyAzureStore { + async fn list_paginated( + &self, + prefix: Option<&str>, + opts: PaginatedListOptions, + ) -> object_store::Result { + self.store.list_paginated(prefix, opts).await + } +} diff --git a/pyo3-object_store/src/gcp/store.rs b/pyo3-object_store/src/gcp/store.rs index 0b52b971..b5718fd0 100644 --- a/pyo3-object_store/src/gcp/store.rs +++ b/pyo3-object_store/src/gcp/store.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder, GoogleConfigKey}; +use object_store::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore}; use object_store::ObjectStoreScheme; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; @@ -374,3 +375,14 @@ fn parse_url(config: Option, parsed: &Url) -> object_store::Resu Ok(config) } + +#[async_trait::async_trait] +impl PaginatedListStore for PyGCSStore { + async fn list_paginated( + &self, + prefix: Option<&str>, + opts: PaginatedListOptions, + ) -> object_store::Result { + self.store.list_paginated(prefix, opts).await + } +} diff --git a/pyo3-object_store/src/local.rs b/pyo3-object_store/src/local.rs index 692cd9c1..740a42bc 100644 --- a/pyo3-object_store/src/local.rs +++ b/pyo3-object_store/src/local.rs @@ -1,8 +1,9 @@ use std::fs::create_dir_all; use std::sync::Arc; +use object_store::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore}; use object_store::local::LocalFileSystem; -use object_store::ObjectStoreScheme; +use object_store::{ListResult, ObjectStore, ObjectStoreScheme}; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::types::{PyDict, PyTuple, PyType}; @@ -141,3 +142,66 @@ impl PyLocalStore { } } } + +/// A custom implementation of PaginatedListStore for LocalFileSystem +/// +/// PaginatedListStore is not implemented in upstream for LocalFileSystem because there's no way to +/// get a stable offset in local FS APIs. +/// https://github.com/apache/arrow-rs-object-store/issues/388 +/// +/// Instead, we collect _all_ results and filter them in memory with the provided substring. +#[async_trait::async_trait] +impl PaginatedListStore for PyLocalStore { + async fn list_paginated( + &self, + prefix: Option<&str>, + _opts: PaginatedListOptions, + ) -> object_store::Result { + // Split a path like "some/prefix/abc" into (Some(Path("some/prefix")), Some("abc")) + // This allows us to do a substring prefix match after the / delimiter + let (list_path, list_prefix_match): (Option, Option) = + if let Some(list_prefix) = prefix { + if let Some((list_path, list_prefix_match)) = list_prefix.rsplit_once('/') { + // There's a / in the prefix, so we assume the part before the last / is a + // path, and the end is a substring match + ( + Some(object_store::path::Path::parse(list_path)?), + Some(list_prefix_match.to_string()), + ) + } else { + // No / in prefix, so we assume it's a substring + (None, Some(list_prefix.to_string())) + } + } else { + (None, None) + }; + + let list_result = self.store.list_with_delimiter(list_path.as_ref()).await?; + + // Filter list result to include only results with the given prefix after the / delimiter + let filtered_list_result = if let Some(list_prefix_match) = list_prefix_match { + let filtered_common_prefixes = list_result + .common_prefixes + .into_iter() + .filter(|p| p.as_ref().starts_with(&list_prefix_match)) + .collect(); + let filtered_objects = list_result + .objects + .into_iter() + .filter(|obj| obj.location.as_ref().starts_with(&list_prefix_match)) + .collect(); + ListResult { + common_prefixes: filtered_common_prefixes, + objects: filtered_objects, + } + } else { + list_result + }; + + Ok(PaginatedListResult { + result: filtered_list_result, + // Local FS does not support pagination + page_token: None, + }) + } +} diff --git a/pyo3-object_store/src/prefix.rs b/pyo3-object_store/src/prefix.rs index 3c376afa..60e006f5 100644 --- a/pyo3-object_store/src/prefix.rs +++ b/pyo3-object_store/src/prefix.rs @@ -5,6 +5,10 @@ use bytes::Bytes; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; +use object_store::aws::AmazonS3; +use object_store::azure::MicrosoftAzure; +use object_store::gcp::GoogleCloudStorage; +use object_store::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore}; use std::borrow::Cow; use std::ops::Range; use std::sync::OnceLock; @@ -58,58 +62,47 @@ impl MaybePrefixedStore { Cow::Borrowed(location) } } - - /// Strip the constant prefix from a given path - fn strip_prefix(&self, path: Path) -> Path { - if let Some(prefix) = &self.prefix { - // Note cannot use match because of borrow checker - if let Some(suffix) = path.prefix_match(prefix) { - return suffix.collect(); - } - path - } else { - path - } - } - - /// Strip the constant prefix from a given ObjectMeta - fn strip_meta(&self, meta: ObjectMeta) -> ObjectMeta { - ObjectMeta { - last_modified: meta.last_modified, - size: meta.size, - location: self.strip_prefix(meta.location), - e_tag: meta.e_tag, - version: None, - } - } } -// Note: This is a relative hack to move these two functions to pure functions so they don't rely -// on the `self` lifetime. Expected to be cleaned up before merge. -// /// Strip the constant prefix from a given path -fn strip_prefix(prefix: &Path, path: Path) -> Path { - // Note cannot use match because of borrow checker - if let Some(suffix) = path.prefix_match(prefix) { - return suffix.collect(); +fn strip_prefix(prefix: Option<&Path>, path: Path) -> Path { + if let Some(prefix) = prefix { + // Note cannot use match because of borrow checker + if let Some(suffix) = path.prefix_match(prefix) { + return suffix.collect(); + } + path + } else { + path } - path } /// Strip the constant prefix from a given ObjectMeta fn strip_meta(prefix: Option<&Path>, meta: ObjectMeta) -> ObjectMeta { - if let Some(prefix) = prefix { - ObjectMeta { - last_modified: meta.last_modified, - size: meta.size, - location: strip_prefix(prefix, meta.location), - e_tag: meta.e_tag, - version: None, - } - } else { - meta + ObjectMeta { + last_modified: meta.last_modified, + size: meta.size, + location: strip_prefix(prefix, meta.location), + e_tag: meta.e_tag, + version: None, } } + +fn strip_list_result(prefix: Option<&Path>, lst: ListResult) -> ListResult { + ListResult { + common_prefixes: lst + .common_prefixes + .into_iter() + .map(|p| strip_prefix(prefix, p)) + .collect(), + objects: lst + .objects + .into_iter() + .map(|meta| strip_meta(prefix, meta)) + .collect(), + } +} + #[async_trait::async_trait] impl ObjectStore for MaybePrefixedStore { async fn put(&self, location: &Path, payload: PutPayload) -> Result { @@ -166,7 +159,7 @@ impl ObjectStore for MaybePrefixedStore { async fn head(&self, location: &Path) -> Result { let full_path = self.full_path(location); let meta = self.inner.head(&full_path).await?; - Ok(self.strip_meta(meta)) + Ok(strip_meta(self.prefix.as_ref(), meta)) } async fn delete(&self, location: &Path) -> Result<()> { @@ -200,18 +193,7 @@ impl ObjectStore for MaybePrefixedStore { self.inner .list_with_delimiter(Some(&prefix)) .await - .map(|lst| ListResult { - common_prefixes: lst - .common_prefixes - .into_iter() - .map(|p| self.strip_prefix(p)) - .collect(), - objects: lst - .objects - .into_iter() - .map(|meta| self.strip_meta(meta)) - .collect(), - }) + .map(|lst| strip_list_result(self.prefix.as_ref(), lst)) } async fn copy(&self, from: &Path, to: &Path) -> Result<()> { @@ -238,3 +220,83 @@ impl ObjectStore for MaybePrefixedStore { self.inner.rename_if_not_exists(&full_from, &full_to).await } } + +fn create_paginated_list_prefix<'a>( + store_prefix: Option<&'a Path>, + list_prefix: Option<&'a str>, + delimiter: Option<&Cow<'static, str>>, +) -> Option> { + match (store_prefix, list_prefix) { + (None, None) => None, + (Some(store_prefix), None) => Some(Cow::Borrowed(store_prefix.as_ref())), + (None, Some(list_prefix)) => Some(Cow::Borrowed(list_prefix)), + (Some(store_prefix), Some(list_prefix)) => { + let delimiter = delimiter.map(|x| x.as_ref()).unwrap_or("/"); + let combined = format!("{}{delimiter}{list_prefix}", store_prefix.as_ref()); + Some(Cow::Owned(combined)) + } + } +} + +#[async_trait::async_trait] +impl PaginatedListStore for MaybePrefixedStore { + async fn list_paginated( + &self, + prefix: Option<&str>, + opts: PaginatedListOptions, + ) -> Result { + let store_prefix = self.prefix.as_ref(); + let list_prefix = + create_paginated_list_prefix(store_prefix, prefix, opts.delimiter.as_ref()); + let lst = self + .inner + .list_paginated(list_prefix.as_deref(), opts) + .await?; + Ok(PaginatedListResult { + result: strip_list_result(store_prefix, lst.result), + page_token: lst.page_token, + }) + } +} + +#[async_trait::async_trait] +impl PaginatedListStore for MaybePrefixedStore { + async fn list_paginated( + &self, + prefix: Option<&str>, + opts: PaginatedListOptions, + ) -> Result { + let store_prefix = self.prefix.as_ref(); + let list_prefix = + create_paginated_list_prefix(store_prefix, prefix, opts.delimiter.as_ref()); + let lst = self + .inner + .list_paginated(list_prefix.as_deref(), opts) + .await?; + Ok(PaginatedListResult { + result: strip_list_result(store_prefix, lst.result), + page_token: lst.page_token, + }) + } +} + +#[async_trait::async_trait] +impl PaginatedListStore for MaybePrefixedStore { + async fn list_paginated( + &self, + prefix: Option<&str>, + opts: PaginatedListOptions, + ) -> Result { + let store_prefix = self.prefix.as_ref(); + let list_prefix = + create_paginated_list_prefix(store_prefix, prefix, opts.delimiter.as_ref()); + let lst = self + .inner + .list_paginated(list_prefix.as_deref(), opts) + .await?; + Ok(PaginatedListResult { + result: strip_list_result(store_prefix, lst.result), + page_token: lst.page_token, + }) + } +} diff --git a/tests/test_list.py b/tests/test_list.py index a151d760..03f12128 100644 --- a/tests/test_list.py +++ b/tests/test_list.py @@ -1,9 +1,12 @@ +import tempfile +from pathlib import Path + import polars as pl import pyarrow as pa import pytest from arro3.core import RecordBatch, Table -from obstore.store import MemoryStore +from obstore.store import LocalStore, MemoryStore def test_list(): @@ -130,6 +133,64 @@ async def test_list_with_delimiter_async(): assert objects["path"][1].as_py() == "a/file2.txt" +def test_list_substring_filtering(): + store = MemoryStore() + + # Add files with various patterns + store.put("data/file1.txt", b"foo") + store.put("data/test_file.txt", b"bar") + store.put("data/another.csv", b"baz") + store.put("data/test_data.json", b"qux") + store.put("logs/test_log.txt", b"log") + + # Test substring filtering for files containing "test" + result = store.list("data/test").collect() + paths = [item["path"] for item in result] + + # Should match files with "test" in the filename within data/ directory + assert "data/test_file.txt" in paths + assert "data/test_data.json" in paths + assert "data/file1.txt" not in paths + assert "data/another.csv" not in paths + assert "logs/test_log.txt" not in paths + + # Test with arrow format + stream = store.list("data/test", return_arrow=True) + batch = stream.collect() + assert isinstance(batch, RecordBatch) + assert batch.num_rows == 2 + + +def test_list_substring_filtering_local_store(): + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir_path = Path(temp_dir) + store = LocalStore(temp_dir_path) + + # Create directory structure + data_dir = temp_dir_path / "data" + data_dir.mkdir(parents=True, exist_ok=True) + + # Write test files + with (data_dir / "file1.txt").open("w") as f: + f.write("foo") + with (data_dir / "test_file.txt").open("w") as f: + f.write("bar") + with (data_dir / "another.csv").open("w") as f: + f.write("baz") + with (data_dir / "test_data.json").open("w") as f: + f.write("qux") + + # Test substring filtering for files containing "test" + result = store.list("data/test").collect() + paths = [item["path"] for item in result] + + # Should match files with "test" in the filename within data/ directory + assert "data/test_file.txt" in paths + assert "data/test_data.json" in paths + assert "data/file1.txt" not in paths + assert "data/another.csv" not in paths + + def test_list_as_arrow_to_polars(): store = MemoryStore()