diff --git a/packages/cubejs-backend-native/Cargo.lock b/packages/cubejs-backend-native/Cargo.lock index b4de3fae06139..adf22f0f95a47 100644 --- a/packages/cubejs-backend-native/Cargo.lock +++ b/packages/cubejs-backend-native/Cargo.lock @@ -2351,6 +2351,7 @@ dependencies = [ "byteorder", "bytes", "chrono", + "chrono-tz 0.8.6", "log", "thiserror 2.0.11", "tokio", diff --git a/rust/cubenativeutils/Cargo.lock b/rust/cubenativeutils/Cargo.lock index 931fb867f7e80..262729e204b01 100644 --- a/rust/cubenativeutils/Cargo.lock +++ b/rust/cubenativeutils/Cargo.lock @@ -445,7 +445,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29c39203181991a7dd4343b8005bd804e7a9a37afb8ac070e43771e8c820bbde" dependencies = [ "chrono", - "chrono-tz-build", + "chrono-tz-build 0.0.3", + "phf", +] + +[[package]] +name = "chrono-tz" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +dependencies = [ + "chrono", + "chrono-tz-build 0.2.1", "phf", ] @@ -460,6 +471,17 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "chrono-tz-build" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "colored" version = "1.9.4" @@ -680,7 +702,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "chrono", - "chrono-tz", + "chrono-tz 0.6.3", "comfy-table 7.1.1", "cubeclient", "datafusion", @@ -2077,6 +2099,7 @@ dependencies = [ "byteorder", "bytes", "chrono", + "chrono-tz 0.8.6", "log", "thiserror 2.0.11", "tokio", diff --git a/rust/cubesql/Cargo.lock b/rust/cubesql/Cargo.lock index 9fdd9795c99cf..e0c5d485be6ca 100644 --- a/rust/cubesql/Cargo.lock +++ b/rust/cubesql/Cargo.lock @@ -424,10 +424,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58549f1842da3080ce63002102d5bc954c7bc843d4f47818e642abdc36253552" dependencies = [ "chrono", - "chrono-tz-build", + "chrono-tz-build 0.0.2", "phf 0.10.1", ] +[[package]] +name = "chrono-tz" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +dependencies = [ + "chrono", + "chrono-tz-build 0.2.1", + "phf 0.11.1", +] + [[package]] name = "chrono-tz-build" version = "0.0.2" @@ -436,7 +447,18 @@ checksum = "db058d493fb2f65f41861bfed7e3fe6335264a9f0f92710cab5bdf01fef09069" dependencies = [ "parse-zoneinfo", "phf 0.10.1", - "phf_codegen", + "phf_codegen 0.10.0", +] + +[[package]] +name = "chrono-tz-build" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +dependencies = [ + "parse-zoneinfo", + "phf 0.11.1", + "phf_codegen 0.11.3", ] [[package]] @@ -761,7 +783,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "chrono", - "chrono-tz", + "chrono-tz 0.6.1", "comfy-table 7.1.0", "criterion", "cubeclient", @@ -2263,6 +2285,7 @@ dependencies = [ "byteorder", "bytes", "chrono", + "chrono-tz 0.8.6", "hex", "log", "thiserror 2.0.11", @@ -2304,10 +2327,20 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fb1c3a8bc4dd4e5cfce29b44ffc14bedd2ee294559a294e2a4d4c9e9a6a13cd" dependencies = [ - "phf_generator", + "phf_generator 0.10.0", "phf_shared 0.10.0", ] +[[package]] +name = "phf_codegen" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aef8048c789fa5e851558d709946d6d79a8ff88c0440c587967f8e94bfb1216a" +dependencies = [ + "phf_generator 0.11.1", + "phf_shared 0.11.1", +] + [[package]] name = "phf_generator" version = "0.10.0" @@ -2318,6 +2351,16 @@ dependencies = [ "rand", ] +[[package]] +name = "phf_generator" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1181c94580fa345f50f19d738aaa39c0ed30a600d95cb2d3e23f94266f14fbf" +dependencies = [ + "phf_shared 0.11.1", + "rand", +] + [[package]] name = "phf_shared" version = "0.10.0" diff --git a/rust/cubesql/cubesql/src/sql/dataframe.rs b/rust/cubesql/cubesql/src/sql/dataframe.rs index d932fe75a6212..a595510a5a55c 100644 --- a/rust/cubesql/cubesql/src/sql/dataframe.rs +++ b/rust/cubesql/cubesql/src/sql/dataframe.rs @@ -1,12 +1,4 @@ -use chrono::{ - format::{ - Fixed, Item, - Numeric::{Day, Hour, Minute, Month, Second, Year}, - Pad::Zero, - }, - prelude::*, -}; -use chrono_tz::Tz; +use chrono::prelude::*; use comfy_table::{Cell, Table}; use datafusion::arrow::{ array::{ @@ -18,15 +10,10 @@ use datafusion::arrow::{ }, datatypes::{DataType, IntervalUnit, Schema, TimeUnit}, record_batch::RecordBatch, - temporal_conversions, }; -use pg_srv::IntervalValue; use rust_decimal::prelude::*; use serde::{Serialize, Serializer}; -use std::{ - fmt::{self, Debug, Formatter}, - io, -}; +use std::fmt::Debug; use super::{ColumnFlags, ColumnType}; use crate::CubeError; @@ -88,6 +75,10 @@ impl Row { } } +// Type aliases for compatibility - actual implementations are in pg-srv +pub type IntervalValue = pg_srv::IntervalValue; +pub type TimestampValue = pg_srv::TimestampValue; + #[derive(Debug)] pub enum TableValue { Null, @@ -207,83 +198,6 @@ impl DataFrame { } } -#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub struct TimestampValue { - unix_nano: i64, - tz: Option, -} - -impl TimestampValue { - pub fn new(mut unix_nano: i64, tz: Option) -> TimestampValue { - // This is a hack to workaround a mismatch between on-disk and in-memory representations. - // We use millisecond precision on-disk. - unix_nano -= unix_nano % 1000; - TimestampValue { unix_nano, tz } - } - - pub fn to_naive_datetime(&self) -> NaiveDateTime { - assert!(self.tz.is_none()); - - temporal_conversions::timestamp_ns_to_datetime(self.unix_nano) - } - - pub fn to_fixed_datetime(&self) -> io::Result> { - assert!(self.tz.is_some()); - - let tz = self - .tz - .as_ref() - .unwrap() - .parse::() - .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; - - let ndt = temporal_conversions::timestamp_ns_to_datetime(self.unix_nano); - Ok(tz.from_utc_datetime(&ndt)) - } - - pub fn tz_ref(&self) -> &Option { - &self.tz - } - - pub fn get_time_stamp(&self) -> i64 { - self.unix_nano - } -} - -impl Debug for TimestampValue { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("TimestampValue") - .field("unix_nano", &self.unix_nano) - .field("tz", &self.tz) - .field("str", &self.to_string()) - .finish() - } -} - -impl ToString for TimestampValue { - fn to_string(&self) -> String { - Utc.timestamp_nanos(self.unix_nano) - .format_with_items( - [ - Item::Numeric(Year, Zero), - Item::Literal("-"), - Item::Numeric(Month, Zero), - Item::Literal("-"), - Item::Numeric(Day, Zero), - Item::Literal("T"), - Item::Numeric(Hour, Zero), - Item::Literal(":"), - Item::Numeric(Minute, Zero), - Item::Literal(":"), - Item::Numeric(Second, Zero), - Item::Fixed(Fixed::Nanosecond3), - ] - .iter(), - ) - .to_string() - } -} - #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct Decimal128Value { n: i128, diff --git a/rust/cubesql/cubesql/src/sql/postgres/writer.rs b/rust/cubesql/cubesql/src/sql/postgres/writer.rs index b29a529eaa700..3d9c365d935b9 100644 --- a/rust/cubesql/cubesql/src/sql/postgres/writer.rs +++ b/rust/cubesql/cubesql/src/sql/postgres/writer.rs @@ -1,16 +1,8 @@ use crate::sql::{ - dataframe::{Decimal128Value, ListValue, TimestampValue}, + dataframe::{Decimal128Value, ListValue}, df_type_to_pg_tid, }; use bytes::{BufMut, BytesMut}; -use chrono::{ - format::{ - Fixed, Item, - Numeric::{Day, Hour, Minute, Month, Second, Year}, - Pad::Zero, - }, - prelude::*, -}; use datafusion::arrow::{ array::{ Array, BooleanArray, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, @@ -24,87 +16,7 @@ use pg_srv::{ PgTypeId, ProtocolError, ToProtocolValue, }; use postgres_types::{ToSql, Type}; -use std::{convert::TryFrom, io, io::Error}; - -// POSTGRES_EPOCH_JDATE -fn pg_base_date_epoch() -> NaiveDateTime { - NaiveDate::from_ymd_opt(2000, 1, 1) - .unwrap() - .and_hms_opt(0, 0, 0) - .unwrap() -} - -impl ToProtocolValue for TimestampValue { - fn to_text(&self, buf: &mut BytesMut) -> Result<(), ProtocolError> { - let ndt = match self.tz_ref() { - None => self.to_naive_datetime(), - Some(_) => self.to_fixed_datetime()?.naive_utc(), - }; - - // 2022-04-25 15:36:49.39705+00 - let as_str = ndt - .format_with_items( - [ - Item::Numeric(Year, Zero), - Item::Literal("-"), - Item::Numeric(Month, Zero), - Item::Literal("-"), - Item::Numeric(Day, Zero), - Item::Literal(" "), - Item::Numeric(Hour, Zero), - Item::Literal(":"), - Item::Numeric(Minute, Zero), - Item::Literal(":"), - Item::Numeric(Second, Zero), - Item::Fixed(Fixed::Nanosecond6), - ] - .iter(), - ) - .to_string(); - - match self.tz_ref() { - None => as_str.to_text(buf), - Some(_) => (as_str + "+00").to_text(buf), - } - } - - fn to_binary(&self, buf: &mut BytesMut) -> Result<(), ProtocolError> { - match self.tz_ref() { - None => { - let seconds = self - .to_naive_datetime() - .signed_duration_since(pg_base_date_epoch()) - .num_microseconds() - .ok_or(Error::new( - io::ErrorKind::Other, - "Unable to extract number of seconds from timestamp", - ))?; - - buf.put_i32(8_i32); - buf.put_i64(seconds) - } - Some(tz) => { - let seconds = self - .to_fixed_datetime()? - .naive_utc() - .signed_duration_since(pg_base_date_epoch()) - .num_microseconds() - .ok_or(Error::new( - io::ErrorKind::Other, - format!( - "Unable to extract number of seconds from timestamp with tz: {}", - tz - ), - ))?; - - buf.put_i32(8_i32); - buf.put_i64(seconds) - } - }; - - Ok(()) - } -} +use std::convert::TryFrom; /// https://github.com/postgres/postgres/blob/REL_14_4/src/backend/utils/adt/numeric.c#L1022 impl ToProtocolValue for Decimal128Value { @@ -324,13 +236,13 @@ impl Serialize for BatchWriter { #[cfg(test)] mod tests { use crate::sql::{ - dataframe::{Decimal128Value, ListValue, TimestampValue}, + dataframe::{Decimal128Value, ListValue}, shim::ConnectionError, writer::{BatchWriter, ToProtocolValue}, }; use bytes::BytesMut; use datafusion::arrow::array::{ArrayRef, Int64Builder}; - use pg_srv::{buffer, protocol::Format}; + use pg_srv::{buffer, protocol::Format, TimestampValue}; use std::{io::Cursor, sync::Arc}; fn assert_text_encode(value: T, expected: &[u8]) { diff --git a/rust/cubesql/pg-srv/Cargo.toml b/rust/cubesql/pg-srv/Cargo.toml index 4967ef2699d47..88fe2558bf50f 100644 --- a/rust/cubesql/pg-srv/Cargo.toml +++ b/rust/cubesql/pg-srv/Cargo.toml @@ -22,6 +22,7 @@ thiserror = "2" chrono = { version = "0.4", package = "chrono", default-features = false, features = [ "clock", ], optional = true } +chrono-tz = "0.8" [dev-dependencies] hex = "0.4.3" diff --git a/rust/cubesql/pg-srv/src/encoding.rs b/rust/cubesql/pg-srv/src/encoding.rs index 34dd67f3350ac..6de1b088cef91 100644 --- a/rust/cubesql/pg-srv/src/encoding.rs +++ b/rust/cubesql/pg-srv/src/encoding.rs @@ -4,10 +4,7 @@ use crate::{protocol::Format, ProtocolError}; use bytes::{BufMut, BytesMut}; #[cfg(feature = "with-chrono")] use chrono::{NaiveDate, NaiveDateTime}; -use std::{ - fmt::{Display, Formatter}, - io::{Error, ErrorKind}, -}; +use std::io::{Error, ErrorKind}; /// This trait explains how to encode values to the protocol format pub trait ToProtocolValue: std::fmt::Debug { @@ -149,146 +146,6 @@ impl ToProtocolValue for NaiveDate { } } -#[derive(Debug, Clone, Default)] -pub struct IntervalValue { - pub months: i32, - pub days: i32, - pub hours: i32, - pub mins: i32, - pub secs: i32, - pub usecs: i32, -} - -impl IntervalValue { - pub fn new(months: i32, days: i32, hours: i32, mins: i32, secs: i32, usecs: i32) -> Self { - Self { - months, - days, - hours, - mins, - secs, - usecs, - } - } - - pub fn is_zeroed(&self) -> bool { - self.months == 0 - && self.days == 0 - && self.hours == 0 - && self.mins == 0 - && self.secs == 0 - && self.usecs == 0 - } - - pub fn extract_years_month(&self) -> (i32, i32) { - let years = (self.months as f64 / 12_f64).floor(); - let month = self.months as f64 - (years * 12_f64); - - (years as i32, month as i32) - } - - pub fn as_iso_str(&self) -> String { - if self.is_zeroed() { - return "00:00:00".to_owned(); - } - - let mut res = "".to_owned(); - let (years, months) = self.extract_years_month(); - - if years != 0 { - if years == 1 { - res.push_str(&format!("{:#?} year ", years)) - } else { - res.push_str(&format!("{:#?} years ", years)) - } - } - - if months != 0 { - if months == 1 { - res.push_str(&format!("{:#?} mon ", months)); - } else { - res.push_str(&format!("{:#?} mons ", months)); - } - } - - if self.hours != 0 || self.mins != 0 || self.secs != 0 || self.usecs != 0 { - if self.hours < 0 || self.mins < 0 || self.secs < 0 || self.usecs < 0 { - res.push('-') - }; - - res.push_str(&format!( - "{:02}:{:02}:{:02}", - self.hours.abs(), - self.mins.abs(), - self.secs.abs() - )); - - if self.usecs != 0 { - res.push_str(&format!(".{:06}", self.usecs.abs())) - } - } - - res.trim().to_string() - } - - pub fn as_postgresql_str(&self) -> String { - let (years, months) = self.extract_years_month(); - - // We manually format sign for the case where self.secs == 0, self.usecs < 0. - // We follow assumptions about consistency of hours/mins/secs/usecs signs as in - // as_iso_str here. - format!( - "{} years {} mons {} days {} hours {} mins {}{}.{} secs", - years, - months, - self.days, - self.hours, - self.mins, - if self.secs < 0 || self.usecs < 0 { - "-" - } else { - "" - }, - self.secs.abs(), - if self.usecs == 0 { - "00".to_string() - } else { - format!("{:06}", self.usecs.abs()) - } - ) - } -} - -impl Display for IntervalValue { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - // TODO lift formatter higher, to as_postgresql_str - // https://github.com/postgres/postgres/blob/REL_14_4/src/interfaces/ecpg/pgtypeslib/interval.c#L763 - f.write_str(&self.as_postgresql_str()) - } -} - -impl ToProtocolValue for IntervalValue { - // https://github.com/postgres/postgres/blob/REL_14_4/src/backend/utils/adt/timestamp.c#L958 - fn to_text(&self, buf: &mut BytesMut) -> Result<(), ProtocolError> { - self.to_string().to_text(buf) - } - - // https://github.com/postgres/postgres/blob/REL_14_4/src/backend/utils/adt/timestamp.c#L1005 - fn to_binary(&self, buf: &mut BytesMut) -> Result<(), ProtocolError> { - let usecs = self.hours as i64 * 60 * 60 * 1_000_000 - + self.mins as i64 * 60 * 1_000_000 - + self.secs as i64 * 1_000_000 - + self.usecs as i64; - - buf.put_i32(16); - buf.put_i64(usecs); - buf.put_i32(self.days); - buf.put_i32(self.months); - - Ok(()) - } -} - #[cfg(test)] mod tests { use crate::*; @@ -306,6 +163,36 @@ mod tests { assert_text_encode(true, &[0, 0, 0, 1, 116]); assert_text_encode(false, &[0, 0, 0, 1, 102]); assert_text_encode("str".to_string(), &[0, 0, 0, 3, 115, 116, 114]); + assert_text_encode( + IntervalValue::new(0, 0, 0, 0, 0, 0), + &[ + 0, 0, 0, 46, 48, 32, 121, 101, 97, 114, 115, 32, 48, 32, 109, 111, 110, 115, 32, + 48, 32, 100, 97, 121, 115, 32, 48, 32, 104, 111, 117, 114, 115, 32, 48, 32, 109, + 105, 110, 115, 32, 48, 46, 48, 48, 32, 115, 101, 99, 115, + ], + ); + assert_text_encode( + IntervalValue::new(1, 2, 3, 4, 5, 6), + &[ + 0, 0, 0, 50, 48, 32, 121, 101, 97, 114, 115, 32, 49, 32, 109, 111, 110, 115, 32, + 50, 32, 100, 97, 121, 115, 32, 51, 32, 104, 111, 117, 114, 115, 32, 52, 32, 109, + 105, 110, 115, 32, 53, 46, 48, 48, 48, 48, 48, 54, 32, 115, 101, 99, 115, + ], + ); + assert_text_encode( + TimestampValue::new(0, None), + &[ + 0, 0, 0, 26, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, + 48, 48, 46, 48, 48, 48, 48, 48, 48, + ], + ); + assert_text_encode( + TimestampValue::new(1650890322000000000, None), + &[ + 0, 0, 0, 26, 50, 48, 50, 50, 45, 48, 52, 45, 50, 53, 32, 49, 50, 58, 51, 56, 58, + 52, 50, 46, 48, 48, 48, 48, 48, 48, + ], + ); Ok(()) } @@ -321,61 +208,23 @@ mod tests { fn test_binary_encoders() -> Result<(), ProtocolError> { assert_bind_encode(true, &[0, 0, 0, 1, 1]); assert_bind_encode(false, &[0, 0, 0, 1, 0]); - - Ok(()) - } - - #[test] - fn test_interval_to_iso() -> Result<(), ProtocolError> { - assert_eq!( - IntervalValue::new(1, 0, 0, 0, 0, 0).as_iso_str(), - "1 mon".to_string() - ); - assert_eq!( - IntervalValue::new(14, 0, 0, 0, 0, 0).as_iso_str(), - "1 year 2 mons".to_string() + assert_bind_encode( + IntervalValue::new(0, 0, 0, 0, 0, 0), + &[0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], ); - assert_eq!( - IntervalValue::new(0, 1, 1, 1, 1, 1).as_iso_str(), - "01:01:01.000001".to_string() + assert_bind_encode( + IntervalValue::new(1, 2, 3, 4, 5, 6), + &[ + 0, 0, 0, 16, 0, 0, 0, 2, 146, 85, 83, 70, 0, 0, 0, 2, 0, 0, 0, 1, + ], ); - assert_eq!( - IntervalValue::new(0, 0, -1, 1, 1, 1).as_iso_str(), - "-01:01:01.000001".to_string() + assert_bind_encode( + TimestampValue::new(0, None), + &[0, 0, 0, 8, 255, 252, 162, 254, 196, 200, 32, 0], ); - assert_eq!( - IntervalValue::new(0, 0, 0, 0, 0, 0).as_iso_str(), - "00:00:00".to_string() - ); - - Ok(()) - } - - #[test] - fn test_interval_to_postgres() -> Result<(), ProtocolError> { - assert_eq!( - IntervalValue::new(0, 0, 0, 0, 0, 0).to_string(), - "0 years 0 mons 0 days 0 hours 0 mins 0.00 secs".to_string() - ); - - assert_eq!( - IntervalValue::new(0, 0, 0, 0, 1, 23).to_string(), - "0 years 0 mons 0 days 0 hours 0 mins 1.000023 secs".to_string() - ); - - assert_eq!( - IntervalValue::new(0, 0, 0, 0, -1, -23).to_string(), - "0 years 0 mons 0 days 0 hours 0 mins -1.000023 secs".to_string() - ); - - assert_eq!( - IntervalValue::new(0, 0, 0, 0, -1, 0).to_string(), - "0 years 0 mons 0 days 0 hours 0 mins -1.00 secs".to_string() - ); - - assert_eq!( - IntervalValue::new(0, 0, -14, -5, -1, 0).to_string(), - "0 years 0 mons 0 days -14 hours -5 mins -1.00 secs".to_string() + assert_bind_encode( + TimestampValue::new(1650890322000000000, None), + &[0, 0, 0, 8, 0, 2, 128, 120, 159, 252, 216, 128], ); Ok(()) diff --git a/rust/cubesql/pg-srv/src/error.rs b/rust/cubesql/pg-srv/src/error.rs new file mode 100644 index 0000000000000..0b8506e2281d6 --- /dev/null +++ b/rust/cubesql/pg-srv/src/error.rs @@ -0,0 +1,68 @@ +//! Protocol error types for PostgreSQL wire protocol + +use crate::protocol; +use std::{backtrace::Backtrace, fmt::Formatter}; + +/// Protocol error abstract of handled/unhandled errors, it should not handle any kind of business logic errors +/// TODO: Migrate back to thiserror crate, when Rust will stabilize feature(error_generic_member_access) +#[derive(Debug)] +pub enum ProtocolError { + IO { + source: std::io::Error, + backtrace: Backtrace, + }, + ErrorResponse { + source: protocol::ErrorResponse, + backtrace: Backtrace, + }, +} + +impl std::fmt::Display for ProtocolError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + ProtocolError::IO { source, .. } => f.write_fmt(format_args!("IO error: {}", source)), + ProtocolError::ErrorResponse { source, .. } => { + f.write_fmt(format_args!("Error: {}", source.message)) + } + } + } +} + +impl From for ProtocolError { + fn from(source: std::io::Error) -> Self { + ProtocolError::IO { + source, + backtrace: Backtrace::capture(), + } + } +} + +impl From for ProtocolError { + fn from(source: protocol::ErrorResponse) -> Self { + ProtocolError::ErrorResponse { + source, + backtrace: Backtrace::capture(), + } + } +} + +impl ProtocolError { + /// Return Backtrace from any variant of Enum + pub fn backtrace(&self) -> Option<&Backtrace> { + match &self { + ProtocolError::IO { backtrace, .. } => Some(backtrace), + ProtocolError::ErrorResponse { backtrace, .. } => Some(backtrace), + } + } + + /// Converts Error to protocol::ErrorResponse which is usefully for writing response to the client + pub fn to_error_response(self) -> protocol::ErrorResponse { + match self { + ProtocolError::IO { source, .. } => protocol::ErrorResponse::error( + protocol::ErrorCode::InternalError, + source.to_string(), + ), + ProtocolError::ErrorResponse { source, .. } => source, + } + } +} diff --git a/rust/cubesql/pg-srv/src/lib.rs b/rust/cubesql/pg-srv/src/lib.rs index bc34adc813c20..8308e7bfa34d6 100644 --- a/rust/cubesql/pg-srv/src/lib.rs +++ b/rust/cubesql/pg-srv/src/lib.rs @@ -6,78 +6,16 @@ mod decoding; mod encoding; pub mod buffer; +pub mod error; pub mod extended; pub mod pg_type; pub mod protocol; +pub mod values; pub use buffer::*; pub use decoding::*; pub use encoding::*; +pub use error::*; pub use extended::*; pub use pg_type::*; - -use std::{backtrace::Backtrace, fmt::Formatter}; - -/// Protocol error abstract of handled/unhandled errors, it should not handle any kind of business logic errors -/// TODO: Migrate back to thiserror crate, when Rust will stabilize feature(error_generic_member_access) -#[derive(Debug)] -pub enum ProtocolError { - IO { - source: std::io::Error, - backtrace: Backtrace, - }, - ErrorResponse { - source: protocol::ErrorResponse, - backtrace: Backtrace, - }, -} - -impl std::fmt::Display for ProtocolError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - ProtocolError::IO { source, .. } => f.write_fmt(format_args!("IO error: {}", source)), - ProtocolError::ErrorResponse { source, .. } => { - f.write_fmt(format_args!("Error: {}", source.message)) - } - } - } -} - -impl From for ProtocolError { - fn from(source: std::io::Error) -> Self { - ProtocolError::IO { - source, - backtrace: Backtrace::capture(), - } - } -} - -impl From for ProtocolError { - fn from(source: protocol::ErrorResponse) -> Self { - ProtocolError::ErrorResponse { - source, - backtrace: Backtrace::capture(), - } - } -} - -impl ProtocolError { - /// Return Backtrace from any variant of Enum - pub fn backtrace(&self) -> Option<&Backtrace> { - match &self { - ProtocolError::IO { backtrace, .. } => Some(backtrace), - ProtocolError::ErrorResponse { backtrace, .. } => Some(backtrace), - } - } - - /// Converts Error to protocol::ErrorResponse which is usefully for writing response to the client - pub fn to_error_response(self) -> protocol::ErrorResponse { - match self { - ProtocolError::IO { source, .. } => protocol::ErrorResponse::error( - protocol::ErrorCode::InternalError, - source.to_string(), - ), - ProtocolError::ErrorResponse { source, .. } => source, - } - } -} +pub use values::*; diff --git a/rust/cubesql/pg-srv/src/values/interval.rs b/rust/cubesql/pg-srv/src/values/interval.rs new file mode 100644 index 0000000000000..7ae73736e8dc2 --- /dev/null +++ b/rust/cubesql/pg-srv/src/values/interval.rs @@ -0,0 +1,215 @@ +//! Interval value representation for PostgreSQL protocol + +use crate::{ProtocolError, ToProtocolValue}; +use bytes::{BufMut, BytesMut}; +use std::fmt::{Display, Formatter}; + +#[derive(Debug, Clone, Default)] +pub struct IntervalValue { + pub months: i32, + pub days: i32, + pub hours: i32, + pub mins: i32, + pub secs: i32, + pub usecs: i32, +} + +impl IntervalValue { + pub fn new(months: i32, days: i32, hours: i32, mins: i32, secs: i32, usecs: i32) -> Self { + Self { + months, + days, + hours, + mins, + secs, + usecs, + } + } + + pub fn is_zeroed(&self) -> bool { + self.months == 0 + && self.days == 0 + && self.hours == 0 + && self.mins == 0 + && self.secs == 0 + && self.usecs == 0 + } + + pub fn extract_years_month(&self) -> (i32, i32) { + let years = self.months / 12; + let month = self.months % 12; + + (years, month) + } + + pub fn as_iso_str(&self) -> String { + if self.is_zeroed() { + return "00:00:00".to_owned(); + } + + let mut res = "".to_owned(); + let (years, months) = self.extract_years_month(); + + if years != 0 { + if years == 1 { + res.push_str(&format!("{:#?} year ", years)) + } else { + res.push_str(&format!("{:#?} years ", years)) + } + } + + if months != 0 { + if months == 1 { + res.push_str(&format!("{:#?} mon ", months)); + } else { + res.push_str(&format!("{:#?} mons ", months)); + } + } + + if self.days != 0 { + if self.days == 1 { + res.push_str(&format!("{:#?} day ", self.days)); + } else { + res.push_str(&format!("{:#?} days ", self.days)); + } + } + + if self.hours != 0 || self.mins != 0 || self.secs != 0 || self.usecs != 0 { + if self.hours < 0 || self.mins < 0 || self.secs < 0 || self.usecs < 0 { + res.push('-') + }; + + res.push_str(&format!( + "{:02}:{:02}:{:02}", + self.hours.abs(), + self.mins.abs(), + self.secs.abs() + )); + + if self.usecs != 0 { + res.push_str(&format!(".{:06}", self.usecs.abs())) + } + } + + res.trim().to_string() + } + + pub fn as_postgresql_str(&self) -> String { + let (years, months) = self.extract_years_month(); + + // We manually format sign for the case where self.secs == 0, self.usecs < 0. + // We follow assumptions about consistency of hours/mins/secs/usecs signs as in + // as_iso_str here. + format!( + "{} years {} mons {} days {} hours {} mins {}{}.{} secs", + years, + months, + self.days, + self.hours, + self.mins, + if self.secs < 0 || self.usecs < 0 { + "-" + } else { + "" + }, + self.secs.abs(), + if self.usecs == 0 { + "00".to_string() + } else { + format!("{:06}", self.usecs.abs()) + } + ) + } +} + +impl Display for IntervalValue { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + // TODO lift formatter higher, to as_postgresql_str + // https://github.com/postgres/postgres/blob/REL_14_4/src/interfaces/ecpg/pgtypeslib/interval.c#L763 + f.write_str(&self.as_postgresql_str()) + } +} + +impl ToProtocolValue for IntervalValue { + // https://github.com/postgres/postgres/blob/REL_14_4/src/backend/utils/adt/timestamp.c#L958 + fn to_text(&self, buf: &mut BytesMut) -> Result<(), ProtocolError> { + self.to_string().to_text(buf) + } + + // https://github.com/postgres/postgres/blob/REL_14_4/src/backend/utils/adt/timestamp.c#L1005 + fn to_binary(&self, buf: &mut BytesMut) -> Result<(), ProtocolError> { + let usecs = self.hours as i64 * 60 * 60 * 1_000_000 + + self.mins as i64 * 60 * 1_000_000 + + self.secs as i64 * 1_000_000 + + self.usecs as i64; + + buf.put_i32(16); + buf.put_i64(usecs); + buf.put_i32(self.days); + buf.put_i32(self.months); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ProtocolError; + + #[test] + fn test_interval_to_iso() -> Result<(), ProtocolError> { + assert_eq!( + IntervalValue::new(1, 0, 0, 0, 0, 0).as_iso_str(), + "1 mon".to_string() + ); + assert_eq!( + IntervalValue::new(14, 0, 0, 0, 0, 0).as_iso_str(), + "1 year 2 mons".to_string() + ); + assert_eq!( + IntervalValue::new(0, 1, 1, 1, 1, 1).as_iso_str(), + "1 day 01:01:01.000001".to_string() + ); + assert_eq!( + IntervalValue::new(0, 0, -1, -1, -1, -1).as_iso_str(), + "-01:01:01.000001".to_string() + ); + assert_eq!( + IntervalValue::new(0, 0, 0, 0, 0, 0).as_iso_str(), + "00:00:00".to_string() + ); + + Ok(()) + } + + #[test] + fn test_interval_to_postgres() -> Result<(), ProtocolError> { + assert_eq!( + IntervalValue::new(0, 0, 0, 0, 0, 0).to_string(), + "0 years 0 mons 0 days 0 hours 0 mins 0.00 secs".to_string() + ); + + assert_eq!( + IntervalValue::new(0, 0, 0, 0, 1, 23).to_string(), + "0 years 0 mons 0 days 0 hours 0 mins 1.000023 secs".to_string() + ); + + assert_eq!( + IntervalValue::new(0, 0, 0, 0, -1, -23).to_string(), + "0 years 0 mons 0 days 0 hours 0 mins -1.000023 secs".to_string() + ); + + assert_eq!( + IntervalValue::new(0, 0, 0, 0, -1, 0).to_string(), + "0 years 0 mons 0 days 0 hours 0 mins -1.00 secs".to_string() + ); + + assert_eq!( + IntervalValue::new(0, 0, -14, -5, -1, 0).to_string(), + "0 years 0 mons 0 days -14 hours -5 mins -1.00 secs".to_string() + ); + + Ok(()) + } +} diff --git a/rust/cubesql/pg-srv/src/values/mod.rs b/rust/cubesql/pg-srv/src/values/mod.rs new file mode 100644 index 0000000000000..003313654c590 --- /dev/null +++ b/rust/cubesql/pg-srv/src/values/mod.rs @@ -0,0 +1,9 @@ +//! PostgreSQL value types for wire protocol + +pub mod interval; +#[cfg(feature = "with-chrono")] +pub mod timestamp; + +pub use interval::*; +#[cfg(feature = "with-chrono")] +pub use timestamp::*; diff --git a/rust/cubesql/pg-srv/src/values/timestamp.rs b/rust/cubesql/pg-srv/src/values/timestamp.rs new file mode 100644 index 0000000000000..c514d3ebbc298 --- /dev/null +++ b/rust/cubesql/pg-srv/src/values/timestamp.rs @@ -0,0 +1,173 @@ +//! Timestamp value representation for PostgreSQL protocol + +use crate::{ProtocolError, ToProtocolValue}; +use bytes::{BufMut, BytesMut}; +use chrono::{ + format::{ + Fixed, Item, + Numeric::{Day, Hour, Minute, Month, Second, Year}, + Pad::Zero, + }, + prelude::*, +}; +use chrono_tz::Tz; +use std::io::Error; +use std::{ + fmt::{self, Debug, Display, Formatter}, + io, +}; + +#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct TimestampValue { + unix_nano: i64, + tz: Option, +} + +impl TimestampValue { + pub fn new(mut unix_nano: i64, tz: Option) -> TimestampValue { + // This is a hack to workaround a mismatch between on-disk and in-memory representations. + // We use microsecond precision on-disk. + unix_nano -= unix_nano % 1000; + TimestampValue { unix_nano, tz } + } + + pub fn to_naive_datetime(&self) -> NaiveDateTime { + // Convert nanoseconds to seconds and nanoseconds + let secs = self.unix_nano / 1_000_000_000; + let nsecs = (self.unix_nano % 1_000_000_000) as u32; + DateTime::from_timestamp(secs, nsecs) + .unwrap_or_else(|| panic!("Invalid timestamp: {}", self.unix_nano)) + .naive_utc() + } + + pub fn to_fixed_datetime(&self) -> io::Result> { + assert!(self.tz.is_some()); + let tz = self + .tz + .as_ref() + .unwrap() + .parse::() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; + let ndt = self.to_naive_datetime(); + Ok(tz.from_utc_datetime(&ndt)) + } + + pub fn tz_ref(&self) -> &Option { + &self.tz + } + + pub fn get_time_stamp(&self) -> i64 { + self.unix_nano + } +} + +impl Debug for TimestampValue { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("TimestampValue") + .field("unix_nano", &self.unix_nano) + .field("tz", &self.tz) + .field("str", &self.to_string()) + .finish() + } +} + +impl Display for TimestampValue { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let formatted = Utc.timestamp_nanos(self.unix_nano).format_with_items( + [ + Item::Numeric(Year, Zero), + Item::Literal("-"), + Item::Numeric(Month, Zero), + Item::Literal("-"), + Item::Numeric(Day, Zero), + Item::Literal("T"), + Item::Numeric(Hour, Zero), + Item::Literal(":"), + Item::Numeric(Minute, Zero), + Item::Literal(":"), + Item::Numeric(Second, Zero), + Item::Fixed(Fixed::Nanosecond3), + ] + .iter(), + ); + write!(f, "{}", formatted) + } +} + +// POSTGRES_EPOCH_JDATE +fn pg_base_date_epoch() -> NaiveDateTime { + NaiveDate::from_ymd_opt(2000, 1, 1) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap() +} + +impl ToProtocolValue for TimestampValue { + fn to_text(&self, buf: &mut BytesMut) -> Result<(), ProtocolError> { + let ndt = match self.tz_ref() { + None => self.to_naive_datetime(), + Some(_) => self.to_fixed_datetime()?.naive_utc(), + }; + + // 2022-04-25 15:36:49.39705+00 + let as_str = ndt.format("%Y-%m-%d %H:%M:%S%.6f").to_string(); + + match self.tz_ref() { + None => as_str.to_text(buf), + Some(_) => (as_str + "+00").to_text(buf), + } + } + + fn to_binary(&self, buf: &mut BytesMut) -> Result<(), ProtocolError> { + let ndt = match self.tz_ref() { + None => self.to_naive_datetime(), + Some(_) => self.to_fixed_datetime()?.naive_utc(), + }; + + let n = ndt + .signed_duration_since(pg_base_date_epoch()) + .num_microseconds() + .ok_or(Error::new( + io::ErrorKind::Other, + "Unable to extract number of seconds from timestamp", + ))?; + + buf.put_i32(8); + buf.put_i64(n); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ProtocolError; + + #[test] + fn test_timestamp_creation() -> Result<(), ProtocolError> { + let ts = TimestampValue::new(1650890322000000000, None); + assert_eq!(ts.get_time_stamp(), 1650890322000000000); + assert_eq!(ts.tz_ref(), &None); + + let ts_with_tz = TimestampValue::new(1650890322000000000, Some("UTC".to_string())); + assert_eq!(ts_with_tz.get_time_stamp(), 1650890322000000000); + assert_eq!(ts_with_tz.tz_ref(), &Some("UTC".to_string())); + + Ok(()) + } + + #[test] + fn test_timestamp_to_string() { + let ts = TimestampValue::new(1650890322000000000, None); + // The string representation should match the expected format + assert!(!ts.to_string().is_empty()); + } + + #[test] + fn test_timestamp_precision_hack() { + // Test that nanoseconds are truncated to milliseconds + let ts = TimestampValue::new(1650890322123456789, None); + assert_eq!(ts.get_time_stamp(), 1650890322123456000); + } +} diff --git a/rust/cubesqlplanner/Cargo.lock b/rust/cubesqlplanner/Cargo.lock index 7a0f6784719b7..ed9fcfd2ef2c3 100644 --- a/rust/cubesqlplanner/Cargo.lock +++ b/rust/cubesqlplanner/Cargo.lock @@ -2152,6 +2152,7 @@ dependencies = [ "byteorder", "bytes", "chrono", + "chrono-tz 0.8.6", "log", "thiserror 2.0.11", "tokio",