@@ -136,15 +136,7 @@ pub struct NetworkShuffleReadyExec {
136136 /// the properties we advertise for this execution plan
137137 pub ( crate ) properties : PlanProperties ,
138138 pub ( crate ) stage_num : usize ,
139- /// metrics_collection is used to collect metrics from child tasks. It is empty when an
140- /// is instantiated (deserialized, created via [NetworkShuffleExec::new_ready] etc...).
141- /// Metrics are populated in this map via [NetworkShuffleExec::execute].
142- ///
143- /// An instance may receive metrics for 0 to N child tasks, where N is the number of tasks in
144- /// the stage it is reading from. This is because, by convention, the ArrowFlightEndpoint
145- /// sends metrics for a task to the last NetworkShuffleExec to read from it, which may or may
146- /// not be this instance.
147- pub ( crate ) metrics_collection : Arc < DashMap < StageKey , Vec < MetricsSetProto > > > ,
139+ pub ( crate ) child_task_metrics : Arc < DashMap < StageKey , Vec < MetricsSetProto > > > ,
148140}
149141
150142impl NetworkShuffleExec {
@@ -209,7 +201,7 @@ impl NetworkBoundary for NetworkShuffleExec {
209201 NetworkShuffleExec :: Ready ( prev) => NetworkShuffleExec :: Ready ( NetworkShuffleReadyExec {
210202 properties : prev. properties . clone ( ) ,
211203 stage_num : prev. stage_num ,
212- metrics_collection : Arc :: clone ( & prev. metrics_collection ) ,
204+ child_task_metrics : Arc :: clone ( & prev. child_task_metrics ) ,
213205 } ) ,
214206 } )
215207 }
@@ -226,11 +218,18 @@ impl NetworkBoundary for NetworkShuffleExec {
226218 let ready = NetworkShuffleReadyExec {
227219 properties : pending. repartition_exec . properties ( ) . clone ( ) ,
228220 stage_num,
229- metrics_collection : Default :: default ( ) ,
221+ child_task_metrics : Default :: default ( ) ,
230222 } ;
231223
232224 Ok ( Arc :: new ( Self :: Ready ( ready) ) )
233225 }
226+
227+ fn metrics_collection ( & self ) -> Option < Arc < DashMap < StageKey , Vec < MetricsSetProto > > > > {
228+ match self {
229+ NetworkShuffleExec :: Pending ( _) => None ,
230+ NetworkShuffleExec :: Ready ( v) => Some ( v. child_task_metrics . clone ( ) ) ,
231+ }
232+ }
234233}
235234
236235impl DisplayAs for NetworkShuffleExec {
@@ -331,7 +330,7 @@ impl ExecutionPlan for NetworkShuffleExec {
331330 } ,
332331 ) ;
333332
334- let metrics_collection_capture = self_ready. metrics_collection . clone ( ) ;
333+ let metrics_collection_capture = self_ready. child_task_metrics . clone ( ) ;
335334 async move {
336335 let url = task. url . ok_or ( internal_datafusion_err ! (
337336 "NetworkShuffleExec: task is unassigned, cannot proceed"
0 commit comments