Skip to content

Commit 9884295

Browse files
robert3005danking
andauthored
chore: Inline arrow-pyarrow crate to avoid upstream pyo3 version pin (#4342)
fix #2181 Signed-off-by: Robert Kruszewski <[email protected]> --------- Signed-off-by: Robert Kruszewski <[email protected]> Signed-off-by: Daniel King <[email protected]> Co-authored-by: Dan King <[email protected]>
1 parent 5fd22b1 commit 9884295

File tree

12 files changed

+312
-25
lines changed

12 files changed

+312
-25
lines changed

Cargo.lock

Lines changed: 0 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ arrow-cast = "55.2.0"
6666
arrow-data = "55.2.0"
6767
arrow-ipc = "55.2.0"
6868
arrow-ord = "55.2.0"
69-
arrow-pyarrow = "55.2.0"
7069
arrow-schema = "55.2.0"
7170
arrow-select = "55.2.0"
7271
arrow-string = "55.2.0"

vortex-python/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,10 @@ workspace = true
2020

2121
[lib]
2222
crate-type = ["rlib", "cdylib"]
23-
doctest = false # TODO(ngates): why?
2423

2524
[dependencies]
2625
arrow-array = { workspace = true }
2726
arrow-data = { workspace = true }
28-
arrow-pyarrow = { workspace = true }
2927
arrow-schema = { workspace = true }
3028
itertools = { workspace = true }
3129
log = { workspace = true }

vortex-python/src/arrays/from_arrow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
use arrow_array::ffi_stream::ArrowArrayStreamReader;
55
use arrow_array::{RecordBatchReader, make_array};
66
use arrow_data::ArrayData as ArrowArrayData;
7-
use arrow_pyarrow::FromPyArrow;
87
use arrow_schema::{DataType, Field};
98
use itertools::Itertools;
109
use pyo3::exceptions::PyValueError;
@@ -17,6 +16,7 @@ use vortex::error::{VortexError, VortexResult};
1716
use vortex::{ArrayRef, IntoArray};
1817

1918
use crate::arrays::PyArrayRef;
19+
use crate::arrow::FromPyArrow;
2020

2121
/// Convert an Arrow object to a Vortex array.
2222
pub(super) fn from_arrow(obj: &Bound<'_, PyAny>) -> PyResult<PyArrayRef> {

vortex-python/src/arrays/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ mod native;
99
pub(crate) mod py;
1010

1111
use arrow_array::{Array as ArrowArray, ArrayRef as ArrowArrayRef};
12-
use arrow_pyarrow::ToPyArrow;
1312
use pyo3::exceptions::{PyTypeError, PyValueError};
1413
use pyo3::prelude::*;
1514
use pyo3::types::{PyDict, PyList};
@@ -22,6 +21,7 @@ use vortex::{Array, ArrayRef};
2221

2322
use crate::arrays::native::PyNativeArray;
2423
use crate::arrays::py::{PyPythonArray, PythonArray};
24+
use crate::arrow::ToPyArrow;
2525
use crate::dtype::PyDType;
2626
use crate::python_repr::PythonRepr;
2727
use crate::scalar::PyScalar;

vortex-python/src/arrow.rs

Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
// SPDX-FileCopyrightText: 2016-2025 Copyright The Apache Software Foundation
2+
// SPDX-FileCopyrightText: 2025 Copyright the Vortex contributors
3+
// SPDX-License-Identifier: Apache-2.0
4+
// SPDX-FileComment: Derived from upstream file arrow-pyarrow/src/lib.rs at commit 549709fb at https://github.com/apache/arrow-rs
5+
// SPDX-FileNotice: https://github.com/apache/arrow-rs/blob/549709fbdf91cd1f6c263a7e4540c542b6fecf6b/NOTICE.txt
6+
#![allow(clippy::same_name_method)]
7+
8+
use std::convert::{From, TryFrom};
9+
use std::ptr::addr_of;
10+
use std::sync::Arc;
11+
12+
use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
13+
use arrow_array::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
14+
use arrow_array::{
15+
RecordBatch, RecordBatchIterator, RecordBatchOptions, RecordBatchReader, StructArray, ffi,
16+
};
17+
use arrow_data::ArrayData;
18+
use arrow_schema::{ArrowError, DataType, Field, Schema};
19+
use pyo3::exceptions::{PyTypeError, PyValueError};
20+
use pyo3::ffi::Py_uintptr_t;
21+
use pyo3::import_exception;
22+
use pyo3::prelude::*;
23+
use pyo3::types::{PyCapsule, PyTuple};
24+
25+
import_exception!(pyarrow, ArrowException);
26+
/// Represents an exception raised by PyArrow.
27+
pub type PyArrowException = ArrowException;
28+
29+
fn to_py_err(err: ArrowError) -> PyErr {
30+
PyArrowException::new_err(err.to_string())
31+
}
32+
33+
/// Trait for converting Python objects to arrow-rs types.
34+
pub trait FromPyArrow: Sized {
35+
/// Convert a Python object to an arrow-rs type.
36+
///
37+
/// Takes a GIL-bound value from Python and returns a result with the arrow-rs type.
38+
fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self>;
39+
}
40+
41+
/// Create a new PyArrow object from a arrow-rs type.
42+
pub trait ToPyArrow {
43+
/// Convert the implemented type into a Python object without consuming it.
44+
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject>;
45+
}
46+
47+
/// Convert an arrow-rs type into a PyArrow object.
48+
pub trait IntoPyArrow {
49+
/// Convert the implemented type into a Python object while consuming it.
50+
fn into_pyarrow(self, py: Python) -> PyResult<PyObject>;
51+
}
52+
53+
fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
54+
let Some(capsule_name) = capsule.name()?.map(|s| s.to_str()).transpose()? else {
55+
return Err(PyValueError::new_err(
56+
"Expected schema PyCapsule to have name set.",
57+
));
58+
};
59+
60+
if capsule_name != name {
61+
return Err(PyValueError::new_err(format!(
62+
"Expected name '{}' in PyCapsule, instead got '{}'",
63+
name, capsule_name
64+
)));
65+
}
66+
67+
Ok(())
68+
}
69+
70+
impl FromPyArrow for DataType {
71+
fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
72+
if !value.hasattr("__arrow_c_schema__")? {
73+
return Err(PyValueError::new_err(
74+
"Expected __arrow_c_schema__ attribute to be set.",
75+
));
76+
}
77+
78+
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
79+
let capsule = capsule.downcast::<PyCapsule>()?;
80+
validate_pycapsule(capsule, "arrow_schema")?;
81+
82+
let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
83+
let dtype = DataType::try_from(schema_ptr).map_err(to_py_err)?;
84+
Ok(dtype)
85+
}
86+
}
87+
88+
impl ToPyArrow for DataType {
89+
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
90+
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
91+
let module = py.import("pyarrow")?;
92+
let class = module.getattr("DataType")?;
93+
let dtype = class.call_method1("_import_from_c", (&raw const c_schema as Py_uintptr_t,))?;
94+
Ok(dtype.into())
95+
}
96+
}
97+
98+
impl FromPyArrow for Field {
99+
fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
100+
if !value.hasattr("__arrow_c_schema__")? {
101+
return Err(PyValueError::new_err(
102+
"Expected __arrow_c_schema__ attribute to be set.",
103+
));
104+
}
105+
106+
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
107+
let capsule = capsule.downcast::<PyCapsule>()?;
108+
validate_pycapsule(capsule, "arrow_schema")?;
109+
110+
let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
111+
let field = Field::try_from(schema_ptr).map_err(to_py_err)?;
112+
Ok(field)
113+
}
114+
}
115+
116+
impl ToPyArrow for Field {
117+
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
118+
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
119+
let module = py.import("pyarrow")?;
120+
let class = module.getattr("Field")?;
121+
let dtype = class.call_method1("_import_from_c", (&raw const c_schema as Py_uintptr_t,))?;
122+
Ok(dtype.into())
123+
}
124+
}
125+
126+
impl FromPyArrow for Schema {
127+
fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
128+
if !value.hasattr("__arrow_c_schema__")? {
129+
return Err(PyValueError::new_err(
130+
"Expected __arrow_c_schema__ attribute to be set.",
131+
));
132+
}
133+
134+
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
135+
let capsule = capsule.downcast::<PyCapsule>()?;
136+
validate_pycapsule(capsule, "arrow_schema")?;
137+
138+
let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
139+
let schema = Schema::try_from(schema_ptr).map_err(to_py_err)?;
140+
Ok(schema)
141+
}
142+
}
143+
144+
impl ToPyArrow for Schema {
145+
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
146+
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
147+
let module = py.import("pyarrow")?;
148+
let class = module.getattr("Schema")?;
149+
let schema =
150+
class.call_method1("_import_from_c", (&raw const c_schema as Py_uintptr_t,))?;
151+
Ok(schema.into())
152+
}
153+
}
154+
155+
impl FromPyArrow for ArrayData {
156+
fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
157+
if !value.hasattr("__arrow_c_array__")? {
158+
return Err(PyValueError::new_err(
159+
"Expected __arrow_c_array__ attribute to be set.",
160+
));
161+
}
162+
163+
let tuple = value.getattr("__arrow_c_array__")?.call0()?;
164+
165+
if !tuple.is_instance_of::<PyTuple>() {
166+
return Err(PyTypeError::new_err(
167+
"Expected __arrow_c_array__ to return a tuple.",
168+
));
169+
}
170+
171+
let schema_capsule = tuple.get_item(0)?;
172+
let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
173+
let array_capsule = tuple.get_item(1)?;
174+
let array_capsule = array_capsule.downcast::<PyCapsule>()?;
175+
176+
validate_pycapsule(schema_capsule, "arrow_schema")?;
177+
validate_pycapsule(array_capsule, "arrow_array")?;
178+
179+
let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
180+
let array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
181+
unsafe { ffi::from_ffi(array, schema_ptr) }.map_err(to_py_err)
182+
}
183+
}
184+
185+
impl ToPyArrow for ArrayData {
186+
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
187+
let array = FFI_ArrowArray::new(self);
188+
let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;
189+
190+
let module = py.import("pyarrow")?;
191+
let class = module.getattr("Array")?;
192+
let array = class.call_method1(
193+
"_import_from_c",
194+
(
195+
addr_of!(array) as Py_uintptr_t,
196+
addr_of!(schema) as Py_uintptr_t,
197+
),
198+
)?;
199+
Ok(array.unbind())
200+
}
201+
}
202+
203+
impl FromPyArrow for RecordBatch {
204+
fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
205+
if !value.hasattr("__arrow_c_array__")? {
206+
return Err(PyValueError::new_err(
207+
"Expected __arrow_c_array__ attribute to be set.",
208+
));
209+
}
210+
211+
let tuple = value.getattr("__arrow_c_array__")?.call0()?;
212+
213+
if !tuple.is_instance_of::<PyTuple>() {
214+
return Err(PyTypeError::new_err(
215+
"Expected __arrow_c_array__ to return a tuple.",
216+
));
217+
}
218+
219+
let schema_capsule = tuple.get_item(0)?;
220+
let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
221+
let array_capsule = tuple.get_item(1)?;
222+
let array_capsule = array_capsule.downcast::<PyCapsule>()?;
223+
224+
validate_pycapsule(schema_capsule, "arrow_schema")?;
225+
validate_pycapsule(array_capsule, "arrow_array")?;
226+
227+
let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
228+
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
229+
let mut array_data = unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
230+
if !matches!(array_data.data_type(), DataType::Struct(_)) {
231+
return Err(PyTypeError::new_err(
232+
"Expected Struct type from __arrow_c_array.",
233+
));
234+
}
235+
let options = RecordBatchOptions::default().with_row_count(Some(array_data.len()));
236+
// Ensure data is aligned (by potentially copying the buffers).
237+
// This is needed because some python code (for example the
238+
// python flight client) produces unaligned buffers
239+
// See https://github.com/apache/arrow/issues/43552 for details
240+
array_data.align_buffers();
241+
let array = StructArray::from(array_data);
242+
// StructArray does not embed metadata from schema. We need to override
243+
// the output schema with the schema from the capsule.
244+
let schema = Arc::new(Schema::try_from(schema_ptr).map_err(to_py_err)?);
245+
let (_fields, columns, nulls) = array.into_parts();
246+
assert_eq!(
247+
nulls.map(|n| n.null_count()).unwrap_or_default(),
248+
0,
249+
"Cannot convert nullable StructArray to RecordBatch, see StructArray documentation"
250+
);
251+
252+
RecordBatch::try_new_with_options(schema, columns, &options).map_err(to_py_err)
253+
}
254+
}
255+
256+
impl ToPyArrow for RecordBatch {
257+
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
258+
// Workaround apache/arrow#37669 by returning RecordBatchIterator
259+
let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema());
260+
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
261+
let py_reader = reader.into_pyarrow(py)?;
262+
py_reader.call_method0(py, "read_next_batch")
263+
}
264+
}
265+
266+
/// Supports conversion from `pyarrow.RecordBatchReader` to [ArrowArrayStreamReader].
267+
impl FromPyArrow for ArrowArrayStreamReader {
268+
fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
269+
if !value.hasattr("__arrow_c_stream__")? {
270+
return Err(PyValueError::new_err(
271+
"Expected __arrow_c_stream__ attribute to be set.",
272+
));
273+
}
274+
275+
let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
276+
let capsule = capsule.downcast::<PyCapsule>()?;
277+
validate_pycapsule(capsule, "arrow_array_stream")?;
278+
279+
let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };
280+
281+
let stream_reader = ArrowArrayStreamReader::try_new(stream)
282+
.map_err(|err| PyValueError::new_err(err.to_string()))?;
283+
284+
Ok(stream_reader)
285+
}
286+
}
287+
288+
/// Convert a [`RecordBatchReader`] into a `pyarrow.RecordBatchReader`.
289+
impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
290+
// We can't implement `ToPyArrow` for `T: RecordBatchReader + Send` because
291+
// there is already a blanket implementation for `T: ToPyArrow`.
292+
fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
293+
let mut stream = FFI_ArrowArrayStream::new(self);
294+
295+
let module = py.import("pyarrow")?;
296+
let class = module.getattr("RecordBatchReader")?;
297+
let args = PyTuple::new(py, [&raw mut stream as Py_uintptr_t])?;
298+
let reader = class.call_method1("_import_from_c", args)?;
299+
300+
Ok(PyObject::from(reader))
301+
}
302+
}

vortex-python/src/dataset.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
use std::sync::Arc;
55

66
use arrow_array::RecordBatchReader;
7-
use arrow_pyarrow::{IntoPyArrow, ToPyArrow};
87
use arrow_schema::SchemaRef;
98
use itertools::Itertools;
109
use pyo3::exceptions::{PyTypeError, PyValueError};
@@ -19,6 +18,7 @@ use vortex::scan::SplitBy;
1918
use vortex::{ArrayRef, ToCanonical};
2019

2120
use crate::arrays::PyArrayRef;
21+
use crate::arrow::{IntoPyArrow, ToPyArrow};
2222
use crate::expr::PyExpr;
2323
use crate::object_store_urls::object_store_from_url;
2424
use crate::{TOKIO_RUNTIME, install_module};

0 commit comments

Comments
 (0)