Skip to content

Commit c31844b

Browse files
committed
Misc improvements to public API
1 parent 4b5a1c1 commit c31844b

File tree

10 files changed

+59
-40
lines changed

10 files changed

+59
-40
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{
4747
};
4848
use datafusion_distributed::{
4949
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
50-
DistributedSessionBuilderContext, NetworkBoundaryExt, Stage,
50+
DistributedSessionBuilderContext, NetworkBoundaryExt,
5151
};
5252
use log::info;
5353
use std::fs;

src/distributed_physical_optimizer_rule.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use super::{NetworkShuffleExec, PartitionIsolatorExec, Stage};
1+
use super::{NetworkShuffleExec, PartitionIsolatorExec};
22
use crate::execution_plans::{DistributedExec, NetworkCoalesceExec};
3+
use crate::stage::Stage;
34
use datafusion::common::plan_err;
45
use datafusion::common::tree_node::TreeNodeRecursion;
56
use datafusion::datasource::source::DataSourceExec;
@@ -232,19 +233,19 @@ impl DistributedPhysicalOptimizerRule {
232233
};
233234

234235
let stage = loop {
235-
let (inner_plan, in_tasks) = dnode.as_ref().to_stage_info(n_tasks)?;
236+
let input_stage_info = dnode.as_ref().get_input_stage_info(n_tasks)?;
236237
// If the current stage has just 1 task, and the next stage is only going to have
237238
// 1 task, there's no point in having a network boundary in between, they can just
238239
// communicate in memory.
239-
if n_tasks == 1 && in_tasks == 1 {
240+
if n_tasks == 1 && input_stage_info.task_count == 1 {
240241
let mut n = dnode.as_ref().rollback()?;
241242
if let Some(node) = n.as_any().downcast_ref::<PartitionIsolatorExec>() {
242243
// Also trim PartitionIsolatorExec out of the plan.
243244
n = Arc::clone(node.children().first().unwrap());
244245
}
245246
return Ok(Transformed::yes(n));
246247
}
247-
match Self::_distribute_plan_inner(query_id, inner_plan.clone(), num, depth + 1, in_tasks) {
248+
match Self::_distribute_plan_inner(query_id, input_stage_info.plan, num, depth + 1, input_stage_info.task_count) {
248249
Ok(v) => break v,
249250
Err(e) => match get_distribute_plan_err(&e) {
250251
None => return Err(e),
@@ -253,7 +254,7 @@ impl DistributedPhysicalOptimizerRule {
253254
// that no more than `limit` tasks can be used for it, so we are going
254255
// to limit the amount of tasks to the requested number and try building
255256
// the stage again.
256-
if in_tasks == *limit {
257+
if input_stage_info.task_count == *limit {
257258
return plan_err!("A node requested {limit} tasks for the stage its in, but that stage already has that many tasks");
258259
}
259260
dnode = Referenced::Arced(dnode.as_ref().with_input_task_count(*limit)?);
@@ -278,14 +279,27 @@ impl DistributedPhysicalOptimizerRule {
278279
}
279280
}
280281

282+
/// Necessary information for building a [Stage] during distributed planning.
283+
///
284+
/// [NetworkBoundary]s return this piece of data so that the distributed planner know how to
285+
/// build the next [Stage] from which the [NetworkBoundary] is going to receive data.
286+
///
287+
/// Some network boundaries might perform some modifications in their children, like scaling
288+
/// up the number of partitions, or injecting a specific [ExecutionPlan] on top.
289+
pub struct InputStageInfo {
290+
/// The head plan of the [Stage] that is about to be built.
291+
pub plan: Arc<dyn ExecutionPlan>,
292+
/// The amount of tasks the [Stage] will have.
293+
pub task_count: usize,
294+
}
295+
281296
/// This trait represents a node that introduces the necessity of a network boundary in the plan.
282297
/// The distributed planner, upon stepping into one of these, will break the plan and build a stage
283298
/// out of it.
284299
pub trait NetworkBoundary: ExecutionPlan {
285-
/// Returns the information necessary for building the next stage.
286-
/// - The head node of the stage.
287-
/// - the amount of tasks that stage will have.
288-
fn to_stage_info(&self, n_tasks: usize) -> Result<(Arc<dyn ExecutionPlan>, usize)>;
300+
/// Returns the information necessary for building the next stage from which this
301+
/// [NetworkBoundary] is going to collect data.
302+
fn get_input_stage_info(&self, task_count: usize) -> Result<InputStageInfo>;
289303

290304
/// re-assigns a different number of input tasks to the current [NetworkBoundary].
291305
///
@@ -295,6 +309,8 @@ pub trait NetworkBoundary: ExecutionPlan {
295309

296310
/// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this
297311
/// information to perform any internal transformations necessary for distributed execution.
312+
///
313+
/// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready".
298314
fn with_input_stage(&self, input_stage: Stage) -> Result<Arc<dyn ExecutionPlan>>;
299315

300316
/// Returns the assigned input [Stage], if any.

src/execution_plans/distributed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
33
use crate::execution_plans::common::require_one_child;
44
use crate::protobuf::DistributedCodec;
5-
use crate::{ExecutionTask, Stage};
5+
use crate::stage::{ExecutionTask, Stage};
66
use datafusion::common::exec_err;
77
use datafusion::common::tree_node::{Transformed, TreeNode};
88
use datafusion::execution::{SendableRecordBatchStream, TaskContext};

src/execution_plans/network_coalesce.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::config_extension_ext::ContextGrpcMetadata;
3-
use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_err};
3+
use crate::distributed_physical_optimizer_rule::{
4+
InputStageInfo, NetworkBoundary, limit_tasks_err,
5+
};
46
use crate::execution_plans::common::{require_one_child, scale_partitioning_props};
57
use crate::flight_service::DoGet;
68
use crate::metrics::MetricsCollectingStream;
79
use crate::metrics::proto::MetricsSetProto;
810
use crate::protobuf::{StageKey, map_flight_to_datafusion_error, map_status_to_datafusion_error};
9-
use crate::stage::MaybeEncodedPlan;
10-
use crate::{ChannelResolver, DistributedTaskContext, Stage};
11+
use crate::stage::{MaybeEncodedPlan, Stage};
12+
use crate::{ChannelResolver, DistributedTaskContext};
1113
use arrow_flight::Ticket;
1214
use arrow_flight::decode::FlightRecordBatchStream;
1315
use arrow_flight::error::FlightError;
@@ -115,10 +117,7 @@ impl NetworkCoalesceExec {
115117
}
116118

117119
impl NetworkBoundary for NetworkCoalesceExec {
118-
fn to_stage_info(
119-
&self,
120-
n_tasks: usize,
121-
) -> Result<(Arc<dyn ExecutionPlan>, usize), DataFusionError> {
120+
fn get_input_stage_info(&self, n_tasks: usize) -> Result<InputStageInfo, DataFusionError> {
122121
let Self::Pending(pending) = self else {
123122
return plan_err!("can only return wrapped child if on Pending state");
124123
};
@@ -128,7 +127,10 @@ impl NetworkBoundary for NetworkCoalesceExec {
128127
return Err(limit_tasks_err(1));
129128
}
130129

131-
Ok((Arc::clone(&pending.input), pending.input_tasks))
130+
Ok(InputStageInfo {
131+
plan: Arc::clone(&pending.input),
132+
task_count: pending.input_tasks,
133+
})
132134
}
133135

134136
fn with_input_stage(

src/execution_plans/network_shuffle.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::config_extension_ext::ContextGrpcMetadata;
3-
use crate::distributed_physical_optimizer_rule::NetworkBoundary;
3+
use crate::distributed_physical_optimizer_rule::{InputStageInfo, NetworkBoundary};
44
use crate::execution_plans::common::{require_one_child, scale_partitioning};
55
use crate::flight_service::DoGet;
66
use crate::metrics::MetricsCollectingStream;
77
use crate::metrics::proto::MetricsSetProto;
88
use crate::protobuf::StageKey;
99
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
10-
use crate::stage::MaybeEncodedPlan;
11-
use crate::{ChannelResolver, DistributedTaskContext, Stage};
10+
use crate::stage::{MaybeEncodedPlan, Stage};
11+
use crate::{ChannelResolver, DistributedTaskContext};
1212
use arrow_flight::Ticket;
1313
use arrow_flight::decode::FlightRecordBatchStream;
1414
use arrow_flight::error::FlightError;
@@ -168,10 +168,7 @@ impl NetworkShuffleExec {
168168
}
169169

170170
impl NetworkBoundary for NetworkShuffleExec {
171-
fn to_stage_info(
172-
&self,
173-
n_tasks: usize,
174-
) -> Result<(Arc<dyn ExecutionPlan>, usize), DataFusionError> {
171+
fn get_input_stage_info(&self, n_tasks: usize) -> Result<InputStageInfo, DataFusionError> {
175172
let Self::Pending(pending) = self else {
176173
return plan_err!("cannot only return wrapped child if on Pending state");
177174
};
@@ -186,7 +183,10 @@ impl NetworkBoundary for NetworkShuffleExec {
186183
scale_partitioning(r_exe.partitioning(), |p| p * n_tasks),
187184
)?);
188185

189-
Ok((next_stage_plan, pending.input_tasks))
186+
Ok(InputStageInfo {
187+
plan: next_stage_plan,
188+
task_count: pending.input_tasks,
189+
})
190190
}
191191

192192
fn with_input_task_count(

src/flight_service/do_get.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,8 @@ fn collect_and_create_metrics_flight_data(
209209
#[cfg(test)]
210210
mod tests {
211211
use super::*;
212-
use crate::ExecutionTask;
213212
use crate::flight_service::session_builder::DefaultSessionBuilder;
213+
use crate::stage::ExecutionTask;
214214
use arrow::datatypes::{Schema, SchemaRef};
215215
use arrow_flight::Ticket;
216216
use datafusion::physical_expr::Partitioning;

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub mod test_utils;
1717
pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver};
1818
pub use distributed_ext::DistributedExt;
1919
pub use distributed_physical_optimizer_rule::{
20-
DistributedPhysicalOptimizerRule, NetworkBoundaryExt,
20+
DistributedPhysicalOptimizerRule, InputStageInfo, NetworkBoundary, NetworkBoundaryExt,
2121
};
2222
pub use execution_plans::{
2323
DistributedExec, NetworkCoalesceExec, NetworkShuffleExec, PartitionIsolatorExec,

src/protobuf/distributed_codec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use super::get_distributed_user_codecs;
22
use crate::distributed_physical_optimizer_rule::NetworkBoundary;
33
use crate::execution_plans::{NetworkCoalesceExec, NetworkCoalesceReady, NetworkShuffleReadyExec};
4-
use crate::stage::MaybeEncodedPlan;
5-
use crate::{ExecutionTask, NetworkShuffleExec, PartitionIsolatorExec, Stage};
4+
use crate::stage::{ExecutionTask, MaybeEncodedPlan, Stage};
5+
use crate::{NetworkShuffleExec, PartitionIsolatorExec};
66
use bytes::Bytes;
77
use datafusion::arrow::datatypes::Schema;
88
use datafusion::arrow::datatypes::SchemaRef;

src/stage.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,11 @@ use uuid::Uuid;
7272
#[derive(Debug, Clone)]
7373
pub struct Stage {
7474
/// Our query_id
75-
pub query_id: Uuid,
75+
pub(crate) query_id: Uuid,
7676
/// Our stage number
77-
pub num: usize,
77+
pub(crate) num: usize,
7878
/// The physical execution plan that this stage will execute.
79-
pub plan: MaybeEncodedPlan,
79+
pub(crate) plan: MaybeEncodedPlan,
8080
/// Our tasks which tell us how finely grained to execute the partitions in
8181
/// the plan
8282
pub tasks: Vec<ExecutionTask>,
@@ -86,15 +86,15 @@ pub struct Stage {
8686
pub struct ExecutionTask {
8787
/// The url of the worker that will execute this task. A None value is interpreted as
8888
/// unassigned.
89-
pub url: Option<Url>,
89+
pub(crate) url: Option<Url>,
9090
}
9191

9292
/// An [ExecutionPlan] that can be either:
9393
/// - Decoded: the inner [ExecutionPlan] is stored as-is.
9494
/// - Encoded: the inner [ExecutionPlan] is stored as protobuf [Bytes]. Storing it this way allow us
9595
/// to thread it through the project and eventually send it through gRPC in a zero copy manner.
9696
#[derive(Debug, Clone)]
97-
pub enum MaybeEncodedPlan {
97+
pub(crate) enum MaybeEncodedPlan {
9898
/// The decoded [ExecutionPlan].
9999
Decoded(Arc<dyn ExecutionPlan>),
100100
/// A protobuf encoded version of the [ExecutionPlan]. The inner [Bytes] represent the full
@@ -106,7 +106,7 @@ pub enum MaybeEncodedPlan {
106106
}
107107

108108
impl MaybeEncodedPlan {
109-
pub fn to_encoded(&self, codec: &dyn PhysicalExtensionCodec) -> Result<Self> {
109+
pub(crate) fn to_encoded(&self, codec: &dyn PhysicalExtensionCodec) -> Result<Self> {
110110
Ok(match self {
111111
Self::Decoded(plan) => Self::Encoded(
112112
PhysicalPlanNode::try_from_physical_plan(Arc::clone(plan), codec)?
@@ -117,14 +117,14 @@ impl MaybeEncodedPlan {
117117
})
118118
}
119119

120-
pub fn decoded(&self) -> Result<&Arc<dyn ExecutionPlan>> {
120+
pub(crate) fn decoded(&self) -> Result<&Arc<dyn ExecutionPlan>> {
121121
match self {
122122
MaybeEncodedPlan::Decoded(v) => Ok(v),
123123
MaybeEncodedPlan::Encoded(_) => plan_err!("Expected plan to be in a decoded state"),
124124
}
125125
}
126126

127-
pub fn encoded(&self) -> Result<&Bytes> {
127+
pub(crate) fn encoded(&self) -> Result<&Bytes> {
128128
match self {
129129
MaybeEncodedPlan::Decoded(_) => plan_err!("Expected plan to be in a encoded state"),
130130
MaybeEncodedPlan::Encoded(v) => Ok(v),

src/test_utils/plans.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use std::sync::Arc;
77

88
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
99
use crate::execution_plans::DistributedExec;
10-
use crate::{Stage, protobuf::StageKey};
10+
use crate::protobuf::StageKey;
11+
use crate::stage::Stage;
1112

1213
/// count_plan_nodes counts the number of execution plan nodes in a plan using BFS traversal.
1314
/// This does NOT traverse child stages, only the execution plan tree within this stage.

0 commit comments

Comments
 (0)