Skip to content

Conversation

@gabotechs
Copy link
Collaborator

@gabotechs gabotechs commented Oct 30, 2025

This PR refactors the public API people use for enriching a DataFusion SessionState with distributed capabilities.

Before, all the with_distributed_* and set_distributed_* methods were applicable to a lot of DataFusion structs:

  • SessionContext
  • SessionConfig
  • SessionState
  • SessionStateBuilder

But there's an ergonomic issue with that:

Users have no other option but to use SessionStateBuilder for adding configuring distributed capabilities, because that's the only possible place in DataFusion to add a PhysicalOptimizerRule implementation (builder.with_physical_optimizer_rule()).

This means that no matter how many ergonomic improvements we bring to other structs, people need to configure things through SessionStateBuilder.

This PR allows configuring distributed DataFusion with a new with_distributed_execution() that will automatically:

  • set the ChannelResolver implementation
  • inject a DistributedConfig struct in the ConfigOptions
  • Add a DistributedPhysicalOptimizerRule as an optimization rule

As these are all things that users need to do proactively one way or another, we now just expose it in one method:

let state = SessionStateBuilder::new()
    .with_distributed_execution(CustomChannelResolver)
    .build();

This is a preparation PR for a bigger one:

… from anything that's not a SessionStateBuilder
Comment on lines +69 to +71
if cfg.network_coalesce_tasks.is_none() && cfg.network_shuffle_tasks.is_none() {
return Ok(plan);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As now DistributedConfig is always in the config, we cannot rely on it not being present as a signal that the query should not be distributed. Now we also need to check that it's not configured at all in order to skip the distribution.

Comment on lines +186 to +188
if plan.as_any().is::<DistributedExec>() {
return Ok(plan);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really related to this PR, but found this footgun while making the tests pass, so I thought that it might be fine to add it here, it would have helped me.

@gabotechs gabotechs marked this pull request as ready for review November 4, 2025 21:56
@gabotechs
Copy link
Collaborator Author

@adriangb any feedback on this? for us (DataDog) this API change makes sense, but let me know what you guys think

@adriangb
Copy link
Contributor

adriangb commented Nov 5, 2025

I think we may need a bit finer grained control. While we're still in the stage of getting datafusion-distributed working for all of our queries we are using a feature flag to toggle it on/off. We have a system of runtime feature flags that can be embedded in queries, and usually use floats instead of bools so I can set an env var along the lines of:

DATAFUSION_DISTRIBUTED_CHANCE=0.25  # run 25% of queries via `datafusion-distributed` by default

This sets the default for an ExtensionOptions (similar to DistributedConfig). Then we can tweak it on a per-customer/tenant basis and even per query:

-- SET extension.distributed_chance = 1.0
SELECT * FROM t LIMIT 10;

The way we do this is currently an optimizer rule wrapper FeatureFlaggedOptimizerRule that lets you dynamically run a wrapped optimizer rule (DistributedPhysicalOptimizerRule) depending on ConfigOptions.

I think we could do something similar with this new system by tracking distributed_chance outside of ConfigOptions and then setting network_coalesce_tasks and cfg.network_shuffle_tasks to None on a per-query basis. I'll try to confirm tomorrow by folding in this PR. That might actually simplify things for us.

But overall I think it's a much nicer / simpler API for most if not all users 🥳

@gabotechs
Copy link
Collaborator Author

I think we could do something similar with this new system by tracking distributed_chance outside of ConfigOptions and then setting network_coalesce_tasks and cfg.network_shuffle_tasks to None on a per-query basis.

🤔 it seems like even if you do that then #216 will make things even more complicated to you as that PR just removes network_coalesce_tasks and network_shuffle_tasks.

My impression then is that we probably should still let people manually inject the DistributedPhysicalOptimizerRule (or any other custom rule like FeatureFlaggedOptimizerRule) rather than doing it ourselves in the set_distributed_execution() method.

@gabotechs
Copy link
Collaborator Author

gabotechs commented Nov 5, 2025

Given that #216 is going to rework how this all works, I'm leaning towards not shipping this PR and just work on #216 to keep something like:

    let state = SessionStateBuilder::new()
        .with_default_features()
        .with_distributed_channel_resolver(localhost_resolver)
        .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
        .build();

instead of

    let state = SessionStateBuilder::new()
        .with_default_features()
        .with_distributed_execution(localhost_resolver)
        .build();

The ergonomic improvement does not seem significant enough to justify removing the ability to provide rules other than DistributedPhysicalOptimizerRule.

@adriangb
Copy link
Contributor

adriangb commented Nov 5, 2025

Why can't we have both? In general I like it when software makes the "default" or "simple" thing easy but allows for unwrapping the onion and poking into the inner layers if users need to do something more complex.

@gabotechs
Copy link
Collaborator Author

yeah, we could, that'd be fine. Don't have a strong opinion as the ergonomic difference between both options is not super big, but if you think that can give a "cleaner" API for the general use case lets go for it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants