Skip to content

Commit 4692166

Browse files
committed
add renamed file
1 parent aa30d88 commit 4692166

File tree

3 files changed

+295
-4
lines changed

3 files changed

+295
-4
lines changed

src/errors/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ mod schema_error;
1717
pub fn datafusion_error_to_tonic_status(err: &DataFusionError) -> tonic::Status {
1818
let err = DataFusionErrorProto::from_datafusion_error(err);
1919
let err = err.encode_to_vec();
20-
20+
2121
tonic::Status::with_details(tonic::Code::Internal, "DataFusionError", err.into())
2222
}
2323

src/physical_optimizer.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ use datafusion::{
1212
config::ConfigOptions,
1313
error::Result,
1414
physical_optimizer::PhysicalOptimizerRule,
15-
physical_plan::{
16-
repartition::RepartitionExec, ExecutionPlan, ExecutionPlanProperties,
17-
},
15+
physical_plan::{repartition::RepartitionExec, ExecutionPlan, ExecutionPlanProperties},
1816
};
1917
use uuid::Uuid;
2018

src/stage/execution_stage.rs

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
use std::sync::Arc;
2+
3+
use datafusion::common::internal_err;
4+
use datafusion::error::{DataFusionError, Result};
5+
use datafusion::execution::TaskContext;
6+
use datafusion::physical_plan::ExecutionPlan;
7+
use datafusion::prelude::SessionContext;
8+
9+
use itertools::Itertools;
10+
use rand::Rng;
11+
use url::Url;
12+
use uuid::Uuid;
13+
14+
use crate::task::ExecutionTask;
15+
use crate::ChannelManager;
16+
17+
/// A unit of isolation for a portion of a physical execution plan
18+
/// that can be executed independently and across a network boundary.
19+
/// It implements [`ExecutionPlan`] and can be executed to produce a
20+
/// stream of record batches.
21+
///
22+
/// An ExecutionTask is a finer grained unit of work compared to an ExecutionStage.
23+
/// One ExecutionStage will create one or more ExecutionTasks
24+
///
25+
/// When an [`ExecutionStage`] is execute()'d if will execute its plan and return a stream
26+
/// of record batches.
27+
///
28+
/// If the stage has input stages, then it those input stages will be executed on remote resources
29+
/// and will be provided the remainder of the stage tree.
30+
///
31+
/// For example if our stage tree looks like this:
32+
///
33+
/// ```text
34+
/// ┌─────────┐
35+
/// │ stage 1 │
36+
/// └───┬─────┘
37+
/// │
38+
/// ┌──────┴────────┐
39+
/// ┌────┴────┐ ┌────┴────┐
40+
/// │ stage 2 │ │ stage 3 │
41+
/// └────┬────┘ └─────────┘
42+
/// │
43+
/// ┌──────┴────────┐
44+
/// ┌────┴────┐ ┌────┴────┐
45+
/// │ stage 4 │ │ Stage 5 │
46+
/// └─────────┘ └─────────┘
47+
///
48+
/// ```
49+
///
50+
/// Then executing Stage 1 will run its plan locally. Stage 1 has two inputs, Stage 2 and Stage 3. We
51+
/// know these will execute on remote resources. As such the plan for Stage 1 must contain an
52+
/// [`ArrowFlightReadExec`] node that will read the results of Stage 2 and Stage 3 and coalese the
53+
/// results.
54+
///
55+
/// When Stage 1's [`ArrowFlightReadExec`] node is executed, it makes an ArrowFlightRequest to the
56+
/// host assigned in the Stage. It provides the following Stage tree serialilzed in the body of the
57+
/// Arrow Flight Ticket:
58+
///
59+
/// ```text
60+
/// ┌─────────┐
61+
/// │ Stage 2 │
62+
/// └────┬────┘
63+
/// │
64+
/// ┌──────┴────────┐
65+
/// ┌────┴────┐ ┌────┴────┐
66+
/// │ Stage 4 │ │ Stage 5 │
67+
/// └─────────┘ └─────────┘
68+
///
69+
/// ```
70+
///
71+
/// The receiving ArrowFlightEndpoint will then execute Stage 2 and will repeat this process.
72+
///
73+
/// When Stage 4 is executed, it has no input tasks, so it is assumed that the plan included in that
74+
/// Stage can complete on its own; its likely holding a leaf node in the overall phyysical plan and
75+
/// producing data from a [`DataSourceExec`].
76+
#[derive(Debug, Clone)]
77+
pub struct ExecutionStage {
78+
/// Our query_id
79+
pub query_id: Uuid,
80+
/// Our stage number
81+
pub num: usize,
82+
/// Our stage name
83+
pub name: String,
84+
/// The physical execution plan that this stage will execute.
85+
pub plan: Arc<dyn ExecutionPlan>,
86+
/// The input stages to this stage
87+
pub inputs: Vec<Arc<dyn ExecutionPlan>>,
88+
/// Our tasks which tell us how finely grained to execute the partitions in
89+
/// the plan
90+
pub tasks: Vec<ExecutionTask>,
91+
/// tree depth of our location in the stage tree, used for display only
92+
pub depth: usize,
93+
}
94+
95+
impl ExecutionStage {
96+
/// Creates a new `ExecutionStage` with the given plan and inputs. One task will be created
97+
/// responsible for partitions in the plan.
98+
pub fn new(
99+
query_id: Uuid,
100+
num: usize,
101+
plan: Arc<dyn ExecutionPlan>,
102+
inputs: Vec<Arc<ExecutionStage>>,
103+
) -> Self {
104+
let name = format!("Stage {:<3}", num);
105+
let partition_group = (0..plan.properties().partitioning.partition_count())
106+
.map(|p| p as u64)
107+
.collect();
108+
ExecutionStage {
109+
query_id,
110+
num,
111+
name,
112+
plan,
113+
inputs: inputs
114+
.into_iter()
115+
.map(|s| s as Arc<dyn ExecutionPlan>)
116+
.collect(),
117+
tasks: vec![ExecutionTask::new(partition_group)],
118+
depth: 0,
119+
}
120+
}
121+
122+
/// Recalculate the tasks for this stage based on the number of partitions in the plan
123+
/// and the maximum number of partitions per task.
124+
///
125+
/// This will unset any worker assignments
126+
pub fn with_maximum_partitions_per_task(mut self, max_partitions_per_task: usize) -> Self {
127+
let partitions = self.plan.properties().partitioning.partition_count();
128+
129+
self.tasks = (0..partitions)
130+
.chunks(max_partitions_per_task)
131+
.into_iter()
132+
.map(|partition_group| {
133+
ExecutionTask::new(
134+
partition_group
135+
.collect::<Vec<_>>()
136+
.into_iter()
137+
.map(|p| p as u64)
138+
.collect(),
139+
)
140+
})
141+
.collect();
142+
self
143+
}
144+
145+
/// Returns the name of this stage
146+
pub fn name(&self) -> String {
147+
format!("Stage {:<3}", self.num)
148+
}
149+
150+
/// Returns an iterator over the child stages of this stage cast as &ExecutionStage
151+
/// which can be useful
152+
pub fn child_stages_iter(&self) -> impl Iterator<Item = &ExecutionStage> {
153+
self.inputs
154+
.iter()
155+
.filter_map(|s| s.as_any().downcast_ref::<ExecutionStage>())
156+
}
157+
158+
/// Returns the name of this stage including child stage numbers if any.
159+
pub fn name_with_children(&self) -> String {
160+
let child_str = if self.inputs.is_empty() {
161+
"".to_string()
162+
} else {
163+
format!(
164+
" Child Stages:[{}] ",
165+
self.child_stages_iter()
166+
.map(|s| format!("{}", s.num))
167+
.collect::<Vec<_>>()
168+
.join(", ")
169+
)
170+
};
171+
format!("Stage {:<3}{}", self.num, child_str)
172+
}
173+
174+
pub fn try_assign(
175+
self,
176+
channel_manager: impl TryInto<ChannelManager, Error = DataFusionError>,
177+
) -> Result<Self> {
178+
let urls: Vec<Url> = channel_manager.try_into()?.get_urls()?;
179+
if urls.is_empty() {
180+
return internal_err!("No URLs found in ChannelManager");
181+
}
182+
183+
Ok(self)
184+
}
185+
186+
fn try_assign_urls(&self, urls: &[Url]) -> Result<Self> {
187+
let assigned_children = self
188+
.child_stages_iter()
189+
.map(|child| {
190+
child
191+
.clone() // TODO: avoid cloning if possible
192+
.try_assign_urls(urls)
193+
.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>)
194+
})
195+
.collect::<Result<Vec<_>>>()?;
196+
197+
// pick a random starting position
198+
let mut rng = rand::thread_rng();
199+
let start_idx = rng.gen_range(0..urls.len());
200+
201+
let assigned_tasks = self
202+
.tasks
203+
.iter()
204+
.enumerate()
205+
.map(|(i, task)| {
206+
let url = &urls[(start_idx + i) % urls.len()];
207+
task.clone().with_assignment(url)
208+
})
209+
.collect::<Vec<_>>();
210+
211+
let assigned_stage = ExecutionStage {
212+
query_id: self.query_id,
213+
num: self.num,
214+
name: self.name.clone(),
215+
plan: self.plan.clone(),
216+
inputs: assigned_children,
217+
tasks: assigned_tasks,
218+
depth: self.depth,
219+
};
220+
221+
Ok(assigned_stage)
222+
}
223+
}
224+
225+
impl ExecutionPlan for ExecutionStage {
226+
fn name(&self) -> &str {
227+
&self.name
228+
}
229+
230+
fn as_any(&self) -> &dyn std::any::Any {
231+
self
232+
}
233+
234+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
235+
self.inputs.iter().collect()
236+
}
237+
238+
fn with_new_children(
239+
self: Arc<Self>,
240+
children: Vec<Arc<dyn ExecutionPlan>>,
241+
) -> Result<Arc<dyn ExecutionPlan>> {
242+
Ok(Arc::new(ExecutionStage {
243+
query_id: self.query_id,
244+
num: self.num,
245+
name: self.name.clone(),
246+
plan: self.plan.clone(),
247+
inputs: children,
248+
tasks: self.tasks.clone(),
249+
depth: self.depth,
250+
}))
251+
}
252+
253+
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
254+
self.plan.properties()
255+
}
256+
257+
fn execute(
258+
&self,
259+
partition: usize,
260+
context: Arc<TaskContext>,
261+
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
262+
let stage = self
263+
.as_any()
264+
.downcast_ref::<ExecutionStage>()
265+
.expect("Unwrapping myself should always work");
266+
267+
let channel_manager = context
268+
.session_config()
269+
.get_extension::<ChannelManager>()
270+
.ok_or(DataFusionError::Execution(
271+
"ChannelManager not found in session config".to_string(),
272+
))?;
273+
274+
let urls = channel_manager.get_urls()?;
275+
276+
let assigned_stage = stage
277+
.try_assign_urls(&urls)
278+
.map(Arc::new)
279+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
280+
281+
// insert the stage into the context so that ExecutionPlan nodes
282+
// that care about the stage can access it
283+
let config = context
284+
.session_config()
285+
.clone()
286+
.with_extension(assigned_stage.clone());
287+
288+
let new_ctx =
289+
SessionContext::new_with_config_rt(config, context.runtime_env().clone()).task_ctx();
290+
291+
assigned_stage.plan.execute(partition, new_ctx)
292+
}
293+
}

0 commit comments

Comments
 (0)