diff --git a/rust/cubesql/cubesql/e2e/main.rs b/rust/cubesql/cubesql/e2e/main.rs index 8730b191a759b..0f72a2fd90292 100644 --- a/rust/cubesql/cubesql/e2e/main.rs +++ b/rust/cubesql/cubesql/e2e/main.rs @@ -22,7 +22,7 @@ impl TestsRunner { pub fn register_suite(&mut self, result: AsyncTestConstructorResult) { match result { - AsyncTestConstructorResult::Sucess(suite) => self.suites.push(suite), + AsyncTestConstructorResult::Success(suite) => self.suites.push(suite), AsyncTestConstructorResult::Skipped(message) => { println!("Skipped: {}", message) } diff --git a/rust/cubesql/cubesql/e2e/tests/basic.rs b/rust/cubesql/cubesql/e2e/tests/basic.rs index bce5e353b7bf4..f1dd74b18cd85 100644 --- a/rust/cubesql/cubesql/e2e/tests/basic.rs +++ b/rust/cubesql/cubesql/e2e/tests/basic.rs @@ -23,6 +23,6 @@ pub trait AsyncTestSuite: Debug { } pub enum AsyncTestConstructorResult { - Sucess(Box), + Success(Box), Skipped(String), } diff --git a/rust/cubesql/cubesql/e2e/tests/postgres.rs b/rust/cubesql/cubesql/e2e/tests/postgres.rs index 46987943f5e0f..aa8f403578c73 100644 --- a/rust/cubesql/cubesql/e2e/tests/postgres.rs +++ b/rust/cubesql/cubesql/e2e/tests/postgres.rs @@ -92,7 +92,7 @@ impl PostgresIntegrationTestSuite { ) .await; - AsyncTestConstructorResult::Sucess(Box::new(PostgresIntegrationTestSuite { client, port })) + AsyncTestConstructorResult::Success(Box::new(PostgresIntegrationTestSuite { client, port })) } async fn create_client(config: tokio_postgres::Config) -> Client { diff --git a/rust/cubesql/cubesql/src/sql/statement.rs b/rust/cubesql/cubesql/src/sql/statement.rs index e12a6ad9ecb74..3a5552914a4a5 100644 --- a/rust/cubesql/cubesql/src/sql/statement.rs +++ b/rust/cubesql/cubesql/src/sql/statement.rs @@ -624,6 +624,9 @@ impl<'ast> Visitor<'ast, ConnectionError> for PostgresStatementParamsBinder { BindValue::Float64(v) => { *value = ast::Value::Number(v.to_string(), *v < 0_f64); } + BindValue::Timestamp(v) => { + *value = ast::Value::SingleQuotedString(v.to_string()); + } BindValue::Null => { *value = ast::Value::Null; } diff --git a/rust/cubesql/cubesql/test.sql b/rust/cubesql/cubesql/test.sql deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/rust/cubesql/pg-srv/src/decoding.rs b/rust/cubesql/pg-srv/src/decoding.rs index afbbc5948ae03..7876bc0a413c6 100644 --- a/rust/cubesql/pg-srv/src/decoding.rs +++ b/rust/cubesql/pg-srv/src/decoding.rs @@ -127,6 +127,7 @@ mod tests { use crate::*; use crate::protocol::Format; + use crate::values::timestamp::TimestampValue; use bytes::BytesMut; fn assert_test_decode( @@ -155,6 +156,9 @@ mod tests { assert_test_decode(std::f64::consts::PI, Format::Text)?; assert_test_decode(-std::f64::consts::E, Format::Text)?; assert_test_decode(0.0_f64, Format::Text)?; + assert_test_decode(TimestampValue::new(1650890322000000000, None), Format::Text)?; + assert_test_decode(TimestampValue::new(0, None), Format::Text)?; + assert_test_decode(TimestampValue::new(1234567890123456000, None), Format::Text)?; Ok(()) } @@ -169,6 +173,15 @@ mod tests { assert_test_decode(std::f64::consts::PI, Format::Binary)?; assert_test_decode(-std::f64::consts::E, Format::Binary)?; assert_test_decode(0.0_f64, Format::Binary)?; + assert_test_decode( + TimestampValue::new(1650890322000000000, None), + Format::Binary, + )?; + assert_test_decode(TimestampValue::new(0, None), Format::Binary)?; + assert_test_decode( + TimestampValue::new(1234567890123456000, None), + Format::Binary, + )?; Ok(()) } diff --git a/rust/cubesql/pg-srv/src/extended.rs b/rust/cubesql/pg-srv/src/extended.rs index dbbf21a99dc3d..163af45857973 100644 --- a/rust/cubesql/pg-srv/src/extended.rs +++ b/rust/cubesql/pg-srv/src/extended.rs @@ -1,10 +1,15 @@ //! Implementation for Extended Query +#[cfg(feature = "with-chrono")] +use crate::TimestampValue; + #[derive(Debug, PartialEq)] pub enum BindValue { String(String), Int64(i64), Float64(f64), Bool(bool), + #[cfg(feature = "with-chrono")] + Timestamp(TimestampValue), Null, } diff --git a/rust/cubesql/pg-srv/src/protocol.rs b/rust/cubesql/pg-srv/src/protocol.rs index cd9ff1f609b01..401ab647cfe1a 100644 --- a/rust/cubesql/pg-srv/src/protocol.rs +++ b/rust/cubesql/pg-srv/src/protocol.rs @@ -16,6 +16,8 @@ use async_trait::async_trait; use bytes::BufMut; use tokio::io::AsyncReadExt; +#[cfg(feature = "with-chrono")] +use crate::TimestampValue; use crate::{buffer, BindValue, FromProtocolValue, PgType, PgTypeId, ProtocolError}; const DEFAULT_CAPACITY: usize = 64; @@ -786,6 +788,11 @@ impl Bind { PgTypeId::FLOAT8 => { BindValue::Float64(f64::from_protocol(raw_value, param_format)?) } + #[cfg(feature = "with-chrono")] + PgTypeId::TIMESTAMP => BindValue::Timestamp(TimestampValue::from_protocol( + raw_value, + param_format, + )?), _ => { return Err(ErrorResponse::error( ErrorCode::FeatureNotSupported, diff --git a/rust/cubesql/pg-srv/src/values/timestamp.rs b/rust/cubesql/pg-srv/src/values/timestamp.rs index c514d3ebbc298..ac495bb5d1178 100644 --- a/rust/cubesql/pg-srv/src/values/timestamp.rs +++ b/rust/cubesql/pg-srv/src/values/timestamp.rs @@ -1,6 +1,10 @@ //! Timestamp value representation for PostgreSQL protocol -use crate::{ProtocolError, ToProtocolValue}; +use crate::{ + protocol::{ErrorCode, ErrorResponse}, + FromProtocolValue, ProtocolError, ToProtocolValue, +}; +use byteorder::{BigEndian, ByteOrder}; use bytes::{BufMut, BytesMut}; use chrono::{ format::{ @@ -11,6 +15,7 @@ use chrono::{ prelude::*, }; use chrono_tz::Tz; +use std::backtrace::Backtrace; use std::io::Error; use std::{ fmt::{self, Debug, Display, Formatter}, @@ -95,6 +100,7 @@ impl Display for TimestampValue { } // POSTGRES_EPOCH_JDATE +// https://github.com/postgres/postgres/blob/REL_14_4/src/include/datatype/timestamp.h#L163 fn pg_base_date_epoch() -> NaiveDateTime { NaiveDate::from_ymd_opt(2000, 1, 1) .unwrap() @@ -118,6 +124,7 @@ impl ToProtocolValue for TimestampValue { } } + // https://github.com/postgres/postgres/blob/REL_14_4/src/backend/utils/adt/timestamp.c#L267 fn to_binary(&self, buf: &mut BytesMut) -> Result<(), ProtocolError> { let ndt = match self.tz_ref() { None => self.to_naive_datetime(), @@ -139,6 +146,80 @@ impl ToProtocolValue for TimestampValue { } } +impl FromProtocolValue for TimestampValue { + fn from_text(raw: &[u8]) -> Result { + let as_str = std::str::from_utf8(raw).map_err(|err| ProtocolError::ErrorResponse { + source: ErrorResponse::error(ErrorCode::ProtocolViolation, err.to_string()), + backtrace: Backtrace::capture(), + })?; + + // Parse timestamp string in format "YYYY-MM-DD HH:MM:SS[.fff]", but PostgreSQL supports + // more formats, so let's align this with parse_date_str function from cubesql crate. + let parsed_datetime = NaiveDateTime::parse_from_str(as_str, "%Y-%m-%d %H:%M:%S") + .or_else(|_| NaiveDateTime::parse_from_str(as_str, "%Y-%m-%d %H:%M:%S%.f")) + .or_else(|_| NaiveDateTime::parse_from_str(as_str, "%Y-%m-%dT%H:%M:%S")) + .or_else(|_| NaiveDateTime::parse_from_str(as_str, "%Y-%m-%dT%H:%M:%S%.f")) + .or_else(|_| NaiveDateTime::parse_from_str(as_str, "%Y-%m-%dT%H:%M:%S%.fZ")) + .or_else(|_| { + NaiveDate::parse_from_str(as_str, "%Y-%m-%d").map(|date| { + date.and_hms_opt(0, 0, 0) + .expect("Unable to set time to 00:00:00") + }) + }) + .map_err(|err| ProtocolError::ErrorResponse { + source: ErrorResponse::error( + ErrorCode::ProtocolViolation, + format!( + "Unable to parse timestamp from text: '{}', error: {}", + as_str, err + ), + ), + backtrace: Backtrace::capture(), + })?; + + // Convert to Unix nanoseconds + let unix_nano = parsed_datetime + .and_utc() + .timestamp_nanos_opt() + .ok_or_else(|| ProtocolError::ErrorResponse { + source: ErrorResponse::error( + ErrorCode::ProtocolViolation, + format!("Timestamp out of range: '{}'", as_str), + ), + backtrace: Backtrace::capture(), + })?; + + Ok(TimestampValue::new(unix_nano, None)) + } + + // https://github.com/postgres/postgres/blob/REL_14_4/src/backend/utils/adt/timestamp.c#L234 + fn from_binary(raw: &[u8]) -> Result { + if raw.len() != 8 { + return Err(ProtocolError::ErrorResponse { + source: ErrorResponse::error( + ErrorCode::ProtocolViolation, + format!( + "Invalid binary timestamp length: expected 8 bytes, got {}", + raw.len() + ), + ), + backtrace: Backtrace::capture(), + }); + } + + let pg_microseconds = BigEndian::read_i64(raw); + + // Convert PostgreSQL microseconds to Unix nanoseconds + let unix_nano = pg_base_date_epoch() + .and_utc() + .timestamp_nanos_opt() + .expect("Unable to get timestamp nanos for pg_base_date_epoch") + + (pg_microseconds * 1_000); + + Ok(TimestampValue::new(unix_nano, None)) + } +} + #[cfg(test)] mod tests { use super::*; @@ -170,4 +251,57 @@ mod tests { let ts = TimestampValue::new(1650890322123456789, None); assert_eq!(ts.get_time_stamp(), 1650890322123456000); } + + #[test] + fn test_invalid_timestamp_text() { + // Test that invalid text formats return errors + assert!(TimestampValue::from_text(b"invalid-date").is_err()); + assert!(TimestampValue::from_text(b"2025-13-45 25:70:99").is_err()); + assert!(TimestampValue::from_text(b"").is_err()); + } + + #[test] + fn test_timestamp_from_text_various_formats() { + // Test basic format without fractional seconds + let ts1 = TimestampValue::from_text(b"2025-08-04 20:15:47").unwrap(); + assert_eq!(ts1.to_naive_datetime().to_string(), "2025-08-04 20:15:47"); + + // Test PostgreSQL format with 6-digit fractional seconds + let ts2 = TimestampValue::from_text(b"2025-08-04 20:16:54.853660").unwrap(); + assert_eq!( + ts2.to_naive_datetime() + .format("%Y-%m-%d %H:%M:%S%.6f") + .to_string(), + "2025-08-04 20:16:54.853660" + ); + + // Test format with 3 fractional seconds + let ts3 = TimestampValue::from_text(b"2025-08-04 20:15:47.953").unwrap(); + assert_eq!( + ts3.to_naive_datetime() + .format("%Y-%m-%d %H:%M:%S%.3f") + .to_string(), + "2025-08-04 20:15:47.953" + ); + + // Test ISO format with T separator + let ts4 = TimestampValue::from_text(b"2025-08-04T20:15:47").unwrap(); + assert_eq!(ts4.to_naive_datetime().to_string(), "2025-08-04 20:15:47"); + + // Test ISO format with T separator and fractional seconds + let ts5 = TimestampValue::from_text(b"2025-08-04T20:15:47.953116").unwrap(); + assert_eq!( + ts5.to_naive_datetime() + .format("%Y-%m-%d %H:%M:%S%.6f") + .to_string(), + "2025-08-04 20:15:47.953116" + ); + } + + #[test] + fn test_invalid_timestamp_binary() { + // Test that invalid binary data returns errors + assert!(TimestampValue::from_binary(&[1, 2, 3]).is_err()); // Wrong length + assert!(TimestampValue::from_binary(&[]).is_err()); // Empty + } }