Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions datafusion/functions-nested/benches/array_set_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use criterion::{
};
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
use datafusion_functions_nested::except::ArrayExcept;
use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion};
use rand::SeedableRng;
use rand::prelude::SliceRandom;
Expand All @@ -38,6 +39,7 @@ const SEED: u64 = 42;
fn criterion_benchmark(c: &mut Criterion) {
bench_array_union(c);
bench_array_intersect(c);
bench_array_except(c);
bench_array_distinct(c);
}

Expand Down Expand Up @@ -98,6 +100,25 @@ fn bench_array_intersect(c: &mut Criterion) {
group.finish();
}

fn bench_array_except(c: &mut Criterion) {
let mut group = c.benchmark_group("array_except");
let udf = ArrayExcept::new();

for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] {
for &array_size in ARRAY_SIZES {
let (array1, array2) =
create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio);
group.bench_with_input(
BenchmarkId::new(*overlap_label, array_size),
&array_size,
|b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
);
}
}

group.finish();
}

fn bench_array_distinct(c: &mut Criterion) {
let mut group = c.benchmark_group("array_distinct");
let udf = ArrayDistinct::new();
Expand Down
42 changes: 28 additions & 14 deletions datafusion/functions-nested/src/except.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

use crate::utils::{check_datatypes, make_scalar_function};
use arrow::array::new_null_array;
use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, cast::AsArray};
use arrow::array::{
Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array,
cast::AsArray,
};
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::compute::take;
use arrow::datatypes::{DataType, FieldRef};
use arrow::row::{RowConverter, SortField};
use datafusion_common::utils::{ListCoercion, take_function_args};
Expand Down Expand Up @@ -179,7 +183,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
offsets.push(OffsetSize::usize_as(0));

let mut rows = Vec::with_capacity(l_values.num_rows());
let mut indices: Vec<usize> = Vec::with_capacity(l_values.num_rows());
let mut dedup = HashSet::new();

let nulls = NullBuffer::union(l.nulls(), r.nulls());
Expand All @@ -193,7 +197,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
.as_ref()
.is_some_and(|nulls| nulls.is_null(list_index))
{
offsets.push(OffsetSize::usize_as(rows.len()));
offsets.push(OffsetSize::usize_as(indices.len()));
continue;
}

Expand All @@ -204,22 +208,32 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
for element_index in l_start.as_usize()..l_end.as_usize() {
let left_row = l_values.row(element_index);
if dedup.insert(left_row) {
rows.push(left_row);
indices.push(element_index);
}
}

offsets.push(OffsetSize::usize_as(rows.len()));
offsets.push(OffsetSize::usize_as(indices.len()));
dedup.clear();
}

if let Some(values) = converter.convert_rows(rows)?.first() {
Ok(GenericListArray::<OffsetSize>::new(
field.to_owned(),
OffsetBuffer::new(offsets.into()),
values.to_owned(),
nulls,
))
// Gather distinct left-side values by index.
// Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
let values = if indices.is_empty() {
arrow::array::new_empty_array(&l.value_type())
} else if OffsetSize::IS_LARGE {
let indices =
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
take(l.values().as_ref(), &indices, None)?
} else {
internal_err!("array_except failed to convert rows")
}
let indices =
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
take(l.values().as_ref(), &indices, None)?
};

Ok(GenericListArray::<OffsetSize>::new(
field.to_owned(),
OffsetBuffer::new(offsets.into()),
values,
nulls,
))
}
80 changes: 57 additions & 23 deletions datafusion/functions-nested/src/set_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

use crate::utils::make_scalar_function;
use arrow::array::{
Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array,
Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array,
new_empty_array, new_null_array,
};
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::compute::{concat, take};
use arrow::datatypes::DataType::{LargeList, List, Null};
use arrow::datatypes::{DataType, Field, FieldRef};
use arrow::row::{RowConverter, SortField};
Expand Down Expand Up @@ -373,12 +375,28 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?;
let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?;

// Combine value arrays so indices from both sides share a single index space.
let combined_values = concat(&[l.values().as_ref(), r.values().as_ref()])?;
let r_offset = l.values().len();

match set_op {
SetOp::Union => generic_set_loop::<OffsetSize, true>(
l, r, &rows_l, &rows_r, field, &converter,
l,
r,
&rows_l,
&rows_r,
field,
&combined_values,
r_offset,
),
SetOp::Intersect => generic_set_loop::<OffsetSize, false>(
l, r, &rows_l, &rows_r, field, &converter,
l,
r,
&rows_l,
&rows_r,
field,
&combined_values,
r_offset,
),
}
}
Expand All @@ -391,7 +409,8 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
rows_l: &arrow::row::Rows,
rows_r: &arrow::row::Rows,
field: Arc<Field>,
converter: &RowConverter,
combined_values: &ArrayRef,
r_offset: usize,
) -> Result<ArrayRef> {
let l_offsets = l.value_offsets();
let r_offsets = r.value_offsets();
Expand All @@ -406,7 +425,7 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
rows_l.num_rows().min(rows_r.num_rows())
};

let mut final_rows = Vec::with_capacity(initial_capacity);
let mut indices: Vec<usize> = Vec::with_capacity(initial_capacity);

// Reuse hash sets across iterations
let mut seen = HashSet::new();
Expand All @@ -430,25 +449,27 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
for idx in l_start..l_end {
let row = rows_l.row(idx);
if seen.insert(row) {
final_rows.push(row);
indices.push(idx);
}
}
for idx in r_start..r_end {
let row = rows_r.row(idx);
if seen.insert(row) {
final_rows.push(row);
indices.push(idx + r_offset);
}
}
} else {
let l_len = l_end - l_start;
let r_len = r_end - r_start;

// Select shorter side for lookup, longer side for probing
let (lookup_rows, lookup_range, probe_rows, probe_range) = if l_len < r_len {
(rows_l, l_start..l_end, rows_r, r_start..r_end)
} else {
(rows_r, r_start..r_end, rows_l, l_start..l_end)
};
// Select shorter side for lookup, longer side for probing.
// Track the probe side's offset into the combined values array.
let (lookup_rows, lookup_range, probe_rows, probe_range, probe_offset) =
if l_len < r_len {
(rows_l, l_start..l_end, rows_r, r_start..r_end, r_offset)
} else {
(rows_r, r_start..r_end, rows_l, l_start..l_end, 0)
};
lookup_set.clear();
lookup_set.reserve(lookup_range.len());

Expand All @@ -461,18 +482,25 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
for idx in probe_range {
let row = probe_rows.row(idx);
if lookup_set.contains(&row) && seen.insert(row) {
final_rows.push(row);
indices.push(idx + probe_offset);
}
}
}
result_offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
}

let final_values = if final_rows.is_empty() {
// Gather distinct values by index from the combined values array.
// Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
let final_values = if indices.is_empty() {
new_empty_array(&l.value_type())
} else if OffsetSize::IS_LARGE {
let indices =
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
take(combined_values.as_ref(), &indices, None)?
} else {
let arrays = converter.convert_rows(final_rows)?;
Arc::clone(&arrays[0])
let indices =
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
take(combined_values.as_ref(), &indices, None)?
};

let arr = GenericListArray::<OffsetSize>::try_new(
Expand Down Expand Up @@ -539,7 +567,7 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
// Convert all values to row format in a single batch for performance
let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
let rows = converter.convert_columns(&[Arc::clone(array.values())])?;
let mut final_rows = Vec::with_capacity(rows.num_rows());
let mut indices: Vec<usize> = Vec::with_capacity(rows.num_rows());
let mut seen = HashSet::new();
for i in 0..array.len() {
let last_offset = *offsets.last().unwrap();
Expand All @@ -559,18 +587,24 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
for idx in start..end {
let row = rows.row(idx);
if seen.insert(row) {
final_rows.push(row);
indices.push(idx);
}
}
offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
}

// Convert all collected distinct rows back
let final_values = if final_rows.is_empty() {
// Gather distinct values in a single pass, using the computed `indices`.
// Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
let final_values = if indices.is_empty() {
new_empty_array(&dt)
} else if OffsetSize::IS_LARGE {
let indices =
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
take(array.values().as_ref(), &indices, None)?
} else {
let arrays = converter.convert_rows(final_rows)?;
Arc::clone(&arrays[0])
let indices =
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
take(array.values().as_ref(), &indices, None)?
};

Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
Expand Down