From af5d6c51570baf01a204bddf8d6e95db141f1e99 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sun, 25 Jan 2026 13:08:47 +0000 Subject: [PATCH] feat(datafusion): Optimize HashJoinExec for single-batch build side Adds a fast path to the `collect_left_input` function to avoid the expensive `concat_batches` operation when the build side consists of a single `RecordBatch`. This optimization reduces memory allocation and CPU usage for this common scenario. --- .../physical-plan/src/joins/hash_join/exec.rs | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 131b07461ebe5..f3eb1299e7447 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -172,7 +172,12 @@ fn try_create_array_map( let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row); reservation.try_grow(mem_size)?; - let batch = concat_batches(schema, batches)?; + // Fast path for single batch, avoiding `concat_batches` which is expensive. + let batch = if batches.len() == 1 { + batches[0].clone() + } else { + concat_batches(schema, batches)? + }; let left_values = evaluate_expressions_to_arrays(on_left, &batch)?; let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?; @@ -1656,10 +1661,8 @@ async fn collect_left_input( let mut hashes_buffer = Vec::new(); let mut offset = 0; - let batches_iter = batches.iter().rev(); - // Updating hashmap starting from the last batch - for batch in batches_iter.clone() { + for batch in batches.iter().rev() { hashes_buffer.clear(); hashes_buffer.resize(batch.num_rows(), 0); update_hash( @@ -1675,8 +1678,13 @@ async fn collect_left_input( offset += batch.num_rows(); } - // Merge all batches into a single batch, so we can directly index into the arrays - let batch = concat_batches(&schema, batches_iter.clone())?; + // Merge all batches into a single batch, so we can directly index into the arrays. + // Fast path for single batch, avoiding `concat_batches` which is expensive. + let batch = if batches.len() == 1 { + batches[0].clone() + } else { + concat_batches(&schema, batches.iter().rev())? + }; let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;