Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 162 additions & 10 deletions obstore/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,70 @@ use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use futures::stream::{BoxStream, Fuse};
use futures::StreamExt;
use indexmap::IndexMap;
use object_store::list::{PaginatedListOptions, PaginatedListStore};
use object_store::path::Path;
use object_store::{ListResult, ObjectMeta, ObjectStore};
use pyo3::exceptions::{PyImportError, PyStopAsyncIteration, PyStopIteration};
use pyo3::exceptions::{PyImportError, PyStopAsyncIteration, PyStopIteration, PyValueError};
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::PyDict;
use pyo3::{intern, IntoPyObjectExt};
use pyo3::{intern, IntoPyObjectExt, PyTypeInfo};
use pyo3_arrow::export::{Arro3RecordBatch, Arro3Table};
use pyo3_arrow::PyTable;
use pyo3_async_runtimes::tokio::get_runtime;
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
use pyo3_object_store::{
PyAzureStore, PyGCSStore, PyHttpStore, PyLocalStore, PyMemoryStore, PyObjectStore,
PyObjectStoreError, PyObjectStoreResult, PyS3Store,
};
use tokio::sync::Mutex;

pub(crate) enum MaybePaginatedListStore {
SupportsPagination(Arc<dyn PaginatedListStore>),
NoPagination(Arc<dyn ObjectStore>),
}

impl<'py> FromPyObject<'py> for MaybePaginatedListStore {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
if let Ok(store) = ob.downcast::<PyS3Store>() {
Ok(Self::SupportsPagination(store.get().as_ref().clone()))
} else if let Ok(store) = ob.downcast::<PyAzureStore>() {
Ok(Self::SupportsPagination(store.get().as_ref().clone()))
} else if let Ok(store) = ob.downcast::<PyGCSStore>() {
Ok(Self::SupportsPagination(store.get().as_ref().clone()))
} else if let Ok(store) = ob.downcast::<PyHttpStore>() {
Ok(Self::NoPagination(store.get().as_ref().clone()))
} else if let Ok(store) = ob.downcast::<PyLocalStore>() {
Ok(Self::NoPagination(store.get().as_ref().clone()))
} else if let Ok(store) = ob.downcast::<PyMemoryStore>() {
Ok(Self::NoPagination(store.get().as_ref().clone()))
} else {
let py = ob.py();
// Check for object-store instance from other library
let cls_name = ob
.getattr(intern!(py, "__class__"))?
.getattr(intern!(py, "__name__"))?
.extract::<PyBackedStr>()?;
if [
PyAzureStore::NAME,
PyGCSStore::NAME,
PyHttpStore::NAME,
PyLocalStore::NAME,
PyMemoryStore::NAME,
PyS3Store::NAME,
]
.contains(&cls_name.as_ref())
{
return Err(PyValueError::new_err("You must use an object store instance exported from **the same library** as this function. They cannot be used across libraries.\nThis is because object store instances are compiled with a specific version of Rust and Python." ));
}

Err(PyValueError::new_err(format!(
"Expected an object store instance, got {}",
ob.repr()?
)))
}
}
}

pub(crate) struct PyObjectMeta(ObjectMeta);

impl PyObjectMeta {
Expand Down Expand Up @@ -352,7 +404,7 @@ impl<'py> IntoPyObject<'py> for PyListResult {
#[pyo3(signature = (store, prefix=None, *, offset=None, chunk_size=50, return_arrow=false))]
pub(crate) fn list(
py: Python,
store: PyObjectStore,
store: MaybePaginatedListStore,
prefix: Option<String>,
offset: Option<String>,
chunk_size: usize,
Expand All @@ -370,12 +422,13 @@ pub(crate) fn list(
.map_err(|err| PyImportError::new_err(format!("{msg}\n\n{err}")))?;
}

let store = store.into_inner().clone();
let prefix = prefix.map(|s| s.into());
let stream = if let Some(offset) = offset {
store.list_with_offset(prefix.as_ref(), &offset.into())
} else {
store.list(prefix.as_ref())
let stream = match store {
MaybePaginatedListStore::SupportsPagination(paginated_store) => {
create_paginated_stream(paginated_store, prefix, offset, chunk_size)
}
MaybePaginatedListStore::NoPagination(object_store) => {
create_filtered_stream(object_store, prefix, offset)
}
};
Ok(PyListStream::new(stream, chunk_size, return_arrow))
}
Expand Down Expand Up @@ -426,3 +479,102 @@ async fn list_with_delimiter_materialize(
let list_result = store.list_with_delimiter(prefix).await?;
Ok(PyListResult::new(list_result, return_arrow))
}

fn create_paginated_stream(
store: Arc<dyn PaginatedListStore>,
prefix: Option<String>,
offset: Option<String>,
chunk_size: usize,
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
// Create a stream that will fetch from the paginated store
let stream = futures::stream::unfold(
(store, prefix, offset, None, true),
move |(store, prefix, offset, page_token, has_more)| async move {
if !has_more {
return None;
}

let opts = PaginatedListOptions {
offset: offset.clone(),
delimiter: None,
max_keys: Some(chunk_size),
page_token,
..Default::default()
};

match store.list_paginated(prefix.as_deref(), opts).await {
Ok(result) => {
let next_has_more = result.page_token.is_some();
let next_page_token = result.page_token;
let objects = result.result.objects;

let next_state = (store, prefix, offset, next_page_token, next_has_more);
Some((objects, next_state))
}
Err(_e) => {
// TODO: propagate error
// For errors, return empty list and stop
Some((Vec::new(), (store, prefix, offset, None, false)))
}
}
},
)
.flat_map(|objects| futures::stream::iter(objects.into_iter().map(Ok)));

Box::pin(stream)
}

fn create_filtered_stream(
store: Arc<dyn ObjectStore>,
prefix: Option<String>,
offset: Option<String>,
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
// For substring filtering, we need to split the prefix into:
// 1. A directory prefix for efficient listing
// 2. A substring filter to apply to the results
let (list_prefix, substring_filter) = if let Some(prefix_str) = &prefix {
if let Some((dir_prefix, substring)) = prefix_str.rsplit_once('/') {
(Some(dir_prefix.to_string()), Some(substring.to_string()))
} else {
(None, Some(prefix_str.clone()))
}
} else {
(None, None)
};

let prefix_path = list_prefix.map(|s| s.into());
let base_stream = if let Some(offset) = offset {
store.list_with_offset(prefix_path.as_ref(), &offset.into())
} else {
store.list(prefix_path.as_ref())
};

// Apply substring filtering if needed
let filtered_stream = if let Some(substring) = substring_filter {
Box::pin(base_stream.filter_map(move |result| {
let substring = substring.clone();
async move {
match result {
Ok(meta) => {
// Extract filename from path for substring matching
let path_str = meta.location.as_ref();
if let Some(filename) = path_str.split('/').last() {
if filename.contains(&substring) {
Some(Ok(meta))
} else {
None
}
} else {
Some(Ok(meta))
}
}
Err(e) => Some(Err(e)),
}
}
}))
} else {
base_stream
};

filtered_stream
}
12 changes: 12 additions & 0 deletions pyo3-object_store/src/aws/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use itertools::Itertools;
use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey};
use object_store::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore};
use object_store::ObjectStoreScheme;
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
Expand Down Expand Up @@ -420,3 +421,14 @@ fn parse_url(

Ok(config)
}

#[async_trait::async_trait]
impl PaginatedListStore for PyS3Store {
async fn list_paginated(
&self,
prefix: Option<&str>,
opts: PaginatedListOptions,
) -> object_store::Result<PaginatedListResult> {
self.store.list_paginated(prefix, opts).await
}
}
12 changes: 12 additions & 0 deletions pyo3-object_store/src/azure/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use object_store::azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder};
use object_store::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore};
use object_store::ObjectStoreScheme;
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
Expand Down Expand Up @@ -470,3 +471,14 @@ fn parse_url(config: Option<PyAzureConfig>, parsed: &Url) -> object_store::Resul

Ok(config)
}

#[async_trait::async_trait]
impl PaginatedListStore for PyAzureStore {
async fn list_paginated(
&self,
prefix: Option<&str>,
opts: PaginatedListOptions,
) -> object_store::Result<PaginatedListResult> {
self.store.list_paginated(prefix, opts).await
}
}
12 changes: 12 additions & 0 deletions pyo3-object_store/src/gcp/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder, GoogleConfigKey};
use object_store::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore};
use object_store::ObjectStoreScheme;
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
Expand Down Expand Up @@ -374,3 +375,14 @@ fn parse_url(config: Option<PyGoogleConfig>, parsed: &Url) -> object_store::Resu

Ok(config)
}

#[async_trait::async_trait]
impl PaginatedListStore for PyGCSStore {
async fn list_paginated(
&self,
prefix: Option<&str>,
opts: PaginatedListOptions,
) -> object_store::Result<PaginatedListResult> {
self.store.list_paginated(prefix, opts).await
}
}
66 changes: 65 additions & 1 deletion pyo3-object_store/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::fs::create_dir_all;
use std::sync::Arc;

use object_store::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore};
use object_store::local::LocalFileSystem;
use object_store::ObjectStoreScheme;
use object_store::{ListResult, ObjectStore, ObjectStoreScheme};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyTuple, PyType};
Expand Down Expand Up @@ -141,3 +142,66 @@ impl PyLocalStore {
}
}
}

/// A custom implementation of PaginatedListStore for LocalFileSystem
///
/// PaginatedListStore is not implemented in upstream for LocalFileSystem because there's no way to
/// get a stable offset in local FS APIs.
/// https://github.com/apache/arrow-rs-object-store/issues/388
///
/// Instead, we collect _all_ results and filter them in memory with the provided substring.
#[async_trait::async_trait]
impl PaginatedListStore for PyLocalStore {
async fn list_paginated(
&self,
prefix: Option<&str>,
_opts: PaginatedListOptions,
) -> object_store::Result<PaginatedListResult> {
// Split a path like "some/prefix/abc" into (Some(Path("some/prefix")), Some("abc"))
// This allows us to do a substring prefix match after the / delimiter
let (list_path, list_prefix_match): (Option<object_store::path::Path>, Option<String>) =
if let Some(list_prefix) = prefix {
if let Some((list_path, list_prefix_match)) = list_prefix.rsplit_once('/') {
// There's a / in the prefix, so we assume the part before the last / is a
// path, and the end is a substring match
(
Some(object_store::path::Path::parse(list_path)?),
Some(list_prefix_match.to_string()),
)
} else {
// No / in prefix, so we assume it's a substring
(None, Some(list_prefix.to_string()))
}
} else {
(None, None)
};

let list_result = self.store.list_with_delimiter(list_path.as_ref()).await?;

// Filter list result to include only results with the given prefix after the / delimiter
let filtered_list_result = if let Some(list_prefix_match) = list_prefix_match {
let filtered_common_prefixes = list_result
.common_prefixes
.into_iter()
.filter(|p| p.as_ref().starts_with(&list_prefix_match))
.collect();
let filtered_objects = list_result
.objects
.into_iter()
.filter(|obj| obj.location.as_ref().starts_with(&list_prefix_match))
.collect();
ListResult {
common_prefixes: filtered_common_prefixes,
objects: filtered_objects,
}
} else {
list_result
};

Ok(PaginatedListResult {
result: filtered_list_result,
// Local FS does not support pagination
page_token: None,
})
}
}
Loading
Loading