Skip to content

Commit 8d54d00

Browse files
distributed_physical_optimizer_rule: move metrics_collection to NetworkBoundary trait
This is a small refactor which moves metrics_collection to the NetworkBoundary trait so all network boundaries must obey the metrics collecting protocol.
1 parent 5129632 commit 8d54d00

File tree

5 files changed

+55
-41
lines changed

5 files changed

+55
-41
lines changed

src/distributed_physical_optimizer_rule.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use super::{NetworkShuffleExec, PartitionIsolatorExec, StageExec};
22
use crate::execution_plans::NetworkCoalesceExec;
3+
use crate::metrics::proto::MetricsSetProto;
4+
use crate::protobuf::StageKey;
5+
use dashmap::DashMap;
36
use datafusion::common::plan_err;
47
use datafusion::common::tree_node::TreeNodeRecursion;
58
use datafusion::datasource::source::DataSourceExec;
@@ -318,6 +321,20 @@ pub trait NetworkBoundary: ExecutionPlan {
318321
}
319322
Ok(Arc::clone(children.first().unwrap()))
320323
}
324+
325+
/// metrics_collection is used to collect metrics from child tasks. It is empty when a
326+
/// [NetworkBoundary] is instantiated (deserialized, created via new() etc...).
327+
/// Metrics are populated by executing() the [NetworkBoundary]. It's expected that the
328+
/// collection is complete after the [NetworkBoundary] has been executed. It is undefined
329+
/// what this returns during execution.
330+
///
331+
/// An instance may receive metrics for 0 to N child tasks, where N is the number of tasks
332+
/// in the stage it is reading from. This is because, by convention, the ArrowFlightEndpoint
333+
/// sends metrics for a task to the last [NetworkBoundary] to read from it, which may or may
334+
/// not be this instance.
335+
fn metrics_collection(&self) -> Option<Arc<DashMap<StageKey, Vec<MetricsSetProto>>>> {
336+
None
337+
}
321338
}
322339

323340
/// Error thrown during distributed planning that prompts the planner to change something and

src/execution_plans/metrics.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::distributed_physical_optimizer_rule::NetworkBoundary;
12
use crate::execution_plans::{NetworkCoalesceExec, NetworkShuffleExec, StageExec};
23
use crate::metrics::proto::{MetricsSetProto, metrics_set_proto_to_df};
34
use arrow::ipc::writer::DictionaryTracker;
@@ -50,26 +51,24 @@ impl TreeNodeRewriter for TaskMetricsCollector {
5051
type Node = Arc<dyn ExecutionPlan>;
5152

5253
fn f_down(&mut self, plan: Self::Node) -> Result<Transformed<Self::Node>> {
53-
// If the plan is an NetworkShuffleExec, assume it has collected metrics already
54+
// If the plan is a NetwordBoundary, assume it has collected metrics already
5455
// from child tasks.
5556
let metrics_collection =
5657
if let Some(node) = plan.as_any().downcast_ref::<NetworkShuffleExec>() {
57-
let NetworkShuffleExec::Ready(ready) = node else {
58-
return internal_err!(
59-
"unexpected NetworkShuffleExec::Pending during metrics collection"
60-
);
61-
};
62-
Some(Arc::clone(&ready.metrics_collection))
58+
node.metrics_collection()
59+
.map(Some)
60+
.ok_or(DataFusionError::Internal(
61+
"could not collect metrics from NetworkShuffleExec".to_string(),
62+
))
6363
} else if let Some(node) = plan.as_any().downcast_ref::<NetworkCoalesceExec>() {
64-
let NetworkCoalesceExec::Ready(ready) = node else {
65-
return internal_err!(
66-
"unexpected NetworkCoalesceExec::Pending during metrics collection"
67-
);
68-
};
69-
Some(Arc::clone(&ready.metrics_collection))
64+
node.metrics_collection()
65+
.map(Some)
66+
.ok_or(DataFusionError::Internal(
67+
"could not collect metrics from NetworkCoalesceExec".to_string(),
68+
))
7069
} else {
71-
None
72-
};
70+
Ok(None)
71+
}?;
7372

7473
if let Some(metrics_collection) = metrics_collection {
7574
for mut entry in metrics_collection.iter_mut() {

src/execution_plans/network_coalesce.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,7 @@ pub struct NetworkCoalesceReady {
9090
pub(crate) properties: PlanProperties,
9191
pub(crate) stage_num: usize,
9292
pub(crate) input_tasks: usize,
93-
/// metrics_collection is used to collect metrics from child tasks. It is empty when an
94-
/// is instantiated (deserialized, created via [NetworkCoalesceExec::new_ready] etc...).
95-
/// Metrics are populated in this map via [NetworkCoalesceExec::execute].
96-
///
97-
/// An instance may receive metrics for 0 to N child tasks, where N is the number of tasks in
98-
/// the stage it is reading from. This is because, by convention, the ArrowFlightEndpoint
99-
/// sends metrics for a task to the last NetworkCoalesceExec to read from it, which may or may
100-
/// not be this instance.
101-
pub(crate) metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
93+
pub(crate) child_task_metrics: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
10294
}
10395

10496
impl NetworkCoalesceExec {
@@ -164,7 +156,7 @@ impl NetworkBoundary for NetworkCoalesceExec {
164156
}),
165157
stage_num,
166158
input_tasks: pending.input_tasks,
167-
metrics_collection: Default::default(),
159+
child_task_metrics: Default::default(),
168160
};
169161

170162
Ok(Arc::new(Self::Ready(ready)))
@@ -185,10 +177,17 @@ impl NetworkBoundary for NetworkCoalesceExec {
185177
}),
186178
stage_num: ready.stage_num,
187179
input_tasks,
188-
metrics_collection: Arc::clone(&ready.metrics_collection),
180+
child_task_metrics: Arc::clone(&ready.child_task_metrics),
189181
}),
190182
})
191183
}
184+
185+
fn metrics_collection(&self) -> Option<Arc<DashMap<StageKey, Vec<MetricsSetProto>>>> {
186+
match self {
187+
NetworkCoalesceExec::Pending(_) => None,
188+
NetworkCoalesceExec::Ready(v) => Some(v.child_task_metrics.clone()),
189+
}
190+
}
192191
}
193192

194193
impl DisplayAs for NetworkCoalesceExec {
@@ -298,7 +297,7 @@ impl ExecutionPlan for NetworkCoalesceExec {
298297
return internal_err!("NetworkCoalesceExec: task is unassigned, cannot proceed");
299298
};
300299

301-
let metrics_collection_capture = self_ready.metrics_collection.clone();
300+
let metrics_collection_capture = self_ready.child_task_metrics.clone();
302301
let stream = async move {
303302
let channel = channel_resolver.get_channel_for_url(&url).await?;
304303
let stream = FlightServiceClient::new(channel)

src/execution_plans/network_shuffle.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,7 @@ pub struct NetworkShuffleReadyExec {
136136
/// the properties we advertise for this execution plan
137137
pub(crate) properties: PlanProperties,
138138
pub(crate) stage_num: usize,
139-
/// metrics_collection is used to collect metrics from child tasks. It is empty when an
140-
/// is instantiated (deserialized, created via [NetworkShuffleExec::new_ready] etc...).
141-
/// Metrics are populated in this map via [NetworkShuffleExec::execute].
142-
///
143-
/// An instance may receive metrics for 0 to N child tasks, where N is the number of tasks in
144-
/// the stage it is reading from. This is because, by convention, the ArrowFlightEndpoint
145-
/// sends metrics for a task to the last NetworkShuffleExec to read from it, which may or may
146-
/// not be this instance.
147-
pub(crate) metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
139+
pub(crate) child_task_metrics: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
148140
}
149141

150142
impl NetworkShuffleExec {
@@ -209,7 +201,7 @@ impl NetworkBoundary for NetworkShuffleExec {
209201
NetworkShuffleExec::Ready(prev) => NetworkShuffleExec::Ready(NetworkShuffleReadyExec {
210202
properties: prev.properties.clone(),
211203
stage_num: prev.stage_num,
212-
metrics_collection: Arc::clone(&prev.metrics_collection),
204+
child_task_metrics: Arc::clone(&prev.child_task_metrics),
213205
}),
214206
})
215207
}
@@ -226,11 +218,18 @@ impl NetworkBoundary for NetworkShuffleExec {
226218
let ready = NetworkShuffleReadyExec {
227219
properties: pending.repartition_exec.properties().clone(),
228220
stage_num,
229-
metrics_collection: Default::default(),
221+
child_task_metrics: Default::default(),
230222
};
231223

232224
Ok(Arc::new(Self::Ready(ready)))
233225
}
226+
227+
fn metrics_collection(&self) -> Option<Arc<DashMap<StageKey, Vec<MetricsSetProto>>>> {
228+
match self {
229+
NetworkShuffleExec::Pending(_) => None,
230+
NetworkShuffleExec::Ready(v) => Some(v.child_task_metrics.clone()),
231+
}
232+
}
234233
}
235234

236235
impl DisplayAs for NetworkShuffleExec {
@@ -331,7 +330,7 @@ impl ExecutionPlan for NetworkShuffleExec {
331330
},
332331
);
333332

334-
let metrics_collection_capture = self_ready.metrics_collection.clone();
333+
let metrics_collection_capture = self_ready.child_task_metrics.clone();
335334
async move {
336335
let url = task.url.ok_or(internal_datafusion_err!(
337336
"NetworkShuffleExec: task is unassigned, cannot proceed"

src/protobuf/distributed_codec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ fn new_network_hash_shuffle_exec(
234234
Boundedness::Bounded,
235235
),
236236
stage_num,
237-
metrics_collection: Default::default(),
237+
child_task_metrics: Default::default(),
238238
})
239239
}
240240

@@ -268,7 +268,7 @@ fn new_network_coalesce_tasks_exec(
268268
),
269269
stage_num,
270270
input_tasks,
271-
metrics_collection: Default::default(),
271+
child_task_metrics: Default::default(),
272272
})
273273
}
274274

0 commit comments

Comments
 (0)