Skip to content

Commit cce3f3f

Browse files
authored
fix: column indices in FFI partition evaluator (#16480)
* Column indices were not computed correctly, causing a panic * Add unit tests
1 parent b4ba1c6 commit cce3f3f

File tree

2 files changed

+84
-11
lines changed

2 files changed

+84
-11
lines changed

datafusion/ffi/src/udwf/mod.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,4 +363,70 @@ impl From<&FFI_SortOptions> for SortOptions {
363363
}
364364

365365
#[cfg(test)]
366-
mod tests {}
366+
#[cfg(feature = "integration-tests")]
367+
mod tests {
368+
use crate::tests::create_record_batch;
369+
use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF};
370+
use arrow::array::{create_array, ArrayRef};
371+
use datafusion::functions_window::lead_lag::{lag_udwf, WindowShift};
372+
use datafusion::logical_expr::expr::Sort;
373+
use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF, WindowUDFImpl};
374+
use datafusion::prelude::SessionContext;
375+
use std::sync::Arc;
376+
377+
fn create_test_foreign_udwf(
378+
original_udwf: impl WindowUDFImpl + 'static,
379+
) -> datafusion::common::Result<WindowUDF> {
380+
let original_udwf = Arc::new(WindowUDF::from(original_udwf));
381+
382+
let local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into();
383+
384+
let foreign_udwf: ForeignWindowUDF = (&local_udwf).try_into()?;
385+
Ok(foreign_udwf.into())
386+
}
387+
388+
#[test]
389+
fn test_round_trip_udwf() -> datafusion::common::Result<()> {
390+
let original_udwf = lag_udwf();
391+
let original_name = original_udwf.name().to_owned();
392+
393+
// Convert to FFI format
394+
let local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into();
395+
396+
// Convert back to native format
397+
let foreign_udwf: ForeignWindowUDF = (&local_udwf).try_into()?;
398+
let foreign_udwf: WindowUDF = foreign_udwf.into();
399+
400+
assert_eq!(original_name, foreign_udwf.name());
401+
Ok(())
402+
}
403+
404+
#[tokio::test]
405+
async fn test_lag_udwf() -> datafusion::common::Result<()> {
406+
let udwf = create_test_foreign_udwf(WindowShift::lag())?;
407+
408+
let ctx = SessionContext::default();
409+
let df = ctx.read_batch(create_record_batch(-5, 5))?;
410+
411+
let df = df.select(vec![
412+
col("a"),
413+
udwf.call(vec![col("a")])
414+
.order_by(vec![Sort::new(col("a"), true, true)])
415+
.build()
416+
.unwrap()
417+
.alias("lag_a"),
418+
])?;
419+
420+
df.clone().show().await?;
421+
422+
let result = df.collect().await?;
423+
let expected =
424+
create_array!(Int32, [None, Some(-5), Some(-4), Some(-3), Some(-2)])
425+
as ArrayRef;
426+
427+
assert_eq!(result.len(), 1);
428+
assert_eq!(result[0].column(1), &expected);
429+
430+
Ok(())
431+
}
432+
}

datafusion/ffi/src/udwf/partition_evaluator_args.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,24 @@ impl TryFrom<PartitionEvaluatorArgs<'_>> for FFI_PartitionEvaluatorArgs {
7575
})
7676
.collect();
7777

78-
let max_column = required_columns.keys().max().unwrap_or(&0).to_owned();
79-
let fields: Vec<_> = (0..max_column)
80-
.map(|idx| match required_columns.get(&idx) {
81-
Some((name, data_type)) => Field::new(*name, (*data_type).clone(), true),
82-
None => Field::new(
83-
format!("ffi_partition_evaluator_col_{idx}"),
84-
DataType::Null,
85-
true,
86-
),
78+
let max_column = required_columns.keys().max();
79+
let fields: Vec<_> = max_column
80+
.map(|max_column| {
81+
(0..(max_column + 1))
82+
.map(|idx| match required_columns.get(&idx) {
83+
Some((name, data_type)) => {
84+
Field::new(*name, (*data_type).clone(), true)
85+
}
86+
None => Field::new(
87+
format!("ffi_partition_evaluator_col_{idx}"),
88+
DataType::Null,
89+
true,
90+
),
91+
})
92+
.collect()
8793
})
88-
.collect();
94+
.unwrap_or_default();
95+
8996
let schema = Arc::new(Schema::new(fields));
9097

9198
let codec = DefaultPhysicalExtensionCodec {};

0 commit comments

Comments
 (0)