Skip to content

Commit d17b83a

Browse files
committed
chore(cubestore): Upgrade DF: push down GlobalLimit after adding it to workers
1 parent e4a1574 commit d17b83a

File tree

3 files changed

+18
-7
lines changed

3 files changed

+18
-7
lines changed

rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ use crate::queryplanner::planning::WorkerExec;
33
use crate::queryplanner::query_executor::ClusterSendExec;
44
use crate::queryplanner::tail_limit::TailLimitExec;
55
use crate::queryplanner::topk::AggregateTopKExec;
6+
use datafusion::config::ConfigOptions;
67
use datafusion::error::DataFusionError;
78
use datafusion::physical_expr::LexOrdering;
9+
use datafusion::physical_optimizer::limit_pushdown::LimitPushdown;
10+
use datafusion::physical_optimizer::PhysicalOptimizerRule as _;
811
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
912
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
1013
use datafusion::physical_plan::limit::GlobalLimitExec;
@@ -192,10 +195,13 @@ pub fn ensure_partition_merge_with_acceptable_parent(
192195
}
193196
}
194197

195-
///Add `GlobalLimitExec` behind worker node if this node has `limit` property set
196-
///Should be executed after all optimizations which can move `Worker` node or change it input
198+
/// Add `GlobalLimitExec` behind worker node if this node has `limit` property set and applies DF
199+
/// `LimitPushdown` optimizer. Should be executed after all optimizations which can move `Worker`
200+
/// node or change its input. `config` is ignored -- we pass it to DF's `LimitPushdown` optimizer,
201+
/// which also ignores it (as of DF 46.0.1).
197202
pub fn add_limit_to_workers(
198203
p: Arc<dyn ExecutionPlan>,
204+
config: &ConfigOptions,
199205
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
200206
let limit_and_reverse;
201207
let input;
@@ -217,6 +223,7 @@ pub fn add_limit_to_workers(
217223
p.with_new_children(vec![limit])
218224
} else {
219225
let limit = Arc::new(GlobalLimitExec::new(input.clone(), 0, Some(limit)));
220-
p.with_new_children(vec![limit])
226+
let limit_optimized = LimitPushdown::new().optimize(limit, config)?;
227+
p.with_new_children(vec![limit_optimized])
221228
}
222229
}

rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,13 @@ impl QueryPlanner for CubeQueryPlanner {
9393
])
9494
.create_physical_plan(logical_plan, ctx_state)
9595
.await?;
96-
// TODO: assert there is only a single ClusterSendExec in the plan. Update: This is no longer true.
97-
finalize_physical_plan(
96+
let result = finalize_physical_plan(
9897
p,
9998
self.memory_handler.clone(),
10099
self.data_loaded_size.clone(),
101-
)
100+
ctx_state.config().options(),
101+
);
102+
result
102103
}
103104
}
104105

@@ -145,12 +146,14 @@ fn finalize_physical_plan(
145146
p: Arc<dyn ExecutionPlan>,
146147
memory_handler: Arc<dyn MemoryHandler>,
147148
data_loaded_size: Option<Arc<DataLoadedSize>>,
149+
config: &ConfigOptions,
148150
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
149151
let p = rewrite_physical_plan(p, &mut |p| add_check_memory_exec(p, memory_handler.clone()))?;
150152
let p = if let Some(data_loaded_size) = data_loaded_size {
151153
rewrite_physical_plan(p, &mut |p| add_trace_data_loaded_exec(p, &data_loaded_size))?
152154
} else {
153155
p
154156
};
155-
rewrite_physical_plan(p, &mut |p| add_limit_to_workers(p))
157+
let p = rewrite_physical_plan(p, &mut |p| add_limit_to_workers(p, config))?;
158+
Ok(p)
156159
}

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use datafusion_datasource::file_scan_config::FileScanConfig;
2323
use datafusion_datasource::memory::MemorySourceConfig;
2424
use datafusion_datasource::source::DataSourceExec;
2525
use itertools::{repeat_n, Itertools};
26+
use std::fmt::Write;
2627
use std::sync::Arc;
2728

2829
use crate::queryplanner::check_memory::CheckMemoryExec;

0 commit comments

Comments
 (0)