Skip to content

Commit 3b052fd

Browse files
committed
WIP commit for execution functionality
1 parent bd9e61f commit 3b052fd

File tree

23 files changed

+2024
-261
lines changed

23 files changed

+2024
-261
lines changed

src/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod result;
2+
pub mod util;

src/common/result.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

src/common/util.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use datafusion::error::Result;
2+
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
3+
4+
use std::fmt::Write;
5+
6+
pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result<String> {
7+
let mut f = String::new();
8+
9+
fn visit(plan: &dyn ExecutionPlan, indent: usize, f: &mut String) -> Result<()> {
10+
let output_partitions = plan.output_partitioning().partition_count();
11+
let input_partitions = plan
12+
.children()
13+
.first()
14+
.map(|child| child.output_partitioning().partition_count());
15+
16+
write!(
17+
f,
18+
"partitions [out:{:<3}{}]{} {}",
19+
output_partitions,
20+
input_partitions
21+
.map(|p| format!("<-- in:{:<3}", p))
22+
.unwrap_or(" ".to_string()),
23+
" ".repeat(indent),
24+
displayable(plan).one_line()
25+
)?;
26+
27+
plan.children()
28+
.iter()
29+
.try_for_each(|input| visit(input.as_ref(), indent + 2, f))?;
30+
31+
Ok(())
32+
}
33+
34+
visit(plan, 0, &mut f)?;
35+
Ok(f)
36+
}

src/exec/distributed.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use std::{any::Any, fmt::Formatter, pin::Pin, sync::Arc};
2+
3+
use datafusion::{
4+
execution::{RecordBatchStream, TaskContext},
5+
physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan},
6+
};
7+
8+
use crate::{remote::WorkerDiscovery, stage::ExecutionStage, task::WorkerAddr};
9+
10+
/// This execution plan provides the capability to execute its input plan in
11+
/// a distributed manner across multiple nodes in a cluster.
12+
///
13+
/// When execute() is called, it will use the provided `WorkerDiscovery` to
14+
/// discover worker nodes and distribute the execution of the input plan
15+
/// across those nodes.
16+
#[derive(Debug)]
17+
pub struct DistributedExec {
18+
/// This is the unmodified input plan that this execution plan will execute
19+
/// We keep it for display purporses
20+
input: Arc<dyn ExecutionPlan>,
21+
/// This is the worker discovery mechanism that will be used to provide
22+
/// worker addresses
23+
discovery: Arc<dyn WorkerDiscovery>,
24+
/// This contains stages as defined by the `crate::stage::planner::StageOptimizer`
25+
stage: Arc<ExecutionStage>,
26+
}
27+
28+
impl DistributedExec {
29+
/// Creates a new `DistributedExec` with the given input plan.
30+
pub fn new(
31+
input: Arc<dyn ExecutionPlan>,
32+
discovery: Arc<dyn WorkerDiscovery>,
33+
stage: Arc<ExecutionStage>,
34+
) -> Self {
35+
DistributedExec {
36+
input,
37+
discovery,
38+
stage,
39+
}
40+
}
41+
}
42+
43+
impl DisplayAs for DistributedExec {
44+
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
45+
match t {
46+
DisplayFormatType::Default => write!(f, "DistributedExec: {}", self.input.name()),
47+
DisplayFormatType::Verbose => write!(
48+
f,
49+
"DistributedExec: input = {}, properties = {:?}",
50+
self.input.name(),
51+
self.input.properties()
52+
),
53+
DisplayFormatType::TreeRender => write!(f, ""),
54+
}
55+
}
56+
}
57+
58+
impl ExecutionPlan for DistributedExec {
59+
fn name(&self) -> &str {
60+
"DistributedExec"
61+
}
62+
63+
fn as_any(&self) -> &dyn Any {
64+
self
65+
}
66+
67+
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
68+
&self.input.properties()
69+
}
70+
71+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
72+
vec![&self.input]
73+
}
74+
75+
fn with_new_children(
76+
self: Arc<Self>,
77+
children: Vec<Arc<dyn ExecutionPlan>>,
78+
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
79+
if children.len() != 1 {
80+
return Err(datafusion::error::DataFusionError::Internal(
81+
"DistributedExec requires exactly one child".to_string(),
82+
));
83+
}
84+
Ok(Arc::new(DistributedExec {
85+
input: children[0].clone(),
86+
discovery: self.discovery.clone(),
87+
stage: self.stage.clone(),
88+
}))
89+
}
90+
91+
fn execute(
92+
&self,
93+
partition: usize,
94+
context: Arc<TaskContext>,
95+
) -> datafusion::error::Result<Pin<Box<dyn RecordBatchStream + Send>>> {
96+
todo!()
97+
}
98+
}

src/exec/isolator.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use std::{fmt::Formatter, sync::Arc};
2+
3+
use datafusion::{
4+
common::internal_datafusion_err,
5+
error::Result,
6+
execution::SendableRecordBatchStream,
7+
physical_plan::{
8+
DisplayAs, DisplayFormatType, EmptyRecordBatchStream, ExecutionPlan,
9+
ExecutionPlanProperties, Partitioning, PlanProperties,
10+
},
11+
};
12+
13+
// TODO: REFACTOR NOT COMPLETE, just copied here and gutted uneccessary parts for now
14+
// IGNORE
15+
16+
/// This is a simple execution plan that isolates a partition from the input
17+
/// plan It will advertise that it has a single partition and when
18+
/// asked to execute, it will execute a particular partition from the child
19+
/// input plan.
20+
///
21+
/// This allows us to execute Repartition Exec's on different processes
22+
/// by showing each one only a single child partition
23+
#[derive(Debug)]
24+
pub struct PartitionIsolatorExec {
25+
pub input: Arc<dyn ExecutionPlan>,
26+
properties: PlanProperties,
27+
pub partition_count: usize,
28+
}
29+
30+
impl PartitionIsolatorExec {
31+
pub fn new(input: Arc<dyn ExecutionPlan>, partition_count: usize) -> Self {
32+
// We advertise that we only have partition_count partitions
33+
let properties = input
34+
.properties()
35+
.clone()
36+
.with_partitioning(Partitioning::UnknownPartitioning(partition_count));
37+
38+
Self {
39+
input,
40+
properties,
41+
partition_count,
42+
}
43+
}
44+
}
45+
46+
impl DisplayAs for PartitionIsolatorExec {
47+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
48+
write!(
49+
f,
50+
"PartitionIsolatorExec [providing upto {} partitions]",
51+
self.partition_count
52+
)
53+
}
54+
}
55+
56+
impl ExecutionPlan for PartitionIsolatorExec {
57+
fn name(&self) -> &str {
58+
"PartitionIsolatorExec"
59+
}
60+
61+
fn as_any(&self) -> &dyn std::any::Any {
62+
self
63+
}
64+
65+
fn properties(&self) -> &PlanProperties {
66+
&self.properties
67+
}
68+
69+
fn children(&self) -> Vec<&std::sync::Arc<dyn ExecutionPlan>> {
70+
vec![&self.input]
71+
}
72+
73+
fn with_new_children(
74+
self: std::sync::Arc<Self>,
75+
children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
76+
) -> Result<std::sync::Arc<dyn ExecutionPlan>> {
77+
// TODO: generalize this
78+
assert_eq!(children.len(), 1);
79+
Ok(Arc::new(Self::new(
80+
children[0].clone(),
81+
self.partition_count,
82+
)))
83+
}
84+
85+
fn execute(
86+
&self,
87+
partition: usize,
88+
context: std::sync::Arc<datafusion::execution::TaskContext>,
89+
) -> Result<SendableRecordBatchStream> {
90+
let config = context.session_config();
91+
let partition_group = &[0, 1];
92+
93+
let partitions_in_input = self.input.output_partitioning().partition_count() as u64;
94+
95+
let output_stream = match partition_group.get(partition) {
96+
Some(actual_partition_number) => {
97+
if *actual_partition_number >= partitions_in_input {
98+
//trace!("{} returning empty stream", ctx_name);
99+
Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
100+
as SendableRecordBatchStream)
101+
} else {
102+
//trace!("{} returning actual stream", ctx_name);
103+
self.input
104+
.execute(*actual_partition_number as usize, context)
105+
}
106+
}
107+
None => Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
108+
as SendableRecordBatchStream),
109+
};
110+
output_stream
111+
}
112+
}

src/exec/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
mod distributed;
2+
mod isolator;

src/lib.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
mod channel_manager;
2-
mod composed_extension_codec;
3-
mod flight_service;
2+
mod common;
3+
mod exec;
4+
//mod flight_service;
45
mod plan;
5-
mod stage_delegation;
6-
#[cfg(test)]
7-
pub mod test_utils;
6+
mod remote;
7+
mod stage;
8+
pub mod task;
9+
mod test_utils;
810

911
pub use channel_manager::{
1012
ArrowFlightChannel, BoxCloneSyncChannel, ChannelManager, ChannelResolver,
1113
};
12-
pub use flight_service::{ArrowFlightEndpoint, SessionBuilder};
14+
1315
pub use plan::ArrowFlightReadExec;
16+
pub use stage::{display_stage, display_stage_tree, ExecutionStage, StagePlanner};

0 commit comments

Comments
 (0)