Skip to content

Commit 4c8ac58

Browse files
committed
Rename ArrowFlightReadExec to NetworkShuffleExec
1 parent 0d29d17 commit 4c8ac58

File tree

5 files changed

+17
-19
lines changed

5 files changed

+17
-19
lines changed

src/execution_plans/metrics.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl TreeNodeRewriter for TaskMetricsCollector {
3838
type Node = Arc<dyn ExecutionPlan>;
3939

4040
fn f_down(&mut self, plan: Self::Node) -> Result<Transformed<Self::Node>> {
41-
// If the plan is an ArrowFlightReadExec, assume it has collected metrics already
41+
// If the plan is an NetworkShuffleExec, assume it has collected metrics already
4242
// from child tasks.
4343
let metrics_collection =
4444
if let Some(node) = plan.as_any().downcast_ref::<NetworkShuffleExec>() {
@@ -79,7 +79,7 @@ impl TreeNodeRewriter for TaskMetricsCollector {
7979
}
8080
}
8181
}
82-
// Skip the subtree of the ArrowFlightReadExec.
82+
// Skip the subtree of the NetworkShuffleExec.
8383
return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump));
8484
}
8585

@@ -107,7 +107,7 @@ impl TaskMetricsCollector {
107107
/// collect metrics from a StageExec plan and any child tasks.
108108
/// Returns
109109
/// - a vec representing the metrics for the current task (ordered using a pre-order traversal)
110-
/// - a map representing the metrics for some subset of child tasks collected from ArrowFlightReadExec leaves
110+
/// - a map representing the metrics for some subset of child tasks collected from NetworkShuffleExec leaves
111111
#[allow(dead_code)]
112112
pub fn collect(mut self, stage: &StageExec) -> Result<MetricsCollectorResult, DataFusionError> {
113113
stage.plan.clone().rewrite(&mut self)?;
@@ -123,14 +123,14 @@ impl TaskMetricsCollector {
123123
/// Ex. for a plan with the form
124124
/// AggregateExec
125125
/// └── ProjectionExec
126-
/// └── ArrowFlightReadExec
126+
/// └── NetworkShuffleExec
127127
///
128128
/// the task will be rewritten as
129129
///
130130
/// MetricsWrapperExec (wrapped: AggregateExec)
131131
/// └── MetricsWrapperExec (wrapped: ProjectionExec)
132-
/// └── ArrowFlightReadExec
133-
/// (Note that the ArrowFlightReadExec node is not wrapped)
132+
/// └── NetworkShuffleExec
133+
/// (Note that the NetworkShuffleExec node is not wrapped)
134134
pub struct TaskMetricsRewriter {
135135
metrics: Vec<MetricsSetProto>,
136136
idx: usize,
@@ -402,7 +402,7 @@ mod tests {
402402
#[ignore]
403403
async fn test_metrics_rewriter() {
404404
let (test_stage, _ctx) = make_test_stage_exec_with_5_nodes().await;
405-
let test_metrics_sets = (0..5) // 5 nodes excluding ArrowFlightReadExec
405+
let test_metrics_sets = (0..5) // 5 nodes excluding NetworkShuffleExec
406406
.map(|i| make_distinct_metrics_set(i + 10))
407407
.collect::<Vec<MetricsSetProto>>();
408408

@@ -420,7 +420,7 @@ mod tests {
420420
r" ProjectionExec: expr=[id@0 as id, count(Int64(1))@1 as count], metrics=[output_rows=12, elapsed_compute=12ns, start_timestamp=2025-09-18 13:00:12 UTC, end_timestamp=2025-09-18 13:00:13 UTC]",
421421
r" AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=13, elapsed_compute=13ns, start_timestamp=2025-09-18 13:00:13 UTC, end_timestamp=2025-09-18 13:00:14 UTC]",
422422
r" CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=14, elapsed_compute=14ns, start_timestamp=2025-09-18 13:00:14 UTC, end_timestamp=2025-09-18 13:00:15 UTC]",
423-
r" ArrowFlightReadExec, metrics=[]",
423+
r" NetworkShuffleExec, metrics=[]",
424424
"" // trailing newline
425425
].join("\n");
426426
assert_eq!(expected, plan_str.to_string());

src/execution_plans/network_coalesce.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ pub struct NetworkCoalesceReady {
9595
///
9696
/// An instance may receive metrics for 0 to N child tasks, where N is the number of tasks in
9797
/// the stage it is reading from. This is because, by convention, the ArrowFlightEndpoint
98-
/// sends metrics for a task to the last ArrowFlightReadExec to read from it, which may or may
98+
/// sends metrics for a task to the last NetworkCoalesceExec to read from it, which may or may
9999
/// not be this instance.
100100
pub(crate) metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
101101
}

src/execution_plans/network_shuffle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ pub struct NetworkShuffleReadyExec {
141141
///
142142
/// An instance may receive metrics for 0 to N child tasks, where N is the number of tasks in
143143
/// the stage it is reading from. This is because, by convention, the ArrowFlightEndpoint
144-
/// sends metrics for a task to the last ArrowFlightReadExec to read from it, which may or may
144+
/// sends metrics for a task to the last NetworkShuffleExec to read from it, which may or may
145145
/// not be this instance.
146146
pub(crate) metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
147147
}

src/execution_plans/stage.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,16 +220,14 @@ impl StageExec {
220220
ctx.session_config()
221221
.get_extension::<StageExec>()
222222
.ok_or(internal_datafusion_err!(
223-
"ArrowFlightReadExec requires an ExecutionStage in the session config"
223+
"missing ExecutionStage in session config"
224224
))
225225
}
226226

227227
pub fn child_stage(&self, i: usize) -> Result<&StageExec, DataFusionError> {
228228
self.child_stages_iter()
229229
.find(|s| s.num == i)
230-
.ok_or(internal_datafusion_err!(
231-
"ArrowFlightReadExec: no child stage with num {i}"
232-
))
230+
.ok_or(internal_datafusion_err!("no child stage with num {i}"))
233231
}
234232
}
235233

@@ -670,7 +668,7 @@ pub fn display_plan(
670668
/// An example of such a node would be:
671669
///
672670
/// ```text
673-
/// ArrowFlightReadExec [label=<
671+
/// NetworkShuffleExec [label=<
674672
/// <TABLE BORDER="0" CELLBORDER="0" CELLSPACING="0" CELLPADDING="0">
675673
/// <TR>
676674
/// <TD CELLBORDER="0">
@@ -686,7 +684,7 @@ pub fn display_plan(
686684
/// <TD BORDER="0" CELLPADDING="0" CELLSPACING="0">
687685
/// <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
688686
/// <TR>
689-
/// <TD>ArrowFlightReadExec</TD>
687+
/// <TD>NetworkShuffleExec</TD>
690688
/// </TR>
691689
/// </TABLE>
692690
/// </TD>

tests/highly_distributed_query.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ mod tests {
4545
assert_snapshot!(physical_distributed_str,
4646
@r"
4747
┌───── Stage 4 Tasks: t0:[p0,p1,p2,p3,p4]
48-
ArrowFlightReadExec input_stage=3, input_partitions=5, input_tasks=1
48+
NetworkShuffleExec input_stage=3, input_partitions=5, input_tasks=1
4949
└──────────────────────────────────────────────────
5050
┌───── Stage 3 Tasks: t0:[p0,p1,p2,p3,p4]
5151
│ RepartitionExec: partitioning=RoundRobinBatch(5), input_partitions=10
52-
ArrowFlightReadExec input_stage=2, input_partitions=10, input_tasks=1
52+
NetworkShuffleExec input_stage=2, input_partitions=10, input_tasks=1
5353
└──────────────────────────────────────────────────
5454
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
5555
│ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
56-
ArrowFlightReadExec input_stage=1, input_partitions=1, input_tasks=1
56+
NetworkShuffleExec input_stage=1, input_partitions=1, input_tasks=1
5757
└──────────────────────────────────────────────────
5858
┌───── Stage 1 Tasks: t0:[p0]
5959
│ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1

0 commit comments

Comments
 (0)