@@ -131,10 +131,10 @@ impl DistributedPhysicalOptimizerRule {
131131 }
132132
133133 // If this is a hash RepartitionExec, introduce a shuffle.
134- if let Some ( node) = plan . as_any ( ) . downcast_ref :: < RepartitionExec > ( ) {
135- let Some ( tasks ) = self . network_shuffle_tasks else {
136- return Ok ( Transformed :: no ( plan ) ) ;
137- } ;
134+ if let ( Some ( node) , Some ( tasks ) ) = (
135+ plan . as_any ( ) . downcast_ref :: < RepartitionExec > ( ) ,
136+ self . network_shuffle_tasks ,
137+ ) {
138138 if !matches ! ( node. partitioning( ) , Partitioning :: Hash ( _, _) ) {
139139 return Ok ( Transformed :: no ( plan) ) ;
140140 }
@@ -146,11 +146,10 @@ impl DistributedPhysicalOptimizerRule {
146146 // If this is a CoalescePartitionsExec, it means that the original plan is trying to
147147 // merge all partitions into one. We need to go one step ahead and also merge all tasks
148148 // into one.
149- if let Some ( node) = plan. as_any ( ) . downcast_ref :: < CoalescePartitionsExec > ( ) {
150- let Some ( tasks) = self . network_coalesce_tasks else {
151- return Ok ( Transformed :: no ( plan) ) ;
152- } ;
153-
149+ if let ( Some ( node) , Some ( tasks) ) = (
150+ plan. as_any ( ) . downcast_ref :: < CoalescePartitionsExec > ( ) ,
151+ self . network_coalesce_tasks ,
152+ ) {
154153 // If the immediate child is a PartitionIsolatorExec, it means that the rest of the
155154 // plan is just a couple of non-computational nodes that are probably not worth
156155 // distributing.
@@ -171,10 +170,10 @@ impl DistributedPhysicalOptimizerRule {
171170 // The SortPreservingMergeExec node will try to coalesce all partitions into just 1.
172171 // We need to account for it and help it by also coalescing all tasks into one, therefore
173172 // a NetworkCoalesceExec is introduced.
174- if let Some ( node) = plan . as_any ( ) . downcast_ref :: < SortPreservingMergeExec > ( ) {
175- let Some ( tasks ) = self . network_coalesce_tasks else {
176- return Ok ( Transformed :: no ( plan ) ) ;
177- } ;
173+ if let ( Some ( node) , Some ( tasks ) ) = (
174+ plan . as_any ( ) . downcast_ref :: < SortPreservingMergeExec > ( ) ,
175+ self . network_coalesce_tasks ,
176+ ) {
178177 let node = NetworkCoalesceExec :: from_sort_preserving_merge_exec ( node, tasks) ?;
179178
180179 let plan = plan. with_new_children ( vec ! [ Arc :: new( node) ] ) ?;
0 commit comments