Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/functions-nested/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ paste = { workspace = true }
criterion = { workspace = true, features = ["async_tokio"] }
rand = { workspace = true }

[[bench]]
harness = false
name = "array_concat"

[[bench]]
harness = false
name = "array_expression"
Expand Down
94 changes: 94 additions & 0 deletions datafusion/functions-nested/benches/array_concat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::hint::black_box;
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, ListArray};
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field};
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

use datafusion_functions_nested::concat::array_concat_inner;

const SEED: u64 = 42;

/// Build a `ListArray<i32>` with `num_lists` rows, each containing
/// `elements_per_list` random i32 values. Every 10th row is null.
fn make_list_array(
rng: &mut StdRng,
num_lists: usize,
elements_per_list: usize,
) -> ArrayRef {
let total_values = num_lists * elements_per_list;
let values: Vec<i32> = (0..total_values).map(|_| rng.random()).collect();
let values = Arc::new(Int32Array::from(values));

let offsets: Vec<i32> = (0..=num_lists)
.map(|i| (i * elements_per_list) as i32)
.collect();
let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));

let nulls: Vec<bool> = (0..num_lists).map(|i| i % 10 != 0).collect();
let nulls = Some(NullBuffer::from(nulls));

Arc::new(ListArray::new(
Arc::new(Field::new("item", DataType::Int32, false)),
offsets,
values,
nulls,
))
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("array_concat");

// Benchmark: varying number of rows, 20 elements per list
for num_rows in [100, 1000, 10000] {
let mut rng = StdRng::seed_from_u64(SEED);
let list_a = make_list_array(&mut rng, num_rows, 20);
let list_b = make_list_array(&mut rng, num_rows, 20);
let args: Vec<ArrayRef> = vec![list_a, list_b];

group.bench_with_input(BenchmarkId::new("rows", num_rows), &args, |b, args| {
b.iter(|| black_box(array_concat_inner(args).unwrap()));
});
}

// Benchmark: 1000 rows, varying element counts per list
for elements_per_list in [5, 50, 500] {
let mut rng = StdRng::seed_from_u64(SEED);
let list_a = make_list_array(&mut rng, 1000, elements_per_list);
let list_b = make_list_array(&mut rng, 1000, elements_per_list);
let args: Vec<ArrayRef> = vec![list_a, list_b];

group.bench_with_input(
BenchmarkId::new("elements_per_list", elements_per_list),
&args,
|b, args| {
b.iter(|| black_box(array_concat_inner(args).unwrap()));
},
);
}

group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
101 changes: 54 additions & 47 deletions datafusion/functions-nested/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use crate::make_array::make_array_inner;
use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
use arrow::array::{
Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
NullBufferBuilder, OffsetSizeTrait,
OffsetSizeTrait,
};
use arrow::buffer::OffsetBuffer;
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{DataType, Field};
use datafusion_common::Result;
use datafusion_common::utils::{
Expand Down Expand Up @@ -352,7 +352,7 @@ impl ScalarUDFImpl for ArrayConcat {
}
}

fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
pub fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.is_empty() {
return exec_err!("array_concat expects at least one argument");
}
Expand Down Expand Up @@ -396,58 +396,65 @@ fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
.iter()
.map(|arg| as_generic_list_array::<O>(arg))
.collect::<Result<Vec<_>>>()?;
// Assume number of rows is the same for all arrays
let row_count = list_arrays[0].len();

let mut array_lengths = vec![];
let mut arrays = vec![];
let mut valid = NullBufferBuilder::new(row_count);
for i in 0..row_count {
let nulls = list_arrays
// Extract underlying values ArrayData from each list array for MutableArrayData.
let values_data: Vec<ArrayData> =
list_arrays.iter().map(|la| la.values().to_data()).collect();
let values_data_refs: Vec<&ArrayData> = values_data.iter().collect();

// Estimate capacity as the sum of all values arrays' lengths.
let total_capacity: usize = values_data.iter().map(|d| d.len()).sum();

let mut mutable = MutableArrayData::with_capacities(
values_data_refs,
false,
Capacities::Array(total_capacity),
);
let mut offsets: Vec<O> = Vec::with_capacity(row_count + 1);
offsets.push(O::zero());

// Compute the output null buffer: a row is null only if null in ALL input
// arrays. This is the bitwise OR of validity bits (valid if valid in ANY
// input). If any array has no null buffer (all valid), no output row can be
// null.
let nulls = list_arrays
.iter()
.filter_map(|la| la.nulls())
.collect::<Vec<_>>();
let valid = if nulls.len() == list_arrays.len() {
nulls
.iter()
.map(|arr| arr.is_null(i))
.collect::<Vec<_>>();

// If all the arrays are null, the concatenated array is null
let is_null = nulls.iter().all(|&x| x);
if is_null {
array_lengths.push(0);
valid.append_null();
} else {
// Get all the arrays on i-th row
let values = list_arrays
.iter()
.map(|arr| arr.value(i))
.collect::<Vec<_>>();

let elements = values
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>();

// Concatenated array on i-th row
let concatenated_array = arrow::compute::concat(elements.as_slice())?;
array_lengths.push(concatenated_array.len());
arrays.push(concatenated_array);
valid.append_non_null();
.map(|n| n.inner().clone())
.reduce(|a, b| &a | &b)
.map(NullBuffer::new)
} else {
None
};

for row_idx in 0..row_count {
for (arr_idx, list_array) in list_arrays.iter().enumerate() {
if list_array.is_null(row_idx) {
continue;
}
let start = list_array.offsets()[row_idx].to_usize().unwrap();
let end = list_array.offsets()[row_idx + 1].to_usize().unwrap();
if start < end {
mutable.extend(arr_idx, start, end);
}
}
offsets.push(O::usize_as(mutable.len()));
}
// Assume all arrays have the same data type
let data_type = list_arrays[0].value_type();

let elements = arrays
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>();
let data_type = list_arrays[0].value_type();
let data = mutable.freeze();

let list_arr = GenericListArray::<O>::new(
Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new_list_field(data_type, true)),
OffsetBuffer::from_lengths(array_lengths),
Arc::new(arrow::compute::concat(elements.as_slice())?),
valid.finish(),
);

Ok(Arc::new(list_arr))
OffsetBuffer::new(offsets.into()),
arrow::array::make_array(data),
valid,
)?))
}

// Kernel functions
Expand Down
16 changes: 16 additions & 0 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3453,6 +3453,22 @@ select
----
[1, 2, 3] List(Utf8View)

# array_concat with NULL elements inside arrays
query ?
select array_concat([1, NULL, 3], [NULL, 5]);
----
[1, NULL, 3, NULL, 5]

query ?
select array_concat([NULL, NULL], [1, 2], [NULL]);
----
[NULL, NULL, 1, 2, NULL]

query ?
select array_concat([NULL, NULL], [NULL, NULL]);
----
[NULL, NULL, NULL, NULL]

# array_concat error
query error DataFusion error: Error during planning: Execution error: Function 'array_concat' user-defined coercion failed with "Error during planning: array_concat does not support type Int64"
select array_concat(1, 2);
Expand Down
Loading