Skip to content

Commit 382ec1c

Browse files
committed
fix(cubesql): handle to_timestamp(expr -> epoch)
1 parent 8a650b7 commit 382ec1c

File tree

10 files changed

+125
-14
lines changed

10 files changed

+125
-14
lines changed

rust/cubesql/cubesql/src/compile/engine/udf/common.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,6 +1535,46 @@ fn postgres_datetime_format_to_iso(format: String) -> String {
15351535
.replace(".MS", "%.3f")
15361536
}
15371537

1538+
pub fn create_epoch_to_timestamp_udf() -> ScalarUDF {
1539+
let fun: Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync> =
1540+
Arc::new(move |args: &[ColumnarValue]| match args {
1541+
[ColumnarValue::Scalar(ScalarValue::Int64(Some(value)))] => Ok(ColumnarValue::Scalar(
1542+
ScalarValue::TimestampNanosecond(Some(value.clone() * 1_000_000_000), None),
1543+
)),
1544+
[ColumnarValue::Scalar(ScalarValue::Float64(Some(value)))] => {
1545+
let seconds = value.round() as i64;
1546+
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1547+
Some(seconds * 1_000_000_000),
1548+
None,
1549+
)))
1550+
}
1551+
[ColumnarValue::Scalar(ScalarValue::Decimal128(Some(value), _, _))] => {
1552+
let seconds = (*value) as i64;
1553+
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
1554+
Some(seconds * 1_000_000_000),
1555+
None,
1556+
)))
1557+
}
1558+
_ => Err(DataFusionError::Execution(
1559+
"Unsupported arguments for to_timestamp".to_string(),
1560+
)),
1561+
});
1562+
1563+
let return_type: ReturnTypeFunction =
1564+
Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None))));
1565+
1566+
let signature = Signature::one_of(
1567+
vec![
1568+
TypeSignature::Exact(vec![DataType::Int64]),
1569+
TypeSignature::Exact(vec![DataType::Float64]),
1570+
TypeSignature::Exact(vec![DataType::Decimal(10, 0)]),
1571+
],
1572+
Volatility::Immutable,
1573+
);
1574+
1575+
ScalarUDF::new("epoch_to_timestamp", &signature, &return_type, &fun)
1576+
}
1577+
15381578
pub fn create_str_to_date_udf() -> ScalarUDF {
15391579
let fun: Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync> =
15401580
Arc::new(move |args: &[ColumnarValue]| {

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16251,4 +16251,20 @@ LIMIT {{ limit }}{% endif %}"#.to_string(),
1625116251
displayable(physical_plan.as_ref()).indent()
1625216252
);
1625316253
}
16254+
16255+
#[tokio::test]
16256+
async fn test_to_timestamp() -> Result<(), CubeError> {
16257+
let query = r#"
16258+
SELECT to_timestamp(1618449331) AS result
16259+
UNION ALL
16260+
SELECT to_timestamp('2021-08-31 11:05:10.400000', '%Y-%m-%d %H:%i:%s.%f') AS result
16261+
"#;
16262+
16263+
insta::assert_snapshot!(
16264+
"to_timestamp",
16265+
execute_query(query.to_string(), DatabaseProtocol::PostgreSQL).await?
16266+
);
16267+
16268+
Ok(())
16269+
}
1625416270
}

rust/cubesql/cubesql/src/compile/query_engine.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ impl QueryEngine for SqlQueryEngine {
411411
ctx.register_udf(create_dayofyear_udf());
412412
ctx.register_udf(create_date_sub_udf());
413413
ctx.register_udf(create_date_add_udf());
414+
ctx.register_udf(create_epoch_to_timestamp_udf());
414415
ctx.register_udf(create_str_to_date_udf());
415416
ctx.register_udf(create_current_timestamp_udf("current_timestamp"));
416417
ctx.register_udf(create_current_timestamp_udf("localtimestamp"));

rust/cubesql/cubesql/src/compile/rewrite/analysis.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,6 +1045,7 @@ impl LogicalPlanAnalysis {
10451045
|| &fun.name == "date_sub"
10461046
|| &fun.name == "date"
10471047
|| &fun.name == "date_to_timestamp"
1048+
|| &fun.name == "epoch_to_timestamp"
10481049
{
10491050
Self::eval_constant_expr(&egraph, &expr)
10501051
} else {

rust/cubesql/cubesql/src/compile/rewrite/rules/filters.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use chrono::{
3939
use cubeclient::models::V1CubeMeta;
4040
use datafusion::{
4141
arrow::{
42-
array::{Date32Array, Date64Array, TimestampNanosecondArray, TimestampSecondArray},
42+
array::{Date32Array, Date64Array, TimestampNanosecondArray},
4343
datatypes::{DataType, IntervalDayTimeType},
4444
},
4545
logical_plan::{Column, Expr, Operator},
@@ -2804,7 +2804,6 @@ impl FilterRules {
28042804
vec![Decimal::new(*value).to_string(*scale)]
28052805
}
28062806
ScalarValue::TimestampNanosecond(_, _)
2807-
| ScalarValue::TimestampSecond(_, _)
28082807
| ScalarValue::Date32(_)
28092808
| ScalarValue::Date64(_) => {
28102809
if let Some(timestamp) =
@@ -3562,7 +3561,6 @@ impl FilterRules {
35623561
Decimal::new(*value).to_string(*scale)
35633562
}
35643563
ScalarValue::TimestampNanosecond(_, _)
3565-
| ScalarValue::TimestampSecond(_, _)
35663564
| ScalarValue::Date32(_)
35673565
| ScalarValue::Date64(_) => {
35683566
if let Some(timestamp) = Self::scalar_to_native_datetime(literal) {
@@ -3578,16 +3576,13 @@ impl FilterRules {
35783576
fn scalar_to_native_datetime(literal: &ScalarValue) -> Option<NaiveDateTime> {
35793577
match literal {
35803578
ScalarValue::TimestampNanosecond(_, _)
3581-
| ScalarValue::TimestampSecond(_, _)
35823579
| ScalarValue::Date32(_)
35833580
| ScalarValue::Date64(_) => {
35843581
let array = literal.to_array();
35853582
let timestamp = if let Some(array) =
35863583
array.as_any().downcast_ref::<TimestampNanosecondArray>()
35873584
{
35883585
array.value_as_datetime(0)
3589-
} else if let Some(array) = array.as_any().downcast_ref::<TimestampSecondArray>() {
3590-
array.value_as_datetime(0)
35913586
} else if let Some(array) = array.as_any().downcast_ref::<Date32Array>() {
35923587
array.value_as_datetime(0)
35933588
} else if let Some(array) = array.as_any().downcast_ref::<Date64Array>() {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
source: cubesql/src/compile/mod.rs
3+
assertion_line: 16263
4+
expression: "execute_query(query.to_string(), DatabaseProtocol::PostgreSQL).await?"
5+
---
6+
+-------------------------+
7+
| result |
8+
+-------------------------+
9+
| 2021-04-15T01:15:31.000 |
10+
| 2021-08-31T11:05:10.400 |
11+
+-------------------------+
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
source: cubesql/src/compile/test/test_udfs.rs
3+
assertion_line: 307
4+
expression: "execute_query(\"SELECT epoch_to_timestamp(1621123456)\".to_string(),\nDatabaseProtocol::PostgreSQL).await?"
5+
---
6+
+---------------------------------------+
7+
| epoch_to_timestamp(Int64(1621123456)) |
8+
+---------------------------------------+
9+
| 2021-05-16T00:04:16.000 |
10+
+---------------------------------------+
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
source: cubesql/src/compile/test/test_udfs.rs
3+
assertion_line: 316
4+
expression: "execute_query(\"\n SELECT epoch_to_timestamp(1621123456)\n UNION ALL\n SELECT epoch_to_timestamp(1621123456.789)\n UNION ALL\n SELECT epoch_to_timestamp(cast(1621123456 as numeric(10)))\n \".to_string(),\nDatabaseProtocol::PostgreSQL).await?"
5+
---
6+
+---------------------------------------+
7+
| epoch_to_timestamp(Int64(1621123456)) |
8+
+---------------------------------------+
9+
| 2021-05-16T00:04:16.000 |
10+
| 2021-05-16T00:04:17.000 |
11+
| 2021-05-16T00:04:16.000 |
12+
+---------------------------------------+

rust/cubesql/cubesql/src/compile/test/test_udfs.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,36 @@ async fn test_pg_backend_pid() -> Result<(), CubeError> {
302302
Ok(())
303303
}
304304

305+
#[tokio::test]
306+
async fn test_epoch_to_timestamp() -> Result<(), CubeError> {
307+
insta::assert_snapshot!(
308+
"epoch_to_timestamp_1",
309+
execute_query(
310+
"SELECT epoch_to_timestamp(1621123456)".to_string(),
311+
DatabaseProtocol::PostgreSQL
312+
)
313+
.await?
314+
);
315+
316+
insta::assert_snapshot!(
317+
"epoch_to_timestamp_2",
318+
execute_query(
319+
"
320+
SELECT epoch_to_timestamp(1621123456)
321+
UNION ALL
322+
SELECT epoch_to_timestamp(1621123456.789)
323+
UNION ALL
324+
SELECT epoch_to_timestamp(cast(1621123456 as numeric(10)))
325+
"
326+
.to_string(),
327+
DatabaseProtocol::PostgreSQL
328+
)
329+
.await?
330+
);
331+
332+
Ok(())
333+
}
334+
305335
#[tokio::test]
306336
async fn test_to_char_udf() -> Result<(), CubeError> {
307337
insta::assert_snapshot!(

rust/cubesql/cubesql/src/sql/statement.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,7 @@ impl<'ast> Visitor<'ast, ConnectionError> for RedshiftDatePartReplacer {
921921
}
922922
}
923923

924-
/// Postgres to_timestamp clashes with Datafusion to_timestamp so we replace it with str_to_date
924+
/// Postgres to_timestamp clashes with Datafusion to_timestamp so we replace it with str_to_date/epoch_to_timestamp
925925
#[derive(Debug)]
926926
pub struct ToTimestampReplacer {}
927927

@@ -942,14 +942,9 @@ impl ToTimestampReplacer {
942942
impl<'ast> Visitor<'ast, ConnectionError> for ToTimestampReplacer {
943943
fn visit_function(&mut self, fun: &mut Function) -> Result<(), ConnectionError> {
944944
if fun.name.to_string().to_lowercase() == "to_timestamp" {
945-
if fun.args.len() == 1
946-
&& matches!(
947-
&fun.args[0],
948-
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(Value::Number(_, _))))
949-
)
950-
{
945+
if fun.args.len() == 1 {
951946
fun.name = ObjectName(vec![Ident {
952-
value: "to_timestamp_seconds".to_string(),
947+
value: "epoch_to_timestamp".to_string(),
953948
quote_style: None,
954949
}]);
955950
} else {

0 commit comments

Comments
 (0)