Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyo3-object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,8 @@ serde = "1"
thiserror = "1"
url = "2"

[patch.crates-io]
object_store = { git = "https://github.com/apache/arrow-rs", rev = "74499c0e7846cfbc498bf9fd7a2c1a4c8731c897" }

[lib]
crate-type = ["rlib"]
2 changes: 1 addition & 1 deletion pyo3-object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl PyLocalStore {
) -> PyObjectStoreResult<Self> {
let fs = if let Some(prefix) = &prefix {
if mkdir {
create_dir_all(&prefix)?;
create_dir_all(prefix)?;
}
LocalFileSystem::new_with_prefix(prefix)?
} else {
Expand Down
88 changes: 8 additions & 80 deletions pyo3-object_store/src/prefix.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
//! An object store wrapper handling a constant path prefix
//! This is vendored from https://github.com/apache/arrow-rs/blob/3bf29a2c7474e59722d885cd11fafd0dca13a28e/object_store/src/prefix.rs#L4 so that we can access the raw `T` underlying the MaybePrefixedStore.
//! This was originally vendored from https://github.com/apache/arrow-rs/blob/3bf29a2c7474e59722d885cd11fafd0dca13a28e/object_store/src/prefix.rs#L4 so that we can access the raw `T` underlying the MaybePrefixedStore.
//! It was further edited to use an `Option<Path>` internally so that we can apply a
//! `MaybePrefixedStore` to all store classes.

use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use std::borrow::Cow;
use std::cell::OnceCell;
use std::ops::Range;
use std::sync::OnceLock;

use object_store::path::Path;
use object_store::{
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts,
PutOptions, PutPayload, PutResult, Result,
};

const DEFAULT_PATH: OnceCell<Path> = OnceCell::new();
static DEFAULT_PATH: OnceLock<Path> = OnceLock::new();

/// Store wrapper that applies a constant prefix to all paths handled by the store.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -169,8 +171,7 @@ impl<T: ObjectStore> ObjectStore for MaybePrefixedStore<T> {
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let binding = DEFAULT_PATH;
let prefix = self.full_path(prefix.unwrap_or(binding.get_or_init(|| Path::default())));
let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
let s = self.inner.list(Some(&prefix));
let slf_prefix = self.prefix.clone();
s.map_ok(move |meta| strip_meta(slf_prefix.as_ref(), meta))
Expand All @@ -182,18 +183,16 @@ impl<T: ObjectStore> ObjectStore for MaybePrefixedStore<T> {
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
let binding = DEFAULT_PATH;
let offset = self.full_path(offset);
let prefix = self.full_path(prefix.unwrap_or(binding.get_or_init(|| Path::default())));
let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
let s = self.inner.list_with_offset(Some(&prefix), &offset);
let slf_prefix = self.prefix.clone();
s.map_ok(move |meta| strip_meta(slf_prefix.as_ref(), meta))
.boxed()
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let binding = DEFAULT_PATH;
let prefix = self.full_path(prefix.unwrap_or(binding.get_or_init(|| Path::default())));
let prefix = self.full_path(prefix.unwrap_or(DEFAULT_PATH.get_or_init(Path::default)));
self.inner
.list_with_delimiter(Some(&prefix))
.await
Expand Down Expand Up @@ -235,74 +234,3 @@ impl<T: ObjectStore> ObjectStore for MaybePrefixedStore<T> {
self.inner.rename_if_not_exists(&full_from, &full_to).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::integration::*;
use crate::local::LocalFileSystem;

use tempfile::TempDir;

#[tokio::test]
async fn prefix_test() {
let root = TempDir::new().unwrap();
let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let integration = MaybePrefixedStore::new(inner, Some("prefix"));

put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
}

#[tokio::test]
async fn prefix_test_applies_prefix() {
let tmpdir = TempDir::new().unwrap();
let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();

let location = Path::from("prefix/test_file.json");
let data = Bytes::from("arbitrary data");

local.put(&location, data.clone().into()).await.unwrap();

let prefix = MaybePrefixedStore::new(local, Some("prefix"));
let location_prefix = Path::from("test_file.json");

let content_list = flatten_list_stream(&prefix, None).await.unwrap();
assert_eq!(content_list, &[location_prefix.clone()]);

let root = Path::from("/");
let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap();
assert_eq!(content_list, &[location_prefix.clone()]);

let read_data = prefix
.get(&location_prefix)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, data);

let target_prefix = Path::from("/test_written.json");
prefix
.put(&target_prefix, data.clone().into())
.await
.unwrap();

prefix.delete(&location_prefix).await.unwrap();

let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();

let err = local.get(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);

let location = Path::from("prefix/test_written.json");
let read_data = local.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(&*read_data, data)
}
}
6 changes: 2 additions & 4 deletions pyo3-object_store/src/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub fn from_url(
retry_config: Option<PyRetryConfig>,
kwargs: Option<Bound<PyAny>>,
) -> PyObjectStoreResult<PyObject> {
let (scheme, _) = ObjectStoreScheme::parse(&url.as_ref()).map_err(object_store::Error::from)?;
let (scheme, _) = ObjectStoreScheme::parse(url.as_ref()).map_err(object_store::Error::from)?;
match scheme {
ObjectStoreScheme::AmazonS3 => {
let store = PyS3Store::from_url(
Expand Down Expand Up @@ -99,9 +99,7 @@ pub fn from_url(
let store: PyMemoryStore = Arc::new(InMemory::new()).into();
Ok(store.into_pyobject(py)?.into_py_any(py)?)
}
scheme => {
return Err(GenericError::new_err(format!("Unknown URL scheme {:?}", scheme,)).into());
}
scheme => Err(GenericError::new_err(format!("Unknown URL scheme {:?}", scheme,)).into()),
}
}

Expand Down