Skip to content

Commit 2a6f3aa

Browse files
authored
perf: Improve NLJ for very small right side case (#17562)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #17547 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> See the issue for the background. If the optimizer made the wrong join order decision, and put a very small input at the probe side of NLJ, the NLJ operator now can handle it much faster than before. For implementation, before it's always handling `(one_left_row X right_batch)` in the inner loop, this PR do join multiple left rows at once with the right batch, if the right batch is very small. The NLJ microbench result, only Q13 is for this workload: ``` ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓ ┃ Query ┃ before ┃ improve-nlj-small-right ┃ Change ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 85.31 ms │ 85.75 ms │ no change │ │ QQuery 2 │ 111.36 ms │ 109.88 ms │ no change │ │ QQuery 3 │ 180.99 ms │ 181.56 ms │ no change │ │ QQuery 4 │ 340.38 ms │ 355.24 ms │ no change │ │ QQuery 5 │ 248.62 ms │ 231.90 ms │ +1.07x faster │ │ QQuery 6 │ 1680.89 ms │ 1682.07 ms │ no change │ │ QQuery 7 │ 233.65 ms │ 234.83 ms │ no change │ │ QQuery 8 │ 1679.12 ms │ 1675.63 ms │ no change │ │ QQuery 9 │ 266.52 ms │ 266.54 ms │ no change │ │ QQuery 10 │ 544.66 ms │ 544.71 ms │ no change │ │ QQuery 11 │ 274.43 ms │ 265.71 ms │ no change │ │ QQuery 12 │ 275.11 ms │ 274.72 ms │ no change │ │ QQuery 13 │ 76.56 ms │ 1.88 ms │ +40.70x faster │ └──────────────┴────────────┴─────────────────────────┴────────────────┘ ``` In DF49 it's around 4ms. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Added one microbench query targeting small right input workload - Added one branch in the NLj's right input handling logic: if the current right batch is very small, try to join it with multiple left rows. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> This can be covered by existing tests: this additional path is not only triggered if the entire right input is small. For regular workloads, the final input batch can be also very small, so this new path can be triggered and tested. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent d8c63ee commit 2a6f3aa

File tree

1 file changed

+247
-15
lines changed

1 file changed

+247
-15
lines changed

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 247 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use crate::{
5050

5151
use arrow::array::{
5252
new_null_array, Array, BooleanArray, BooleanBufferBuilder, RecordBatchOptions,
53-
UInt64Array,
53+
UInt32Array, UInt64Array,
5454
};
5555
use arrow::buffer::BooleanBuffer;
5656
use arrow::compute::{
@@ -1269,11 +1269,51 @@ impl NestedLoopJoinStream {
12691269
// and push the result into output_buffer
12701270
// ========
12711271

1272+
// Special case:
1273+
// When the right batch is very small, join with multiple left rows at once,
1274+
//
1275+
// The regular implementation is not efficient if the plan's right child is
1276+
// very small (e.g. 1 row total), because inside the inner loop of NLJ, it's
1277+
// handling one input right batch at once, if it's not large enough, the
1278+
// overheads like filter evaluation can't be amortized through vectorization.
1279+
debug_assert_ne!(
1280+
right_batch.num_rows(),
1281+
0,
1282+
"When fetching the right batch, empty batches will be skipped"
1283+
);
1284+
1285+
let l_row_cnt_ratio = self.batch_size / right_batch.num_rows();
1286+
if l_row_cnt_ratio > 10 {
1287+
// Calculate max left rows to handle at once. This operator tries to handle
1288+
// up to `datafusion.execution.batch_size` rows at once in the intermediate
1289+
// batch.
1290+
let l_row_count = std::cmp::min(
1291+
l_row_cnt_ratio,
1292+
left_data.batch().num_rows() - self.left_probe_idx,
1293+
);
1294+
1295+
debug_assert!(l_row_count != 0, "This function should only be entered when there are remaining left rows to process");
1296+
let joined_batch = self.process_left_range_join(
1297+
&left_data,
1298+
&right_batch,
1299+
self.left_probe_idx,
1300+
l_row_count,
1301+
)?;
1302+
1303+
if let Some(batch) = joined_batch {
1304+
self.output_buffer.push_batch(batch)?;
1305+
}
1306+
1307+
self.left_probe_idx += l_row_count;
1308+
1309+
return Ok(true);
1310+
}
1311+
12721312
let l_idx = self.left_probe_idx;
1273-
let join_batch =
1313+
let joined_batch =
12741314
self.process_single_left_row_join(&left_data, &right_batch, l_idx)?;
12751315

1276-
if let Some(batch) = join_batch {
1316+
if let Some(batch) = joined_batch {
12771317
self.output_buffer.push_batch(batch)?;
12781318
}
12791319

@@ -1286,8 +1326,196 @@ impl NestedLoopJoinStream {
12861326
Ok(true)
12871327
}
12881328

1329+
/// Process [l_start_index, l_start_index + l_count) JOIN right_batch
1330+
/// Returns a RecordBatch containing the join results (None if empty)
1331+
///
1332+
/// Side Effect: If the join type requires, left or right side matched bitmap
1333+
/// will be set for matched indices.
1334+
fn process_left_range_join(
1335+
&mut self,
1336+
left_data: &JoinLeftData,
1337+
right_batch: &RecordBatch,
1338+
l_start_index: usize,
1339+
l_row_count: usize,
1340+
) -> Result<Option<RecordBatch>> {
1341+
// Construct the Cartesian product between the specified range of left rows
1342+
// and the entire right_batch. First, it calculates the index vectors, then
1343+
// materializes the intermediate batch, and finally applies the join filter
1344+
// to it.
1345+
// -----------------------------------------------------------
1346+
let right_rows = right_batch.num_rows();
1347+
let total_rows = l_row_count * right_rows;
1348+
1349+
// Build index arrays for cartesian product: left_range X right_batch
1350+
let left_indices: UInt32Array =
1351+
UInt32Array::from_iter_values((0..l_row_count).flat_map(|i| {
1352+
std::iter::repeat_n((l_start_index + i) as u32, right_rows)
1353+
}));
1354+
let right_indices: UInt32Array = UInt32Array::from_iter_values(
1355+
(0..l_row_count).flat_map(|_| 0..right_rows as u32),
1356+
);
1357+
1358+
debug_assert!(
1359+
left_indices.len() == right_indices.len()
1360+
&& right_indices.len() == total_rows,
1361+
"The length or cartesian product should be (left_size * right_size)",
1362+
);
1363+
1364+
// Evaluate the join filter (if any) over an intermediate batch built
1365+
// using the filter's own schema/column indices.
1366+
let bitmap_combined = if let Some(filter) = &self.join_filter {
1367+
// Build the intermediate batch for filter evaluation
1368+
let intermediate_batch = if filter.schema.fields().is_empty() {
1369+
// Constant predicate (e.g., TRUE/FALSE). Use an empty schema with row_count
1370+
create_record_batch_with_empty_schema(
1371+
Arc::new((*filter.schema).clone()),
1372+
total_rows,
1373+
)?
1374+
} else {
1375+
let mut filter_columns: Vec<Arc<dyn Array>> =
1376+
Vec::with_capacity(filter.column_indices().len());
1377+
for column_index in filter.column_indices() {
1378+
let array = if column_index.side == JoinSide::Left {
1379+
let col = left_data.batch().column(column_index.index);
1380+
take(col.as_ref(), &left_indices, None)?
1381+
} else {
1382+
let col = right_batch.column(column_index.index);
1383+
take(col.as_ref(), &right_indices, None)?
1384+
};
1385+
filter_columns.push(array);
1386+
}
1387+
1388+
RecordBatch::try_new(Arc::new((*filter.schema).clone()), filter_columns)?
1389+
};
1390+
1391+
let filter_result = filter
1392+
.expression()
1393+
.evaluate(&intermediate_batch)?
1394+
.into_array(intermediate_batch.num_rows())?;
1395+
let filter_arr = as_boolean_array(&filter_result)?;
1396+
1397+
// Combine with null bitmap to get a unified mask
1398+
boolean_mask_from_filter(filter_arr)
1399+
} else {
1400+
// No filter: all pairs match
1401+
BooleanArray::from(vec![true; total_rows])
1402+
};
1403+
1404+
// Update the global left or right bitmap for matched indices
1405+
// -----------------------------------------------------------
1406+
1407+
// None means we don't have to update left bitmap for this join type
1408+
let mut left_bitmap = if need_produce_result_in_final(self.join_type) {
1409+
Some(left_data.bitmap().lock())
1410+
} else {
1411+
None
1412+
};
1413+
1414+
// 'local' meaning: we want to collect 'is_matched' flag for the current
1415+
// right batch, after it has joining all of the left buffer, here it's only
1416+
// the partial result for joining given left range
1417+
let mut local_right_bitmap = if self.should_track_unmatched_right {
1418+
let mut current_right_batch_bitmap = BooleanBufferBuilder::new(right_rows);
1419+
// Ensure builder has logical length so set_bit is in-bounds
1420+
current_right_batch_bitmap.append_n(right_rows, false);
1421+
Some(current_right_batch_bitmap)
1422+
} else {
1423+
None
1424+
};
1425+
1426+
// Set the matched bit for left and right side bitmap
1427+
for (i, is_matched) in bitmap_combined.iter().enumerate() {
1428+
let is_matched = is_matched.ok_or_else(|| {
1429+
internal_datafusion_err!("Must be Some after the previous combining step")
1430+
})?;
1431+
1432+
let l_index = l_start_index + i / right_rows;
1433+
let r_index = i % right_rows;
1434+
1435+
if let Some(bitmap) = left_bitmap.as_mut() {
1436+
if is_matched {
1437+
// Map local index back to absolute left index within the batch
1438+
bitmap.set_bit(l_index, true);
1439+
}
1440+
}
1441+
1442+
if let Some(bitmap) = local_right_bitmap.as_mut() {
1443+
if is_matched {
1444+
bitmap.set_bit(r_index, true);
1445+
}
1446+
}
1447+
}
1448+
1449+
// Apply the local right bitmap to the global bitmap
1450+
if self.should_track_unmatched_right {
1451+
// Remember to put it back after update
1452+
let global_right_bitmap =
1453+
std::mem::take(&mut self.current_right_batch_matched).ok_or_else(
1454+
|| internal_datafusion_err!("right batch's bitmap should be present"),
1455+
)?;
1456+
let (buf, nulls) = global_right_bitmap.into_parts();
1457+
debug_assert!(nulls.is_none());
1458+
1459+
let current_right_bitmap = local_right_bitmap
1460+
.ok_or_else(|| {
1461+
internal_datafusion_err!(
1462+
"Should be Some if the current join type requires right bitmap"
1463+
)
1464+
})?
1465+
.finish();
1466+
let updated_global_right_bitmap = buf.bitor(&current_right_bitmap);
1467+
1468+
self.current_right_batch_matched =
1469+
Some(BooleanArray::new(updated_global_right_bitmap, None));
1470+
}
1471+
1472+
// For the following join types: only bitmaps are updated; do not emit rows now
1473+
if matches!(
1474+
self.join_type,
1475+
JoinType::LeftAnti
1476+
| JoinType::LeftSemi
1477+
| JoinType::LeftMark
1478+
| JoinType::RightAnti
1479+
| JoinType::RightMark
1480+
| JoinType::RightSemi
1481+
) {
1482+
return Ok(None);
1483+
}
1484+
1485+
// Build the projected output batch (using output schema/column_indices),
1486+
// then apply the bitmap filter to it.
1487+
if self.output_schema.fields().is_empty() {
1488+
// Empty projection: only row count matters
1489+
let row_count = bitmap_combined.true_count();
1490+
return Ok(Some(create_record_batch_with_empty_schema(
1491+
Arc::clone(&self.output_schema),
1492+
row_count,
1493+
)?));
1494+
}
1495+
1496+
let mut out_columns: Vec<Arc<dyn Array>> =
1497+
Vec::with_capacity(self.output_schema.fields().len());
1498+
for column_index in &self.column_indices {
1499+
let array = if column_index.side == JoinSide::Left {
1500+
let col = left_data.batch().column(column_index.index);
1501+
take(col.as_ref(), &left_indices, None)?
1502+
} else {
1503+
let col = right_batch.column(column_index.index);
1504+
take(col.as_ref(), &right_indices, None)?
1505+
};
1506+
out_columns.push(array);
1507+
}
1508+
let pre_filtered =
1509+
RecordBatch::try_new(Arc::clone(&self.output_schema), out_columns)?;
1510+
let filtered = filter_record_batch(&pre_filtered, &bitmap_combined)?;
1511+
Ok(Some(filtered))
1512+
}
1513+
12891514
/// Process a single left row join with the current right batch.
12901515
/// Returns a RecordBatch containing the join results (None if empty)
1516+
///
1517+
/// Side Effect: If the join type requires, left or right side matched bitmap
1518+
/// will be set for matched indices.
12911519
fn process_single_left_row_join(
12921520
&mut self,
12931521
left_data: &JoinLeftData,
@@ -1584,22 +1812,26 @@ fn apply_filter_to_row_join_batch(
15841812
.into_array(intermediate_batch.num_rows())?;
15851813
let filter_arr = as_boolean_array(&filter_result)?;
15861814

1587-
// [Caution] This step has previously introduced bugs
1588-
// The filter result is NOT a bitmap; it contains true/false/null values.
1589-
// For example, 1 < NULL is evaluated to NULL. Therefore, we must combine (AND)
1590-
// the boolean array with its null bitmap to construct a unified bitmap.
1591-
let (is_filtered, nulls) = filter_arr.clone().into_parts();
1592-
let bitmap_combined = match nulls {
1593-
Some(nulls) => {
1594-
let combined = nulls.inner() & &is_filtered;
1595-
BooleanArray::new(combined, None)
1596-
}
1597-
None => BooleanArray::new(is_filtered, None),
1598-
};
1815+
// Convert boolean array with potential nulls into a unified mask bitmap
1816+
let bitmap_combined = boolean_mask_from_filter(filter_arr);
15991817

16001818
Ok(bitmap_combined)
16011819
}
16021820

1821+
/// Convert a boolean filter array into a unified mask bitmap.
1822+
///
1823+
/// Caution: The filter result is NOT a bitmap; it contains true/false/null values.
1824+
/// For example, `1 < NULL` evaluates to NULL. Therefore, we must combine (AND)
1825+
/// the boolean array with its null bitmap to construct a unified bitmap.
1826+
#[inline]
1827+
fn boolean_mask_from_filter(filter_arr: &BooleanArray) -> BooleanArray {
1828+
let (values, nulls) = filter_arr.clone().into_parts();
1829+
match nulls {
1830+
Some(nulls) => BooleanArray::new(nulls.inner() & &values, None),
1831+
None => BooleanArray::new(values, None),
1832+
}
1833+
}
1834+
16031835
/// This function performs the following steps:
16041836
/// 1. Apply filter to probe-side batch
16051837
/// 2. Broadcast the left row (build_side_batch\[build_side_index\]) to the

0 commit comments

Comments
 (0)