Skip to content

Commit 67ce291

Browse files
committed
chore(cubestore): Upgrade DF: Test MergeSort node present when ClusterSend has multiple partitions with sorted aggregate
1 parent 3c64eb2 commit 67ce291

File tree

2 files changed

+91
-48
lines changed

2 files changed

+91
-48
lines changed

rust/cubestore/cubestore-sql-tests/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub fn run_sql_tests(
3939
extra_args: Vec<String>,
4040
runner: impl Fn(/*test_name*/ &str, TestFn) + RefUnwindSafe + Send + Sync + Clone + 'static,
4141
) {
42-
let tests = sql_tests()
42+
let tests = sql_tests(prefix)
4343
.into_iter()
4444
.map(|(name, test_fn)| {
4545
let runner = runner.clone();

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 90 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub type TestFn = Box<
3232
+ Sync
3333
+ RefUnwindSafe,
3434
>;
35-
pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
35+
pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> {
3636
return vec![
3737
t("insert", insert),
3838
t("select_test", select_test),
@@ -217,9 +217,9 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
217217
"unique_key_and_multi_measures_for_stream_table",
218218
unique_key_and_multi_measures_for_stream_table,
219219
),
220-
t(
220+
(
221221
"unique_key_and_multi_partitions",
222-
unique_key_and_multi_partitions,
222+
{ let prefix = prefix.to_owned(); Box::new(move |service| { Box::pin(unique_key_and_multi_partitions(prefix.clone(), service)) }) },
223223
),
224224
t(
225225
"unique_key_and_multi_partitions_hash_aggregate",
@@ -2904,41 +2904,43 @@ async fn planning_inplace_aggregate(service: Box<dyn SqlClient>) {
29042904
.plan_query("SELECT url, SUM(hits) FROM s.Data GROUP BY 1")
29052905
.await
29062906
.unwrap();
2907+
let pp_opts = PPOptions { show_partitions: true, ..PPOptions::none()};
29072908
assert_eq!(
2908-
pp_phys_plan(p.router.as_ref()),
2909-
"SortedFinalAggregate\
2909+
pp_phys_plan_ext(p.router.as_ref(), &pp_opts),
2910+
"SortedFinalAggregate, partitions: 1\
29102911
\n ClusterSend, partitions: [[1]]"
29112912
);
29122913
assert_eq!(
2913-
pp_phys_plan(p.worker.as_ref()),
2914-
"SortedFinalAggregate\
2915-
\n Worker\
2916-
\n SortedPartialAggregate\
2917-
\n Scan, index: default:1:[1]:sort_on[url], fields: [url, hits]\
2918-
\n Sort\
2919-
\n Empty"
2914+
pp_phys_plan_ext(p.worker.as_ref(), &pp_opts),
2915+
"SortedFinalAggregate, partitions: 1\
2916+
\n Worker, partitions: 1\
2917+
\n SortedPartialAggregate, partitions: 1\
2918+
\n Scan, index: default:1:[1]:sort_on[url], fields: [url, hits], partitions: 1\
2919+
\n Sort, partitions: 1\
2920+
\n Empty, partitions: 1"
29202921
);
29212922

29222923
// When there is no index, we fallback to inplace aggregates.
29232924
let p = service
29242925
.plan_query("SELECT day, SUM(hits) FROM s.Data GROUP BY 1")
29252926
.await
29262927
.unwrap();
2928+
// TODO: Can we not have CoalescePartitions? We don't want.
29272929
assert_eq!(
2928-
pp_phys_plan(p.router.as_ref()),
2929-
"LinearFinalAggregate\
2930-
\n CoalescePartitions\
2930+
pp_phys_plan_ext(p.router.as_ref(), &pp_opts),
2931+
"LinearFinalAggregate, partitions: 1\
2932+
\n CoalescePartitions, partitions: 1\
29312933
\n ClusterSend, partitions: [[1]]"
29322934
);
29332935
assert_eq!(
2934-
pp_phys_plan(p.worker.as_ref()),
2935-
"LinearFinalAggregate\
2936-
\n CoalescePartitions\
2937-
\n Worker\
2938-
\n CoalescePartitions\
2939-
\n LinearPartialAggregate\
2940-
\n Scan, index: default:1:[1], fields: [day, hits]\
2941-
\n Empty"
2936+
pp_phys_plan_ext(p.worker.as_ref(), &pp_opts),
2937+
"LinearFinalAggregate, partitions: 1\
2938+
\n CoalescePartitions, partitions: 1\
2939+
\n Worker, partitions: 1\
2940+
\n CoalescePartitions, partitions: 1\
2941+
\n LinearPartialAggregate, partitions: 1\
2942+
\n Scan, index: default:1:[1], fields: [day, hits], partitions: 1\
2943+
\n Empty, partitions: 1"
29422944
);
29432945

29442946
service
@@ -2952,35 +2954,35 @@ async fn planning_inplace_aggregate(service: Box<dyn SqlClient>) {
29522954
)
29532955
.await
29542956
.unwrap();
2955-
let phys_plan = pp_phys_plan(p.worker.as_ref());
2957+
let phys_plan = pp_phys_plan_ext(p.worker.as_ref(), &pp_opts);
29562958
assert_eq!(
29572959
phys_plan,
2958-
"PartiallySortedFinalAggregate\
2959-
\n Worker\
2960-
\n PartiallySortedPartialAggregate\
2961-
\n CoalesceBatchesExec\
2962-
\n Filter\
2963-
\n Scan, index: default:2:[2]:sort_on[url, segment, day], fields: *\
2964-
\n Sort\
2965-
\n Empty"
2960+
"PartiallySortedFinalAggregate, partitions: 1\
2961+
\n Worker, partitions: 1\
2962+
\n PartiallySortedPartialAggregate, partitions: 1\
2963+
\n CoalesceBatchesExec, partitions: 1\
2964+
\n Filter, partitions: 1\
2965+
\n Scan, index: default:2:[2]:sort_on[url, segment, day], fields: *, partitions: 1\
2966+
\n Sort, partitions: 1\
2967+
\n Empty, partitions: 1"
29662968
);
29672969
let p = service
29682970
.plan_query(
29692971
"SELECT url, day, SUM(hits) FROM s.DataBool where segment = false GROUP BY 1, 2",
29702972
)
29712973
.await
29722974
.unwrap();
2973-
let phys_plan = pp_phys_plan(p.worker.as_ref());
2975+
let phys_plan = pp_phys_plan_ext(p.worker.as_ref(), &pp_opts);
29742976
assert_eq!(
29752977
phys_plan,
2976-
"PartiallySortedFinalAggregate\
2977-
\n Worker\
2978-
\n PartiallySortedPartialAggregate\
2979-
\n CoalesceBatchesExec\
2980-
\n Filter\
2981-
\n Scan, index: default:2:[2]:sort_on[url, segment, day], fields: *\
2982-
\n Sort\
2983-
\n Empty"
2978+
"PartiallySortedFinalAggregate, partitions: 1\
2979+
\n Worker, partitions: 1\
2980+
\n PartiallySortedPartialAggregate, partitions: 1\
2981+
\n CoalesceBatchesExec, partitions: 1\
2982+
\n Filter, partitions: 1\
2983+
\n Scan, index: default:2:[2]:sort_on[url, segment, day], fields: *, partitions: 1\
2984+
\n Sort, partitions: 1\
2985+
\n Empty, partitions: 1"
29842986
);
29852987
}
29862988

@@ -3503,7 +3505,6 @@ async fn planning_simple(service: Box<dyn SqlClient>) {
35033505
)
35043506
.await
35053507
.unwrap();
3506-
// TODO: test MergeSort node is present if ClusterSend has multiple partitions.
35073508
assert_eq!(
35083509
pp_phys_plan(p.router.as_ref()),
35093510
"SortedFinalAggregate\
@@ -7124,7 +7125,7 @@ async fn unique_key_and_multi_measures_for_stream_table(service: Box<dyn SqlClie
71247125
);
71257126
}
71267127

7127-
async fn unique_key_and_multi_partitions(service: Box<dyn SqlClient>) {
7128+
async fn unique_key_and_multi_partitions(prefix: String, service: Box<dyn SqlClient>) {
71287129
service.exec_query("CREATE SCHEMA test").await.unwrap();
71297130
service.exec_query("CREATE TABLE test.unique_parts1 (a int, b int, c int, e int, val int) unique key (a, b, c, e) ").await.unwrap();
71307131
service.exec_query("CREATE TABLE test.unique_parts2 (a int, b int, c int, e int, val int) unique key (a, b, c, e) ").await.unwrap();
@@ -7167,13 +7168,15 @@ async fn unique_key_and_multi_partitions(service: Box<dyn SqlClient>) {
71677168
.await
71687169
.unwrap();
71697170

7170-
let r = service
7171-
.exec_query(
7172-
"SELECT a, b FROM (
7171+
let query = "SELECT a, b FROM (
71737172
SELECT * FROM test.unique_parts1
71747173
UNION ALL
71757174
SELECT * FROM test.unique_parts2
7176-
) `tt` GROUP BY 1, 2 ORDER BY 1, 2 LIMIT 100",
7175+
) `tt` GROUP BY 1, 2 ORDER BY 1, 2 LIMIT 100";
7176+
7177+
let r = service
7178+
.exec_query(
7179+
query,
71777180
)
71787181
.await
71797182
.unwrap();
@@ -7182,6 +7185,46 @@ async fn unique_key_and_multi_partitions(service: Box<dyn SqlClient>) {
71827185
to_rows(&r),
71837186
rows(&[(1, 1), (2, 2), (3, 3), (4, 4), (11, 11), (22, 22)])
71847187
);
7188+
7189+
let test_multiple_partitions = match prefix.as_str() {
7190+
"cluster" => true,
7191+
"in_process" => false,
7192+
"multi_process" => false,
7193+
_ => false,
7194+
};
7195+
7196+
// Assert that we get a MergeSort node when there are multiple partitions.
7197+
if test_multiple_partitions {
7198+
let plan = service.plan_query(query).await.unwrap();
7199+
7200+
assert_eq!(pp_phys_plan_ext(plan.router.as_ref(), &PPOptions{ show_partitions: true, ..PPOptions::none()}),
7201+
"Sort, fetch: 100, partitions: 1\
7202+
\n SortedFinalAggregate, partitions: 1\
7203+
\n MergeSort, partitions: 1\
7204+
\n ClusterSend, partitions: [[2], [1]]");
7205+
assert_eq!(pp_phys_plan_ext(plan.worker.as_ref(), &PPOptions{ show_partitions: true, ..PPOptions::none()}),
7206+
"Sort, fetch: 100, partitions: 1\
7207+
\n SortedFinalAggregate, partitions: 1\
7208+
\n MergeSort, partitions: 1\
7209+
\n Worker, partitions: 2\
7210+
\n GlobalLimit, n: 100, partitions: 1\
7211+
\n SortedPartialAggregate, partitions: 1\
7212+
\n MergeSort, partitions: 1\
7213+
\n Union, partitions: 2\
7214+
\n Projection, [a, b], partitions: 1\
7215+
\n LastRowByUniqueKey, partitions: 1\
7216+
\n MergeSort, partitions: 1\
7217+
\n Scan, index: default:1:[1]:sort_on[a, b], fields: [a, b, c, e, __seq], partitions: 2\
7218+
\n FilterByKeyRange, partitions: 1\
7219+
\n MemoryScan, partitions: 1\
7220+
\n FilterByKeyRange, partitions: 1\
7221+
\n MemoryScan, partitions: 1\
7222+
\n Projection, [a, b], partitions: 1\
7223+
\n LastRowByUniqueKey, partitions: 1\
7224+
\n Scan, index: default:2:[2]:sort_on[a, b], fields: [a, b, c, e, __seq], partitions: 1\
7225+
\n FilterByKeyRange, partitions: 1\
7226+
\n MemoryScan, partitions: 1");
7227+
}
71857228
}
71867229

71877230
async fn unique_key_and_multi_partitions_hash_aggregate(service: Box<dyn SqlClient>) {

0 commit comments

Comments
 (0)