Skip to content

Commit 9d821cc

Browse files
KSDaemonmarianore-muttdata
authored andcommitted
fix(schema-compiler): Fix BigQuery DATE_ADD push down template for years/quarters/months (cube-js#9432)
* fix(schema-compiler): Fix BigQuery DATE_ADD push down template for years/quarters/months * add tests for BQ date_add * fix tests * add interval + date_part for date_add udf * fix template * add/fix tests
1 parent 729b21b commit 9d821cc

File tree

3 files changed

+187
-25
lines changed

3 files changed

+187
-25
lines changed

packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,21 @@ export class BigqueryQuery extends BaseQuery {
186186
}
187187

188188
public subtractTimestampInterval(date, interval) {
189-
return `TIMESTAMP_SUB(${date}, INTERVAL ${this.formatInterval(interval)[0]})`;
189+
const [intervalFormatted, timeUnit] = this.formatInterval(interval);
190+
if (['YEAR', 'MONTH', 'QUARTER'].includes(timeUnit)) {
191+
return this.timeStampCast(`DATETIME_SUB(DATETIME(${date}), INTERVAL ${intervalFormatted})`);
192+
}
193+
194+
return `TIMESTAMP_SUB(${date}, INTERVAL ${intervalFormatted})`;
190195
}
191196

192197
public addTimestampInterval(date, interval) {
193-
return `TIMESTAMP_ADD(${date}, INTERVAL ${this.formatInterval(interval)[0]})`;
198+
const [intervalFormatted, timeUnit] = this.formatInterval(interval);
199+
if (['YEAR', 'MONTH', 'QUARTER'].includes(timeUnit)) {
200+
return this.timeStampCast(`DATETIME_ADD(DATETIME(${date}), INTERVAL ${intervalFormatted})`);
201+
}
202+
203+
return `TIMESTAMP_ADD(${date}, INTERVAL ${intervalFormatted})`;
194204
}
195205

196206
public nowTimestampSql() {
@@ -242,7 +252,7 @@ export class BigqueryQuery extends BaseQuery {
242252
templates.functions.STRPOS = 'STRPOS({{ args_concat }})';
243253
templates.functions.DATEDIFF = 'DATETIME_DIFF(CAST({{ args[2] }} AS DATETIME), CAST({{ args[1] }} AS DATETIME), {{ date_part }})';
244254
// DATEADD is being rewritten to DATE_ADD
245-
// templates.functions.DATEADD = 'DATETIME_ADD(CAST({{ args[2] }} AS DATETTIME), INTERVAL {{ interval }} {{ date_part }})';
255+
templates.functions.DATE_ADD = '{% if date_part|upper in [\'YEAR\', \'MONTH\', \'QUARTER\'] %}TIMESTAMP(DATETIME_ADD(DATETIME({{ args[0] }}), INTERVAL {{ interval }} {{ date_part }})){% else %}TIMESTAMP_ADD({{ args[0] }}, INTERVAL {{ interval }} {{ date_part }}){% endif %}';
246256
templates.functions.CURRENTDATE = 'CURRENT_DATE';
247257
delete templates.functions.TO_CHAR;
248258
templates.expressions.binary = '{% if op == \'%\' %}MOD({{ left }}, {{ right }}){% else %}({{ left }} {{ op }} {{ right }}){% endif %}';

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2491,10 +2491,47 @@ impl CubeScanWrapperNode {
24912491
if DATE_PART_REGEX.is_match(date_part) {
24922492
Ok(Some(date_part.to_string()))
24932493
} else {
2494-
Err(date_part_err(date_part))
2494+
Err(date_part_err(date_part.to_string()))
24952495
}
24962496
}
2497-
_ => Err(date_part_err(&args[0].to_string())),
2497+
_ => Err(date_part_err(args[0].to_string())),
2498+
},
2499+
"date_add" => match &args[1] {
2500+
Expr::Literal(ScalarValue::IntervalDayTime(Some(interval))) => {
2501+
let days = (*interval >> 32) as i32;
2502+
let ms = (*interval & 0xFFFF_FFFF) as i32;
2503+
2504+
if days != 0 && ms == 0 {
2505+
Ok(Some("DAY".to_string()))
2506+
} else if ms != 0 && days == 0 {
2507+
Ok(Some("MILLISECOND".to_string()))
2508+
} else {
2509+
Err(DataFusionError::Internal(format!(
2510+
"Unsupported mixed IntervalDayTime: days = {days}, ms = {ms}"
2511+
)))
2512+
}
2513+
}
2514+
Expr::Literal(ScalarValue::IntervalYearMonth(Some(_months))) => {
2515+
Ok(Some("MONTH".to_string()))
2516+
}
2517+
Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(interval))) => {
2518+
let months = (interval >> 96) as i32;
2519+
let days = ((interval >> 64) & 0xFFFF_FFFF) as i32;
2520+
let nanos = *interval as i64;
2521+
2522+
if months != 0 && days == 0 && nanos == 0 {
2523+
Ok(Some("MONTH".to_string()))
2524+
} else if days != 0 && months == 0 && nanos == 0 {
2525+
Ok(Some("DAY".to_string()))
2526+
} else if nanos != 0 && months == 0 && days == 0 {
2527+
Ok(Some("NANOSECOND".to_string()))
2528+
} else {
2529+
Err(DataFusionError::Internal(format!(
2530+
"Unsupported mixed IntervalMonthDayNano: months = {months}, days = {days}, nanos = {nanos}"
2531+
)))
2532+
}
2533+
}
2534+
_ => Err(date_part_err(args[1].to_string())),
24982535
},
24992536
_ => Ok(None),
25002537
}?;
@@ -2507,6 +2544,43 @@ impl CubeScanWrapperNode {
25072544
"Can't generate SQL for scalar function: interval must be Int64"
25082545
))),
25092546
},
2547+
"date_add" => match &args[1] {
2548+
Expr::Literal(ScalarValue::IntervalDayTime(Some(interval))) => {
2549+
let days = (*interval >> 32) as i32;
2550+
let ms = (*interval & 0xFFFF_FFFF) as i32;
2551+
2552+
if days != 0 && ms == 0 {
2553+
Ok(Some(days.to_string()))
2554+
} else if ms != 0 && days == 0 {
2555+
Ok(Some(ms.to_string()))
2556+
} else {
2557+
Err(DataFusionError::Internal(format!(
2558+
"Unsupported mixed IntervalDayTime: days = {days}, ms = {ms}"
2559+
)))
2560+
}
2561+
}
2562+
Expr::Literal(ScalarValue::IntervalYearMonth(Some(months))) => {
2563+
Ok(Some(months.to_string()))
2564+
}
2565+
Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(interval))) => {
2566+
let months = (interval >> 96) as i32;
2567+
let days = ((interval >> 64) & 0xFFFF_FFFF) as i32;
2568+
let nanos = *interval as i64;
2569+
2570+
if months != 0 && days == 0 && nanos == 0 {
2571+
Ok(Some(months.to_string()))
2572+
} else if days != 0 && months == 0 && nanos == 0 {
2573+
Ok(Some(days.to_string()))
2574+
} else if nanos != 0 && months == 0 && days == 0 {
2575+
Ok(Some(nanos.to_string()))
2576+
} else {
2577+
Err(DataFusionError::Internal(format!(
2578+
"Unsupported mixed IntervalMonthDayNano: months = {months}, days = {days}, nanos = {nanos}"
2579+
)))
2580+
}
2581+
}
2582+
_ => Err(date_part_err(args[1].to_string())),
2583+
},
25102584
_ => Ok(None),
25112585
}?;
25122586
let mut sql_args = Vec::new();

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 98 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14624,19 +14624,19 @@ ORDER BY "source"."str0" ASC
1462414624
assert!(sql.contains("EXTRACT(EPOCH FROM"));
1462514625
}
1462614626

14627-
// redshift-dateadd-[literal-date32-]to-interval rewrites DATEADD to DATE_ADD
1462814627
#[tokio::test]
14629-
#[ignore]
1463014628
async fn test_dateadd_push_down() {
1463114629
if !Rewriter::sql_push_down_enabled() {
1463214630
return;
1463314631
}
1463414632
init_testing_logger();
1463514633

14634+
// Redshift function DATEADD
1463614635
let query_plan = convert_select_to_query_plan(
1463714636
"
1463814637
SELECT DATEADD(DAY, 7, order_date) AS d
1463914638
FROM KibanaSampleDataEcommerce AS k
14639+
WHERE LOWER(customer_gender) = 'test'
1464014640
GROUP BY 1
1464114641
ORDER BY 1 DESC
1464214642
"
@@ -14652,25 +14652,24 @@ ORDER BY "source"."str0" ASC
1465214652
);
1465314653

1465414654
let logical_plan = query_plan.as_logical_plan();
14655-
assert!(logical_plan
14656-
.find_cube_scan_wrapped_sql()
14657-
.wrapped_sql
14658-
.sql
14659-
.contains("DATEADD(day, 7,"));
14655+
let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;
14656+
// redshift-dateadd-[literal-date32-]to-interval rewrites DATEADD to DATE_ADD
14657+
assert!(sql.contains("DATE_ADD("));
14658+
assert!(sql.contains("INTERVAL '7 DAY')"));
1466014659

14661-
// BigQuery
14660+
// BigQuery + Postgres DATE_ADD + DAYS
14661+
let bq_templates = vec![("functions/DATE_ADD".to_string(), "{% if date_part|upper in ['YEAR', 'MONTH', 'QUARTER'] %}TIMESTAMP(DATETIME_ADD(DATETIME({{ args[0] }}), INTERVAL {{ interval }} {{ date_part }})){% else %}TIMESTAMP_ADD({{ args[0] }}, INTERVAL {{ interval }} {{ date_part }}){% endif %}".to_string())];
1466214662
let query_plan = convert_select_to_query_plan_customized(
1466314663
"
14664-
SELECT DATEADD(DAY, 7, order_date) AS d
14664+
SELECT DATE_ADD(order_date, INTERVAL '7 DAYS') AS d
1466514665
FROM KibanaSampleDataEcommerce AS k
14666+
WHERE LOWER(customer_gender) = 'test'
1466614667
GROUP BY 1
1466714668
ORDER BY 1 DESC
1466814669
"
1466914670
.to_string(),
1467014671
DatabaseProtocol::PostgreSQL,
14671-
vec![
14672-
("functions/DATEADD".to_string(), "DATETIME_ADD(CAST({{ args[2] }} AS DATETTIME), INTERVAL {{ interval }} {{ date_part }})".to_string()),
14673-
],
14672+
bq_templates.clone(),
1467414673
)
1467514674
.await;
1467614675

@@ -14682,23 +14681,101 @@ ORDER BY "source"."str0" ASC
1468214681

1468314682
let logical_plan = query_plan.as_logical_plan();
1468414683
let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;
14685-
assert!(sql.contains("DATETIME_ADD(CAST("));
14686-
assert!(sql.contains("INTERVAL 7 day)"));
14684+
assert!(sql.contains("TIMESTAMP_ADD("));
14685+
assert!(sql.contains("INTERVAL 7 DAY)"));
1468714686

14688-
// Postgres
14687+
// BigQuery + Redshift DATEADD + DAYS
14688+
let bq_templates = vec![("functions/DATE_ADD".to_string(), "{% if date_part|upper in ['YEAR', 'MONTH', 'QUARTER'] %}TIMESTAMP(DATETIME_ADD(DATETIME({{ args[0] }}), INTERVAL {{ interval }} {{ date_part }})){% else %}TIMESTAMP_ADD({{ args[0] }}, INTERVAL {{ interval }} {{ date_part }}){% endif %}".to_string())];
1468914689
let query_plan = convert_select_to_query_plan_customized(
1469014690
"
1469114691
SELECT DATEADD(DAY, 7, order_date) AS d
1469214692
FROM KibanaSampleDataEcommerce AS k
14693+
WHERE LOWER(customer_gender) = 'test'
14694+
GROUP BY 1
14695+
ORDER BY 1 DESC
14696+
"
14697+
.to_string(),
14698+
DatabaseProtocol::PostgreSQL,
14699+
bq_templates.clone(),
14700+
)
14701+
.await;
14702+
14703+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
14704+
println!(
14705+
"Physical plan: {}",
14706+
displayable(physical_plan.as_ref()).indent()
14707+
);
14708+
14709+
let logical_plan = query_plan.as_logical_plan();
14710+
let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;
14711+
assert!(sql.contains("TIMESTAMP_ADD("));
14712+
assert!(sql.contains("INTERVAL 7 DAY)"));
14713+
14714+
// BigQuery + Postgres DATE_ADD + MONTHS
14715+
let query_plan = convert_select_to_query_plan_customized(
14716+
"
14717+
SELECT DATE_ADD(order_date, INTERVAL '7 MONTHS') AS d
14718+
FROM KibanaSampleDataEcommerce AS k
14719+
WHERE LOWER(customer_gender) = 'test'
14720+
GROUP BY 1
14721+
ORDER BY 1 DESC
14722+
"
14723+
.to_string(),
14724+
DatabaseProtocol::PostgreSQL,
14725+
bq_templates,
14726+
)
14727+
.await;
14728+
14729+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
14730+
println!(
14731+
"Physical plan: {}",
14732+
displayable(physical_plan.as_ref()).indent()
14733+
);
14734+
14735+
let logical_plan = query_plan.as_logical_plan();
14736+
let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;
14737+
assert!(sql.contains("TIMESTAMP(DATETIME_ADD(DATETIME("));
14738+
assert!(sql.contains("INTERVAL 7 MONTH)"));
14739+
14740+
// BigQuery + Redshift DATEADD + MONTHS
14741+
let bq_templates = vec![("functions/DATE_ADD".to_string(), "{% if date_part|upper in ['YEAR', 'MONTH', 'QUARTER'] %}TIMESTAMP(DATETIME_ADD(DATETIME({{ args[0] }}), INTERVAL {{ interval }} {{ date_part }})){% else %}TIMESTAMP_ADD({{ args[0] }}, INTERVAL {{ interval }} {{ date_part }}){% endif %}".to_string())];
14742+
let query_plan = convert_select_to_query_plan_customized(
14743+
"
14744+
SELECT DATEADD(MONTH, 7, order_date) AS d
14745+
FROM KibanaSampleDataEcommerce AS k
14746+
WHERE LOWER(customer_gender) = 'test'
14747+
GROUP BY 1
14748+
ORDER BY 1 DESC
14749+
"
14750+
.to_string(),
14751+
DatabaseProtocol::PostgreSQL,
14752+
bq_templates.clone(),
14753+
)
14754+
.await;
14755+
14756+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
14757+
println!(
14758+
"Physical plan: {}",
14759+
displayable(physical_plan.as_ref()).indent()
14760+
);
14761+
14762+
let logical_plan = query_plan.as_logical_plan();
14763+
let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;
14764+
assert!(sql.contains("TIMESTAMP(DATETIME_ADD(DATETIME("));
14765+
assert!(sql.contains("INTERVAL 7 MONTH)"));
14766+
14767+
// Postgres DATE_ADD
14768+
let query_plan = convert_select_to_query_plan_customized(
14769+
"
14770+
SELECT DATE_ADD(order_date, INTERVAL '7 DAYS') AS d
14771+
FROM KibanaSampleDataEcommerce AS k
14772+
WHERE LOWER(customer_gender) = 'test'
1469314773
GROUP BY 1
1469414774
ORDER BY 1 DESC
1469514775
"
1469614776
.to_string(),
1469714777
DatabaseProtocol::PostgreSQL,
14698-
vec![(
14699-
"functions/DATEADD".to_string(),
14700-
"({{ args[2] }} + \'{{ interval }} {{ date_part }}\'::interval)".to_string(),
14701-
)],
14778+
vec![],
1470214779
)
1470314780
.await;
1470414781

@@ -14710,7 +14787,8 @@ ORDER BY "source"."str0" ASC
1471014787

1471114788
let logical_plan = query_plan.as_logical_plan();
1471214789
let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;
14713-
assert!(sql.contains("+ '7 day'::interval"));
14790+
assert!(sql.contains("DATE_ADD("));
14791+
assert!(sql.contains("INTERVAL '7 DAY'"));
1471414792
}
1471514793

1471614794
#[tokio::test]

0 commit comments

Comments
 (0)