Skip to content

Commit 5f09187

Browse files
committed
chore(cubestore): Upgrade DF: Disable CoalesceBatches
1 parent 3f58992 commit 5f09187

File tree

3 files changed

+83
-100
lines changed

3 files changed

+83
-100
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 70 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -3192,11 +3192,10 @@ async fn planning_inplace_aggregate(service: Box<dyn SqlClient>) {
31923192
"PartiallySortedFinalAggregate, partitions: 1\
31933193
\n Worker, partitions: 1\
31943194
\n PartiallySortedPartialAggregate, partitions: 1\
3195-
\n CoalesceBatches, partitions: 1\
3196-
\n Filter, partitions: 1\
3197-
\n Scan, index: default:2:[2]:sort_on[url, segment, day], fields: *, partitions: 1\
3198-
\n Sort, partitions: 1\
3199-
\n Empty, partitions: 1"
3195+
\n Filter, partitions: 1\
3196+
\n Scan, index: default:2:[2]:sort_on[url, segment, day], fields: *, partitions: 1\
3197+
\n Sort, partitions: 1\
3198+
\n Empty, partitions: 1"
32003199
);
32013200
let p = service
32023201
.plan_query(
@@ -3210,11 +3209,10 @@ async fn planning_inplace_aggregate(service: Box<dyn SqlClient>) {
32103209
"PartiallySortedFinalAggregate, partitions: 1\
32113210
\n Worker, partitions: 1\
32123211
\n PartiallySortedPartialAggregate, partitions: 1\
3213-
\n CoalesceBatches, partitions: 1\
3214-
\n Filter, partitions: 1\
3215-
\n Scan, index: default:2:[2]:sort_on[url, segment, day], fields: *, partitions: 1\
3216-
\n Sort, partitions: 1\
3217-
\n Empty, partitions: 1"
3212+
\n Filter, partitions: 1\
3213+
\n Scan, index: default:2:[2]:sort_on[url, segment, day], fields: *, partitions: 1\
3214+
\n Sort, partitions: 1\
3215+
\n Empty, partitions: 1"
32183216
);
32193217
}
32203218

@@ -3295,10 +3293,9 @@ async fn planning_hints(service: Box<dyn SqlClient>) {
32953293
\n Worker, single_vals: [1]\
32963294
\n CoalescePartitions, single_vals: [1]\
32973295
\n Projection, [id3, id2], single_vals: [1]\
3298-
\n CoalesceBatches, single_vals: [0]\
3299-
\n Filter, single_vals: [0]\
3300-
\n Scan, index: default:1:[1], fields: [id2, id3]\
3301-
\n Empty"
3296+
\n Filter, single_vals: [0]\
3297+
\n Scan, index: default:1:[1], fields: [id2, id3]\
3298+
\n Empty"
33023299
);
33033300

33043301
// Removing single value columns should keep the sort order of the rest.
@@ -3309,11 +3306,10 @@ async fn planning_hints(service: Box<dyn SqlClient>) {
33093306
assert_eq!(
33103307
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
33113308
"Worker, sort_order: [0]\
3312-
\n CoalesceBatches, sort_order: [0]\
3313-
\n Filter, sort_order: [0]\
3314-
\n Scan, index: default:1:[1]:sort_on[id1, id2], fields: *, sort_order: [0, 1, 2]\
3315-
\n Sort, sort_order: [0, 1, 2]\
3316-
\n Empty"
3309+
\n Filter, sort_order: [0]\
3310+
\n Scan, index: default:1:[1]:sort_on[id1, id2], fields: *, sort_order: [0, 1, 2]\
3311+
\n Sort, sort_order: [0, 1, 2]\
3312+
\n Empty"
33173313
);
33183314
let p = service
33193315
.plan_query("SELECT id1, id3 FROM s.Data WHERE id2 = 234")
@@ -3322,12 +3318,11 @@ async fn planning_hints(service: Box<dyn SqlClient>) {
33223318
assert_eq!(
33233319
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
33243320
"Worker, sort_order: [0, 1]\
3325-
\n CoalesceBatches, sort_order: [0, 1]\
3326-
\n Filter, sort_order: [0, 1]\
3327-
\n CoalescePartitions, sort_order: [0, 1, 2]\
3328-
\n Scan, index: default:1:[1], fields: *, sort_order: [0, 1, 2]\
3329-
\n Sort, sort_order: [0, 1, 2]\
3330-
\n Empty"
3321+
\n Filter, sort_order: [0, 1]\
3322+
\n CoalescePartitions, sort_order: [0, 1, 2]\
3323+
\n Scan, index: default:1:[1], fields: *, sort_order: [0, 1, 2]\
3324+
\n Sort, sort_order: [0, 1, 2]\
3325+
\n Empty"
33313326
);
33323327
}
33333328

@@ -3380,17 +3375,15 @@ async fn planning_inplace_aggregate2(service: Box<dyn SqlClient>) {
33803375
\n LinearSingleAggregate\
33813376
\n CoalescePartitions\
33823377
\n Union\
3383-
\n CoalesceBatches\
3378+
\n Filter\
3379+
\n Scan, index: default:1:[1]:sort_on[allowed, site_id, url], fields: *, sort_order: [0, 1, 2, 3, 4]\
3380+
\n Sort, by: [allowed@0, site_id@1, url@2, day@3, hits@4], sort_order: [0, 1, 2, 3, 4]\
3381+
\n Empty\
3382+
\n CoalescePartitions\
33843383
\n Filter\
3385-
\n Scan, index: default:1:[1]:sort_on[allowed, site_id, url], fields: *, sort_order: [0, 1, 2, 3, 4]\
3384+
\n Scan, index: default:2:[2]:sort_on[allowed, site_id, url], fields: *, sort_order: [0, 1, 2, 3, 4]\
33863385
\n Sort, by: [allowed@0, site_id@1, url@2, day@3, hits@4], sort_order: [0, 1, 2, 3, 4]\
3387-
\n Empty\
3388-
\n CoalescePartitions\
3389-
\n CoalesceBatches\
3390-
\n Filter\
3391-
\n Scan, index: default:2:[2]:sort_on[allowed, site_id, url], fields: *, sort_order: [0, 1, 2, 3, 4]\
3392-
\n Sort, by: [allowed@0, site_id@1, url@2, day@3, hits@4], sort_order: [0, 1, 2, 3, 4]\
3393-
\n Empty"
3386+
\n Empty"
33943387
);
33953388
}
33963389

@@ -3643,12 +3636,11 @@ async fn planning_simple(service: Box<dyn SqlClient>) {
36433636
assert_eq!(
36443637
pp_phys_plan(p.worker.as_ref()),
36453638
"Worker\
3646-
\n CoalesceBatches\
3647-
\n Filter\
3648-
\n CoalescePartitions\
3649-
\n Scan, index: default:1:[1], fields: [id, amount]\
3650-
\n Sort\
3651-
\n Empty"
3639+
\n Filter\
3640+
\n CoalescePartitions\
3641+
\n Scan, index: default:1:[1], fields: [id, amount]\
3642+
\n Sort\
3643+
\n Empty"
36523644
);
36533645

36543646
let p = service
@@ -3669,12 +3661,11 @@ async fn planning_simple(service: Box<dyn SqlClient>) {
36693661
pp_phys_plan(p.worker.as_ref()),
36703662
"Sort\
36713663
\n Worker\
3672-
\n CoalesceBatches\
3673-
\n Filter\
3674-
\n CoalescePartitions\
3675-
\n Scan, index: default:1:[1], fields: [id, amount]\
3676-
\n Sort\
3677-
\n Empty"
3664+
\n Filter\
3665+
\n CoalescePartitions\
3666+
\n Scan, index: default:1:[1], fields: [id, amount]\
3667+
\n Sort\
3668+
\n Empty"
36783669
);
36793670

36803671
let p = service
@@ -3695,12 +3686,11 @@ async fn planning_simple(service: Box<dyn SqlClient>) {
36953686
pp_phys_plan(p.worker.as_ref()),
36963687
"GlobalLimit, n: 10\
36973688
\n Worker\
3698-
\n CoalesceBatches\
3699-
\n Filter\
3700-
\n CoalescePartitions\
3701-
\n Scan, index: default:1:[1], fields: [id, amount]\
3702-
\n Sort\
3703-
\n Empty"
3689+
\n Filter\
3690+
\n CoalescePartitions\
3691+
\n Scan, index: default:1:[1], fields: [id, amount]\
3692+
\n Sort\
3693+
\n Empty"
37043694
);
37053695

37063696
let p = service
@@ -3788,11 +3778,10 @@ async fn planning_filter_index_selection(service: Box<dyn SqlClient>) {
37883778
"SortedFinalAggregate\
37893779
\n Worker\
37903780
\n SortedPartialAggregate\
3791-
\n CoalesceBatches\
3792-
\n Filter\
3793-
\n Scan, index: cb:2:[2]:sort_on[c, b], fields: [b, c, amount]\
3794-
\n Sort\
3795-
\n Empty"
3781+
\n Filter\
3782+
\n Scan, index: cb:2:[2]:sort_on[c, b], fields: [b, c, amount]\
3783+
\n Sort\
3784+
\n Empty"
37963785
);
37973786

37983787
let p = service
@@ -3812,11 +3801,10 @@ async fn planning_filter_index_selection(service: Box<dyn SqlClient>) {
38123801
\n Worker\
38133802
\n CoalescePartitions\
38143803
\n LinearPartialAggregate\
3815-
\n CoalesceBatches\
3816-
\n Filter\
3817-
\n Scan, index: cb:2:[2], fields: [b, c, amount]\
3818-
\n Sort\
3819-
\n Empty"
3804+
\n Filter\
3805+
\n Scan, index: cb:2:[2], fields: [b, c, amount]\
3806+
\n Sort\
3807+
\n Empty"
38203808
);
38213809

38223810
let p = service
@@ -3837,11 +3825,10 @@ async fn planning_filter_index_selection(service: Box<dyn SqlClient>) {
38373825
"SortedFinalAggregate\
38383826
\n Worker\
38393827
\n SortedPartialAggregate\
3840-
\n CoalesceBatches\
3841-
\n Filter\
3842-
\n Scan, index: cb:2:[2]:sort_on[c, b], fields: [a, b, c, amount]\
3843-
\n Sort\
3844-
\n Empty"
3828+
\n Filter\
3829+
\n Scan, index: cb:2:[2]:sort_on[c, b], fields: [a, b, c, amount]\
3830+
\n Sort\
3831+
\n Empty"
38453832
);
38463833
}
38473834

@@ -4007,19 +3994,17 @@ async fn planning_3_table_joins(service: Box<dyn SqlClient>) {
40073994
\n MergeJoin, on: [product_id@1 = product_id@0]\
40083995
\n Projection, [order_id, product_id, customer_name]\
40093996
\n MergeJoin, on: [customer_id@1 = customer_id@0]\
4010-
\n CoalesceBatches\
4011-
\n Filter, predicate: product_id@2 = 125\
4012-
\n Scan, index: by_product_customer:3:[3]:sort_on[product_id, customer_id], fields: [order_id, customer_id, product_id], predicate: product_id = Int64(125)\
4013-
\n Sort\
4014-
\n Empty\
3997+
\n Filter, predicate: product_id@2 = 125\
3998+
\n Scan, index: by_product_customer:3:[3]:sort_on[product_id, customer_id], fields: [order_id, customer_id, product_id], predicate: product_id = Int64(125)\
3999+
\n Sort\
4000+
\n Empty\
40154001
\n Scan, index: default:4:[4]:sort_on[customer_id], fields: *\
40164002
\n Sort\
40174003
\n Empty\
4018-
\n CoalesceBatches\
4019-
\n Filter, predicate: product_id@0 = 125\
4020-
\n Scan, index: default:5:[5]:sort_on[product_id], fields: *, predicate: product_id = Int64(125)\
4021-
\n Sort\
4022-
\n Empty",
4004+
\n Filter, predicate: product_id@0 = 125\
4005+
\n Scan, index: default:5:[5]:sort_on[product_id], fields: *, predicate: product_id = Int64(125)\
4006+
\n Sort\
4007+
\n Empty",
40234008
);
40244009
}
40254010

@@ -7633,11 +7618,10 @@ async fn planning_aggregate_index(service: Box<dyn SqlClient>) {
76337618
"SortedFinalAggregate\
76347619
\n Worker\
76357620
\n SortedPartialAggregate\
7636-
\n CoalesceBatches\
7637-
\n Filter\
7638-
\n Scan, index: default:3:[3]:sort_on[a, b, c], fields: *\
7639-
\n Sort\
7640-
\n Empty"
7621+
\n Filter\
7622+
\n Scan, index: default:3:[3]:sort_on[a, b, c], fields: *\
7623+
\n Sort\
7624+
\n Empty"
76417625
);
76427626

76437627
let p = service
@@ -7679,11 +7663,10 @@ async fn planning_aggregate_index(service: Box<dyn SqlClient>) {
76797663
"SortedFinalAggregate\
76807664
\n Worker\
76817665
\n SortedPartialAggregate\
7682-
\n CoalesceBatches\
7683-
\n Filter\
7684-
\n Scan, index: aggr_index:2:[2]:sort_on[a, b], fields: [a, b, a_sum]\
7685-
\n Sort\
7686-
\n Empty"
7666+
\n Filter\
7667+
\n Scan, index: aggr_index:2:[2]:sort_on[a, b], fields: [a, b, a_sum]\
7668+
\n Sort\
7669+
\n Empty"
76877670
);
76887671
}
76897672

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ use datafusion::physical_expr::{
5555
Distribution, EquivalenceProperties, LexRequirement, PhysicalSortExpr, PhysicalSortRequirement,
5656
};
5757
use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
58-
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
5958
use datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
6059
use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
6160
use datafusion::physical_optimizer::join_selection::JoinSelection;
@@ -471,7 +470,8 @@ impl QueryExecutorImpl {
471470
Arc::new(EnforceSorting::new()),
472471
Arc::new(OptimizeAggregateOrder::new()),
473472
Arc::new(ProjectionPushdown::new()),
474-
Arc::new(CoalesceBatches::new()),
473+
// Also disabled before DF 46 upgrade; re-disabled because it uses too much memory.
474+
// Arc::new(CoalesceBatches::new()),
475475
Arc::new(OutputRequirements::new_remove_mode()),
476476
Arc::new(TopKAggregation::new()),
477477
Arc::new(ProjectionPushdown::new()),

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3344,16 +3344,15 @@ mod tests {
33443344
\n Worker\
33453345
\n CoalescePartitions\
33463346
\n LinearPartialAggregate\
3347-
\n CoalesceBatches\
3348-
\n Filter\
3349-
\n MergeSort\
3350-
\n Scan, index: default:1:[1]:sort_on[num], fields: *\
3351-
\n FilterByKeyRange\
3352-
\n CheckMemoryExec\
3353-
\n ParquetScan\
3354-
\n FilterByKeyRange\
3355-
\n CheckMemoryExec\
3356-
\n ParquetScan";
3347+
\n Filter\
3348+
\n MergeSort\
3349+
\n Scan, index: default:1:[1]:sort_on[num], fields: *\
3350+
\n FilterByKeyRange\
3351+
\n CheckMemoryExec\
3352+
\n ParquetScan\
3353+
\n FilterByKeyRange\
3354+
\n CheckMemoryExec\
3355+
\n ParquetScan";
33573356
let plan = pp_phys_plan_ext(plans.worker.as_ref(), &opts);
33583357
let p = plan_regexp.replace_all(&plan, "ParquetScan");
33593358
println!("pp {}", p);
@@ -4413,8 +4412,9 @@ mod tests {
44134412
match &worker_row
44144413
.values()[2] {
44154414
TableValue::String(pp_plan) => {
4415+
// CoalesceBatches is disabled; if reenabled, it is expected above Filter.
44164416
let regex = Regex::new(
4417-
r"LinearPartialAggregate\s+CoalesceBatches\s+Filter\s+Scan, index: default:1:\[1\], fields: \[platform, age, amount\]\s+ParquetScan, files: \S*\.chunk\.parquet"
4417+
r"LinearPartialAggregate\s+Filter\s+Scan, index: default:1:\[1\], fields: \[platform, age, amount\]\s+ParquetScan, files: \S*\.chunk\.parquet"
44184418
).unwrap();
44194419
let matches = regex.captures_iter(&pp_plan).count();
44204420
assert_eq!(matches, 1, "pp_plan = {}", pp_plan);

0 commit comments

Comments
 (0)