Skip to content

Commit bcbe47b

Browse files
committed
fix(cubesql): Support CAST projection split
1 parent 23330b6 commit bcbe47b

File tree

6 files changed

+371
-24
lines changed

6 files changed

+371
-24
lines changed

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use itertools::Itertools;
2626
use regex::{Captures, Regex};
2727
use serde_derive::*;
2828
use std::{
29-
any::Any, collections::HashMap, convert::TryInto, fmt, future::Future, iter, pin::Pin, result,
30-
sync::Arc,
29+
any::Any, cmp::min, collections::HashMap, convert::TryInto, fmt, future::Future, iter,
30+
pin::Pin, result, sync::Arc,
3131
};
3232

3333
#[derive(Debug, Clone, Deserialize)]
@@ -373,13 +373,20 @@ impl CubeScanWrapperNode {
373373
.cloned();
374374
if let Some(node) = cube_scan_node {
375375
let mut new_node = node.clone();
376-
new_node.request.limit = Some(query_limit);
376+
new_node.request.limit = Some(
377+
new_node
378+
.request
379+
.limit
380+
.map_or(query_limit, |limit| min(limit, query_limit)),
381+
);
377382
Arc::new(LogicalPlan::Extension(Extension {
378383
node: Arc::new(new_node),
379384
}))
380385
} else if let Some(node) = wrapped_select_node {
381386
let mut new_node = node.clone();
382-
new_node.limit = Some(query_limit as usize);
387+
new_node.limit = Some(new_node.limit.map_or(query_limit as usize, |limit| {
388+
min(limit, query_limit as usize)
389+
}));
383390
Arc::new(LogicalPlan::Extension(Extension {
384391
node: Arc::new(new_node),
385392
}))

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 149 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20028,6 +20028,8 @@ ORDER BY "source"."str0" ASC
2002820028
)
2002920029
}
2003020030

20031+
// TODO: __cubeJoinField for WrappedSelect
20032+
#[ignore]
2003120033
#[tokio::test]
2003220034
async fn test_sigma_row_count_cross_join() {
2003320035
if !Rewriter::sql_push_down_enabled() {
@@ -21644,7 +21646,7 @@ ORDER BY "source"."str0" ASC
2164421646
}
2164521647
init_logger();
2164621648

21647-
let logical_plan = convert_select_to_query_plan(
21649+
let query_plan = convert_select_to_query_plan(
2164821650
r#"
2164921651
SELECT
2165021652
(CAST("ta_1"."order_date" AS date) - CAST((CAST(EXTRACT(YEAR FROM "ta_1"."order_date") || '-' || EXTRACT(MONTH FROM "ta_1"."order_date") || '-01' AS DATE) + ((EXTRACT(MONTH FROM "ta_1"."order_date") - 1) * -1) * INTERVAL '1 month') AS date) + 1) AS "ca_1",
@@ -21658,8 +21660,25 @@ ORDER BY "source"."str0" ASC
2165821660
.to_string(),
2165921661
DatabaseProtocol::PostgreSQL,
2166021662
)
21661-
.await
21662-
.as_logical_plan();
21663+
.await;
21664+
let logical_plan = query_plan.as_logical_plan();
21665+
21666+
if Rewriter::sql_push_down_enabled() {
21667+
let sql = logical_plan
21668+
.find_cube_scan_wrapper()
21669+
.wrapped_sql
21670+
.unwrap()
21671+
.sql;
21672+
assert!(sql.contains("EXTRACT(YEAR"));
21673+
assert!(sql.contains("EXTRACT(MONTH"));
21674+
21675+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
21676+
println!(
21677+
"Physical plan: {}",
21678+
displayable(physical_plan.as_ref()).indent()
21679+
);
21680+
return;
21681+
}
2166321682

2166421683
assert_eq!(
2166521684
logical_plan.find_cube_scan().request,
@@ -21739,7 +21758,7 @@ ORDER BY "source"."str0" ASC
2173921758
}
2174021759
init_logger();
2174121760

21742-
let logical_plan = convert_select_to_query_plan(
21761+
let query_plan = convert_select_to_query_plan(
2174321762
r#"
2174421763
SELECT
2174521764
(MOD(CAST((CAST("ta_1"."order_date" AS date) - CAST(DATE '1970-01-01' AS date) + 3) AS numeric), 7) + 1) AS "ca_1",
@@ -21753,8 +21772,27 @@ ORDER BY "source"."str0" ASC
2175321772
.to_string(),
2175421773
DatabaseProtocol::PostgreSQL,
2175521774
)
21756-
.await
21757-
.as_logical_plan();
21775+
.await;
21776+
let logical_plan = query_plan.as_logical_plan();
21777+
21778+
// TODO: split on complex expressions?
21779+
// CAST(CAST(ta_1.order_date AS Date32) - CAST(CAST(Utf8("1970-01-01") AS Date32) AS Date32) + Int64(3) AS Decimal(38, 10))
21780+
if Rewriter::sql_push_down_enabled() {
21781+
let sql = logical_plan
21782+
.find_cube_scan_wrapper()
21783+
.wrapped_sql
21784+
.unwrap()
21785+
.sql;
21786+
assert!(sql.contains("\"limit\":1000"));
21787+
assert!(sql.contains("% 7"));
21788+
21789+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
21790+
println!(
21791+
"Physical plan: {}",
21792+
displayable(physical_plan.as_ref()).indent()
21793+
);
21794+
return;
21795+
}
2175821796

2175921797
assert_eq!(
2176021798
logical_plan.find_cube_scan().request,
@@ -21990,6 +22028,8 @@ ORDER BY "source"."str0" ASC
2199022028
);
2199122029
}
2199222030

22031+
// TODO: Can't generate SQL for literal: IntervalMonthDayNano
22032+
#[ignore]
2199322033
#[tokio::test]
2199422034
async fn test_sigma_sunday_week_push_down() {
2199522035
if !Rewriter::sql_push_down_enabled() {
@@ -22122,9 +22162,27 @@ ORDER BY "source"."str0" ASC
2212222162
DatabaseProtocol::PostgreSQL,
2212322163
)
2212422164
.await;
22165+
let logical_plan = query_plan.as_logical_plan();
22166+
22167+
if Rewriter::sql_push_down_enabled() {
22168+
let sql = logical_plan
22169+
.find_cube_scan_wrapper()
22170+
.wrapped_sql
22171+
.unwrap()
22172+
.sql;
22173+
assert!(sql.contains("LIMIT 101"));
22174+
assert!(sql.contains("ORDER BY"));
22175+
22176+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
22177+
println!(
22178+
"Physical plan: {}",
22179+
displayable(physical_plan.as_ref()).indent()
22180+
);
22181+
return;
22182+
}
2212522183

2212622184
assert_eq!(
22127-
query_plan.as_logical_plan().find_cube_scan().request,
22185+
logical_plan.find_cube_scan().request,
2212822186
V1LoadRequestQuery {
2212922187
measures: Some(vec![]),
2213022188
dimensions: Some(vec!["KibanaSampleDataEcommerce.order_date".to_string()]),
@@ -23801,4 +23859,88 @@ LIMIT {{ limit }}{% endif %}"#.to_string(),
2380123859
}
2380223860
)
2380323861
}
23862+
23863+
#[tokio::test]
23864+
async fn test_quicksight_cast_with_limit_no_sort() {
23865+
if !Rewriter::sql_push_down_enabled() {
23866+
return;
23867+
}
23868+
init_logger();
23869+
23870+
let logical_plan = convert_select_to_query_plan(
23871+
r#"
23872+
SELECT
23873+
"customer_gender" AS "customer_gender",
23874+
CAST("has_subscription" AS INTEGER) AS "has_subscription"
23875+
FROM "public"."KibanaSampleDataEcommerce"
23876+
GROUP BY
23877+
"customer_gender",
23878+
CAST("has_subscription" AS INTEGER)
23879+
LIMIT 500
23880+
"#
23881+
.to_string(),
23882+
DatabaseProtocol::PostgreSQL,
23883+
)
23884+
.await
23885+
.as_logical_plan();
23886+
23887+
assert_eq!(
23888+
logical_plan.find_cube_scan().request,
23889+
V1LoadRequestQuery {
23890+
measures: Some(vec![]),
23891+
dimensions: Some(vec![
23892+
"KibanaSampleDataEcommerce.customer_gender".to_string(),
23893+
"KibanaSampleDataEcommerce.has_subscription".to_string(),
23894+
]),
23895+
segments: Some(vec![]),
23896+
time_dimensions: None,
23897+
order: None,
23898+
limit: Some(500),
23899+
offset: None,
23900+
filters: None,
23901+
ungrouped: None,
23902+
}
23903+
)
23904+
}
23905+
23906+
#[tokio::test]
23907+
async fn test_quicksight_cast_with_limit_and_sort() {
23908+
if !Rewriter::sql_push_down_enabled() {
23909+
return;
23910+
}
23911+
init_logger();
23912+
23913+
let query_plan = convert_select_to_query_plan(
23914+
r#"
23915+
SELECT
23916+
"customer_gender" AS "customer_gender",
23917+
CAST("has_subscription" AS INTEGER) AS "has_subscription"
23918+
FROM "public"."KibanaSampleDataEcommerce"
23919+
GROUP BY
23920+
"customer_gender",
23921+
CAST("has_subscription" AS INTEGER)
23922+
ORDER BY
23923+
1 NULLS LAST,
23924+
2 NULLS LAST
23925+
LIMIT 250
23926+
"#
23927+
.to_string(),
23928+
DatabaseProtocol::PostgreSQL,
23929+
)
23930+
.await;
23931+
23932+
let logical_plan = query_plan.as_logical_plan();
23933+
assert!(logical_plan
23934+
.find_cube_scan_wrapper()
23935+
.wrapped_sql
23936+
.unwrap()
23937+
.sql
23938+
.contains("LIMIT 250"));
23939+
23940+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
23941+
println!(
23942+
"Physical plan: {}",
23943+
displayable(physical_plan.as_ref()).indent()
23944+
);
23945+
}
2380423946
}

rust/cubesql/cubesql/src/compile/rewrite/cost.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ impl BestCubePlan {
3232
/// - `member_errors` > `cube_members` - extra cube members may be required (e.g. CASE)
3333
/// - `member_errors` > `wrapper_nodes` - use SQL push down where possible if cube scan can't be detected
3434
/// - `non_pushed_down_window` > `wrapper_nodes` - prefer to always push down window functions
35+
/// - `non_pushed_down_limit_sort` > `wrapper_nodes` - prefer to always push down limit-sort expressions
3536
/// - match errors by priority - optimize for more specific errors
3637
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3738
pub struct CubePlanCost {
@@ -41,10 +42,11 @@ pub struct CubePlanCost {
4142
non_detected_cube_scans: i64,
4243
unwrapped_subqueries: usize,
4344
member_errors: i64,
45+
ungrouped_aggregates: usize,
4446
// TODO if pre-aggregation can be used for window functions, then it'd be suboptimal
4547
non_pushed_down_window: i64,
4648
non_pushed_down_grouping_sets: i64,
47-
ungrouped_aggregates: usize,
49+
non_pushed_down_limit_sort: i64,
4850
wrapper_nodes: i64,
4951
wrapped_select_ungrouped_scan: usize,
5052
ast_size_outside_wrapper: usize,
@@ -80,10 +82,28 @@ impl CubePlanState {
8082
}
8183
}
8284

85+
#[derive(Debug, Clone, Eq, PartialEq)]
86+
pub enum SortState {
87+
None,
88+
Current,
89+
DirectChild,
90+
}
91+
92+
impl SortState {
93+
pub fn add_child(&self, other: &Self) -> Self {
94+
match (self, other) {
95+
(Self::Current, _) => Self::Current,
96+
(_, Self::Current) | (Self::DirectChild, _) => Self::DirectChild,
97+
_ => Self::None,
98+
}
99+
}
100+
}
101+
83102
#[derive(Debug, Clone, Eq, PartialEq)]
84103
pub struct CubePlanCostAndState {
85104
pub cost: CubePlanCost,
86105
pub state: CubePlanState,
106+
pub sort_state: SortState,
87107
}
88108

89109
impl PartialOrd for CubePlanCostAndState {
@@ -100,17 +120,18 @@ impl Ord for CubePlanCostAndState {
100120

101121
impl CubePlanCostAndState {
102122
pub fn add_child(&self, other: &Self) -> Self {
103-
let state = self.state.add_child(&other.state);
104123
Self {
105124
cost: self.cost.add_child(&other.cost),
106-
state,
125+
state: self.state.add_child(&other.state),
126+
sort_state: self.sort_state.add_child(&other.sort_state),
107127
}
108128
}
109129

110130
pub fn finalize(&self, enode: &LogicalPlanLanguage) -> Self {
111131
Self {
112-
cost: self.cost.finalize(&self.state, enode),
132+
cost: self.cost.finalize(&self.state, &self.sort_state, enode),
113133
state: self.state.clone(),
134+
sort_state: self.sort_state.clone(),
114135
}
115136
}
116137
}
@@ -130,6 +151,8 @@ impl CubePlanCost {
130151
non_pushed_down_window: self.non_pushed_down_window + other.non_pushed_down_window,
131152
non_pushed_down_grouping_sets: self.non_pushed_down_grouping_sets
132153
+ other.non_pushed_down_grouping_sets,
154+
non_pushed_down_limit_sort: self.non_pushed_down_limit_sort
155+
+ other.non_pushed_down_limit_sort,
133156
member_errors: self.member_errors + other.member_errors,
134157
cube_members: self.cube_members + other.cube_members,
135158
errors: self.errors + other.errors,
@@ -155,7 +178,12 @@ impl CubePlanCost {
155178
}
156179
}
157180

158-
pub fn finalize(&self, state: &CubePlanState, enode: &LogicalPlanLanguage) -> Self {
181+
pub fn finalize(
182+
&self,
183+
state: &CubePlanState,
184+
sort_state: &SortState,
185+
enode: &LogicalPlanLanguage,
186+
) -> Self {
159187
Self {
160188
replacers: self.replacers,
161189
table_scans: self.table_scans,
@@ -173,6 +201,10 @@ impl CubePlanCost {
173201
CubePlanState::Unwrapped(_) => self.non_pushed_down_grouping_sets,
174202
CubePlanState::Wrapper => 0,
175203
},
204+
non_pushed_down_limit_sort: match sort_state {
205+
SortState::DirectChild => self.non_pushed_down_limit_sort,
206+
_ => 0,
207+
},
176208
cube_members: self.cube_members,
177209
errors: self.errors,
178210
structure_points: self.structure_points,
@@ -266,6 +298,11 @@ impl CostFunction<LogicalPlanLanguage> for BestCubePlan {
266298
_ => 0,
267299
};
268300

301+
let non_pushed_down_limit_sort = match enode {
302+
LogicalPlanLanguage::Limit(_) => 1,
303+
_ => 0,
304+
};
305+
269306
let ast_size_inside_wrapper = match enode {
270307
LogicalPlanLanguage::WrappedSelect(_) => 1,
271308
_ => 0,
@@ -409,6 +446,7 @@ impl CostFunction<LogicalPlanLanguage> for BestCubePlan {
409446
member_errors,
410447
non_pushed_down_window,
411448
non_pushed_down_grouping_sets,
449+
non_pushed_down_limit_sort,
412450
cube_members,
413451
errors: this_errors,
414452
time_dimensions_used_as_dimensions,
@@ -433,6 +471,10 @@ impl CostFunction<LogicalPlanLanguage> for BestCubePlan {
433471
LogicalPlanLanguage::CubeScanWrapper(_) => CubePlanState::Wrapper,
434472
_ => CubePlanState::Unwrapped(ast_size_outside_wrapper),
435473
},
474+
sort_state: match enode {
475+
LogicalPlanLanguage::Sort(_) => SortState::Current,
476+
_ => SortState::None,
477+
},
436478
};
437479
let res = enode
438480
.children()

0 commit comments

Comments
 (0)