Skip to content

Commit 2d48e02

Browse files
committed
Introduce new NetworkHashShuffleExec and NetworkCoalesceTasksExec
1 parent abaefd9 commit 2d48e02

22 files changed

+2036
-1411
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ pub struct RunOpt {
102102
sorted: bool,
103103

104104
/// Number of partitions per task.
105-
#[structopt(long = "ppt")]
106-
partitions_per_task: Option<usize>,
105+
#[structopt(long)]
106+
max_tasks: Option<usize>,
107107

108108
/// Spawns a worker in the specified port.
109109
#[structopt(long)]
@@ -141,10 +141,10 @@ impl DistributedSessionBuilder for RunOpt {
141141
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
142142
}
143143
if !self.workers.is_empty() {
144-
let mut rule = DistributedPhysicalOptimizerRule::new();
145-
if let Some(partitions_per_task) = self.partitions_per_task {
146-
rule = rule.with_maximum_partitions_per_task(partitions_per_task)
147-
}
144+
let tasks = self.max_tasks.unwrap_or(self.workers.len());
145+
let rule = DistributedPhysicalOptimizerRule::new()
146+
.with_coalesce_partitions_exec_tasks(tasks)
147+
.with_network_shuffle_exec_tasks(tasks);
148148
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
149149
}
150150

src/distributed_physical_optimizer_rule.rs

Lines changed: 213 additions & 166 deletions
Large diffs are not rendered by default.

src/execution_plans/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
mod arrow_flight_read;
1+
mod network_coalesce_tasks;
2+
mod network_hash_shuffle;
23
mod partition_isolator;
34
mod stage;
45

5-
pub use arrow_flight_read::ArrowFlightReadExec;
6-
pub use partition_isolator::{PartitionGroup, PartitionIsolatorExec};
7-
pub use stage::{display_stage_graphviz, ExecutionTask, StageExec};
6+
pub use network_coalesce_tasks::{NetworkCoalesceTasksExec, NetworkCoalesceTasksReadyExec};
7+
pub use network_hash_shuffle::{NetworkHashShuffleExec, NetworkHashShuffleReadyExec};
8+
pub use partition_isolator::PartitionIsolatorExec;
9+
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};
Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
use crate::channel_resolver_ext::get_distributed_channel_resolver;
2+
use crate::config_extension_ext::ContextGrpcMetadata;
3+
use crate::distributed_physical_optimizer_rule::{limit_tasks_err, DistributedExecutionPlan};
4+
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
5+
use crate::execution_plans::{DistributedTaskContext, StageExec};
6+
use crate::flight_service::{DoGet, StageKey};
7+
use crate::protobuf::{proto_from_stage, DistributedCodec};
8+
use crate::ChannelResolver;
9+
use arrow_flight::decode::FlightRecordBatchStream;
10+
use arrow_flight::error::FlightError;
11+
use arrow_flight::flight_service_client::FlightServiceClient;
12+
use arrow_flight::Ticket;
13+
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
14+
use datafusion::error::DataFusionError;
15+
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
16+
use datafusion::physical_expr::Partitioning;
17+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
18+
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
19+
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
20+
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
21+
use futures::{TryFutureExt, TryStreamExt};
22+
use http::Extensions;
23+
use prost::Message;
24+
use std::any::Any;
25+
use std::fmt::Formatter;
26+
use std::sync::Arc;
27+
use tonic::metadata::MetadataMap;
28+
use tonic::Request;
29+
30+
/// This node has two variants.
31+
/// 1. Pending: it acts as a placeholder for the distributed optimization step to mark it as ready.
32+
/// 2. Ready: runs within a distributed stage and queries the next input stage over the network
33+
/// using Arrow Flight.
34+
#[derive(Debug, Clone)]
35+
pub enum NetworkCoalesceTasksExec {
36+
Pending(NetworkCoalesceTasksPendingExec),
37+
Ready(NetworkCoalesceTasksReadyExec),
38+
}
39+
40+
/// Placeholder version of the [NetworkCoalesceTasksExec] node. It acts as a marker for the
41+
/// distributed optimization step, which will replace it with the appropriate
42+
/// [NetworkCoalesceTasksReadyExec] node.
43+
#[derive(Debug, Clone)]
44+
pub struct NetworkCoalesceTasksPendingExec {
45+
properties: PlanProperties,
46+
input_tasks: usize,
47+
child: Arc<dyn ExecutionPlan>,
48+
}
49+
50+
/// Ready version of the [NetworkCoalesceTasksExec] node. This node can be created in
51+
/// just two ways:
52+
/// - by the distributed optimization step based on an original [NetworkCoalesceTasksPendingExec]
53+
/// - deserialized from a protobuf plan sent over the network.
54+
#[derive(Debug, Clone)]
55+
pub struct NetworkCoalesceTasksReadyExec {
56+
/// the properties we advertise for this execution plan
57+
pub(crate) properties: PlanProperties,
58+
pub(crate) stage_num: usize,
59+
pub(crate) input_tasks: usize,
60+
}
61+
62+
impl NetworkCoalesceTasksExec {
63+
pub fn from_coalesce_partitions_exec(
64+
input: &CoalescePartitionsExec,
65+
input_tasks: usize,
66+
) -> Result<Self, DataFusionError> {
67+
Self::from_input(input, input_tasks)
68+
}
69+
70+
pub fn from_sort_preserving_merge_exec(
71+
input: &SortPreservingMergeExec,
72+
input_tasks: usize,
73+
) -> Result<Self, DataFusionError> {
74+
Self::from_input(input, input_tasks)
75+
}
76+
77+
pub fn from_input(
78+
input: &dyn ExecutionPlan,
79+
input_tasks: usize,
80+
) -> Result<Self, DataFusionError> {
81+
let children = input.children();
82+
let Some(child) = children.first() else {
83+
return internal_err!("Expected a single child");
84+
};
85+
86+
Ok(Self::Pending(NetworkCoalesceTasksPendingExec {
87+
properties: child.properties().clone(),
88+
input_tasks,
89+
child: Arc::clone(child),
90+
}))
91+
}
92+
}
93+
94+
impl DistributedExecutionPlan for NetworkCoalesceTasksExec {
95+
fn to_stage_info(
96+
&self,
97+
n_tasks: usize,
98+
) -> Result<(Arc<dyn ExecutionPlan>, usize), DataFusionError> {
99+
let Self::Pending(ref pending) = self else {
100+
return plan_err!("can only return wrapped child if on Pending state");
101+
};
102+
103+
if n_tasks > 1 {
104+
return Err(limit_tasks_err(1));
105+
}
106+
107+
Ok((Arc::clone(&pending.child), pending.input_tasks))
108+
}
109+
110+
fn to_distributed(
111+
&self,
112+
stage_num: usize,
113+
stage_head: &Arc<dyn ExecutionPlan>,
114+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
115+
let NetworkCoalesceTasksExec::Pending(pending) = self else {
116+
return internal_err!("NetworkCoalesceTasksExec is already distributed");
117+
};
118+
119+
let ready = NetworkCoalesceTasksReadyExec {
120+
properties: scale_partitioning(stage_head.properties(), |p| p * pending.input_tasks),
121+
stage_num,
122+
input_tasks: pending.input_tasks,
123+
};
124+
125+
Ok(Arc::new(Self::Ready(ready)))
126+
}
127+
128+
fn with_input_tasks(&self, input_tasks: usize) -> Arc<dyn DistributedExecutionPlan> {
129+
Arc::new(match self {
130+
NetworkCoalesceTasksExec::Pending(pending) => {
131+
NetworkCoalesceTasksExec::Pending(NetworkCoalesceTasksPendingExec {
132+
properties: pending.properties.clone(),
133+
input_tasks,
134+
child: pending.child.clone(),
135+
})
136+
}
137+
NetworkCoalesceTasksExec::Ready(ready) => {
138+
NetworkCoalesceTasksExec::Ready(NetworkCoalesceTasksReadyExec {
139+
properties: scale_partitioning(&ready.properties, |p| {
140+
p * input_tasks / ready.input_tasks
141+
}),
142+
stage_num: ready.stage_num,
143+
input_tasks,
144+
})
145+
}
146+
})
147+
}
148+
}
149+
150+
fn scale_partitioning(props: &PlanProperties, f: impl FnOnce(usize) -> usize) -> PlanProperties {
151+
let partitioning = match &props.partitioning {
152+
Partitioning::RoundRobinBatch(p) => Partitioning::RoundRobinBatch(f(*p)),
153+
Partitioning::Hash(hash, p) => Partitioning::Hash(hash.clone(), f(*p)),
154+
Partitioning::UnknownPartitioning(p) => Partitioning::UnknownPartitioning(f(*p)),
155+
};
156+
PlanProperties::new(
157+
props.eq_properties.clone(),
158+
partitioning,
159+
props.emission_type,
160+
props.boundedness,
161+
)
162+
}
163+
164+
impl DisplayAs for NetworkCoalesceTasksExec {
165+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
166+
write!(f, "NetworkCoalesceTasksExec")
167+
}
168+
}
169+
170+
impl ExecutionPlan for NetworkCoalesceTasksExec {
171+
fn name(&self) -> &str {
172+
"NetworkCoalesceTasksExec"
173+
}
174+
175+
fn as_any(&self) -> &dyn Any {
176+
self
177+
}
178+
179+
fn properties(&self) -> &PlanProperties {
180+
match self {
181+
NetworkCoalesceTasksExec::Pending(v) => &v.properties,
182+
NetworkCoalesceTasksExec::Ready(v) => &v.properties,
183+
}
184+
}
185+
186+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
187+
match self {
188+
NetworkCoalesceTasksExec::Pending(v) => vec![&v.child],
189+
NetworkCoalesceTasksExec::Ready(_) => vec![],
190+
}
191+
}
192+
193+
fn with_new_children(
194+
self: Arc<Self>,
195+
children: Vec<Arc<dyn ExecutionPlan>>,
196+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
197+
if !children.is_empty() {
198+
return plan_err!(
199+
"NetworkCoalesceTasksExec: wrong number of children, expected 0, got {}",
200+
children.len()
201+
);
202+
}
203+
Ok(self)
204+
}
205+
206+
fn execute(
207+
&self,
208+
partition: usize,
209+
context: Arc<TaskContext>,
210+
) -> Result<SendableRecordBatchStream, DataFusionError> {
211+
let NetworkCoalesceTasksExec::Ready(self_ready) = self else {
212+
return exec_err!(
213+
"NetworkCoalesceTasksExec is not ready, was the distributed optimization step performed?"
214+
);
215+
};
216+
217+
// get the channel manager and current stage from our context
218+
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
219+
220+
// the `NetworkCoalesceTasksExec` node can only be executed in the context of a `StageExec`
221+
let stage = StageExec::from_ctx(&context)?;
222+
223+
// of our child stages find the one that matches the one we are supposed to be
224+
// reading from
225+
let child_stage = stage.child_stage(self_ready.stage_num)?;
226+
227+
let codec = DistributedCodec::new_combined_with_user(context.session_config());
228+
let child_stage_proto = proto_from_stage(child_stage, &codec).map_err(|e| {
229+
internal_datafusion_err!(
230+
"NetworkCoalesceTasksExec: failed to convert stage to proto: {e}"
231+
)
232+
})?;
233+
234+
let context_headers = ContextGrpcMetadata::headers_from_ctx(&context);
235+
let task_context = DistributedTaskContext::from_ctx(&context);
236+
if task_context.task_index > 0 {
237+
return exec_err!("NetworkCoalesceTasksExec cannot be executed in more than one task");
238+
}
239+
240+
let partitions_per_task =
241+
self.properties().partitioning.partition_count() / child_stage.tasks.len();
242+
243+
let target_task = partition / partitions_per_task;
244+
let target_partition = partition % partitions_per_task;
245+
246+
let ticket = Request::from_parts(
247+
MetadataMap::from_headers(context_headers.clone()),
248+
Extensions::default(),
249+
Ticket {
250+
ticket: DoGet {
251+
stage_proto: Some(child_stage_proto.clone()),
252+
target_partition: target_partition as u64,
253+
stage_key: Some(StageKey {
254+
query_id: stage.query_id.to_string(),
255+
stage_id: child_stage.num as u64,
256+
task_number: target_task as u64,
257+
}),
258+
target_task_index: target_task as u64,
259+
}
260+
.encode_to_vec()
261+
.into(),
262+
},
263+
);
264+
265+
let Some(task) = child_stage.tasks.get(target_task) else {
266+
return internal_err!("ProgrammingError: Task {target_task} not found");
267+
};
268+
269+
let Some(url) = task.url.clone() else {
270+
return internal_err!("NetworkCoalesceTasksExec: task is unassigned, cannot proceed");
271+
};
272+
273+
let stream = async move {
274+
let channel = channel_resolver.get_channel_for_url(&url).await?;
275+
let stream = FlightServiceClient::new(channel)
276+
.do_get(ticket)
277+
.await
278+
.map_err(map_status_to_datafusion_error)?
279+
.into_inner()
280+
.map_err(|err| FlightError::Tonic(Box::new(err)));
281+
282+
Ok(FlightRecordBatchStream::new_from_flight_data(stream)
283+
.map_err(map_flight_to_datafusion_error))
284+
}
285+
.try_flatten_stream();
286+
287+
Ok(Box::pin(RecordBatchStreamAdapter::new(
288+
self.schema(),
289+
stream,
290+
)))
291+
}
292+
}

0 commit comments

Comments
 (0)