Skip to content
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
a6e01e7
Split channel resolver in two
gabotechs Dec 22, 2025
fc9bfc8
Simplify WorkerResolverExtension and ChannelResolverExtension
gabotechs Dec 22, 2025
9e15f2b
Add default builder to ArrowFlightEndpoint
gabotechs Dec 22, 2025
34cf529
Add some docs
gabotechs Dec 22, 2025
312901d
Listen to clippy
gabotechs Dec 23, 2025
f026e41
Split get_flight_client_for_url in two
gabotechs Dec 23, 2025
2508e48
Fix conflicts
gabotechs Dec 24, 2025
f7218b0
Remove unnecessary channel resolver
gabotechs Dec 25, 2025
b49289a
Improve WorkerResolver docs
gabotechs Dec 26, 2025
793f898
Use one ChannelResolver per runtime
gabotechs Dec 26, 2025
eaad60f
Improve error reporting on client connection failure
gabotechs Dec 26, 2025
ea4e09a
Add a from_session_builder method for constructing an InMemoryChannel…
gabotechs Dec 26, 2025
33b0cc7
Add ChannelResolver and WorkerResolver default implementations for Arcs
gabotechs Dec 26, 2025
1aeb719
Make TPC-DS tests use DataFusion test dataset
gabotechs Dec 24, 2025
e377698
Remove non-working in-memory option from benchmarks
gabotechs Dec 24, 2025
7a0b296
Remove unnecessary utils folder
gabotechs Dec 24, 2025
41f90a1
Refactor benchmark folder
gabotechs Dec 24, 2025
c88058e
Rename to prepare_tpch.rs
gabotechs Dec 24, 2025
b3bdd2b
Adapt benchmarks for TPC-DS
gabotechs Dec 24, 2025
05a30cc
Update benchmarks README.md
gabotechs Dec 24, 2025
0c736fd
Fix conflicts
gabotechs Dec 24, 2025
f9f4439
Use default session state builder
gabotechs Dec 26, 2025
21e8581
Update benchmarks README.md
gabotechs Dec 27, 2025
c306c6d
add broadcast join
gene-bordegaray Dec 27, 2025
12512af
don't distribute 1 consumer tasks
gene-bordegaray Dec 28, 2025
8927012
fix analyze tests
gene-bordegaray Dec 28, 2025
d13a28c
dont strictibute a single consumer, use coalesce
gene-bordegaray Dec 28, 2025
e0e5f50
intriduce broadcast operator that does caching
gene-bordegaray Jan 3, 2026
ec607b5
Merge branch 'main' into gene.bordegaray/2025/12/add_broadcast_exec
gene-bordegaray Jan 3, 2026
47d4ab9
refactored distributed planner to contain less broadcast logic and ad…
gene-bordegaray Jan 3, 2026
eae78c5
fix docs
gene-bordegaray Jan 3, 2026
7152752
add comment for follow up streaming work
gene-bordegaray Jan 3, 2026
83bfed2
add comment explaining cache solution for 1->1 task stage collapses
gene-bordegaray Jan 4, 2026
5d02692
refactor network broadcast to be cleaner
gene-bordegaray Jan 6, 2026
9669518
add new pass to the annotation
gene-bordegaray Jan 9, 2026
4a3af9a
put broadcast joins behind feature flag
gene-bordegaray Jan 12, 2026
9bcd287
Merge branch 'main' into gene.bordegaray/2025/12/add_broadcast_exec
gene-bordegaray Jan 12, 2026
b3cd8e0
ony distribute joins when broadcast enabled
gene-bordegaray Jan 13, 2026
92b81a6
add benchmark config and update docs
gene-bordegaray Jan 13, 2026
c1610a3
everything but tpcds query 75 works
gene-bordegaray Jan 14, 2026
2cebb41
propagagate unions correctly and don't allow left joins
gene-bordegaray Jan 17, 2026
1bd0922
refactor tests
gene-bordegaray Jan 17, 2026
b8064e2
Merge branch 'main' into gene.bordegaray/2025/12/add_broadcast_exec
gene-bordegaray Jan 17, 2026
c50f9b5
add comments
gene-bordegaray Jan 17, 2026
a4392a4
fix broadcast with new children
gene-bordegaray Jan 17, 2026
f1922d7
fix benches
gene-bordegaray Jan 17, 2026
728a031
add insert broaaadcast tests
gene-bordegaray Jan 17, 2026
5f884ac
refactor / address comments
gene-bordegaray Jan 18, 2026
c9d2a11
remove no-op check
gene-bordegaray Jan 18, 2026
c271cd5
remove check to allow build side to cap join
gene-bordegaray Jan 23, 2026
0809dee
Merge branch 'main' into gene.bordegaray/2025/12/add_broadcast_exec
gene-bordegaray Jan 23, 2026
3c0d1c1
fix indenting
gene-bordegaray Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion benchmarks/cdk/bin/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ struct Cmd {
/// The bucket name.
#[structopt(long, default_value = "datafusion-distributed-benchmarks")]
bucket: String,

// Turns broadcast joins on.
#[structopt(long)]
broadcast_joins: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -67,12 +71,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
let runtime_env = Arc::new(RuntimeEnv::default());
runtime_env.register_object_store(&s3_url, s3);

let state = SessionStateBuilder::new()
let mut state = SessionStateBuilder::new()
.with_default_features()
.with_runtime_env(Arc::clone(&runtime_env))
.with_distributed_worker_resolver(Ec2WorkerResolver::new())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.build();
if cmd.broadcast_joins {
state = state.with_distributed_broadcast_joins_enabled(true)?;
}
let ctx = SessionContext::from(state);

let worker = Worker::default().with_runtime_env(runtime_env);
Expand Down
5 changes: 5 additions & 0 deletions benchmarks/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ pub struct RunOpt {
#[structopt(long)]
children_isolator_unions: bool,

/// Turns on broadcast joins.
#[structopt(long)]
enable_broadcast_joins: bool,

/// Collects metrics across network boundaries
#[structopt(long)]
collect_metrics: bool,
Expand Down Expand Up @@ -210,6 +214,7 @@ impl RunOpt {
self.cardinality_task_sf.unwrap_or(1.0),
)?
.with_distributed_children_isolator_unions(self.children_isolator_unions)?
.with_distributed_broadcast_joins_enabled(self.enable_broadcast_joins)?
.with_distributed_metrics_collection(self.collect_metrics)?
.build();
let ctx = SessionContext::new_with_state(state);
Expand Down
45 changes: 45 additions & 0 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,23 @@ pub trait DistributedExt: Sized {
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>;

/// Enables broadcast joins for CollectLeft hash joins. When enabled, the build side of
/// a CollectLeft join is broadcast to all consumer tasks instead of being coalesced
/// into a single partition.
///
/// Note: This option is disabled by default until the implementation is smarter about when to
/// broadcast.
fn with_distributed_broadcast_joins_enabled(
self,
enabled: bool,
) -> Result<Self, DataFusionError>;

/// Same as [DistributedExt::with_distributed_broadcast_joins_enabled] but with an in-place mutation.
fn set_distributed_broadcast_joins_enabled(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>;
}

impl DistributedExt for SessionConfig {
Expand Down Expand Up @@ -529,6 +546,15 @@ impl DistributedExt for SessionConfig {
Ok(())
}

fn set_distributed_broadcast_joins_enabled(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError> {
let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
d_cfg.broadcast_joins_enabled = enabled;
Ok(())
}

delegate! {
to self {
#[call(set_distributed_option_extension)]
Expand Down Expand Up @@ -574,6 +600,10 @@ impl DistributedExt for SessionConfig {
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;

#[call(set_distributed_broadcast_joins_enabled)]
#[expr($?;Ok(self))]
fn with_distributed_broadcast_joins_enabled(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -635,6 +665,11 @@ impl DistributedExt for SessionStateBuilder {
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;

fn set_distributed_broadcast_joins_enabled(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_broadcast_joins_enabled)]
#[expr($?;Ok(self))]
fn with_distributed_broadcast_joins_enabled(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -696,6 +731,11 @@ impl DistributedExt for SessionState {
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;

fn set_distributed_broadcast_joins_enabled(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_broadcast_joins_enabled)]
#[expr($?;Ok(self))]
fn with_distributed_broadcast_joins_enabled(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -757,6 +797,11 @@ impl DistributedExt for SessionContext {
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(self, enabled: bool) -> Result<Self, DataFusionError>;

fn set_distributed_broadcast_joins_enabled(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_broadcast_joins_enabled)]
#[expr($?;Ok(self))]
fn with_distributed_broadcast_joins_enabled(self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
6 changes: 6 additions & 0 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ extensions_options! {
/// Propagate collected metrics from all nodes in the plan across network boundaries
/// so that they can be reconstructed on the head node of the plan.
pub collect_metrics: bool, default = true
/// Enable broadcast joins for CollectLeft hash joins. When enabled, the build side of
/// a CollectLeft join is broadcast to all consumer tasks.
/// TODO: This option exists temporarily until we become smarter about when to actually
/// use broadcasting like checking build side size.
/// For now, broadcasting all CollectLeft joins is not always beneficial.
pub broadcast_joins_enabled: bool, default = false
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
Expand Down
Loading