Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a6e01e7
Split channel resolver in two
gabotechs Dec 22, 2025
fc9bfc8
Simplify WorkerResolverExtension and ChannelResolverExtension
gabotechs Dec 22, 2025
9e15f2b
Add default builder to ArrowFlightEndpoint
gabotechs Dec 22, 2025
34cf529
Add some docs
gabotechs Dec 22, 2025
312901d
Listen to clippy
gabotechs Dec 23, 2025
f026e41
Split get_flight_client_for_url in two
gabotechs Dec 23, 2025
2508e48
Fix conflicts
gabotechs Dec 24, 2025
f7218b0
Remove unnecessary channel resolver
gabotechs Dec 25, 2025
b49289a
Improve WorkerResolver docs
gabotechs Dec 26, 2025
793f898
Use one ChannelResolver per runtime
gabotechs Dec 26, 2025
eaad60f
Improve error reporting on client connection failure
gabotechs Dec 26, 2025
ea4e09a
Add a from_session_builder method for constructing an InMemoryChannel…
gabotechs Dec 26, 2025
33b0cc7
Add ChannelResolver and WorkerResolver default implementations for Arcs
gabotechs Dec 26, 2025
1aeb719
Make TPC-DS tests use DataFusion test dataset
gabotechs Dec 24, 2025
e377698
Remove non-working in-memory option from benchmarks
gabotechs Dec 24, 2025
7a0b296
Remove unnecessary utils folder
gabotechs Dec 24, 2025
41f90a1
Refactor benchmark folder
gabotechs Dec 24, 2025
c88058e
Rename to prepare_tpch.rs
gabotechs Dec 24, 2025
b3bdd2b
Adapt benchmarks for TPC-DS
gabotechs Dec 24, 2025
05a30cc
Update benchmarks README.md
gabotechs Dec 24, 2025
0c736fd
Fix conflicts
gabotechs Dec 24, 2025
f9f4439
Use default session state builder
gabotechs Dec 26, 2025
21e8581
Update benchmarks README.md
gabotechs Dec 27, 2025
c306c6d
add broadcast join
gene-bordegaray Dec 27, 2025
12512af
don't distribute 1 consumer tasks
gene-bordegaray Dec 28, 2025
8927012
fix analyze tests
gene-bordegaray Dec 28, 2025
d13a28c
dont strictibute a single consumer, use coalesce
gene-bordegaray Dec 28, 2025
e0e5f50
intriduce broadcast operator that does caching
gene-bordegaray Jan 3, 2026
ec607b5
Merge branch 'main' into gene.bordegaray/2025/12/add_broadcast_exec
gene-bordegaray Jan 3, 2026
47d4ab9
refactored distributed planner to contain less broadcast logic and ad…
gene-bordegaray Jan 3, 2026
eae78c5
fix docs
gene-bordegaray Jan 3, 2026
7152752
add comment for follow up streaming work
gene-bordegaray Jan 3, 2026
83bfed2
add comment explaining cache solution for 1->1 task stage collapses
gene-bordegaray Jan 4, 2026
5d02692
refactor network broadcast to be cleaner
gene-bordegaray Jan 6, 2026
9669518
add new pass to the annotation
gene-bordegaray Jan 9, 2026
4a3af9a
put broadcast joins behind feature flag
gene-bordegaray Jan 12, 2026
9bcd287
Merge branch 'main' into gene.bordegaray/2025/12/add_broadcast_exec
gene-bordegaray Jan 12, 2026
b3cd8e0
ony distribute joins when broadcast enabled
gene-bordegaray Jan 13, 2026
92b81a6
add benchmark config and update docs
gene-bordegaray Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion benchmarks/cdk/bin/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ struct Cmd {
/// The bucket name.
#[structopt(long, default_value = "datafusion-distributed-benchmarks")]
bucket: String,

// Turns broadcast joins on.
#[structopt(long)]
broadcast_joins: bool,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄 it's funny how you managed to choose three different arguments for the broadcast joins flags in three different places (broadcast_joins, enable_broadcast_joins, broadcast_joins_enabled).

It's a pretty good idea to add this option, I'd probably just choose one flag name in all places, probably broadcast_joins since its shorter.

}

#[tokio::main]
Expand Down Expand Up @@ -67,12 +71,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
let runtime_env = Arc::new(RuntimeEnv::default());
runtime_env.register_object_store(&s3_url, s3);

let state = SessionStateBuilder::new()
let mut state = SessionStateBuilder::new()
.with_default_features()
.with_runtime_env(Arc::clone(&runtime_env))
.with_distributed_worker_resolver(Ec2WorkerResolver::new())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.build();
if cmd.broadcast_joins {
state = state.with_distributed_broadcast_joins_enabled(true)?;
}
Comment on lines 78 to +82
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: even sorter:

Suggested change
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.build();
if cmd.broadcast_joins {
state = state.with_distributed_broadcast_joins_enabled(true)?;
}
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_broadcast_joins_enabled(cmd.broadcast_joins)?
.build();

let ctx = SessionContext::from(state);

let worker = Worker::default().with_runtime_env(runtime_env);
Expand Down
5 changes: 5 additions & 0 deletions benchmarks/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ pub struct RunOpt {
#[structopt(long)]
children_isolator_unions: bool,

/// Turns on broadcast joins.
#[structopt(long)]
enable_broadcast_joins: bool,

/// Collects metrics across network boundaries
#[structopt(long)]
collect_metrics: bool,
Expand Down Expand Up @@ -210,6 +214,7 @@ impl RunOpt {
self.cardinality_task_sf.unwrap_or(1.0),
)?
.with_distributed_children_isolator_unions(self.children_isolator_unions)?
.with_distributed_broadcast_joins_enabled(self.enable_broadcast_joins)?
.with_distributed_metrics_collection(self.collect_metrics)?
.build();
let ctx = SessionContext::new_with_state(state);
Expand Down
45 changes: 45 additions & 0 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,23 @@ pub trait DistributedExt: Sized {
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>;

/// Enables broadcast joins for CollectLeft hash joins. When enabled, the build side of
/// a CollectLeft join is broadcast to all consumer tasks instead of being coalesced
/// into a single partition.
///
/// Note: This option is disabled by default until the implementation is smarter about when to
/// broadcast.
fn with_distributed_broadcast_joins_enabled(
self,
enabled: bool,
) -> Result<Self, DataFusionError>;

/// Same as [DistributedExt::with_distributed_broadcast_joins_enabled] but with an in-place mutation.
fn set_distributed_broadcast_joins_enabled(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>;
}

impl DistributedExt for SessionConfig {
Expand Down Expand Up @@ -529,6 +546,15 @@ impl DistributedExt for SessionConfig {
Ok(())
}

fn set_distributed_broadcast_joins_enabled(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError> {
let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
d_cfg.broadcast_joins_enabled = enabled;
Ok(())
}

delegate! {
to self {
#[call(set_distributed_option_extension)]
Expand Down Expand Up @@ -574,6 +600,10 @@ impl DistributedExt for SessionConfig {
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;

#[call(set_distributed_broadcast_joins_enabled)]
#[expr($?;Ok(self))]
fn with_distributed_broadcast_joins_enabled(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -635,6 +665,11 @@ impl DistributedExt for SessionStateBuilder {
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;

fn set_distributed_broadcast_joins_enabled(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_broadcast_joins_enabled)]
#[expr($?;Ok(self))]
fn with_distributed_broadcast_joins_enabled(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -696,6 +731,11 @@ impl DistributedExt for SessionState {
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;

fn set_distributed_broadcast_joins_enabled(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_broadcast_joins_enabled)]
#[expr($?;Ok(self))]
fn with_distributed_broadcast_joins_enabled(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -757,6 +797,11 @@ impl DistributedExt for SessionContext {
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(self, enabled: bool) -> Result<Self, DataFusionError>;

fn set_distributed_broadcast_joins_enabled(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_broadcast_joins_enabled)]
#[expr($?;Ok(self))]
fn with_distributed_broadcast_joins_enabled(self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
6 changes: 6 additions & 0 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ extensions_options! {
/// Propagate collected metrics from all nodes in the plan across network boundaries
/// so that they can be reconstructed on the head node of the plan.
pub collect_metrics: bool, default = true
/// Enable broadcast joins for CollectLeft hash joins. When enabled, the build side of
/// a CollectLeft join is broadcast to all consumer tasks.
/// TODO: This option exists temporarily until we become smarter about when to actually
/// use broadcasting like checking build side size.
/// For now, broadcasting all CollectLeft joins is not always beneficial.
pub broadcast_joins_enabled: bool, default = false
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
Expand Down
Loading