Skip to content

Commit 897b5c1

Browse files
timsaucerCopilot
andauthored
feat: support repartitioning of FFI execution plans (apache#20449)
## Which issue does this PR close? This is a blocker for apache#20450 ## Rationale for this change This PR introduces an important concept in the FFI work to avoids creating wrappers upon wrappers of plans. It was discovered as part of the work to create FFI physical optimizer rules. Suppose we have a foreign plan. Then we attempt to turn this into an FFI plan. What we will end up with currently is a FFI plan where the underlying private data is a foreign plan that additionally contains a FFI plan. Instead *any* time we are creating an FFI object we should check to see if it is locally downcastable to a Foreign plan and if so to just access the already existing FFI object. This pattern is adapted across all FFI objects in this PR. With this work in place we can also properly support repartioning via FFI as well as `new_with_children` via FFI. ## What changes are included in this PR? - Adds access pattern for creating new FFI objects. When they are already a locally downcastable to a Foreign wrapper then we simply get the underlying existing FFI object instead of creating a wrapper around a wrapper. - Implement repartitioning and new_with_children via FFI on execution plans. ## Are these changes tested? Integration tests are added. ## Are there any user-facing changes? The one use facing change is that for some of the aggregates and accumulators that take in closures we require these closures to be static so that we can downcast the boxed traits. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent c792700 commit 897b5c1

File tree

29 files changed

+429
-97
lines changed

29 files changed

+429
-97
lines changed

datafusion/catalog/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ pub trait TableProviderFactory: Debug + Sync + Send {
486486
}
487487

488488
/// A trait for table function implementations
489-
pub trait TableFunctionImpl: Debug + Sync + Send {
489+
pub trait TableFunctionImpl: Debug + Sync + Send + Any {
490490
/// Create a table provider
491491
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
492492
}

datafusion/expr-common/src/accumulator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use std::fmt::Debug;
4848
/// [`evaluate`]: Self::evaluate
4949
/// [`merge_batch`]: Self::merge_batch
5050
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
51-
pub trait Accumulator: Send + Sync + Debug {
51+
pub trait Accumulator: Send + Sync + Debug + std::any::Any {
5252
/// Updates the accumulator's state from its input.
5353
///
5454
/// `values` contains the arguments to this aggregate function.

datafusion/expr-common/src/groups_accumulator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl EmitTo {
108108
///
109109
/// [`Accumulator`]: crate::accumulator::Accumulator
110110
/// [Aggregating Millions of Groups Fast blog]: https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/
111-
pub trait GroupsAccumulator: Send {
111+
pub trait GroupsAccumulator: Send + std::any::Any {
112112
/// Updates the accumulator's state from its arguments, encoded as
113113
/// a vector of [`ArrayRef`]s.
114114
///

datafusion/expr/src/partition_evaluator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ use crate::window_state::WindowAggState;
9090
/// For more background, please also see the [User defined Window Functions in DataFusion blog]
9191
///
9292
/// [User defined Window Functions in DataFusion blog]: https://datafusion.apache.org/blog/2025/04/19/user-defined-window-functions
93-
pub trait PartitionEvaluator: Debug + Send {
93+
pub trait PartitionEvaluator: Debug + Send + std::any::Any {
9494
/// When the window frame has a fixed beginning (e.g UNBOUNDED
9595
/// PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and
9696
/// NTH_VALUE do not need the (unbounded) input once they have

datafusion/ffi/src/catalog_provider.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,11 @@ impl FFI_CatalogProvider {
250250
runtime: Option<Handle>,
251251
logical_codec: FFI_LogicalExtensionCodec,
252252
) -> Self {
253+
if let Some(provider) = provider.as_any().downcast_ref::<ForeignCatalogProvider>()
254+
{
255+
return provider.0.clone();
256+
}
257+
253258
let private_data = Box::new(ProviderPrivateData { provider, runtime });
254259

255260
Self {

datafusion/ffi/src/catalog_provider_list.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,13 @@ impl FFI_CatalogProviderList {
212212
runtime: Option<Handle>,
213213
logical_codec: FFI_LogicalExtensionCodec,
214214
) -> Self {
215+
if let Some(provider) = provider
216+
.as_any()
217+
.downcast_ref::<ForeignCatalogProviderList>()
218+
{
219+
return provider.0.clone();
220+
}
221+
215222
let private_data = Box::new(ProviderPrivateData { provider, runtime });
216223

217224
Self {

0 commit comments

Comments
 (0)