Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/cubesql/cubesql/e2e/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl TestsRunner {

pub fn register_suite(&mut self, result: AsyncTestConstructorResult) {
match result {
AsyncTestConstructorResult::Sucess(suite) => self.suites.push(suite),
AsyncTestConstructorResult::Success(suite) => self.suites.push(suite),
AsyncTestConstructorResult::Skipped(message) => {
println!("Skipped: {}", message)
}
Expand Down
2 changes: 1 addition & 1 deletion rust/cubesql/cubesql/e2e/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ pub trait AsyncTestSuite: Debug {
}

pub enum AsyncTestConstructorResult {
Sucess(Box<dyn AsyncTestSuite>),
Success(Box<dyn AsyncTestSuite>),
Skipped(String),
}
2 changes: 1 addition & 1 deletion rust/cubesql/cubesql/e2e/tests/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl PostgresIntegrationTestSuite {
)
.await;

AsyncTestConstructorResult::Sucess(Box::new(PostgresIntegrationTestSuite { client, port }))
AsyncTestConstructorResult::Success(Box::new(PostgresIntegrationTestSuite { client, port }))
}

async fn create_client(config: tokio_postgres::Config) -> Client {
Expand Down
3 changes: 3 additions & 0 deletions rust/cubesql/cubesql/src/sql/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,9 @@ impl<'ast> Visitor<'ast, ConnectionError> for PostgresStatementParamsBinder {
BindValue::Float64(v) => {
*value = ast::Value::Number(v.to_string(), *v < 0_f64);
}
BindValue::Timestamp(v) => {
*value = ast::Value::SingleQuotedString(v.to_string());
}
BindValue::Null => {
*value = ast::Value::Null;
}
Expand Down
Empty file removed rust/cubesql/cubesql/test.sql
Empty file.
13 changes: 13 additions & 0 deletions rust/cubesql/pg-srv/src/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ mod tests {
use crate::*;

use crate::protocol::Format;
use crate::values::timestamp::TimestampValue;
use bytes::BytesMut;

fn assert_test_decode<T: ToProtocolValue + FromProtocolValue + std::cmp::PartialEq>(
Expand Down Expand Up @@ -155,6 +156,9 @@ mod tests {
assert_test_decode(std::f64::consts::PI, Format::Text)?;
assert_test_decode(-std::f64::consts::E, Format::Text)?;
assert_test_decode(0.0_f64, Format::Text)?;
assert_test_decode(TimestampValue::new(1650890322000000000, None), Format::Text)?;
assert_test_decode(TimestampValue::new(0, None), Format::Text)?;
assert_test_decode(TimestampValue::new(1234567890123456000, None), Format::Text)?;

Ok(())
}
Expand All @@ -169,6 +173,15 @@ mod tests {
assert_test_decode(std::f64::consts::PI, Format::Binary)?;
assert_test_decode(-std::f64::consts::E, Format::Binary)?;
assert_test_decode(0.0_f64, Format::Binary)?;
assert_test_decode(
TimestampValue::new(1650890322000000000, None),
Format::Binary,
)?;
assert_test_decode(TimestampValue::new(0, None), Format::Binary)?;
assert_test_decode(
TimestampValue::new(1234567890123456000, None),
Format::Binary,
)?;

Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions rust/cubesql/pg-srv/src/extended.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
//! Implementation for Extended Query

#[cfg(feature = "with-chrono")]
use crate::TimestampValue;

#[derive(Debug, PartialEq)]
pub enum BindValue {
String(String),
Int64(i64),
Float64(f64),
Bool(bool),
#[cfg(feature = "with-chrono")]
Timestamp(TimestampValue),
Null,
}
7 changes: 7 additions & 0 deletions rust/cubesql/pg-srv/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use async_trait::async_trait;
use bytes::BufMut;
use tokio::io::AsyncReadExt;

#[cfg(feature = "with-chrono")]
use crate::TimestampValue;
use crate::{buffer, BindValue, FromProtocolValue, PgType, PgTypeId, ProtocolError};

const DEFAULT_CAPACITY: usize = 64;
Expand Down Expand Up @@ -786,6 +788,11 @@ impl Bind {
PgTypeId::FLOAT8 => {
BindValue::Float64(f64::from_protocol(raw_value, param_format)?)
}
#[cfg(feature = "with-chrono")]
PgTypeId::TIMESTAMP => BindValue::Timestamp(TimestampValue::from_protocol(
raw_value,
param_format,
)?),
_ => {
return Err(ErrorResponse::error(
ErrorCode::FeatureNotSupported,
Expand Down
136 changes: 135 additions & 1 deletion rust/cubesql/pg-srv/src/values/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
//! Timestamp value representation for PostgreSQL protocol

use crate::{ProtocolError, ToProtocolValue};
use crate::{
protocol::{ErrorCode, ErrorResponse},
FromProtocolValue, ProtocolError, ToProtocolValue,
};
use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut};
use chrono::{
format::{
Expand All @@ -11,6 +15,7 @@ use chrono::{
prelude::*,
};
use chrono_tz::Tz;
use std::backtrace::Backtrace;
use std::io::Error;
use std::{
fmt::{self, Debug, Display, Formatter},
Expand Down Expand Up @@ -95,6 +100,7 @@ impl Display for TimestampValue {
}

// POSTGRES_EPOCH_JDATE
// https://github.com/postgres/postgres/blob/REL_14_4/src/include/datatype/timestamp.h#L163
fn pg_base_date_epoch() -> NaiveDateTime {
NaiveDate::from_ymd_opt(2000, 1, 1)
.unwrap()
Expand All @@ -118,6 +124,7 @@ impl ToProtocolValue for TimestampValue {
}
}

// https://github.com/postgres/postgres/blob/REL_14_4/src/backend/utils/adt/timestamp.c#L267
fn to_binary(&self, buf: &mut BytesMut) -> Result<(), ProtocolError> {
let ndt = match self.tz_ref() {
None => self.to_naive_datetime(),
Expand All @@ -139,6 +146,80 @@ impl ToProtocolValue for TimestampValue {
}
}

impl FromProtocolValue for TimestampValue {
fn from_text(raw: &[u8]) -> Result<Self, ProtocolError> {
let as_str = std::str::from_utf8(raw).map_err(|err| ProtocolError::ErrorResponse {
source: ErrorResponse::error(ErrorCode::ProtocolViolation, err.to_string()),
backtrace: Backtrace::capture(),
})?;

// Parse timestamp string in format "YYYY-MM-DD HH:MM:SS[.fff]", but PostgreSQL supports
// more formats, so let's align this with parse_date_str function from cubesql crate.
let parsed_datetime = NaiveDateTime::parse_from_str(as_str, "%Y-%m-%d %H:%M:%S")
.or_else(|_| NaiveDateTime::parse_from_str(as_str, "%Y-%m-%d %H:%M:%S%.f"))
.or_else(|_| NaiveDateTime::parse_from_str(as_str, "%Y-%m-%dT%H:%M:%S"))
.or_else(|_| NaiveDateTime::parse_from_str(as_str, "%Y-%m-%dT%H:%M:%S%.f"))
.or_else(|_| NaiveDateTime::parse_from_str(as_str, "%Y-%m-%dT%H:%M:%S%.fZ"))
.or_else(|_| {
NaiveDate::parse_from_str(as_str, "%Y-%m-%d").map(|date| {
date.and_hms_opt(0, 0, 0)
.expect("Unable to set time to 00:00:00")
})
})
.map_err(|err| ProtocolError::ErrorResponse {
source: ErrorResponse::error(
ErrorCode::ProtocolViolation,
format!(
"Unable to parse timestamp from text: '{}', error: {}",
as_str, err
),
),
backtrace: Backtrace::capture(),
})?;

// Convert to Unix nanoseconds
let unix_nano = parsed_datetime
.and_utc()
.timestamp_nanos_opt()
.ok_or_else(|| ProtocolError::ErrorResponse {
source: ErrorResponse::error(
ErrorCode::ProtocolViolation,
format!("Timestamp out of range: '{}'", as_str),
),
backtrace: Backtrace::capture(),
})?;

Ok(TimestampValue::new(unix_nano, None))
}

// https://github.com/postgres/postgres/blob/REL_14_4/src/backend/utils/adt/timestamp.c#L234
fn from_binary(raw: &[u8]) -> Result<Self, ProtocolError> {
if raw.len() != 8 {
return Err(ProtocolError::ErrorResponse {
source: ErrorResponse::error(
ErrorCode::ProtocolViolation,
format!(
"Invalid binary timestamp length: expected 8 bytes, got {}",
raw.len()
),
),
backtrace: Backtrace::capture(),
});
}

let pg_microseconds = BigEndian::read_i64(raw);

// Convert PostgreSQL microseconds to Unix nanoseconds
let unix_nano = pg_base_date_epoch()
.and_utc()
.timestamp_nanos_opt()
.expect("Unable to get timestamp nanos for pg_base_date_epoch")
+ (pg_microseconds * 1_000);

Ok(TimestampValue::new(unix_nano, None))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -170,4 +251,57 @@ mod tests {
let ts = TimestampValue::new(1650890322123456789, None);
assert_eq!(ts.get_time_stamp(), 1650890322123456000);
}

#[test]
fn test_invalid_timestamp_text() {
// Test that invalid text formats return errors
assert!(TimestampValue::from_text(b"invalid-date").is_err());
assert!(TimestampValue::from_text(b"2025-13-45 25:70:99").is_err());
assert!(TimestampValue::from_text(b"").is_err());
}

#[test]
fn test_timestamp_from_text_various_formats() {
// Test basic format without fractional seconds
let ts1 = TimestampValue::from_text(b"2025-08-04 20:15:47").unwrap();
assert_eq!(ts1.to_naive_datetime().to_string(), "2025-08-04 20:15:47");

// Test PostgreSQL format with 6-digit fractional seconds
let ts2 = TimestampValue::from_text(b"2025-08-04 20:16:54.853660").unwrap();
assert_eq!(
ts2.to_naive_datetime()
.format("%Y-%m-%d %H:%M:%S%.6f")
.to_string(),
"2025-08-04 20:16:54.853660"
);

// Test format with 3 fractional seconds
let ts3 = TimestampValue::from_text(b"2025-08-04 20:15:47.953").unwrap();
assert_eq!(
ts3.to_naive_datetime()
.format("%Y-%m-%d %H:%M:%S%.3f")
.to_string(),
"2025-08-04 20:15:47.953"
);

// Test ISO format with T separator
let ts4 = TimestampValue::from_text(b"2025-08-04T20:15:47").unwrap();
assert_eq!(ts4.to_naive_datetime().to_string(), "2025-08-04 20:15:47");

// Test ISO format with T separator and fractional seconds
let ts5 = TimestampValue::from_text(b"2025-08-04T20:15:47.953116").unwrap();
assert_eq!(
ts5.to_naive_datetime()
.format("%Y-%m-%d %H:%M:%S%.6f")
.to_string(),
"2025-08-04 20:15:47.953116"
);
}

#[test]
fn test_invalid_timestamp_binary() {
// Test that invalid binary data returns errors
assert!(TimestampValue::from_binary(&[1, 2, 3]).is_err()); // Wrong length
assert!(TimestampValue::from_binary(&[]).is_err()); // Empty
}
}
Loading