Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/functions-nested/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ paste = { workspace = true }
criterion = { workspace = true, features = ["async_tokio"] }
rand = { workspace = true }

[[bench]]
harness = false
name = "array_concat"

[[bench]]
harness = false
name = "array_expression"
Expand Down
94 changes: 94 additions & 0 deletions datafusion/functions-nested/benches/array_concat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::hint::black_box;
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, ListArray};
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field};
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

use datafusion_functions_nested::concat::array_concat_inner;

const SEED: u64 = 42;

/// Build a `ListArray<i32>` with `num_lists` rows, each containing
/// `elements_per_list` random i32 values. Every 10th row is null.
fn make_list_array(
rng: &mut StdRng,
num_lists: usize,
elements_per_list: usize,
) -> ArrayRef {
let total_values = num_lists * elements_per_list;
let values: Vec<i32> = (0..total_values).map(|_| rng.random()).collect();
let values = Arc::new(Int32Array::from(values));

let offsets: Vec<i32> = (0..=num_lists)
.map(|i| (i * elements_per_list) as i32)
.collect();
let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));

let nulls: Vec<bool> = (0..num_lists).map(|i| i % 10 != 0).collect();
let nulls = Some(NullBuffer::from(nulls));

Arc::new(ListArray::new(
Arc::new(Field::new("item", DataType::Int32, false)),
offsets,
values,
nulls,
))
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("array_concat");

// Benchmark: varying number of rows, 20 elements per list
for num_rows in [100, 1000, 10000] {
let mut rng = StdRng::seed_from_u64(SEED);
let list_a = make_list_array(&mut rng, num_rows, 20);
let list_b = make_list_array(&mut rng, num_rows, 20);
let args: Vec<ArrayRef> = vec![list_a, list_b];

group.bench_with_input(BenchmarkId::new("rows", num_rows), &args, |b, args| {
b.iter(|| black_box(array_concat_inner(args).unwrap()));
});
}

// Benchmark: 1000 rows, varying element counts per list
for elements_per_list in [5, 50, 500] {
let mut rng = StdRng::seed_from_u64(SEED);
let list_a = make_list_array(&mut rng, 1000, elements_per_list);
let list_b = make_list_array(&mut rng, 1000, elements_per_list);
let args: Vec<ArrayRef> = vec![list_a, list_b];

group.bench_with_input(
BenchmarkId::new("elements_per_list", elements_per_list),
&args,
|b, args| {
b.iter(|| black_box(array_concat_inner(args).unwrap()));
},
);
}

group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
81 changes: 39 additions & 42 deletions datafusion/functions-nested/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ impl ScalarUDFImpl for ArrayConcat {
}
}

fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
pub fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.is_empty() {
return exec_err!("array_concat expects at least one argument");
}
Expand Down Expand Up @@ -396,58 +396,55 @@ fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
.iter()
.map(|arg| as_generic_list_array::<O>(arg))
.collect::<Result<Vec<_>>>()?;
// Assume number of rows is the same for all arrays
let row_count = list_arrays[0].len();

let mut array_lengths = vec![];
let mut arrays = vec![];
// Extract underlying values ArrayData from each list array for MutableArrayData.
let values_data: Vec<ArrayData> =
list_arrays.iter().map(|la| la.values().to_data()).collect();
let values_data_refs: Vec<&ArrayData> = values_data.iter().collect();

// Estimate capacity as the sum of all values arrays' lengths.
let total_capacity: usize = values_data.iter().map(|d| d.len()).sum();

let mut mutable = MutableArrayData::with_capacities(
values_data_refs,
false,
Capacities::Array(total_capacity),
);
let mut offsets: Vec<O> = Vec::with_capacity(row_count + 1);
offsets.push(O::zero());
let mut valid = NullBufferBuilder::new(row_count);
for i in 0..row_count {
let nulls = list_arrays
.iter()
.map(|arr| arr.is_null(i))
.collect::<Vec<_>>();

// If all the arrays are null, the concatenated array is null
let is_null = nulls.iter().all(|&x| x);
if is_null {
array_lengths.push(0);
valid.append_null();
} else {
// Get all the arrays on i-th row
let values = list_arrays
.iter()
.map(|arr| arr.value(i))
.collect::<Vec<_>>();

let elements = values
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>();

// Concatenated array on i-th row
let concatenated_array = arrow::compute::concat(elements.as_slice())?;
array_lengths.push(concatenated_array.len());
arrays.push(concatenated_array);
for row_idx in 0..row_count {
let mut row_has_values = false;
for (arr_idx, list_array) in list_arrays.iter().enumerate() {
if list_array.is_null(row_idx) {
continue;
}
row_has_values = true;
let start = list_array.offsets()[row_idx].to_usize().unwrap();
let end = list_array.offsets()[row_idx + 1].to_usize().unwrap();
if start < end {
mutable.extend(arr_idx, start, end);
}
}
if row_has_values {
valid.append_non_null();
} else {
valid.append_null();
}
offsets.push(O::usize_as(mutable.len()));
}
// Assume all arrays have the same data type
let data_type = list_arrays[0].value_type();

let elements = arrays
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>();
let data_type = list_arrays[0].value_type();
let data = mutable.freeze();

let list_arr = GenericListArray::<O>::new(
Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new_list_field(data_type, true)),
OffsetBuffer::from_lengths(array_lengths),
Arc::new(arrow::compute::concat(elements.as_slice())?),
OffsetBuffer::new(offsets.into()),
arrow::array::make_array(data),
valid.finish(),
);

Ok(Arc::new(list_arr))
)?))
}

// Kernel functions
Expand Down
16 changes: 16 additions & 0 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3453,6 +3453,22 @@ select
----
[1, 2, 3] List(Utf8View)

# array_concat with NULL elements inside arrays
query ?
select array_concat([1, NULL, 3], [NULL, 5]);
----
[1, NULL, 3, NULL, 5]

query ?
select array_concat([NULL, NULL], [1, 2], [NULL]);
----
[NULL, NULL, 1, 2, NULL]

query ?
select array_concat([NULL, NULL], [NULL, NULL]);
----
[NULL, NULL, NULL, NULL]

# array_concat error
query error DataFusion error: Error during planning: Execution error: Function 'array_concat' user-defined coercion failed with "Error during planning: array_concat does not support type Int64"
select array_concat(1, 2);
Expand Down