@@ -96,45 +96,12 @@ pub(crate) fn repartition(
9696 let partitioning_strategy =
9797 determine_partitioning_strategy ( & input, & table_metadata, target_partitions) ?;
9898
99- if !needs_repartitioning ( & input, & partitioning_strategy) {
100- return Ok ( input) ;
101- }
102-
10399 Ok ( Arc :: new ( RepartitionExec :: try_new (
104100 input,
105101 partitioning_strategy,
106102 ) ?) )
107103}
108104
109- /// Returns whether repartitioning is actually needed by comparing input and desired partitioning
110- fn needs_repartitioning ( input : & Arc < dyn ExecutionPlan > , desired : & Partitioning ) -> bool {
111- let input_partitioning = input. properties ( ) . output_partitioning ( ) ;
112- match ( input_partitioning, desired) {
113- ( Partitioning :: RoundRobinBatch ( a) , Partitioning :: RoundRobinBatch ( b) ) => a != b,
114- ( Partitioning :: Hash ( a_exprs, a_n) , Partitioning :: Hash ( b_exprs, b_n) ) => {
115- a_n != b_n || !same_columns ( a_exprs, b_exprs)
116- }
117- _ => true ,
118- }
119- }
120-
121- /// Helper function to check if two sets of column expressions are the same
122- fn same_columns ( a_exprs : & [ Arc < dyn PhysicalExpr > ] , b_exprs : & [ Arc < dyn PhysicalExpr > ] ) -> bool {
123- if a_exprs. len ( ) != b_exprs. len ( ) {
124- return false ;
125- }
126- a_exprs. iter ( ) . zip ( b_exprs. iter ( ) ) . all ( |( a, b) | {
127- if let ( Some ( a_col) , Some ( b_col) ) = (
128- a. as_any ( ) . downcast_ref :: < Column > ( ) ,
129- b. as_any ( ) . downcast_ref :: < Column > ( ) ,
130- ) {
131- a_col. name ( ) == b_col. name ( ) && a_col. index ( ) == b_col. index ( )
132- } else {
133- std:: ptr:: eq ( a. as_ref ( ) , b. as_ref ( ) )
134- }
135- } )
136- }
137-
138105/// Determine the optimal partitioning strategy based on table metadata.
139106///
140107/// Analyzes the table's partition specification to select the most appropriate
0 commit comments