Skip to content

Commit d213fb4

Browse files
committed
Refactor to NetworkShuffleExec and NetworkCoalesceExec
1 parent 2d48e02 commit d213fb4

20 files changed

+795
-895
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ impl DistributedSessionBuilder for RunOpt {
143143
if !self.workers.is_empty() {
144144
let tasks = self.max_tasks.unwrap_or(self.workers.len());
145145
let rule = DistributedPhysicalOptimizerRule::new()
146-
.with_coalesce_partitions_exec_tasks(tasks)
147-
.with_network_shuffle_exec_tasks(tasks);
146+
.with_network_coalesce_tasks(tasks)
147+
.with_network_shuffle_tasks(tasks);
148148
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
149149
}
150150

src/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
mod composed_extension_codec;
2+
mod partitioning;
23
#[allow(unused)]
34
pub mod ttl_map;
45

56
pub(crate) use composed_extension_codec::ComposedPhysicalExtensionCodec;
7+
pub(crate) use partitioning::{scale_partitioning, scale_partitioning_props};

src/common/partitioning.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use datafusion::physical_expr::Partitioning;
2+
use datafusion::physical_plan::PlanProperties;
3+
4+
pub fn scale_partitioning_props(
5+
props: &PlanProperties,
6+
f: impl FnOnce(usize) -> usize,
7+
) -> PlanProperties {
8+
PlanProperties::new(
9+
props.eq_properties.clone(),
10+
scale_partitioning(&props.partitioning, f),
11+
props.emission_type,
12+
props.boundedness,
13+
)
14+
}
15+
16+
pub fn scale_partitioning(
17+
partitioning: &Partitioning,
18+
f: impl FnOnce(usize) -> usize,
19+
) -> Partitioning {
20+
match &partitioning {
21+
Partitioning::RoundRobinBatch(p) => Partitioning::RoundRobinBatch(f(*p)),
22+
Partitioning::Hash(hash, p) => Partitioning::Hash(hash.clone(), f(*p)),
23+
Partitioning::UnknownPartitioning(p) => Partitioning::UnknownPartitioning(f(*p)),
24+
}
25+
}

src/distributed_physical_optimizer_rule.rs

Lines changed: 79 additions & 58 deletions
Large diffs are not rendered by default.

src/execution_plans/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
mod network_coalesce_tasks;
2-
mod network_hash_shuffle;
1+
mod network_coalesce;
2+
mod network_shuffle;
33
mod partition_isolator;
44
mod stage;
55

6-
pub use network_coalesce_tasks::{NetworkCoalesceTasksExec, NetworkCoalesceTasksReadyExec};
7-
pub use network_hash_shuffle::{NetworkHashShuffleExec, NetworkHashShuffleReadyExec};
6+
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
7+
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
88
pub use partition_isolator::PartitionIsolatorExec;
99
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};

src/execution_plans/network_coalesce_tasks.rs renamed to src/execution_plans/network_coalesce.rs

Lines changed: 43 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
2+
use crate::common::scale_partitioning_props;
23
use crate::config_extension_ext::ContextGrpcMetadata;
34
use crate::distributed_physical_optimizer_rule::{limit_tasks_err, DistributedExecutionPlan};
45
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
@@ -13,7 +14,6 @@ use arrow_flight::Ticket;
1314
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
1415
use datafusion::error::DataFusionError;
1516
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
16-
use datafusion::physical_expr::Partitioning;
1717
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
1818
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
1919
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
@@ -32,34 +32,34 @@ use tonic::Request;
3232
/// 2. Ready: runs within a distributed stage and queries the next input stage over the network
3333
/// using Arrow Flight.
3434
#[derive(Debug, Clone)]
35-
pub enum NetworkCoalesceTasksExec {
36-
Pending(NetworkCoalesceTasksPendingExec),
37-
Ready(NetworkCoalesceTasksReadyExec),
35+
pub enum NetworkCoalesceExec {
36+
Pending(NetworkCoalescePending),
37+
Ready(NetworkCoalesceReady),
3838
}
3939

40-
/// Placeholder version of the [NetworkCoalesceTasksExec] node. It acts as a marker for the
40+
/// Placeholder version of the [NetworkCoalesceExec] node. It acts as a marker for the
4141
/// distributed optimization step, which will replace it with the appropriate
42-
/// [NetworkCoalesceTasksReadyExec] node.
42+
/// [NetworkCoalesceReady] node.
4343
#[derive(Debug, Clone)]
44-
pub struct NetworkCoalesceTasksPendingExec {
44+
pub struct NetworkCoalescePending {
4545
properties: PlanProperties,
4646
input_tasks: usize,
4747
child: Arc<dyn ExecutionPlan>,
4848
}
4949

50-
/// Ready version of the [NetworkCoalesceTasksExec] node. This node can be created in
50+
/// Ready version of the [NetworkCoalesceExec] node. This node can be created in
5151
/// just two ways:
52-
/// - by the distributed optimization step based on an original [NetworkCoalesceTasksPendingExec]
52+
/// - by the distributed optimization step based on an original [NetworkCoalescePending]
5353
/// - deserialized from a protobuf plan sent over the network.
5454
#[derive(Debug, Clone)]
55-
pub struct NetworkCoalesceTasksReadyExec {
55+
pub struct NetworkCoalesceReady {
5656
/// the properties we advertise for this execution plan
5757
pub(crate) properties: PlanProperties,
5858
pub(crate) stage_num: usize,
5959
pub(crate) input_tasks: usize,
6060
}
6161

62-
impl NetworkCoalesceTasksExec {
62+
impl NetworkCoalesceExec {
6363
pub fn from_coalesce_partitions_exec(
6464
input: &CoalescePartitionsExec,
6565
input_tasks: usize,
@@ -83,15 +83,15 @@ impl NetworkCoalesceTasksExec {
8383
return internal_err!("Expected a single child");
8484
};
8585

86-
Ok(Self::Pending(NetworkCoalesceTasksPendingExec {
86+
Ok(Self::Pending(NetworkCoalescePending {
8787
properties: child.properties().clone(),
8888
input_tasks,
8989
child: Arc::clone(child),
9090
}))
9191
}
9292
}
9393

94-
impl DistributedExecutionPlan for NetworkCoalesceTasksExec {
94+
impl DistributedExecutionPlan for NetworkCoalesceExec {
9595
fn to_stage_info(
9696
&self,
9797
n_tasks: usize,
@@ -112,12 +112,14 @@ impl DistributedExecutionPlan for NetworkCoalesceTasksExec {
112112
stage_num: usize,
113113
stage_head: &Arc<dyn ExecutionPlan>,
114114
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
115-
let NetworkCoalesceTasksExec::Pending(pending) = self else {
116-
return internal_err!("NetworkCoalesceTasksExec is already distributed");
115+
let NetworkCoalesceExec::Pending(pending) = self else {
116+
return internal_err!("NetworkCoalesceExec is already distributed");
117117
};
118118

119-
let ready = NetworkCoalesceTasksReadyExec {
120-
properties: scale_partitioning(stage_head.properties(), |p| p * pending.input_tasks),
119+
let ready = NetworkCoalesceReady {
120+
properties: scale_partitioning_props(stage_head.properties(), |p| {
121+
p * pending.input_tasks
122+
}),
121123
stage_num,
122124
input_tasks: pending.input_tasks,
123125
};
@@ -127,49 +129,33 @@ impl DistributedExecutionPlan for NetworkCoalesceTasksExec {
127129

128130
fn with_input_tasks(&self, input_tasks: usize) -> Arc<dyn DistributedExecutionPlan> {
129131
Arc::new(match self {
130-
NetworkCoalesceTasksExec::Pending(pending) => {
131-
NetworkCoalesceTasksExec::Pending(NetworkCoalesceTasksPendingExec {
132+
NetworkCoalesceExec::Pending(pending) => {
133+
NetworkCoalesceExec::Pending(NetworkCoalescePending {
132134
properties: pending.properties.clone(),
133135
input_tasks,
134136
child: pending.child.clone(),
135137
})
136138
}
137-
NetworkCoalesceTasksExec::Ready(ready) => {
138-
NetworkCoalesceTasksExec::Ready(NetworkCoalesceTasksReadyExec {
139-
properties: scale_partitioning(&ready.properties, |p| {
140-
p * input_tasks / ready.input_tasks
141-
}),
142-
stage_num: ready.stage_num,
143-
input_tasks,
144-
})
145-
}
139+
NetworkCoalesceExec::Ready(ready) => NetworkCoalesceExec::Ready(NetworkCoalesceReady {
140+
properties: scale_partitioning_props(&ready.properties, |p| {
141+
p * input_tasks / ready.input_tasks
142+
}),
143+
stage_num: ready.stage_num,
144+
input_tasks,
145+
}),
146146
})
147147
}
148148
}
149149

150-
fn scale_partitioning(props: &PlanProperties, f: impl FnOnce(usize) -> usize) -> PlanProperties {
151-
let partitioning = match &props.partitioning {
152-
Partitioning::RoundRobinBatch(p) => Partitioning::RoundRobinBatch(f(*p)),
153-
Partitioning::Hash(hash, p) => Partitioning::Hash(hash.clone(), f(*p)),
154-
Partitioning::UnknownPartitioning(p) => Partitioning::UnknownPartitioning(f(*p)),
155-
};
156-
PlanProperties::new(
157-
props.eq_properties.clone(),
158-
partitioning,
159-
props.emission_type,
160-
props.boundedness,
161-
)
162-
}
163-
164-
impl DisplayAs for NetworkCoalesceTasksExec {
150+
impl DisplayAs for NetworkCoalesceExec {
165151
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
166-
write!(f, "NetworkCoalesceTasksExec")
152+
write!(f, "NetworkCoalesceExec")
167153
}
168154
}
169155

170-
impl ExecutionPlan for NetworkCoalesceTasksExec {
156+
impl ExecutionPlan for NetworkCoalesceExec {
171157
fn name(&self) -> &str {
172-
"NetworkCoalesceTasksExec"
158+
"NetworkCoalesceExec"
173159
}
174160

175161
fn as_any(&self) -> &dyn Any {
@@ -178,15 +164,15 @@ impl ExecutionPlan for NetworkCoalesceTasksExec {
178164

179165
fn properties(&self) -> &PlanProperties {
180166
match self {
181-
NetworkCoalesceTasksExec::Pending(v) => &v.properties,
182-
NetworkCoalesceTasksExec::Ready(v) => &v.properties,
167+
NetworkCoalesceExec::Pending(v) => &v.properties,
168+
NetworkCoalesceExec::Ready(v) => &v.properties,
183169
}
184170
}
185171

186172
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
187173
match self {
188-
NetworkCoalesceTasksExec::Pending(v) => vec![&v.child],
189-
NetworkCoalesceTasksExec::Ready(_) => vec![],
174+
NetworkCoalesceExec::Pending(v) => vec![&v.child],
175+
NetworkCoalesceExec::Ready(_) => vec![],
190176
}
191177
}
192178

@@ -196,7 +182,7 @@ impl ExecutionPlan for NetworkCoalesceTasksExec {
196182
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
197183
if !children.is_empty() {
198184
return plan_err!(
199-
"NetworkCoalesceTasksExec: wrong number of children, expected 0, got {}",
185+
"NetworkCoalesceExec: wrong number of children, expected 0, got {}",
200186
children.len()
201187
);
202188
}
@@ -208,16 +194,16 @@ impl ExecutionPlan for NetworkCoalesceTasksExec {
208194
partition: usize,
209195
context: Arc<TaskContext>,
210196
) -> Result<SendableRecordBatchStream, DataFusionError> {
211-
let NetworkCoalesceTasksExec::Ready(self_ready) = self else {
197+
let NetworkCoalesceExec::Ready(self_ready) = self else {
212198
return exec_err!(
213-
"NetworkCoalesceTasksExec is not ready, was the distributed optimization step performed?"
199+
"NetworkCoalesceExec is not ready, was the distributed optimization step performed?"
214200
);
215201
};
216202

217203
// get the channel manager and current stage from our context
218204
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
219205

220-
// the `NetworkCoalesceTasksExec` node can only be executed in the context of a `StageExec`
206+
// the `NetworkCoalesceExec` node can only be executed in the context of a `StageExec`
221207
let stage = StageExec::from_ctx(&context)?;
222208

223209
// of our child stages find the one that matches the one we are supposed to be
@@ -226,15 +212,13 @@ impl ExecutionPlan for NetworkCoalesceTasksExec {
226212

227213
let codec = DistributedCodec::new_combined_with_user(context.session_config());
228214
let child_stage_proto = proto_from_stage(child_stage, &codec).map_err(|e| {
229-
internal_datafusion_err!(
230-
"NetworkCoalesceTasksExec: failed to convert stage to proto: {e}"
231-
)
215+
internal_datafusion_err!("NetworkCoalesceExec: failed to convert stage to proto: {e}")
232216
})?;
233217

234218
let context_headers = ContextGrpcMetadata::headers_from_ctx(&context);
235219
let task_context = DistributedTaskContext::from_ctx(&context);
236220
if task_context.task_index > 0 {
237-
return exec_err!("NetworkCoalesceTasksExec cannot be executed in more than one task");
221+
return exec_err!("NetworkCoalesceExec cannot be executed in more than one task");
238222
}
239223

240224
let partitions_per_task =
@@ -267,7 +251,7 @@ impl ExecutionPlan for NetworkCoalesceTasksExec {
267251
};
268252

269253
let Some(url) = task.url.clone() else {
270-
return internal_err!("NetworkCoalesceTasksExec: task is unassigned, cannot proceed");
254+
return internal_err!("NetworkCoalesceExec: task is unassigned, cannot proceed");
271255
};
272256

273257
let stream = async move {

0 commit comments

Comments
 (0)