diff --git a/rust/cubesql/cubesql/src/compile/engine/udf/common.rs b/rust/cubesql/cubesql/src/compile/engine/udf/common.rs index 494641bbcac35..4380c05c79408 100644 --- a/rust/cubesql/cubesql/src/compile/engine/udf/common.rs +++ b/rust/cubesql/cubesql/src/compile/engine/udf/common.rs @@ -1535,6 +1535,46 @@ fn postgres_datetime_format_to_iso(format: String) -> String { .replace(".MS", "%.3f") } +pub fn create_epoch_to_timestamp_udf() -> ScalarUDF { + let fun: Arc Result + Send + Sync> = + Arc::new(move |args: &[ColumnarValue]| match args { + [ColumnarValue::Scalar(ScalarValue::Int64(Some(value)))] => Ok(ColumnarValue::Scalar( + ScalarValue::TimestampNanosecond(Some(value.clone() * 1_000_000_000), None), + )), + [ColumnarValue::Scalar(ScalarValue::Float64(Some(value)))] => { + let seconds = value.round() as i64; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(seconds * 1_000_000_000), + None, + ))) + } + [ColumnarValue::Scalar(ScalarValue::Decimal128(Some(value), _, _))] => { + let seconds = (*value) as i64; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(seconds * 1_000_000_000), + None, + ))) + } + _ => Err(DataFusionError::Execution( + "Unsupported arguments for to_timestamp".to_string(), + )), + }); + + let return_type: ReturnTypeFunction = + Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)))); + + let signature = Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float64]), + TypeSignature::Exact(vec![DataType::Decimal(10, 0)]), + ], + Volatility::Immutable, + ); + + ScalarUDF::new("epoch_to_timestamp", &signature, &return_type, &fun) +} + pub fn create_str_to_date_udf() -> ScalarUDF { let fun: Arc Result + Send + Sync> = Arc::new(move |args: &[ColumnarValue]| { diff --git a/rust/cubesql/cubesql/src/compile/mod.rs b/rust/cubesql/cubesql/src/compile/mod.rs index ed5d604436a6e..d835ae612b1b2 100644 --- a/rust/cubesql/cubesql/src/compile/mod.rs +++ b/rust/cubesql/cubesql/src/compile/mod.rs @@ -16299,6 +16299,22 @@ LIMIT {{ limit }}{% endif %}"#.to_string(), Ok(()) } + #[tokio::test] + async fn test_to_timestamp() -> Result<(), CubeError> { + let query = r#" + SELECT to_timestamp(1618449331) AS result + UNION ALL + SELECT to_timestamp('2021-08-31 11:05:10.400000', '%Y-%m-%d %H:%i:%s.%f') AS result + "#; + + insta::assert_snapshot!( + "to_timestamp", + execute_query(query.to_string(), DatabaseProtocol::PostgreSQL).await? + ); + + Ok(()) + } + #[tokio::test] async fn test_mysql_nulls_last() { if !Rewriter::sql_push_down_enabled() { diff --git a/rust/cubesql/cubesql/src/compile/query_engine.rs b/rust/cubesql/cubesql/src/compile/query_engine.rs index afc7f40cafb61..5a0f5d16f4803 100644 --- a/rust/cubesql/cubesql/src/compile/query_engine.rs +++ b/rust/cubesql/cubesql/src/compile/query_engine.rs @@ -460,6 +460,7 @@ impl QueryEngine for SqlQueryEngine { ctx.register_udf(create_dayofyear_udf()); ctx.register_udf(create_date_sub_udf()); ctx.register_udf(create_date_add_udf()); + ctx.register_udf(create_epoch_to_timestamp_udf()); ctx.register_udf(create_str_to_date_udf()); ctx.register_udf(create_current_timestamp_udf("current_timestamp")); ctx.register_udf(create_current_timestamp_udf("localtimestamp")); diff --git a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs index a5df8018f19e4..17af8700b2cb0 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs @@ -1039,6 +1039,7 @@ impl LogicalPlanAnalysis { || &fun.name == "date_sub" || &fun.name == "date" || &fun.name == "date_to_timestamp" + || &fun.name == "epoch_to_timestamp" { Self::eval_constant_expr(&egraph, &expr) } else { diff --git a/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__to_timestamp.snap b/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__to_timestamp.snap new file mode 100644 index 0000000000000..2e465fbd6042c --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__to_timestamp.snap @@ -0,0 +1,11 @@ +--- +source: cubesql/src/compile/mod.rs +assertion_line: 16263 +expression: "execute_query(query.to_string(), DatabaseProtocol::PostgreSQL).await?" +--- ++-------------------------+ +| result | ++-------------------------+ +| 2021-04-15T01:15:31.000 | +| 2021-08-31T11:05:10.400 | ++-------------------------+ diff --git a/rust/cubesql/cubesql/src/compile/test/snapshots/cubesql__compile__test__test_udfs__epoch_to_timestamp_1.snap b/rust/cubesql/cubesql/src/compile/test/snapshots/cubesql__compile__test__test_udfs__epoch_to_timestamp_1.snap new file mode 100644 index 0000000000000..6acb52786075e --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/test/snapshots/cubesql__compile__test__test_udfs__epoch_to_timestamp_1.snap @@ -0,0 +1,10 @@ +--- +source: cubesql/src/compile/test/test_udfs.rs +assertion_line: 307 +expression: "execute_query(\"SELECT epoch_to_timestamp(1621123456)\".to_string(),\nDatabaseProtocol::PostgreSQL).await?" +--- ++---------------------------------------+ +| epoch_to_timestamp(Int64(1621123456)) | ++---------------------------------------+ +| 2021-05-16T00:04:16.000 | ++---------------------------------------+ diff --git a/rust/cubesql/cubesql/src/compile/test/snapshots/cubesql__compile__test__test_udfs__epoch_to_timestamp_2.snap b/rust/cubesql/cubesql/src/compile/test/snapshots/cubesql__compile__test__test_udfs__epoch_to_timestamp_2.snap new file mode 100644 index 0000000000000..78e0cb698749a --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/test/snapshots/cubesql__compile__test__test_udfs__epoch_to_timestamp_2.snap @@ -0,0 +1,12 @@ +--- +source: cubesql/src/compile/test/test_udfs.rs +assertion_line: 316 +expression: "execute_query(\"\n SELECT epoch_to_timestamp(1621123456)\n UNION ALL\n SELECT epoch_to_timestamp(1621123456.789)\n UNION ALL\n SELECT epoch_to_timestamp(cast(1621123456 as numeric(10)))\n \".to_string(),\nDatabaseProtocol::PostgreSQL).await?" +--- ++---------------------------------------+ +| epoch_to_timestamp(Int64(1621123456)) | ++---------------------------------------+ +| 2021-05-16T00:04:16.000 | +| 2021-05-16T00:04:17.000 | +| 2021-05-16T00:04:16.000 | ++---------------------------------------+ diff --git a/rust/cubesql/cubesql/src/compile/test/test_udfs.rs b/rust/cubesql/cubesql/src/compile/test/test_udfs.rs index 46bf3f8faeedf..ea796f5505a9b 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_udfs.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_udfs.rs @@ -302,6 +302,36 @@ async fn test_pg_backend_pid() -> Result<(), CubeError> { Ok(()) } +#[tokio::test] +async fn test_epoch_to_timestamp() -> Result<(), CubeError> { + insta::assert_snapshot!( + "epoch_to_timestamp_1", + execute_query( + "SELECT epoch_to_timestamp(1621123456)".to_string(), + DatabaseProtocol::PostgreSQL + ) + .await? + ); + + insta::assert_snapshot!( + "epoch_to_timestamp_2", + execute_query( + " + SELECT epoch_to_timestamp(1621123456) + UNION ALL + SELECT epoch_to_timestamp(1621123456.789) + UNION ALL + SELECT epoch_to_timestamp(cast(1621123456 as numeric(10))) + " + .to_string(), + DatabaseProtocol::PostgreSQL + ) + .await? + ); + + Ok(()) +} + #[tokio::test] async fn test_to_char_udf() -> Result<(), CubeError> { insta::assert_snapshot!( diff --git a/rust/cubesql/cubesql/src/sql/statement.rs b/rust/cubesql/cubesql/src/sql/statement.rs index 74ac31e4b16f2..0790e304e8689 100644 --- a/rust/cubesql/cubesql/src/sql/statement.rs +++ b/rust/cubesql/cubesql/src/sql/statement.rs @@ -921,7 +921,7 @@ impl<'ast> Visitor<'ast, ConnectionError> for RedshiftDatePartReplacer { } } -/// Postgres to_timestamp clashes with Datafusion to_timestamp so we replace it with str_to_date +/// Postgres to_timestamp clashes with Datafusion to_timestamp so we replace it with str_to_date/epoch_to_timestamp #[derive(Debug)] pub struct ToTimestampReplacer {} @@ -940,11 +940,31 @@ impl ToTimestampReplacer { } impl<'ast> Visitor<'ast, ConnectionError> for ToTimestampReplacer { - fn visit_identifier(&mut self, identifier: &mut Ident) -> Result<(), ConnectionError> { - if identifier.value.to_lowercase() == "to_timestamp" { - identifier.value = "str_to_date".to_string() - }; + fn visit_function(&mut self, fun: &mut Function) -> Result<(), ConnectionError> { + if fun.name.to_string().to_lowercase() == "to_timestamp" { + if fun.args.len() == 1 { + fun.name = ObjectName(vec![Ident { + value: "epoch_to_timestamp".to_string(), + quote_style: None, + }]); + } else { + fun.name = ObjectName(vec![Ident { + value: "str_to_date".to_string(), + quote_style: None, + }]); + } + } + // Continue visiting function arguments + self.visit_function_args(&mut fun.args)?; + if let Some(over) = &mut fun.over { + for res in over.partition_by.iter_mut() { + self.visit_expr(res)?; + } + for order_expr in over.order_by.iter_mut() { + self.visit_expr(&mut order_expr.expr)?; + } + } Ok(()) } }