diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index f3ec083efb24..f9bc93f60bac 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -327,6 +327,34 @@ pub fn adjust_input_keys_ordering( ) .map(Transformed::yes); } + PartitionMode::LazyPartitioned => { + // LazyPartitioned mode uses the same key reordering as Partitioned, + // but with LazyPartitioned mode preserved + let join_constructor = |new_conditions: ( + Vec<(PhysicalExprRef, PhysicalExprRef)>, + Vec, + )| { + HashJoinExec::try_new( + Arc::clone(left), + Arc::clone(right), + new_conditions.0, + filter.clone(), + join_type, + projection.clone(), + PartitionMode::LazyPartitioned, + *null_equality, + *null_aware, + ) + .map(|e| Arc::new(e) as _) + }; + return reorder_partitioned_join_keys( + requirements, + on, + &[], + &join_constructor, + ) + .map(Transformed::yes); + } PartitionMode::CollectLeft => { // Push down requirements to the right side requirements.children[1].data = match join_type { @@ -624,7 +652,10 @@ pub fn reorder_join_keys_to_inputs( .. }) = plan_any.downcast_ref::() { - if matches!(mode, PartitionMode::Partitioned) { + if matches!( + mode, + PartitionMode::Partitioned | PartitionMode::LazyPartitioned + ) { let (join_keys, positions) = reorder_current_join_keys( extract_join_keys(on), Some(left.output_partitioning()), @@ -645,7 +676,7 @@ pub fn reorder_join_keys_to_inputs( filter.clone(), join_type, projection.clone(), - PartitionMode::Partitioned, + *mode, *null_equality, *null_aware, )?)); @@ -1257,6 +1288,10 @@ pub fn ensure_distribution( // // CollectLeft/CollectRight modes are safe because one side is collected // to a single partition which eliminates partition-to-partition mapping. + // + // LazyPartitioned mode is also safe from this issue because the build side + // is not pre-partitioned; instead, rows are filtered locally during hash + // table construction. Only the probe side is hash-partitioned. let is_partitioned_join = plan .as_any() .downcast_ref::() diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index 7412d0ba9781..3bcc877346fa 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -293,7 +293,7 @@ fn statistical_join_selection_subrule( || partitioned_hash_join(hash_join).map(Some), |v| Ok(Some(v)), )?, - PartitionMode::Partitioned => { + PartitionMode::Partitioned | PartitionMode::LazyPartitioned => { let left = hash_join.left(); let right = hash_join.right(); // Don't swap null-aware anti joins as they have specific side requirements @@ -302,7 +302,7 @@ fn statistical_join_selection_subrule( && should_swap_join_order(&**left, &**right)? { hash_join - .swap_inputs(PartitionMode::Partitioned) + .swap_inputs(*hash_join.partition_mode()) .map(Some)? } else { None @@ -540,6 +540,9 @@ pub(crate) fn swap_join_according_to_unboundedness( (PartitionMode::Partitioned, _) => { hash_join.swap_inputs(PartitionMode::Partitioned) } + (PartitionMode::LazyPartitioned, _) => { + hash_join.swap_inputs(PartitionMode::LazyPartitioned) + } (PartitionMode::CollectLeft, _) => { hash_join.swap_inputs(PartitionMode::CollectLeft) } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 131b07461ebe..ffc3b2e978cf 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -27,6 +27,7 @@ use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; +use crate::hash_utils::create_hashes; use crate::joins::Map; use crate::joins::array_map::ArrayMap; use crate::joins::hash_join::inlist_builder::build_struct_inlist_values; @@ -49,6 +50,7 @@ use crate::projection::{ }; use crate::repartition::REPARTITION_RANDOM_STATE; use crate::spill::get_record_batch_memory_size; +use crate::stream::RecordBatchReceiverStream; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, @@ -61,8 +63,8 @@ use crate::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; -use arrow::array::{ArrayRef, BooleanBufferBuilder}; -use arrow::compute::concat_batches; +use arrow::array::{ArrayRef, BooleanArray, BooleanBufferBuilder}; +use arrow::compute::{concat_batches, filter_record_batch}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; @@ -740,6 +742,12 @@ impl HashJoinExec { PartitionMode::Partitioned => { symmetric_join_output_partitioning(left, right, &join_type)? } + PartitionMode::LazyPartitioned => { + // LazyPartitioned: output partitioning is determined by the probe (right) side, + // since each partition builds its own hash table from filtered build rows. + // This is similar to Partitioned mode but the build side isn't pre-partitioned. + symmetric_join_output_partitioning(left, right, &join_type)? + } }; let emission_type = if left.boundedness().is_unbounded() { @@ -958,6 +966,16 @@ impl ExecutionPlan for HashJoinExec { Distribution::HashPartitioned(right_expr), ] } + PartitionMode::LazyPartitioned => { + // LazyPartitioned mode: build side IS hash-partitioned as usual, + // but probe side is NOT repartitioned (we filter locally during probing). + // This saves repartitioning the often-larger probe side. + let left_expr = self.on.iter().map(|(l, _)| Arc::clone(l)).collect(); + vec![ + Distribution::HashPartitioned(left_expr), + Distribution::UnspecifiedDistribution, + ] + } PartitionMode::Auto => vec![ Distribution::UnspecifiedDistribution, Distribution::UnspecifiedDistribution, @@ -1116,6 +1134,7 @@ impl ExecutionPlan for HashJoinExec { Arc::clone(context.session_config().options()), self.null_equality, array_map_created_count, + None, // No partition filtering for CollectLeft mode )) })?, PartitionMode::Partitioned => { @@ -1137,6 +1156,32 @@ impl ExecutionPlan for HashJoinExec { Arc::clone(context.session_config().options()), self.null_equality, array_map_created_count, + None, // No partition filtering - already pre-partitioned by RepartitionExec + )) + } + PartitionMode::LazyPartitioned => { + // LazyPartitioned mode: build side IS hash-partitioned (same as Partitioned), + // but probe side is NOT repartitioned - we merge all partitions and filter + // lazily during probing. This saves repartitioning the often-larger probe side. + let left_stream = self.left.execute(partition, Arc::clone(&context))?; + + let reservation = + MemoryConsumer::new(format!("HashJoinInput[{partition}]")) + .register(context.memory_pool()); + + OnceFut::new(collect_left_input( + self.random_state.random_state().clone(), + left_stream, + on_left.clone(), + join_metrics.clone(), + reservation, + need_produce_result_in_final(self.join_type), + 1, // Each partition has its own hash table (same as Partitioned) + enable_dynamic_filter_pushdown, + Arc::clone(context.session_config().options()), + self.null_equality, + array_map_created_count, + None, // No partition filtering on build side - already hash-partitioned )) } PartitionMode::Auto => { @@ -1176,9 +1221,41 @@ impl ExecutionPlan for HashJoinExec { .flatten() .flatten(); - // we have the batches and the hash map with their keys. We can how create a stream + // we have the batches and the hash map with their keys. We can now create a stream // over the right that uses this information to issue new batches. - let right_stream = self.right.execute(partition, context)?; + let right_stream: SendableRecordBatchStream = match self.mode { + PartitionMode::LazyPartitioned => { + // LazyPartitioned mode: merge ALL probe partitions into one stream + // and filter lazily during probing based on hash % num_partitions + let right_partitions = self.right.output_partitioning().partition_count(); + if right_partitions == 1 { + self.right.execute(0, Arc::clone(&context))? + } else { + let mut builder = RecordBatchReceiverStream::builder( + self.right.schema(), + right_partitions, + ); + for part_i in 0..right_partitions { + builder.run_input( + Arc::clone(&self.right), + part_i, + Arc::clone(&context), + ); + } + builder.build() + } + } + _ => self.right.execute(partition, Arc::clone(&context))?, + }; + + // For LazyPartitioned mode, we need to filter probe rows by hash during probing + let probe_partition_filter = match self.mode { + PartitionMode::LazyPartitioned => Some(PartitionFilter { + current_partition: partition, + total_partitions: self.left.output_partitioning().partition_count(), + }), + _ => None, + }; // update column indices to reflect the projection let column_indices_after_projection = match &self.projection { @@ -1214,6 +1291,7 @@ impl ExecutionPlan for HashJoinExec { build_accumulator, self.mode, self.null_aware, + probe_partition_filter, ))) } @@ -1514,6 +1592,62 @@ fn should_collect_min_max_for_perfect_hash( Ok(ArrayMap::is_supported_type(&data_type)) } +/// Partition filter configuration for lazy partitioning. +/// +/// When set, only rows where `hash(join_keys) % total_partitions == current_partition` +/// are kept during hash table construction. This avoids the overhead of +/// pre-partitioning with `RepartitionExec` by filtering rows locally. +#[derive(Debug, Clone, Copy)] +pub(super) struct PartitionFilter { + /// The partition index this execution is responsible for + current_partition: usize, + /// Total number of partitions in the join + total_partitions: usize, +} + +/// Filters a record batch to only include rows that belong to the specified partition. +/// +/// Uses the same hash function and seeds as `RepartitionExec` to ensure that +/// rows are routed consistently. Rows where `hash(join_keys) % total_partitions == current_partition` +/// are kept; all others are filtered out. +/// +/// This function is used in `LazyPartitioned` mode to avoid the overhead of +/// `RepartitionExec` by filtering rows during hash table construction instead +/// of pre-partitioning. +pub(super) fn filter_batch_by_partition( + batch: &RecordBatch, + on_left: &[PhysicalExprRef], + partition_filter: &PartitionFilter, +) -> Result { + let num_rows = batch.num_rows(); + if num_rows == 0 { + return Ok(batch.clone()); + } + + // Evaluate join key columns + let arrays = evaluate_expressions_to_arrays(on_left, batch)?; + + // Compute hashes using the same random state as RepartitionExec + let mut hashes_buffer = vec![0u64; num_rows]; + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + &mut hashes_buffer, + )?; + + // Create a boolean mask for rows belonging to this partition + let mask: BooleanArray = hashes_buffer + .iter() + .map(|hash| { + *hash % partition_filter.total_partitions as u64 + == partition_filter.current_partition as u64 + }) + .collect(); + + // Filter the batch + Ok(filter_record_batch(batch, &mask)?) +} + /// Collects all batches from the left (build) side stream and creates a hash map for joining. /// /// This function is responsible for: @@ -1531,6 +1665,8 @@ fn should_collect_min_max_for_perfect_hash( /// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins) /// * `probe_threads_count` - Number of threads that will probe this hash table /// * `should_compute_dynamic_filters` - Whether to compute min/max bounds for dynamic filtering +/// * `partition_filter` - Optional partition filter for lazy partitioning mode. +/// When set, only rows belonging to the specified partition are included in the hash table. /// /// # Dynamic Filter Coordination /// When `should_compute_dynamic_filters` is true, this function computes the min/max bounds @@ -1555,6 +1691,7 @@ async fn collect_left_input( config: Arc, null_equality: NullEquality, array_map_created_count: Count, + partition_filter: Option, ) -> Result { let schema = left_stream.schema(); @@ -1569,28 +1706,46 @@ async fn collect_left_input( should_compute_dynamic_filters || should_collect_min_max_for_phj, )?; + // Clone on_left for use in the closure + let on_left_for_filter = on_left.clone(); + let state = left_stream - .try_fold(initial, |mut state, batch| async move { - // Update accumulators if computing bounds - if let Some(ref mut accumulators) = state.bounds_accumulators { - for accumulator in accumulators { - accumulator.update_batch(&batch)?; + .try_fold(initial, |mut state, batch| { + let on_left_clone = on_left_for_filter.clone(); + async move { + // Apply partition filter if in lazy partitioning mode + let batch = if let Some(ref pf) = partition_filter { + let filtered = filter_batch_by_partition(&batch, &on_left_clone, pf)?; + // Skip empty batches after filtering + if filtered.num_rows() == 0 { + return Ok(state); + } + filtered + } else { + batch + }; + + // Update accumulators if computing bounds + if let Some(ref mut accumulators) = state.bounds_accumulators { + for accumulator in accumulators { + accumulator.update_batch(&batch)?; + } } - } - // Decide if we spill or not - let batch_size = get_record_batch_memory_size(&batch); - // Reserve memory for incoming batch - state.reservation.try_grow(batch_size)?; - // Update metrics - state.metrics.build_mem_used.add(batch_size); - state.metrics.build_input_batches.add(1); - state.metrics.build_input_rows.add(batch.num_rows()); - // Update row count - state.num_rows += batch.num_rows(); - // Push batch to output - state.batches.push(batch); - Ok(state) + // Decide if we spill or not + let batch_size = get_record_batch_memory_size(&batch); + // Reserve memory for incoming batch + state.reservation.try_grow(batch_size)?; + // Update metrics + state.metrics.build_mem_used.add(batch_size); + state.metrics.build_input_batches.add(1); + state.metrics.build_input_rows.add(batch.num_rows()); + // Update row count + state.num_rows += batch.num_rows(); + // Push batch to output + state.batches.push(batch); + Ok(state) + } }) .await?; @@ -1979,10 +2134,13 @@ mod tests { let left_repartitioned: Arc = match partition_mode { PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)), - PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(left_expr, partition_count), - )?), + PartitionMode::Partitioned | PartitionMode::LazyPartitioned => { + // For both Partitioned and LazyPartitioned, build side is hash partitioned + Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(left_expr, partition_count), + )?) + } PartitionMode::Auto => { return internal_err!("Unexpected PartitionMode::Auto in join tests"); } @@ -2000,10 +2158,17 @@ mod tests { Partitioning::Hash(partition_expr, partition_count), )?) as _ } - PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash(right_expr, partition_count), - )?), + PartitionMode::Partitioned => { + // For Partitioned, probe side is hash partitioned + Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(right_expr, partition_count), + )?) + } + PartitionMode::LazyPartitioned => { + // For LazyPartitioned, probe side is NOT repartitioned - merged instead + Arc::new(CoalescePartitionsExec::new(right)) + } PartitionMode::Auto => { return internal_err!("Unexpected PartitionMode::Auto in join tests"); } diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index f32dc7fa8026..80cb5b561ef6 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -314,6 +314,10 @@ impl SharedBuildAccumulator { PartitionMode::Partitioned => { left_child.output_partitioning().partition_count() } + // LazyPartitioned: each probe partition builds its own data from filtered build rows + PartitionMode::LazyPartitioned => { + right_child.output_partitioning().partition_count() + } // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two) PartitionMode::Auto => unreachable!( "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" @@ -327,6 +331,12 @@ impl SharedBuildAccumulator { left_child.output_partitioning().partition_count() ], }, + PartitionMode::LazyPartitioned => AccumulatedBuildData::Partitioned { + partitions: vec![ + None; + right_child.output_partitioning().partition_count() + ], + }, PartitionMode::CollectLeft => { AccumulatedBuildData::CollectLeft { data: None } } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 54e620f99de7..78409b01b7d6 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -27,7 +27,9 @@ use std::task::Poll; use crate::joins::Map; use crate::joins::MapOffset; use crate::joins::PartitionMode; -use crate::joins::hash_join::exec::JoinLeftData; +use crate::joins::hash_join::exec::{ + JoinLeftData, PartitionFilter, filter_batch_by_partition, +}; use crate::joins::hash_join::shared_bounds::{ PartitionBounds, PartitionBuildData, SharedBuildAccumulator, }; @@ -227,6 +229,8 @@ pub(super) struct HashJoinStream { output_buffer: Box, /// Whether this is a null-aware anti join null_aware: bool, + /// Optional partition filter for LazyPartitioned mode - filters probe rows by hash + probe_partition_filter: Option, } impl RecordBatchStream for HashJoinStream { @@ -375,6 +379,7 @@ impl HashJoinStream { build_accumulator: Option>, mode: PartitionMode, null_aware: bool, + probe_partition_filter: Option, ) -> Self { // Create output buffer with coalescing. // Use biggest_coalesce_batch_size to bypass coalescing for batches @@ -407,6 +412,7 @@ impl HashJoinStream { mode, output_buffer, null_aware, + probe_partition_filter, } } @@ -502,7 +508,9 @@ impl HashJoinStream { let build_accumulator = Arc::clone(build_accumulator); let left_side_partition_id = match self.mode { - PartitionMode::Partitioned => self.partition, + PartitionMode::Partitioned | PartitionMode::LazyPartitioned => { + self.partition + } PartitionMode::CollectLeft => 0, PartitionMode::Auto => unreachable!( "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" @@ -514,14 +522,16 @@ impl HashJoinStream { // Construct the appropriate build data enum variant based on partition mode let build_data = match self.mode { - PartitionMode::Partitioned => PartitionBuildData::Partitioned { - partition_id: left_side_partition_id, - pushdown, - bounds: left_data - .bounds - .clone() - .unwrap_or_else(|| PartitionBounds::new(vec![])), - }, + PartitionMode::Partitioned | PartitionMode::LazyPartitioned => { + PartitionBuildData::Partitioned { + partition_id: left_side_partition_id, + pushdown, + bounds: left_data + .bounds + .clone() + .unwrap_or_else(|| PartitionBounds::new(vec![])), + } + } PartitionMode::CollectLeft => PartitionBuildData::CollectLeft { pushdown, bounds: left_data @@ -559,6 +569,19 @@ impl HashJoinStream { self.state = HashJoinStreamState::ExhaustedProbeSide; } Some(Ok(batch)) => { + // For LazyPartitioned mode, filter probe batch to only keep rows + // belonging to this partition based on hash(join_keys) % num_partitions + let batch = if let Some(ref filter) = self.probe_partition_filter { + filter_batch_by_partition(&batch, &self.on_right, filter)? + } else { + batch + }; + + // Skip empty batches after filtering + if batch.num_rows() == 0 { + return Poll::Ready(Ok(StatefulStreamResult::Continue)); + } + // Precalculate hash values for fetched batch let keys_values = evaluate_expressions_to_arrays(&self.on_right, &batch)?; diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 848d0472fe88..b0b48d7df13b 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -89,6 +89,15 @@ pub enum PartitionMode { /// mode(Partitioned/CollectLeft) is optimal based on statistics. It will /// also consider swapping the left and right inputs for the Join Auto, + /// Lazy partitioning: build side is not pre-partitioned by RepartitionExec. + /// Instead, each join partition reads all build partitions and filters rows + /// by computing `hash % num_partitions` during hash table construction. + /// The probe side is still hash-partitioned to ensure correct matching. + /// + /// This mode reduces overhead for wide build tables by avoiding the column + /// copies that occur in RepartitionExec, at the cost of each partition + /// reading the entire build side and filtering locally. + LazyPartitioned, } /// Partitioning mode to use for symmetric hash join diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 5f590560c467..6183139bf07f 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1100,6 +1100,7 @@ enum PartitionMode { COLLECT_LEFT = 0; PARTITIONED = 1; AUTO = 2; + LAZY_PARTITIONED = 3; } message HashJoinExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index f6d364f269b4..2e9a3063530f 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -14441,6 +14441,7 @@ impl serde::Serialize for PartitionMode { Self::CollectLeft => "COLLECT_LEFT", Self::Partitioned => "PARTITIONED", Self::Auto => "AUTO", + Self::LazyPartitioned => "LAZY_PARTITIONED", }; serializer.serialize_str(variant) } @@ -14455,6 +14456,7 @@ impl<'de> serde::Deserialize<'de> for PartitionMode { "COLLECT_LEFT", "PARTITIONED", "AUTO", + "LAZY_PARTITIONED", ]; struct GeneratedVisitor; @@ -14498,6 +14500,7 @@ impl<'de> serde::Deserialize<'de> for PartitionMode { "COLLECT_LEFT" => Ok(PartitionMode::CollectLeft), "PARTITIONED" => Ok(PartitionMode::Partitioned), "AUTO" => Ok(PartitionMode::Auto), + "LAZY_PARTITIONED" => Ok(PartitionMode::LazyPartitioned), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c1afd73ec3c5..01b2ff548215 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2281,6 +2281,7 @@ pub enum PartitionMode { CollectLeft = 0, Partitioned = 1, Auto = 2, + LazyPartitioned = 3, } impl PartitionMode { /// String value of the enum field names used in the ProtoBuf definition. @@ -2292,6 +2293,7 @@ impl PartitionMode { Self::CollectLeft => "COLLECT_LEFT", Self::Partitioned => "PARTITIONED", Self::Auto => "AUTO", + Self::LazyPartitioned => "LAZY_PARTITIONED", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -2300,6 +2302,7 @@ impl PartitionMode { "COLLECT_LEFT" => Some(Self::CollectLeft), "PARTITIONED" => Some(Self::Partitioned), "AUTO" => Some(Self::Auto), + "LAZY_PARTITIONED" => Some(Self::LazyPartitioned), _ => None, } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 45868df4ced6..75bbdbf23d88 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1218,6 +1218,7 @@ impl protobuf::PhysicalPlanNode { protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft, protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned, protobuf::PartitionMode::Auto => PartitionMode::Auto, + protobuf::PartitionMode::LazyPartitioned => PartitionMode::LazyPartitioned, }; let projection = if !hashjoin.projection.is_empty() { Some( @@ -2218,6 +2219,7 @@ impl protobuf::PhysicalPlanNode { PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft, PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned, PartitionMode::Auto => protobuf::PartitionMode::Auto, + PartitionMode::LazyPartitioned => protobuf::PartitionMode::LazyPartitioned, }; Ok(protobuf::PhysicalPlanNode {