Skip to content

Commit 0c62e84

Browse files
alamb2010YOUY01
andauthored
[branch-50] perf: Fix NLJ slow join with condition array_has (apache#18161) (apache#18179)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Related to apache#18070 - Part of apache#18072 ## Rationale for this change Fix performance regression in Datafusion 50 ## What changes are included in this PR? Backport apache#18161 to `branch-50` ## Are these changes tested? Yes ## Are there any user-facing changes? Fix performance regression Co-authored-by: Yongting You <[email protected]>
1 parent ade5232 commit 0c62e84

File tree

2 files changed

+92
-6
lines changed

2 files changed

+92
-6
lines changed

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,15 @@ use crate::{
4848

4949
use arrow::array::{
5050
new_null_array, Array, BooleanArray, BooleanBufferBuilder, RecordBatchOptions,
51+
UInt64Array,
5152
};
5253
use arrow::buffer::BooleanBuffer;
53-
use arrow::compute::{concat_batches, filter, filter_record_batch, not, BatchCoalescer};
54+
use arrow::compute::{
55+
concat_batches, filter, filter_record_batch, not, take, BatchCoalescer,
56+
};
5457
use arrow::datatypes::{Schema, SchemaRef};
5558
use arrow::record_batch::RecordBatch;
59+
use arrow_schema::DataType;
5660
use datafusion_common::cast::as_boolean_array;
5761
use datafusion_common::{
5862
arrow_err, internal_datafusion_err, internal_err, project_schema,
@@ -1658,11 +1662,30 @@ fn build_row_join_batch(
16581662
// Broadcast the single build-side row to match the filtered
16591663
// probe-side batch length
16601664
let original_left_array = build_side_batch.column(column_index.index);
1661-
let scalar_value = ScalarValue::try_from_array(
1662-
original_left_array.as_ref(),
1663-
build_side_index,
1664-
)?;
1665-
scalar_value.to_array_of_size(filtered_probe_batch.num_rows())?
1665+
// Avoid using `ScalarValue::to_array_of_size()` for `List(Utf8View)` to avoid
1666+
// deep copies for buffers inside `Utf8View` array. See below for details.
1667+
// https://github.com/apache/datafusion/issues/18159
1668+
//
1669+
// In other cases, `to_array_of_size()` is faster.
1670+
match original_left_array.data_type() {
1671+
DataType::List(field) | DataType::LargeList(field)
1672+
if field.data_type() == &DataType::Utf8View =>
1673+
{
1674+
let indices_iter = std::iter::repeat_n(
1675+
build_side_index as u64,
1676+
filtered_probe_batch.num_rows(),
1677+
);
1678+
let indices_array = UInt64Array::from_iter_values(indices_iter);
1679+
take(original_left_array.as_ref(), &indices_array, None)?
1680+
}
1681+
_ => {
1682+
let scalar_value = ScalarValue::try_from_array(
1683+
original_left_array.as_ref(),
1684+
build_side_index,
1685+
)?;
1686+
scalar_value.to_array_of_size(filtered_probe_batch.num_rows())?
1687+
}
1688+
}
16661689
} else {
16671690
// Take the filtered probe-side column using compute::take
16681691
Arc::clone(filtered_probe_batch.column(column_index.index))
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
19+
20+
## Ensure test coverage for NLJ using joining on LISTS
21+
22+
## Reproducer for https://github.com/apache/datafusion/issues/18070
23+
24+
statement ok
25+
CREATE TABLE categories_raw
26+
AS SELECT arrow_cast('cat_' || value, 'Utf8View') AS category_id FROM generate_series(1, 5);
27+
28+
statement ok
29+
CREATE TABLE places
30+
AS SELECT column1 as id, column2 as fsq_category_ids, column3 as date_refreshed
31+
FROM VALUES
32+
(1, ['cat_1', 'cat_2', 'cat_3'], DATE '2023-05-10'),
33+
(2, ['cat_4', 'cat_5'], DATE '2021-12-01'),
34+
(3, ['cat_6', 'cat_7', 'cat_8', 'cat_9'], DATE '2024-01-15'); --> NOTE these categories do not exist in categories_raw
35+
36+
37+
query I
38+
WITH categories_arr AS (
39+
SELECT array_agg(category_id) AS category_ids FROM categories_raw LIMIT 500
40+
)
41+
SELECT COUNT(*)
42+
FROM places p
43+
WHERE array_has_any(p.fsq_category_ids, (SELECT category_ids FROM categories_arr));
44+
----
45+
2
46+
47+
query I
48+
WITH categories_arr AS (
49+
SELECT array_agg(category_id) AS category_ids FROM categories_raw LIMIT 500
50+
)
51+
SELECT COUNT(*)
52+
FROM places p
53+
WHERE id <> 1 AND array_has_any(p.fsq_category_ids, (SELECT category_ids FROM categories_arr));
54+
----
55+
1
56+
57+
# cleanup
58+
statement ok
59+
DROP TABLE categories_raw;
60+
61+
statement ok
62+
DROP TABLE places;
63+

0 commit comments

Comments
 (0)