Skip to content

Commit 0b091c3

Browse files
authored
Arrow iterator conversions (#3875)
Signed-off-by: Nicholas Gates <[email protected]>
1 parent b0be264 commit 0b091c3

File tree

7 files changed

+46
-9
lines changed

7 files changed

+46
-9
lines changed

vortex-array/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ rustversion = { workspace = true }
2323
arbitrary = { workspace = true, optional = true }
2424
arcref = { workspace = true }
2525
arrow-arith = { workspace = true }
26-
arrow-array = { workspace = true }
26+
arrow-array = { workspace = true, features = ["ffi"] }
2727
arrow-buffer = { workspace = true }
2828
arrow-cast = { workspace = true }
2929
arrow-data = { workspace = true }
Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,47 @@
44
use std::sync::Arc;
55

66
use arrow_array::cast::AsArray;
7-
use arrow_array::{RecordBatch, RecordBatchReader};
7+
use arrow_array::{RecordBatch, RecordBatchReader, ffi_stream};
88
use arrow_schema::{ArrowError, DataType, SchemaRef};
9-
use vortex::arrow::compute::to_arrow;
10-
use vortex::error::VortexResult;
11-
use vortex::iter::ArrayIterator;
9+
use vortex_dtype::DType;
10+
use vortex_dtype::arrow::FromArrowType;
11+
use vortex_error::{VortexError, VortexResult};
12+
13+
use crate::ArrayRef;
14+
use crate::arrow::FromArrowArray;
15+
use crate::arrow::compute::to_arrow;
16+
use crate::iter::ArrayIterator;
17+
18+
/// An adapter for converting an `ArrowArrayStreamReader` into a Vortex `ArrayStream`.
19+
pub struct ArrowArrayStreamAdapter {
20+
stream: ffi_stream::ArrowArrayStreamReader,
21+
dtype: DType,
22+
}
23+
24+
impl ArrowArrayStreamAdapter {
25+
pub fn new(stream: ffi_stream::ArrowArrayStreamReader, dtype: DType) -> Self {
26+
Self { stream, dtype }
27+
}
28+
}
29+
30+
impl ArrayIterator for ArrowArrayStreamAdapter {
31+
fn dtype(&self) -> &DType {
32+
&self.dtype
33+
}
34+
}
35+
36+
impl Iterator for ArrowArrayStreamAdapter {
37+
type Item = VortexResult<ArrayRef>;
38+
39+
fn next(&mut self) -> Option<Self::Item> {
40+
let batch = self.stream.next()?;
41+
42+
Some(batch.map_err(VortexError::from).map(|b| {
43+
debug_assert_eq!(&self.dtype, &DType::from_arrow(b.schema()));
44+
ArrayRef::from_arrow(b, false)
45+
}))
46+
}
47+
}
1248

1349
/// Adapter for converting a [`ArrayIterator`] into an Arrow [`RecordBatchReader`].
1450
pub struct VortexRecordBatchReader<I> {

vortex-array/src/arrow/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ mod array;
1111
pub mod compute;
1212
mod convert;
1313
mod datum;
14+
mod iter;
1415
mod record_batch;
1516

1617
pub use datum::*;
18+
pub use iter::*;
1719

1820
use crate::arrow::compute::ToArrowOptions;
1921

vortex-python/src/dataset.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use arrow_schema::SchemaRef;
99
use pyo3::exceptions::PyTypeError;
1010
use pyo3::prelude::*;
1111
use pyo3::types::PyString;
12+
use vortex::arrow::VortexRecordBatchReader;
1213
use vortex::dtype::FieldName;
1314
use vortex::error::VortexResult;
1415
use vortex::expr::{ExprRef, SelectExpr, root};
@@ -20,7 +21,6 @@ use crate::arrays::PyArrayRef;
2021
use crate::expr::PyExpr;
2122
use crate::iter::ArrayStreamToIterator;
2223
use crate::object_store_urls::object_store_from_url;
23-
use crate::record_batch_reader::VortexRecordBatchReader;
2424
use crate::{TOKIO_RUNTIME, install_module};
2525

2626
pub(crate) fn init(py: Python, parent: &Bound<PyModule>) -> PyResult<()> {

vortex-python/src/file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use pyo3::exceptions::PyTypeError;
99
use pyo3::prelude::*;
1010
use pyo3::types::PyList;
1111
use vortex::ToCanonical;
12+
use vortex::arrow::VortexRecordBatchReader;
1213
use vortex::compute::cast;
1314
use vortex::dtype::Nullability::NonNullable;
1415
use vortex::dtype::{DType, PType};
@@ -24,7 +25,6 @@ use crate::dataset::PyVortexDataset;
2425
use crate::dtype::PyDType;
2526
use crate::expr::PyExpr;
2627
use crate::iter::{ArrayStreamToIterator, PyArrayIterator};
27-
use crate::record_batch_reader::VortexRecordBatchReader;
2828
use crate::{TOKIO_RUNTIME, install_module};
2929

3030
pub(crate) fn init(py: Python, parent: &Bound<PyModule>) -> PyResult<()> {

vortex-python/src/iter/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use pyo3::prelude::*;
1313
use pyo3::types::PyIterator;
1414
use pyo3::{Bound, PyResult, Python};
1515
pub(crate) use stream::*;
16+
use vortex::arrow::VortexRecordBatchReader;
1617
use vortex::dtype::DType;
1718
use vortex::iter::{ArrayIterator, ArrayIteratorAdapter, ArrayIteratorExt};
1819
use vortex::{Canonical, IntoArray};
@@ -21,7 +22,6 @@ use crate::arrays::PyArrayRef;
2122
use crate::dtype::PyDType;
2223
use crate::install_module;
2324
use crate::iter::python::PythonArrayIterator;
24-
use crate::record_batch_reader::VortexRecordBatchReader;
2525

2626
pub(crate) fn init(py: Python, parent: &Bound<PyModule>) -> PyResult<()> {
2727
let m = PyModule::new(py, "iter")?;

vortex-python/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ mod io;
1919
mod iter;
2020
mod object_store_urls;
2121
mod python_repr;
22-
mod record_batch_reader;
2322
mod registry;
2423
pub(crate) mod scalar;
2524
mod serde;

0 commit comments

Comments
 (0)