Skip to content

Commit 34addca

Browse files
authored
bugfix: preserve schema metadata for record batch in FFI (#19293)
## Which issue does this PR close? - Closes #19292. ## Rationale for this change Metadata on a schema for a record batch is lost during FFI conversion. This is not always obvious because our other integrations like `ExecutionPlan` and `TableProvider` have their own ways to provide the schema. This is currently an issue because we have FFI table providers who say they have a specific schema but the schema of the actual record batches does not match. The metadata is dropped. ## What changes are included in this PR? We already have the schema in the FFI object, so use it both when converting to and from FFI. ## Are these changes tested? Unit test added. ## Are there any user-facing changes? No
1 parent 3420a2d commit 34addca

File tree

1 file changed

+36
-6
lines changed

1 file changed

+36
-6
lines changed

datafusion/ffi/src/record_batch_stream.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,11 @@ unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_RecordBatchStream) {
105105
pub(crate) fn record_batch_to_wrapped_array(
106106
record_batch: RecordBatch,
107107
) -> FFIResult<WrappedArray> {
108+
let schema = WrappedSchema::from(record_batch.schema());
108109
let struct_array = StructArray::from(record_batch);
109110
rresult!(
110-
to_ffi(&struct_array.to_data()).map(|(array, schema)| WrappedArray {
111-
array,
112-
schema: WrappedSchema(schema)
113-
})
111+
to_ffi(&struct_array.to_data())
112+
.map(|(array, _schema)| WrappedArray { array, schema })
114113
)
115114
}
116115

@@ -157,6 +156,7 @@ impl RecordBatchStream for FFI_RecordBatchStream {
157156
pub(crate) fn wrapped_array_to_record_batch(array: WrappedArray) -> Result<RecordBatch> {
158157
let array_data =
159158
unsafe { from_ffi(array.array, &array.schema.0).map_err(DataFusionError::from)? };
159+
let schema: arrow::datatypes::SchemaRef = array.schema.into();
160160
let array = make_array(array_data);
161161
let struct_array = array
162162
.as_any()
@@ -165,7 +165,9 @@ pub(crate) fn wrapped_array_to_record_batch(array: WrappedArray) -> Result<Recor
165165
"Unexpected array type during record batch collection in FFI_RecordBatchStream - expected StructArray"
166166
))?;
167167

168-
Ok(struct_array.into())
168+
let rb: RecordBatch = struct_array.into();
169+
170+
rb.with_schema(schema).map_err(Into::into)
169171
}
170172

171173
fn maybe_wrapped_array_to_record_batch(
@@ -219,7 +221,11 @@ mod tests {
219221
use datafusion::test_util::bounded_stream;
220222
use futures::StreamExt;
221223

222-
use super::FFI_RecordBatchStream;
224+
use super::{
225+
FFI_RecordBatchStream, record_batch_to_wrapped_array,
226+
wrapped_array_to_record_batch,
227+
};
228+
use crate::df_result;
223229

224230
#[tokio::test]
225231
async fn test_round_trip_record_batch_stream() -> Result<()> {
@@ -252,4 +258,28 @@ mod tests {
252258

253259
Ok(())
254260
}
261+
262+
#[test]
263+
fn round_trip_record_batch_with_metadata() -> Result<()> {
264+
let rb = record_batch!(
265+
("a", Int32, vec![1, 2, 3]),
266+
("b", Float64, vec![Some(4.0), None, Some(5.0)])
267+
)?;
268+
269+
let schema = rb
270+
.schema()
271+
.as_ref()
272+
.clone()
273+
.with_metadata([("some_key".to_owned(), "some_value".to_owned())].into())
274+
.into();
275+
276+
let rb = rb.with_schema(schema)?;
277+
278+
let ffi_rb = df_result!(record_batch_to_wrapped_array(rb.clone()))?;
279+
280+
let round_trip_rb = wrapped_array_to_record_batch(ffi_rb)?;
281+
282+
assert_eq!(rb, round_trip_rb);
283+
Ok(())
284+
}
255285
}

0 commit comments

Comments
 (0)