Skip to content

Commit 3ae9c4c

Browse files
committed
Scope common execution_plan helpers to src/execution_plans/common.rs
1 parent 76cbed2 commit 3ae9c4c

File tree

7 files changed

+23
-29
lines changed

7 files changed

+23
-29
lines changed

src/common/execution_plan_ops.rs

Lines changed: 0 additions & 13 deletions
This file was deleted.

src/common/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
mod execution_plan_ops;
21
mod map_last_stream;
3-
mod partitioning;
4-
#[allow(unused)]
52
pub mod ttl_map;
63

7-
pub(crate) use execution_plan_ops::*;
84
pub(crate) use map_last_stream::map_last_stream;
9-
pub(crate) use partitioning::{scale_partitioning, scale_partitioning_props};
Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
1+
use datafusion::common::{DataFusionError, plan_err};
12
use datafusion::physical_expr::Partitioning;
2-
use datafusion::physical_plan::PlanProperties;
3+
use datafusion::physical_plan::{ExecutionPlan, PlanProperties};
4+
use std::sync::Arc;
35

4-
pub fn scale_partitioning_props(
6+
pub(super) fn require_one_child(
7+
children: &[Arc<dyn ExecutionPlan>],
8+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
9+
if children.len() != 1 {
10+
return plan_err!("Expected exactly 1 children, got {}", children.len());
11+
}
12+
Ok(children[0].clone())
13+
}
14+
15+
pub(super) fn scale_partitioning_props(
516
props: &PlanProperties,
617
f: impl FnOnce(usize) -> usize,
718
) -> PlanProperties {
@@ -13,7 +24,7 @@ pub fn scale_partitioning_props(
1324
)
1425
}
1526

16-
pub fn scale_partitioning(
27+
pub(super) fn scale_partitioning(
1728
partitioning: &Partitioning,
1829
f: impl FnOnce(usize) -> usize,
1930
) -> Partitioning {

src/execution_plans/distributed.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
2-
use crate::common::one_child;
32
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
3+
use crate::execution_plans::common::require_one_child;
44
use crate::protobuf::DistributedCodec;
55
use crate::{ExecutionTask, Stage};
66
use datafusion::common::exec_err;
@@ -98,7 +98,7 @@ impl ExecutionPlan for DistributedExec {
9898
children: Vec<Arc<dyn ExecutionPlan>>,
9999
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
100100
Ok(Arc::new(DistributedExec {
101-
plan: one_child(&children)?,
101+
plan: require_one_child(&children)?,
102102
}))
103103
}
104104

src/execution_plans/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod common;
12
mod distributed;
23
mod metrics;
34
mod network_coalesce;

src/execution_plans/network_coalesce.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
2-
use crate::common::{one_child, scale_partitioning_props};
32
use crate::config_extension_ext::ContextGrpcMetadata;
43
use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_err};
4+
use crate::execution_plans::common::{require_one_child, scale_partitioning_props};
55
use crate::flight_service::DoGet;
66
use crate::metrics::MetricsCollectingStream;
77
use crate::metrics::proto::MetricsSetProto;
@@ -230,12 +230,12 @@ impl ExecutionPlan for NetworkCoalesceExec {
230230
match self.as_ref() {
231231
Self::Pending(v) => {
232232
let mut v = v.clone();
233-
v.child = one_child(&children)?;
233+
v.child = require_one_child(&children)?;
234234
Ok(Arc::new(Self::Pending(v)))
235235
}
236236
Self::Ready(v) => {
237237
let mut v = v.clone();
238-
v.input_stage.plan = MaybeEncodedPlan::Decoded(one_child(&children)?);
238+
v.input_stage.plan = MaybeEncodedPlan::Decoded(require_one_child(&children)?);
239239
Ok(Arc::new(Self::Ready(v)))
240240
}
241241
}

src/execution_plans/network_shuffle.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
2-
use crate::common::{one_child, scale_partitioning};
32
use crate::config_extension_ext::ContextGrpcMetadata;
43
use crate::distributed_physical_optimizer_rule::NetworkBoundary;
4+
use crate::execution_plans::common::{require_one_child, scale_partitioning};
55
use crate::flight_service::DoGet;
66
use crate::metrics::MetricsCollectingStream;
77
use crate::metrics::proto::MetricsSetProto;
@@ -292,12 +292,12 @@ impl ExecutionPlan for NetworkShuffleExec {
292292
match self.as_ref() {
293293
Self::Pending(v) => {
294294
let mut v = v.clone();
295-
v.repartition_exec = one_child(&children)?;
295+
v.repartition_exec = require_one_child(&children)?;
296296
Ok(Arc::new(Self::Pending(v)))
297297
}
298298
Self::Ready(v) => {
299299
let mut v = v.clone();
300-
v.input_stage.plan = MaybeEncodedPlan::Decoded(one_child(&children)?);
300+
v.input_stage.plan = MaybeEncodedPlan::Decoded(require_one_child(&children)?);
301301
Ok(Arc::new(Self::Ready(v)))
302302
}
303303
}

0 commit comments

Comments
 (0)