diff --git a/Cargo.lock b/Cargo.lock index 2b09c3c39..6f9563250 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3666,6 +3666,7 @@ version = "0.6.0" dependencies = [ "anyhow", "async-trait", + "chrono", "datafusion", "expect-test", "futures", diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 1df552bbe..c2c8cd879 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -669,7 +669,7 @@ impl Not for Predicate { /// just adds a `NOT` operator. /// /// # Example - /// + /// ///```rust /// use std::ops::Bound::Unbounded; /// diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index b005f7ab4..f3f86044b 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -60,6 +60,9 @@ const INT_MIN: i32 = -2147483648; const LONG_MAX: i64 = 9223372036854775807; const LONG_MIN: i64 = -9223372036854775808; +const MICROS_PER_DAY: i64 = 24 * 60 * 60 * 1_000_000; +const NANOS_PER_MICRO: i64 = 1000; + /// Values present in iceberg type #[derive(Clone, Debug, PartialOrd, PartialEq, Hash, Eq)] pub enum PrimitiveLiteral { @@ -1196,14 +1199,82 @@ impl Datum { (PrimitiveLiteral::Int(val), _, PrimitiveType::Int) => Ok(Datum::int(*val)), (PrimitiveLiteral::Int(val), _, PrimitiveType::Date) => Ok(Datum::date(*val)), (PrimitiveLiteral::Int(val), _, PrimitiveType::Long) => Ok(Datum::long(*val)), + (PrimitiveLiteral::Int(val), PrimitiveType::Date, PrimitiveType::Timestamp) => { + Ok(Datum::timestamp_micros(*val as i64 * MICROS_PER_DAY)) + } + ( + PrimitiveLiteral::Int(val), + PrimitiveType::Date, + PrimitiveType::Timestamptz, + ) => Ok(Datum::timestamptz_micros(*val as i64 * MICROS_PER_DAY)), + ( + PrimitiveLiteral::Int(val), + PrimitiveType::Date, + PrimitiveType::TimestampNs, + ) => Ok(Datum::timestamp_nanos( + *val as i64 * MICROS_PER_DAY * NANOS_PER_MICRO, + )), + ( + PrimitiveLiteral::Int(val), + PrimitiveType::Date, + PrimitiveType::TimestamptzNs, + ) => Ok(Datum::timestamptz_nanos( + *val as i64 * MICROS_PER_DAY * NANOS_PER_MICRO, + )), (PrimitiveLiteral::Long(val), _, PrimitiveType::Int) => { Ok(Datum::i64_to_i32(*val)) } - (PrimitiveLiteral::Long(val), _, PrimitiveType::Timestamp) => { - Ok(Datum::timestamp_micros(*val)) - } - (PrimitiveLiteral::Long(val), _, PrimitiveType::Timestamptz) => { - Ok(Datum::timestamptz_micros(*val)) + (PrimitiveLiteral::Long(val), source_type, target_type) => { + match (source_type, target_type) { + (_, PrimitiveType::Long) => Ok(Datum::long(*val)), + ( + PrimitiveType::Long + | PrimitiveType::Timestamp + | PrimitiveType::Timestamptz, + PrimitiveType::Timestamp, + ) => Ok(Datum::timestamp_micros(*val)), + ( + PrimitiveType::Long + | PrimitiveType::Timestamp + | PrimitiveType::Timestamptz, + PrimitiveType::Timestamptz, + ) => Ok(Datum::timestamptz_micros(*val)), + ( + PrimitiveType::Long + | PrimitiveType::TimestampNs + | PrimitiveType::TimestamptzNs, + PrimitiveType::TimestampNs, + ) => Ok(Datum::timestamp_nanos(*val)), + ( + PrimitiveType::Long + | PrimitiveType::TimestampNs + | PrimitiveType::TimestamptzNs, + PrimitiveType::TimestamptzNs, + ) => Ok(Datum::timestamptz_nanos(*val)), + ( + PrimitiveType::TimestampNs | PrimitiveType::TimestamptzNs, + PrimitiveType::Timestamp, + ) => Ok(Datum::timestamp_micros(val / NANOS_PER_MICRO)), + ( + PrimitiveType::TimestampNs | PrimitiveType::TimestamptzNs, + PrimitiveType::Timestamptz, + ) => Ok(Datum::timestamptz_micros(val / NANOS_PER_MICRO)), + ( + PrimitiveType::Timestamp | PrimitiveType::Timestamptz, + PrimitiveType::TimestampNs, + ) => Ok(Datum::timestamp_nanos(val * NANOS_PER_MICRO)), + ( + PrimitiveType::Timestamp | PrimitiveType::Timestamptz, + PrimitiveType::TimestamptzNs, + ) => Ok(Datum::timestamptz_nanos(val * NANOS_PER_MICRO)), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Can't convert datum from {} type to {} type.", + self.r#type, target_primitive_type + ), + )), + } } // Let's wait with nano's until this clears up: https://github.com/apache/iceberg/pull/11775 (PrimitiveLiteral::Int128(val), _, PrimitiveType::Long) => { @@ -3943,4 +4014,253 @@ mod tests { assert_eq!(double_sorted, double_expected); } + + #[test] + fn test_datum_timestamp_nanos_convert_to_timestamp_micros() { + let datum = Datum::timestamp_nanos(12345000); + + let result = datum.to(&Primitive(PrimitiveType::Timestamp)).unwrap(); + + let expected = Datum::timestamp_micros(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamp_nanos_convert_to_timestamptz_micros() { + let datum = Datum::timestamp_nanos(12345000); + + let result = datum.to(&Primitive(PrimitiveType::Timestamptz)).unwrap(); + + let expected = Datum::timestamptz_micros(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamptz_nanos_convert_to_timestamp_micros() { + let datum = Datum::timestamptz_nanos(12345000); + + let result = datum.to(&Primitive(PrimitiveType::Timestamp)).unwrap(); + + let expected = Datum::timestamp_micros(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamptz_nanos_convert_to_timestamptz_micros() { + let datum = Datum::timestamptz_nanos(12345000); + + let result = datum.to(&Primitive(PrimitiveType::Timestamptz)).unwrap(); + + let expected = Datum::timestamptz_micros(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamp_micros_convert_to_timestamp_nanos() { + let datum = Datum::timestamp_micros(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestampNs)).unwrap(); + + let expected = Datum::timestamp_nanos(12345000); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamp_micros_convert_to_timestamptz_nanos() { + let datum = Datum::timestamp_micros(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestamptzNs)).unwrap(); + + let expected = Datum::timestamptz_nanos(12345000); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamptz_micros_convert_to_timestamp_nanos() { + let datum = Datum::timestamptz_micros(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestampNs)).unwrap(); + + let expected = Datum::timestamp_nanos(12345000); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamptz_micros_convert_to_timestamptz_nanos() { + let datum = Datum::timestamptz_micros(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestamptzNs)).unwrap(); + + let expected = Datum::timestamptz_nanos(12345000); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamp_nanos_convert_to_timestamp_nanos() { + let datum = Datum::timestamp_nanos(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestampNs)).unwrap(); + + let expected = Datum::timestamp_nanos(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamp_nanos_convert_to_timestamptz_nanos() { + let datum = Datum::timestamp_nanos(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestamptzNs)).unwrap(); + + let expected = Datum::timestamptz_nanos(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamptz_nanos_convert_to_timestamp_nanos() { + let datum = Datum::timestamptz_nanos(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestampNs)).unwrap(); + + let expected = Datum::timestamp_nanos(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamptz_nanos_convert_to_timestamptz_nanos() { + let datum = Datum::timestamptz_nanos(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestamptzNs)).unwrap(); + + let expected = Datum::timestamptz_nanos(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_long_convert_to_timestamp_nanos() { + let datum = Datum::long(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestampNs)).unwrap(); + + let expected = Datum::timestamp_nanos(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_long_convert_to_timestamptz_nanos() { + let datum = Datum::long(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestamptzNs)).unwrap(); + + let expected = Datum::timestamptz_nanos(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamp_nanos_to_micros() { + let datum = Datum::timestamp_nanos(12345678); + + let result = datum.to(&Primitive(PrimitiveType::Timestamp)).unwrap(); + + let expected = Datum::timestamp_micros(12345); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_timestamp_micros_to_nanos() { + let datum = Datum::timestamp_micros(12345); + + let result = datum.to(&Primitive(PrimitiveType::TimestampNs)).unwrap(); + + let expected = Datum::timestamp_nanos(12345000); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_date_convert_to_timestamp() { + let datum = Datum::date(1); // 1970-01-02 + + let result = datum.to(&Primitive(PrimitiveType::Timestamp)).unwrap(); + + let expected = Datum::timestamp_from_datetime( + DateTime::parse_from_rfc3339("1970-01-02T00:00:00Z") + .unwrap() + .naive_utc(), + ); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_date_convert_to_timestamptz() { + let datum = Datum::date(1); + + let result = datum.to(&Primitive(PrimitiveType::Timestamptz)).unwrap(); + + let expected = Datum::timestamptz_from_str("1970-01-02T00:00:00Z").unwrap(); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_date_convert_to_timestamp_nanos() { + let datum = Datum::date(1); + + let result = datum.to(&Primitive(PrimitiveType::TimestampNs)).unwrap(); + + let expected = Datum::timestamp_from_datetime( + DateTime::parse_from_rfc3339("1970-01-02T00:00:00Z") + .unwrap() + .naive_utc(), + ) + .to(&Primitive(PrimitiveType::TimestampNs)) + .unwrap(); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_date_convert_to_timestamptz_nanos() { + let datum = Datum::date(1); + + let result = datum.to(&Primitive(PrimitiveType::TimestamptzNs)).unwrap(); + + let expected = Datum::timestamptz_from_datetime( + DateTime::parse_from_rfc3339("1970-01-02T00:00:00Z").unwrap(), + ) + .to(&Primitive(PrimitiveType::TimestamptzNs)) + .unwrap(); + + assert_eq!(result, expected); + } + + #[test] + fn test_datum_date_negative_convert_to_timestamp() { + let datum = Datum::date(-1); + + let result = datum.to(&Primitive(PrimitiveType::Timestamp)).unwrap(); + + let expected = Datum::timestamp_from_datetime( + DateTime::parse_from_rfc3339("1969-12-31T00:00:00Z") + .unwrap() + .naive_utc(), + ); + + assert_eq!(result, expected); + } } diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 6954950b0..f015f30b3 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -31,6 +31,7 @@ repository = { workspace = true } [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +chrono = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } diff --git a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs index 10b92d54b..1dccc4b45 100644 --- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs +++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs @@ -17,6 +17,7 @@ use std::vec; +use chrono::{FixedOffset, TimeZone as _}; use datafusion::arrow::datatypes::DataType; use datafusion::logical_expr::{Expr, Operator}; use datafusion::scalar::ScalarValue; @@ -201,6 +202,9 @@ fn reverse_predicate_operator(op: PredicateOperator) -> PredicateOperator { } const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000; +const MICROS_PER_SECOND: i64 = 1_000_000; +const MICROS_PER_MILLISECOND: i64 = 1_000; + /// Convert a scalar value to an iceberg datum. fn scalar_value_to_datum(value: &ScalarValue) -> Option { match value { @@ -214,18 +218,46 @@ fn scalar_value_to_datum(value: &ScalarValue) -> Option { ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())), ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)), ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)), + ScalarValue::TimestampSecond(Some(v), tz) => { + interpret_timestamptz_micros(v * MICROS_PER_SECOND, tz.as_deref()) + } + ScalarValue::TimestampMillisecond(Some(v), tz) => { + interpret_timestamptz_micros(v * MICROS_PER_MILLISECOND, tz.as_deref()) + } + ScalarValue::TimestampMicrosecond(Some(v), tz) => { + interpret_timestamptz_micros(*v, tz.as_deref()) + } + ScalarValue::TimestampNanosecond(Some(v), Some(_)) => Some(Datum::timestamptz_nanos(*v)), + ScalarValue::TimestampNanosecond(Some(v), None) => Some(Datum::timestamp_nanos(*v)), _ => None, } } +fn interpret_timestamptz_micros(micros: i64, tz: Option>) -> Option { + let offset = tz + .as_ref() + .and_then(|s| s.as_ref().parse::().ok()); + + match offset { + Some(off) => off + .timestamp_micros(micros) + .single() + .map(Datum::timestamptz_from_datetime), + None => Some(Datum::timestamp_micros(micros)), + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; + use chrono::DateTime; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::common::DFSchema; use datafusion::logical_expr::utils::split_conjunction; + use datafusion::logical_expr::{Operator, binary_expr, col, lit}; use datafusion::prelude::{Expr, SessionContext}; + use datafusion::scalar::ScalarValue; use iceberg::expr::{Predicate, Reference}; use iceberg::spec::Datum; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; @@ -245,6 +277,42 @@ mod tests { Field::new("ts", DataType::Timestamp(TimeUnit::Second, None), true).with_metadata( HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), ), + Field::new( + "ts_ms", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + Field::new( + "ts_us", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + Field::new( + "ts_ns", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + Field::new( + "ts_tz", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), ]); DFSchema::try_from_qualified_schema("my_table", &arrow_schema).unwrap() } @@ -429,4 +497,76 @@ mod tests { let predicate = convert_to_iceberg_predicate(sql); assert_eq!(predicate, None); } + + #[test] + fn test_predicate_conversion_with_timestamp() { + // 2023-01-01 12:00:00 UTC + let timestamp_scalar = ScalarValue::TimestampSecond(Some(1672574400), None); + let dt = DateTime::parse_from_rfc3339("2023-01-01T12:00:00+00:00").unwrap(); + + let expr = binary_expr(col("ts"), Operator::Gt, lit(timestamp_scalar)); + let exprs: Vec = split_conjunction(&expr).into_iter().cloned().collect(); + let predicate = convert_filters_to_predicate(&exprs[..]).unwrap(); + let expected_predicate = + Reference::new("ts").greater_than(Datum::timestamp_from_datetime(dt.naive_utc())); + assert_eq!(predicate, expected_predicate); + } + + #[test] + fn test_predicate_conversion_with_timestamp_milliseconds() { + // 2023-01-01 12:00:00 UTC + let timestamp_scalar = ScalarValue::TimestampMillisecond(Some(1672574400000), None); + let dt = DateTime::parse_from_rfc3339("2023-01-01T12:00:00+00:00").unwrap(); + + let expr = binary_expr(col("ts_ms"), Operator::LtEq, lit(timestamp_scalar)); + let exprs: Vec = split_conjunction(&expr).into_iter().cloned().collect(); + let predicate = convert_filters_to_predicate(&exprs[..]).unwrap(); + let expected_predicate = Reference::new("ts_ms") + .less_than_or_equal_to(Datum::timestamp_from_datetime(dt.naive_utc())); + assert_eq!(predicate, expected_predicate); + } + + #[test] + fn test_predicate_conversion_with_timestamp_nanoseconds() { + // 2023-01-01 12:00:00 UTC + let timestamp_scalar = ScalarValue::TimestampNanosecond(Some(1672574400000000000), None); + let dt = DateTime::parse_from_rfc3339("2023-01-01T12:00:00+00:00").unwrap(); + + let expr = binary_expr(col("ts_ns"), Operator::NotEq, lit(timestamp_scalar)); + let exprs: Vec = split_conjunction(&expr).into_iter().cloned().collect(); + let predicate = convert_filters_to_predicate(&exprs[..]).unwrap(); + let expected_predicate = Reference::new("ts_ns") + .not_equal_to(Datum::timestamp_nanos(dt.timestamp_nanos_opt().unwrap())); + assert_eq!(predicate, expected_predicate); + } + + #[test] + fn test_predicate_conversion_with_timestamp_with_timezone() { + // 2023-01-01 13:00:00 +01:00 + let timestamp_scalar = + ScalarValue::TimestampSecond(Some(1672574400), Some("+01:00".into())); + let dt = DateTime::parse_from_rfc3339("2023-01-01T13:00:00+01:00").unwrap(); + + let expr = binary_expr(col("ts_tz"), Operator::GtEq, lit(timestamp_scalar)); + let exprs: Vec = split_conjunction(&expr).into_iter().cloned().collect(); + let predicate = convert_filters_to_predicate(&exprs[..]).unwrap(); + let expected_predicate = + Reference::new("ts_tz").greater_than_or_equal_to(Datum::timestamptz_from_datetime(dt)); + assert_eq!(predicate, expected_predicate); + } + + #[test] + fn test_predicate_conversion_with_timestamp_nanoseconds_with_timezone() { + // 2023-01-01 13:00:00 +01:00 + let timestamp_scalar = + ScalarValue::TimestampNanosecond(Some(1672574400000000000), Some("+01:00".into())); + let dt = DateTime::parse_from_rfc3339("2023-01-01T13:00:00+01:00").unwrap(); + + let expr = binary_expr(col("ts_tz"), Operator::GtEq, lit(timestamp_scalar)); + let exprs: Vec = split_conjunction(&expr).into_iter().cloned().collect(); + let predicate = convert_filters_to_predicate(&exprs[..]).unwrap(); + let expected_predicate = Reference::new("ts_tz") + .greater_than_or_equal_to(Datum::timestamptz_nanos(dt.timestamp_nanos_opt().unwrap())); + assert_eq!(predicate, expected_predicate); + } }