Skip to content

Commit 6724cfc

Browse files
committed
chore(cubestore): Upgrade DF: Disable parallelization of SortPreservingMergeExec inputs
This makes it behave like pre-DF-upgrade MergeSortExec
1 parent 5f09187 commit 6724cfc

File tree

3 files changed

+51
-32
lines changed

3 files changed

+51
-32
lines changed

rust/cubestore/Cargo.lock

Lines changed: 29 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,13 @@ impl QueryPlannerImpl {
282282
}
283283

284284
impl QueryPlannerImpl {
285-
pub fn make_execution_context(config: SessionConfig) -> SessionContext {
285+
pub fn make_execution_context(mut config: SessionConfig) -> SessionContext {
286+
// The config parameter is from metadata_cache_factor (which we need to rename) but doesn't
287+
// include all necessary configs.
288+
config
289+
.options_mut()
290+
.execution
291+
.dont_parallelize_sort_preserving_merge_exec_inputs = true;
286292
let context = SessionContext::new_with_config(config);
287293
// TODO upgrade DF: build SessionContexts consistently
288294
for udaf in registerable_aggregate_udfs() {

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ impl QueryExecutorImpl {
411411
})
412412
}
413413

414+
/// Only used for create_physical_plan, not executing the plan. TODO upgrade DF: Make fewer distinct SessionContexts.
414415
fn router_context(
415416
&self,
416417
cluster: Arc<dyn Cluster>,
@@ -423,6 +424,7 @@ impl QueryExecutorImpl {
423424
))
424425
}
425426

427+
/// Only used for create_physical_plan, not executing the plan. TODO upgrade DF: Make fewer distinct SessionContexts.
426428
fn worker_context(
427429
&self,
428430
serialized_plan: Arc<PreSerializedPlan>,
@@ -437,6 +439,7 @@ impl QueryExecutorImpl {
437439
))
438440
}
439441

442+
/// Currently, only used for physical planning, not even execution. TODO upgrade DF: Make fewer distinct SessionContexts.
440443
fn make_context(
441444
&self,
442445
query_planner: CubeQueryPlanner,
@@ -450,13 +453,13 @@ impl QueryExecutorImpl {
450453
.with_query_planner(Arc::new(query_planner))
451454
.with_aggregate_functions(registerable_arc_aggregate_udfs())
452455
.with_scalar_functions(registerable_arc_scalar_udfs())
453-
.with_physical_optimizer_rules(self.optimizer_rules())
456+
.with_physical_optimizer_rules(self.physical_optimizer_rules())
454457
.build();
455458
let ctx = SessionContext::new_with_state(session_state);
456459
Ok(Arc::new(ctx))
457460
}
458461

459-
fn optimizer_rules(&self) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
462+
fn physical_optimizer_rules(&self) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
460463
vec![
461464
// Cube rules
462465
Arc::new(PreOptimizeRule::new()),
@@ -481,6 +484,8 @@ impl QueryExecutorImpl {
481484
}
482485

483486
fn session_config(&self) -> SessionConfig {
487+
// Currently, only used for physical planning.
488+
484489
let mut config = self
485490
.metadata_cache_factory
486491
.make_session_config()
@@ -490,6 +495,14 @@ impl QueryExecutorImpl {
490495
.with_prefer_existing_sort(true)
491496
.with_round_robin_repartition(false);
492497
config.options_mut().optimizer.prefer_hash_join = false;
498+
// Redundant with the commented CoalesceBatches::new() line in `Self::optimizer_rules`
499+
config.options_mut().execution.coalesce_batches = false;
500+
// Not used in physical planning... included in QueryPlannerImpl::make_execution_context
501+
// too; we should try and dedup these two places.
502+
config
503+
.options_mut()
504+
.execution
505+
.dont_parallelize_sort_preserving_merge_exec_inputs = true;
493506
config
494507
}
495508
}

0 commit comments

Comments
 (0)