Skip to content

Commit cddd07e

Browse files
authored
Add option to return objects as Arrow in list_with_delimiter (#238)
* Return ListResult as Arrow * Add tests for list_with_delimiter
1 parent 01b1aef commit cddd07e

File tree

7 files changed

+165
-53
lines changed

7 files changed

+165
-53
lines changed

obstore/python/obstore/_list.pyi

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class ObjectMeta(TypedDict):
2626
version: str | None
2727
"""A version indicator for this object"""
2828

29-
class ListResult(TypedDict):
29+
class ListResult(TypedDict, Generic[ChunkType]):
3030
"""
3131
Result of a list call that includes objects, prefixes (directories) and a token for
3232
the next set of results. Individual result sets may be limited to 1,000 objects
@@ -36,7 +36,7 @@ class ListResult(TypedDict):
3636
common_prefixes: List[str]
3737
"""Prefixes that are common (like directories)"""
3838

39-
objects: List[ObjectMeta]
39+
objects: ChunkType
4040
"""Object metadata for the listing"""
4141

4242
ChunkType = TypeVar("ChunkType", List[ObjectMeta], RecordBatch)
@@ -188,7 +188,26 @@ def list(
188188
A ListStream, which you can iterate through to access list results.
189189
"""
190190

191-
def list_with_delimiter(store: ObjectStore, prefix: str | None = None) -> ListResult:
191+
@overload
192+
def list_with_delimiter(
193+
store: ObjectStore,
194+
prefix: str | None = None,
195+
*,
196+
return_arrow: Literal[True],
197+
) -> ListResult[RecordBatch]: ...
198+
@overload
199+
def list_with_delimiter(
200+
store: ObjectStore,
201+
prefix: str | None = None,
202+
*,
203+
return_arrow: Literal[False] = False,
204+
) -> ListResult[List[ObjectMeta]]: ...
205+
def list_with_delimiter(
206+
store: ObjectStore,
207+
prefix: str | None = None,
208+
*,
209+
return_arrow: bool = False,
210+
) -> ListResult[RecordBatch] | ListResult[List[ObjectMeta]]:
192211
"""
193212
List objects with the given prefix and an implementation specific
194213
delimiter. Returns common prefixes (directories) in addition to object
@@ -209,9 +228,26 @@ def list_with_delimiter(store: ObjectStore, prefix: str | None = None) -> ListRe
209228
ListResult
210229
"""
211230

231+
@overload
212232
async def list_with_delimiter_async(
213-
store: ObjectStore, prefix: str | None = None
214-
) -> ListResult:
233+
store: ObjectStore,
234+
prefix: str | None = None,
235+
*,
236+
return_arrow: Literal[True],
237+
) -> ListResult[RecordBatch]: ...
238+
@overload
239+
async def list_with_delimiter_async(
240+
store: ObjectStore,
241+
prefix: str | None = None,
242+
*,
243+
return_arrow: Literal[False] = False,
244+
) -> ListResult[List[ObjectMeta]]: ...
245+
async def list_with_delimiter_async(
246+
store: ObjectStore,
247+
prefix: str | None = None,
248+
*,
249+
return_arrow: bool = False,
250+
) -> ListResult[RecordBatch] | ListResult[List[ObjectMeta]]:
215251
"""Call `list_with_delimiter` asynchronously.
216252
217253
Refer to the documentation for

obstore/src/list.rs

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ use indexmap::IndexMap;
1111
use object_store::path::Path;
1212
use object_store::{ListResult, ObjectMeta, ObjectStore};
1313
use pyo3::exceptions::{PyImportError, PyStopAsyncIteration, PyStopIteration};
14-
use pyo3::intern;
1514
use pyo3::prelude::*;
1615
use pyo3::types::PyDict;
16+
use pyo3::{intern, IntoPyObjectExt};
1717
use pyo3_arrow::PyRecordBatch;
1818
use pyo3_object_store::{get_runtime, PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
1919
use tokio::sync::Mutex;
@@ -47,17 +47,11 @@ impl<'py> IntoPyObject<'py> for PyObjectMeta {
4747
let mut dict = IndexMap::with_capacity(5);
4848
// Note, this uses "path" instead of "location" because we standardize the API to accept
4949
// the keyword "path" everywhere.
50-
dict.insert(
51-
"path",
52-
self.0.location.as_ref().into_pyobject(py)?.into_any(),
53-
);
54-
dict.insert(
55-
"last_modified",
56-
self.0.last_modified.into_pyobject(py)?.into_any(),
57-
);
58-
dict.insert("size", self.0.size.into_pyobject(py)?.into_any());
59-
dict.insert("e_tag", self.0.e_tag.into_pyobject(py)?.into_any());
60-
dict.insert("version", self.0.version.into_pyobject(py)?);
50+
dict.insert("path", self.0.location.as_ref().into_bound_py_any(py)?);
51+
dict.insert("last_modified", self.0.last_modified.into_bound_py_any(py)?);
52+
dict.insert("size", self.0.size.into_bound_py_any(py)?);
53+
dict.insert("e_tag", self.0.e_tag.into_bound_py_any(py)?);
54+
dict.insert("version", self.0.version.into_bound_py_any(py)?);
6155
dict.into_pyobject(py)
6256
}
6357
}
@@ -317,7 +311,19 @@ fn object_meta_to_arrow(metas: &[PyObjectMeta]) -> PyRecordBatchWrapper {
317311
PyRecordBatchWrapper::new(batch)
318312
}
319313

320-
pub(crate) struct PyListResult(ListResult);
314+
pub(crate) struct PyListResult {
315+
result: ListResult,
316+
return_arrow: bool,
317+
}
318+
319+
impl PyListResult {
320+
fn new(result: ListResult, return_arrow: bool) -> Self {
321+
Self {
322+
result,
323+
return_arrow,
324+
}
325+
}
326+
}
321327

322328
impl<'py> IntoPyObject<'py> for PyListResult {
323329
type Target = PyDict;
@@ -328,24 +334,25 @@ impl<'py> IntoPyObject<'py> for PyListResult {
328334
let mut dict = IndexMap::with_capacity(2);
329335
dict.insert(
330336
"common_prefixes",
331-
self.0
337+
self.result
332338
.common_prefixes
333339
.into_iter()
334340
.map(String::from)
335341
.collect::<Vec<_>>()
336-
.into_pyobject(py)?
337-
.into_any(),
338-
);
339-
dict.insert(
340-
"objects",
341-
self.0
342-
.objects
343-
.into_iter()
344-
.map(PyObjectMeta)
345-
.collect::<Vec<_>>()
346-
.into_pyobject(py)?
347-
.into_any(),
342+
.into_bound_py_any(py)?,
348343
);
344+
let objects = self
345+
.result
346+
.objects
347+
.into_iter()
348+
.map(PyObjectMeta)
349+
.collect::<Vec<_>>();
350+
let objects = if self.return_arrow {
351+
object_meta_to_arrow(&objects).into_bound_py_any(py)
352+
} else {
353+
objects.into_bound_py_any(py)
354+
}?;
355+
dict.insert("objects", objects);
349356
dict.into_pyobject(py)
350357
}
351358
}
@@ -383,41 +390,48 @@ pub(crate) fn list(
383390
}
384391

385392
#[pyfunction]
386-
#[pyo3(signature = (store, prefix=None))]
393+
#[pyo3(signature = (store, prefix=None, *, return_arrow=false))]
387394
pub(crate) fn list_with_delimiter(
388395
py: Python,
389396
store: PyObjectStore,
390397
prefix: Option<String>,
398+
return_arrow: bool,
391399
) -> PyObjectStoreResult<PyListResult> {
392400
let runtime = get_runtime(py)?;
393401
py.allow_threads(|| {
394402
let out = runtime.block_on(list_with_delimiter_materialize(
395403
store.into_inner(),
396404
prefix.map(|s| s.into()).as_ref(),
405+
return_arrow,
397406
))?;
398407
Ok::<_, PyObjectStoreError>(out)
399408
})
400409
}
401410

402411
#[pyfunction]
403-
#[pyo3(signature = (store, prefix=None))]
412+
#[pyo3(signature = (store, prefix=None, *, return_arrow=false))]
404413
pub(crate) fn list_with_delimiter_async(
405414
py: Python,
406415
store: PyObjectStore,
407416
prefix: Option<String>,
417+
return_arrow: bool,
408418
) -> PyResult<Bound<PyAny>> {
409419
pyo3_async_runtimes::tokio::future_into_py(py, async move {
410-
let out =
411-
list_with_delimiter_materialize(store.into_inner(), prefix.map(|s| s.into()).as_ref())
412-
.await?;
420+
let out = list_with_delimiter_materialize(
421+
store.into_inner(),
422+
prefix.map(|s| s.into()).as_ref(),
423+
return_arrow,
424+
)
425+
.await?;
413426
Ok(out)
414427
})
415428
}
416429

417430
async fn list_with_delimiter_materialize(
418431
store: Arc<dyn ObjectStore>,
419432
prefix: Option<&Path>,
433+
return_arrow: bool,
420434
) -> PyObjectStoreResult<PyListResult> {
421435
let list_result = store.list_with_delimiter(prefix).await?;
422-
Ok(PyListResult(list_result))
436+
Ok(PyListResult::new(list_result, return_arrow))
423437
}

obstore/src/put.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ use object_store::{
1212
WriteMultipart,
1313
};
1414
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
15-
use pyo3::intern;
1615
use pyo3::prelude::*;
1716
use pyo3::pybacked::PyBackedStr;
1817
use pyo3::types::PyDict;
18+
use pyo3::{intern, IntoPyObjectExt};
1919
use pyo3_bytes::PyBytes;
2020
use pyo3_file::PyFileLikeObject;
2121
use pyo3_object_store::{get_runtime, PyObjectStore, PyObjectStoreResult};
@@ -283,8 +283,8 @@ impl<'py> IntoPyObject<'py> for PyPutResult {
283283

284284
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
285285
let mut dict = IndexMap::with_capacity(2);
286-
dict.insert("e_tag", self.0.e_tag.into_pyobject(py)?.into_any());
287-
dict.insert("version", self.0.version.into_pyobject(py)?.into_any());
286+
dict.insert("e_tag", self.0.e_tag.into_bound_py_any(py)?);
287+
dict.insert("version", self.0.version.into_bound_py_any(py)?);
288288
dict.into_pyobject(py)
289289
}
290290
}

pyo3-object_store/src/http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ struct HTTPConfig {
1717

1818
impl HTTPConfig {
1919
fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
20-
let args = PyTuple::new(py, vec![self.url.clone().into_pyobject(py)?])?.into_py_any(py)?;
20+
let args = PyTuple::new(py, vec![self.url.clone()])?.into_py_any(py)?;
2121
let kwargs = PyDict::new(py);
2222

2323
if let Some(client_options) = &self.client_options {

pyo3-object_store/src/local.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ struct LocalConfig {
2020

2121
impl LocalConfig {
2222
fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
23-
let args =
24-
PyTuple::new(py, vec![self.prefix.clone().into_pyobject(py)?])?.into_py_any(py)?;
23+
let args = PyTuple::new(py, vec![self.prefix.clone()])?.into_py_any(py)?;
2524
let kwargs = PyDict::new(py);
2625
kwargs.set_item(intern!(py, "automatic_cleanup"), self.automatic_cleanup)?;
2726
kwargs.set_item(intern!(py, "mkdir"), self.mkdir)?;
@@ -119,10 +118,8 @@ impl PyLocalStore {
119118
// So we manually convert to a pathlib.Path
120119
if let Some(prefix) = &self.config.prefix {
121120
let pathlib_mod = py.import(intern!(py, "pathlib"))?;
122-
let path_object = pathlib_mod.call_method1(
123-
intern!(py, "Path"),
124-
PyTuple::new(py, vec![prefix.into_pyobject(py)?])?,
125-
)?;
121+
let path_object =
122+
pathlib_mod.call_method1(intern!(py, "Path"), PyTuple::new(py, vec![prefix])?)?;
126123
path_object.into_py_any(py)
127124
} else {
128125
Ok(py.None())

pyo3-object_store/src/simple.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub fn from_url(
3939
retry_config,
4040
kwargs.map(|x| x.extract()).transpose()?,
4141
)?;
42-
Ok(store.into_pyobject(py)?.into_py_any(py)?)
42+
Ok(store.into_py_any(py)?)
4343
}
4444
ObjectStoreScheme::GoogleCloudStorage => {
4545
let store = PyGCSStore::from_url(
@@ -50,7 +50,7 @@ pub fn from_url(
5050
retry_config,
5151
kwargs.map(|x| x.extract()).transpose()?,
5252
)?;
53-
Ok(store.into_pyobject(py)?.into_py_any(py)?)
53+
Ok(store.into_py_any(py)?)
5454
}
5555
ObjectStoreScheme::MicrosoftAzure => {
5656
let store = PyAzureStore::from_url(
@@ -61,7 +61,7 @@ pub fn from_url(
6161
retry_config,
6262
kwargs.map(|x| x.extract()).transpose()?,
6363
)?;
64-
Ok(store.into_pyobject(py)?.into_py_any(py)?)
64+
Ok(store.into_py_any(py)?)
6565
}
6666
ObjectStoreScheme::Http => {
6767
raise_if_config_passed(config, kwargs, "http")?;
@@ -71,7 +71,7 @@ pub fn from_url(
7171
client_options,
7272
retry_config,
7373
)?;
74-
Ok(store.into_pyobject(py)?.into_py_any(py)?)
74+
Ok(store.into_py_any(py)?)
7575
}
7676
ObjectStoreScheme::Local => {
7777
let mut automatic_cleanup = false;
@@ -92,12 +92,12 @@ pub fn from_url(
9292
automatic_cleanup,
9393
mkdir,
9494
)?;
95-
Ok(store.into_pyobject(py)?.into_py_any(py)?)
95+
Ok(store.into_py_any(py)?)
9696
}
9797
ObjectStoreScheme::Memory => {
9898
raise_if_config_passed(config, kwargs, "memory")?;
9999
let store: PyMemoryStore = Arc::new(InMemory::new()).into();
100-
Ok(store.into_pyobject(py)?.into_py_any(py)?)
100+
Ok(store.into_py_any(py)?)
101101
}
102102
scheme => Err(GenericError::new_err(format!("Unknown URL scheme {:?}", scheme,)).into()),
103103
}

tests/test_list.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,68 @@ async def test_list_stream_async():
5858
batch = await stream.collect_async()
5959
assert isinstance(batch, RecordBatch)
6060
assert batch.num_rows == 100
61+
62+
63+
def test_list_with_delimiter():
64+
store = MemoryStore()
65+
66+
obs.put(store, "a/file1.txt", b"foo")
67+
obs.put(store, "a/file2.txt", b"bar")
68+
obs.put(store, "b/file3.txt", b"baz")
69+
70+
list_result1 = obs.list_with_delimiter(store)
71+
assert list_result1["common_prefixes"] == ["a", "b"]
72+
assert list_result1["objects"] == []
73+
74+
list_result2 = obs.list_with_delimiter(store, "a")
75+
assert list_result2["common_prefixes"] == []
76+
assert list_result2["objects"][0]["path"] == "a/file1.txt"
77+
assert list_result2["objects"][1]["path"] == "a/file2.txt"
78+
79+
list_result3 = obs.list_with_delimiter(store, "b")
80+
assert list_result3["common_prefixes"] == []
81+
assert list_result3["objects"][0]["path"] == "b/file3.txt"
82+
83+
# Test returning arrow
84+
list_result1 = obs.list_with_delimiter(store, return_arrow=True)
85+
assert list_result1["common_prefixes"] == ["a", "b"]
86+
assert list_result1["objects"].num_rows == 0
87+
88+
list_result2 = obs.list_with_delimiter(store, "a", return_arrow=True)
89+
assert list_result2["common_prefixes"] == []
90+
assert list_result2["objects"].num_rows == 2
91+
assert list_result2["objects"]["path"][0].as_py() == "a/file1.txt"
92+
assert list_result2["objects"]["path"][1].as_py() == "a/file2.txt"
93+
94+
95+
@pytest.mark.asyncio
96+
async def test_list_with_delimiter_async():
97+
store = MemoryStore()
98+
99+
await obs.put_async(store, "a/file1.txt", b"foo")
100+
await obs.put_async(store, "a/file2.txt", b"bar")
101+
await obs.put_async(store, "b/file3.txt", b"baz")
102+
103+
list_result1 = await obs.list_with_delimiter_async(store)
104+
assert list_result1["common_prefixes"] == ["a", "b"]
105+
assert list_result1["objects"] == []
106+
107+
list_result2 = await obs.list_with_delimiter_async(store, "a")
108+
assert list_result2["common_prefixes"] == []
109+
assert list_result2["objects"][0]["path"] == "a/file1.txt"
110+
assert list_result2["objects"][1]["path"] == "a/file2.txt"
111+
112+
list_result3 = await obs.list_with_delimiter_async(store, "b")
113+
assert list_result3["common_prefixes"] == []
114+
assert list_result3["objects"][0]["path"] == "b/file3.txt"
115+
116+
# Test returning arrow
117+
list_result1 = await obs.list_with_delimiter_async(store, return_arrow=True)
118+
assert list_result1["common_prefixes"] == ["a", "b"]
119+
assert list_result1["objects"].num_rows == 0
120+
121+
list_result2 = await obs.list_with_delimiter_async(store, "a", return_arrow=True)
122+
assert list_result2["common_prefixes"] == []
123+
assert list_result2["objects"].num_rows == 2
124+
assert list_result2["objects"]["path"][0].as_py() == "a/file1.txt"
125+
assert list_result2["objects"]["path"][1].as_py() == "a/file2.txt"

0 commit comments

Comments
 (0)