Skip to content

Commit a94556a

Browse files
authored
Distribute UNION operations (#262)
* Split channel resolver in two * Simplify WorkerResolverExtension and ChannelResolverExtension * Add default builder to ArrowFlightEndpoint * Add some docs * Listen to clippy * Split get_flight_client_for_url in two * Fix conflicts * Remove unnecessary channel resolver * Improve WorkerResolver docs * Use one ChannelResolver per runtime * Improve error reporting on client connection failure * Add a from_session_builder method for constructing an InMemoryChannelResolver * Add ChannelResolver and WorkerResolver default implementations for Arcs * Make TPC-DS tests use DataFusion test dataset * Remove non-working in-memory option from benchmarks * Remove unnecessary utils folder * Refactor benchmark folder * Rename to prepare_tpch.rs * Adapt benchmarks for TPC-DS * Update benchmarks README.md * Fix conflicts * Use default session state builder * Add ChildrenIsolatorUnionExec * Add proto serde for ChildrenIsolatorUnionExec * Wire up ChildrenIsolatorUnionExec to planner * Add integration tests for distributed UNIONs * Skip query 72 in TPC-DS benchmarks * Allow setting children isolator unions * Allow passing multiple queries in benchmarks
1 parent 831d617 commit a94556a

File tree

12 files changed

+2828
-1583
lines changed

12 files changed

+2828
-1583
lines changed

benchmarks/get-tpcds.sh

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
SCALE_FACTOR=${SCALE_FACTOR:-1}
6+
PARTITIONS=${PARTITIONS:-16}
7+
8+
echo "Generating TPC-DS dataset with SCALE_FACTOR=${SCALE_FACTOR} and PARTITIONS=${PARTITIONS}"
9+
10+
# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
11+
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
12+
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
13+
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
14+
TPCDS_DIR="${DATA_DIR}/tpcds_sf${SCALE_FACTOR}"
15+
16+
echo "Creating tpcds dataset at Scale Factor ${SCALE_FACTOR} in ${TPCDS_DIR}..."
17+
18+
# Ensure the target data directory exists
19+
mkdir -p "${TPCDS_DIR}"
20+
21+
$CARGO_COMMAND -- prepare-tpcds --output "${TPCDS_DIR}" --partitions "$PARTITIONS"

benchmarks/src/run.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ use tonic::transport::Server;
5858
#[structopt(verbatim_doc_comment)]
5959
pub struct RunOpt {
6060
/// Query number. If not specified, runs all queries
61-
#[structopt(short, long)]
62-
pub query: Option<usize>,
61+
#[structopt(short, long, use_delimiter = true)]
62+
pub query: Vec<usize>,
6363

6464
/// Path to data files
6565
#[structopt(parse(from_os_str), short = "p", long = "path")]
@@ -89,6 +89,10 @@ pub struct RunOpt {
8989
#[structopt(long)]
9090
cardinality_task_sf: Option<f64>,
9191

92+
/// Use children isolator UNIONs for distributing UNION operations.
93+
#[structopt(long)]
94+
children_isolator_unions: bool,
95+
9296
/// Collects metrics across network boundaries
9397
#[structopt(long)]
9498
collect_metrics: bool,
@@ -143,7 +147,9 @@ impl Dataset {
143147
Dataset::Tpch => (1..22 + 1)
144148
.map(|i| Ok((i as usize, tpch::get_test_tpch_query(i)?)))
145149
.collect(),
146-
Dataset::Tpcds => (1..99 + 1)
150+
Dataset::Tpcds => (1..72)
151+
// skip query 72, it's ridiculously slow
152+
.chain(73..99 + 1)
147153
.map(|i| Ok((i, tpcds::get_test_tpcds_query(i)?)))
148154
.collect(),
149155
Dataset::Clickbench => (0..42 + 1)
@@ -203,6 +209,7 @@ impl RunOpt {
203209
.with_distributed_cardinality_effect_task_scale_factor(
204210
self.cardinality_task_sf.unwrap_or(1.0),
205211
)?
212+
.with_distributed_children_isolator_unions(self.children_isolator_unions)?
206213
.with_distributed_metrics_collection(self.collect_metrics)?
207214
.build();
208215
let ctx = SessionContext::new_with_state(state);
@@ -219,7 +226,7 @@ impl RunOpt {
219226
let dataset = Dataset::infer_from_data_path(path.clone())?;
220227

221228
for (id, sql) in dataset.queries()? {
222-
if self.query.is_some_and(|v| v != id) {
229+
if !self.query.is_empty() && !self.query.contains(&id) {
223230
continue;
224231
}
225232
let query_id = format!("{dataset:?} {id}");

src/distributed_ext.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,35 @@ pub trait DistributedExt: Sized {
418418

419419
/// Same as [DistributedExt::with_distributed_metrics_collection] but with an in-place mutation.
420420
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
421+
422+
/// Enables children isolator unions for distributing UNION operations across as many tasks as
423+
/// the sum of all the tasks required for each child.
424+
///
425+
/// For example, if there is a UNION with 3 children, requiring one task each, it will result
426+
/// in a plan with 3 tasks where each task runs one child:
427+
///
428+
/// ```text
429+
/// ┌─────────────────────────────┐┌─────────────────────────────┐┌─────────────────────────────┐
430+
/// │ Task 1 ││ Task 2 ││ Task 3 │
431+
/// │┌───────────────────────────┐││┌───────────────────────────┐││┌───────────────────────────┐│
432+
/// ││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││
433+
/// │└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘│
434+
/// │ │ ││ │ ││ │ │
435+
/// │┌───┴───┐ ┌ ─│ ─ ┌ ─│ ─ ││┌ ─│ ─ ┌───┴───┐ ┌ ─│ ─ ││┌ ─│ ─ ┌ ─│ ─ ┌───┴───┐│
436+
/// ││Child 1│ Child 2│ Child 3│││ Child 1│ │Child 2│ Child 3│││ Child 1│ Child 2│ │Child 3││
437+
/// │└───────┘ └ ─ ─ └ ─ ─ ││└ ─ ─ └───────┘ └ ─ ─ ││└ ─ ─ └ ─ ─ └───────┘│
438+
/// └─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘
439+
/// ```
440+
fn with_distributed_children_isolator_unions(
441+
self,
442+
enabled: bool,
443+
) -> Result<Self, DataFusionError>;
444+
445+
/// Same as [DistributedExt::with_distributed_children_isolator_unions] but with an in-place mutation.
446+
fn set_distributed_children_isolator_unions(
447+
&mut self,
448+
enabled: bool,
449+
) -> Result<(), DataFusionError>;
421450
}
422451

423452
impl DistributedExt for SessionConfig {
@@ -489,6 +518,15 @@ impl DistributedExt for SessionConfig {
489518
Ok(())
490519
}
491520

521+
fn set_distributed_children_isolator_unions(
522+
&mut self,
523+
enabled: bool,
524+
) -> Result<(), DataFusionError> {
525+
let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
526+
d_cfg.children_isolator_unions = enabled;
527+
Ok(())
528+
}
529+
492530
delegate! {
493531
to self {
494532
#[call(set_distributed_option_extension)]
@@ -530,6 +568,10 @@ impl DistributedExt for SessionConfig {
530568
#[call(set_distributed_metrics_collection)]
531569
#[expr($?;Ok(self))]
532570
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
571+
572+
#[call(set_distributed_children_isolator_unions)]
573+
#[expr($?;Ok(self))]
574+
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
533575
}
534576
}
535577
}
@@ -586,6 +628,11 @@ impl DistributedExt for SessionStateBuilder {
586628
#[call(set_distributed_metrics_collection)]
587629
#[expr($?;Ok(self))]
588630
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
631+
632+
fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
633+
#[call(set_distributed_children_isolator_unions)]
634+
#[expr($?;Ok(self))]
635+
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
589636
}
590637
}
591638
}
@@ -642,6 +689,11 @@ impl DistributedExt for SessionState {
642689
#[call(set_distributed_metrics_collection)]
643690
#[expr($?;Ok(self))]
644691
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
692+
693+
fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
694+
#[call(set_distributed_children_isolator_unions)]
695+
#[expr($?;Ok(self))]
696+
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
645697
}
646698
}
647699
}
@@ -698,6 +750,11 @@ impl DistributedExt for SessionContext {
698750
#[call(set_distributed_metrics_collection)]
699751
#[expr($?;Ok(self))]
700752
fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
753+
754+
fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
755+
#[call(set_distributed_children_isolator_unions)]
756+
#[expr($?;Ok(self))]
757+
fn with_distributed_children_isolator_unions(self, enabled: bool) -> Result<Self, DataFusionError>;
701758
}
702759
}
703760
}

src/distributed_planner/distributed_config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@ extensions_options! {
2828
/// batches over the wire.
2929
/// If set to 0, batch coalescing is disabled on network shuffle operations.
3030
pub shuffle_batch_size: usize, default = 8192
31+
/// When encountering a UNION operation, isolate its children depending on the task context.
32+
/// For example, on a UNION operation with 3 children running in 3 distributed tasks,
33+
/// instead of executing the 3 children in each 3 tasks with a DistributedTaskContext of
34+
/// 1/3, 2/3, and 3/3 respectively, Execute:
35+
/// - The first child in the first task with a DistributedTaskContext of 1/1
36+
/// - The second child in the second task with a DistributedTaskContext of 1/1
37+
/// - The third child in the third task with a DistributedTaskContext of 1/1
38+
pub children_isolator_unions: bool, default = true
3139
/// Propagate collected metrics from all nodes in the plan across network boundaries
3240
/// so that they can be reconstructed on the head node of the plan.
3341
pub collect_metrics: bool, default = true

0 commit comments

Comments
 (0)