Skip to content

Commit c8d1e7f

Browse files
committed
Move out of single-node/distributed into pending/ready
1 parent 76708e6 commit c8d1e7f

File tree

7 files changed

+37
-39
lines changed

7 files changed

+37
-39
lines changed

src/physical_optimizer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl DistributedPhysicalOptimizerRule {
109109
};
110110

111111
return Ok(Transformed::yes(Arc::new(
112-
ArrowFlightReadExec::new_single_node(
112+
ArrowFlightReadExec::new_pending(
113113
Arc::clone(&maybe_isolated_plan),
114114
maybe_isolated_plan.output_partitioning().clone(),
115115
),

src/plan/arrow_flight_read.rs

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use arrow_flight::error::FlightError;
88
use arrow_flight::flight_service_client::FlightServiceClient;
99
use arrow_flight::Ticket;
1010
use datafusion::arrow::datatypes::SchemaRef;
11-
use datafusion::common::{internal_datafusion_err, internal_err, plan_err};
11+
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
1212
use datafusion::error::DataFusionError;
1313
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1414
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
@@ -22,40 +22,39 @@ use std::fmt::Formatter;
2222
use std::sync::Arc;
2323
use url::Url;
2424

25-
/// This node can operate in two modes:
26-
/// 1. Single-node: it acts as a pure passthrough the child plan, as if it was not there.
27-
/// 2. Distributed: runs within a distributed stage and queries the next input stage over
28-
/// the network using Arrow Flight.
25+
/// This node has two variants.
26+
/// 1. Pending: it acts as a placeholder for the distributed optimization step to mark it as ready.
27+
/// 2. Ready: runs within a distributed stage and queries the next input stage over the network
28+
/// using Arrow Flight.
2929
#[derive(Debug, Clone)]
3030
pub enum ArrowFlightReadExec {
31-
SingleNode(ArrowFlightReadSingleNodeExec),
32-
Distributed(ArrowFlightReadDistributedExec),
31+
Pending(ArrowFlightReadPendingExec),
32+
Ready(ArrowFlightReadReadyExec),
3333
}
3434

35-
/// Single-node version of the [ArrowFlightReadExec] node. Users can choose to place
36-
/// this node wherever they want in their plan wherever they want, and the distributed
37-
/// optimization step will replace it with the appropriate [ArrowFlightReadDistributedExec]
38-
/// node.
35+
/// Placeholder version of the [ArrowFlightReadExec] node. It acts as a marker for the
36+
/// distributed optimization step, which will replace it with the appropriate
37+
/// [ArrowFlightReadReadyExec] node.
3938
#[derive(Debug, Clone)]
40-
pub struct ArrowFlightReadSingleNodeExec {
39+
pub struct ArrowFlightReadPendingExec {
4140
properties: PlanProperties,
4241
child: Arc<dyn ExecutionPlan>,
4342
}
4443

45-
/// Distributed version of the [ArrowFlightReadExec] node. This node can be created in
44+
/// Ready version of the [ArrowFlightReadExec] node. This node can be created in
4645
/// just two ways:
47-
/// - by the distributed optimization step based on an original [ArrowFlightReadSingleNodeExec]
46+
/// - by the distributed optimization step based on an original [ArrowFlightReadPendingExec]
4847
/// - deserialized from a protobuf plan sent over the network.
4948
#[derive(Debug, Clone)]
50-
pub struct ArrowFlightReadDistributedExec {
49+
pub struct ArrowFlightReadReadyExec {
5150
/// the properties we advertise for this execution plan
5251
properties: PlanProperties,
5352
pub(crate) stage_num: usize,
5453
}
5554

5655
impl ArrowFlightReadExec {
57-
pub fn new_single_node(child: Arc<dyn ExecutionPlan>, partitioning: Partitioning) -> Self {
58-
Self::SingleNode(ArrowFlightReadSingleNodeExec {
56+
pub fn new_pending(child: Arc<dyn ExecutionPlan>, partitioning: Partitioning) -> Self {
57+
Self::Pending(ArrowFlightReadPendingExec {
5958
properties: PlanProperties::new(
6059
EquivalenceProperties::new(child.schema()),
6160
partitioning,
@@ -66,7 +65,7 @@ impl ArrowFlightReadExec {
6665
})
6766
}
6867

69-
pub(crate) fn new_distributed(
68+
pub(crate) fn new_ready(
7069
partitioning: Partitioning,
7170
schema: SchemaRef,
7271
stage_num: usize,
@@ -77,15 +76,15 @@ impl ArrowFlightReadExec {
7776
EmissionType::Incremental,
7877
Boundedness::Bounded,
7978
);
80-
Self::Distributed(ArrowFlightReadDistributedExec {
79+
Self::Ready(ArrowFlightReadReadyExec {
8180
properties,
8281
stage_num,
8382
})
8483
}
8584

8685
pub(crate) fn to_distributed(&self, stage_num: usize) -> Result<Self, DataFusionError> {
8786
match self {
88-
ArrowFlightReadExec::SingleNode(p) => Ok(Self::new_distributed(
87+
ArrowFlightReadExec::Pending(p) => Ok(Self::new_ready(
8988
p.properties.partitioning.clone(),
9089
p.child.schema(),
9190
stage_num,
@@ -98,8 +97,8 @@ impl ArrowFlightReadExec {
9897
impl DisplayAs for ArrowFlightReadExec {
9998
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
10099
match self {
101-
ArrowFlightReadExec::SingleNode(_) => write!(f, "ArrowFlightReadExec"),
102-
ArrowFlightReadExec::Distributed(v) => {
100+
ArrowFlightReadExec::Pending(_) => write!(f, "ArrowFlightReadExec"),
101+
ArrowFlightReadExec::Ready(v) => {
103102
write!(f, "ArrowFlightReadExec: Stage {:<3}", v.stage_num)
104103
}
105104
}
@@ -117,15 +116,15 @@ impl ExecutionPlan for ArrowFlightReadExec {
117116

118117
fn properties(&self) -> &PlanProperties {
119118
match self {
120-
ArrowFlightReadExec::SingleNode(v) => &v.properties,
121-
ArrowFlightReadExec::Distributed(v) => &v.properties,
119+
ArrowFlightReadExec::Pending(v) => &v.properties,
120+
ArrowFlightReadExec::Ready(v) => &v.properties,
122121
}
123122
}
124123

125124
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
126125
match self {
127-
ArrowFlightReadExec::SingleNode(v) => vec![&v.child],
128-
ArrowFlightReadExec::Distributed(_) => vec![],
126+
ArrowFlightReadExec::Pending(v) => vec![&v.child],
127+
ArrowFlightReadExec::Ready(_) => vec![],
129128
}
130129
}
131130

@@ -147,9 +146,8 @@ impl ExecutionPlan for ArrowFlightReadExec {
147146
partition: usize,
148147
context: Arc<TaskContext>,
149148
) -> Result<SendableRecordBatchStream, DataFusionError> {
150-
let this = match self {
151-
ArrowFlightReadExec::SingleNode(this) => return this.child.execute(partition, context),
152-
ArrowFlightReadExec::Distributed(this) => this,
149+
let ArrowFlightReadExec::Ready(this) = self else {
150+
return exec_err!("ArrowFlightReadExec is not ready, was the distributed optimization step performed?");
153151
};
154152

155153
// get the channel manager and current stage from our context

src/plan/codec.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
5252
)?
5353
.ok_or(proto_error("ArrowFlightReadExec is missing partitioning"))?;
5454

55-
Ok(Arc::new(ArrowFlightReadExec::new_distributed(
55+
Ok(Arc::new(ArrowFlightReadExec::new_ready(
5656
partioning,
5757
Arc::new(schema),
5858
stage_num as usize,
@@ -84,9 +84,9 @@ impl PhysicalExtensionCodec for DistributedCodec {
8484
buf: &mut Vec<u8>,
8585
) -> datafusion::common::Result<()> {
8686
if let Some(node) = node.as_any().downcast_ref::<ArrowFlightReadExec>() {
87-
let ArrowFlightReadExec::Distributed(ready_node) = node else {
87+
let ArrowFlightReadExec::Ready(ready_node) = node else {
8888
return Err(proto_error(
89-
"deserialized an ArrowFlightReadExec that is not distributed",
89+
"deserialized an ArrowFlightReadExec that is not ready",
9090
));
9191
};
9292
ArrowFlightReadExecProto {

tests/common/plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub fn distribute_aggregate(
3636
let child = node.children()[0].clone();
3737

3838
let node = node.with_new_children(vec![Arc::new(
39-
ArrowFlightReadExec::new_single_node(child, Partitioning::Hash(expr, 1)),
39+
ArrowFlightReadExec::new_pending(child, Partitioning::Hash(expr, 1)),
4040
)])?;
4141
Ok(Transformed::yes(node))
4242
}
@@ -54,7 +54,7 @@ pub fn distribute_aggregate(
5454
let child = node.children()[0].clone();
5555

5656
let node = node.with_new_children(vec![Arc::new(
57-
ArrowFlightReadExec::new_single_node(child, Partitioning::RoundRobinBatch(8)),
57+
ArrowFlightReadExec::new_pending(child, Partitioning::RoundRobinBatch(8)),
5858
)])?;
5959
Ok(Transformed::yes(node))
6060
}

tests/custom_extension_codec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ mod tests {
126126
)?);
127127

128128
if distributed {
129-
plan = Arc::new(ArrowFlightReadExec::new_single_node(
129+
plan = Arc::new(ArrowFlightReadExec::new_pending(
130130
plan.clone(),
131131
Partitioning::Hash(vec![col("numbers", &plan.schema())?], 1),
132132
));
@@ -141,7 +141,7 @@ mod tests {
141141
));
142142

143143
if distributed {
144-
plan = Arc::new(ArrowFlightReadExec::new_single_node(
144+
plan = Arc::new(ArrowFlightReadExec::new_pending(
145145
plan.clone(),
146146
Partitioning::RoundRobinBatch(10),
147147
));

tests/error_propagation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ mod tests {
5151
let mut plan: Arc<dyn ExecutionPlan> = Arc::new(ErrorExec::new("something failed"));
5252

5353
for size in [1, 2, 3] {
54-
plan = Arc::new(ArrowFlightReadExec::new_single_node(
54+
plan = Arc::new(ArrowFlightReadExec::new_pending(
5555
plan,
5656
Partitioning::RoundRobinBatch(size),
5757
));

tests/highly_distributed_query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ mod tests {
3131

3232
let mut physical_distributed = physical.clone();
3333
for size in [1, 10, 5] {
34-
physical_distributed = Arc::new(ArrowFlightReadExec::new_single_node(
34+
physical_distributed = Arc::new(ArrowFlightReadExec::new_pending(
3535
physical_distributed,
3636
Partitioning::RoundRobinBatch(size),
3737
));

0 commit comments

Comments
 (0)