Skip to content

Commit 7a771f2

Browse files
committed
chore(cubestore): Upgrade DF: Implement DATE_ADD and DATE_SUB by invoking DF arithmetic operator behavior
1 parent eb95f89 commit 7a771f2

File tree

1 file changed

+79
-95
lines changed
  • rust/cubestore/cubestore/src/queryplanner

1 file changed

+79
-95
lines changed

rust/cubestore/cubestore/src/queryplanner/udfs.rs

Lines changed: 79 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ pub fn scalar_udf_by_kind(k: CubeScalarUDFKind) -> Arc<ScalarUDF> {
4242
CubeScalarUDFKind::UnixTimestamp => {
4343
Arc::new(ScalarUDF::new_from_impl(UnixTimestamp::new()))
4444
}
45-
CubeScalarUDFKind::DateAdd => todo!(), // Box::new(DateAddSub { is_add: true }),
46-
CubeScalarUDFKind::DateSub => todo!(), // Box::new(DateAddSub { is_add: false }),
45+
CubeScalarUDFKind::DateAdd => Arc::new(ScalarUDF::new_from_impl(DateAddSub::new_add())),
46+
CubeScalarUDFKind::DateSub => Arc::new(ScalarUDF::new_from_impl(DateAddSub::new_sub())),
4747
CubeScalarUDFKind::DateBin => Arc::new(ScalarUDF::new_from_impl(DateBin::new())),
4848
}
4949
}
@@ -52,6 +52,8 @@ pub fn registerable_scalar_udfs() -> Vec<ScalarUDF> {
5252
vec![
5353
HllCardinality::descriptor(),
5454
ScalarUDF::new_from_impl(DateBin::new()),
55+
ScalarUDF::new_from_impl(DateAddSub::new_add()),
56+
ScalarUDF::new_from_impl(DateAddSub::new_sub()),
5557
]
5658
}
5759

@@ -549,99 +551,81 @@ impl ScalarUDFImpl for DateBin {
549551
}
550552
}
551553

552-
// struct DateAddSub {
553-
// is_add: bool,
554-
// }
555-
//
556-
// impl DateAddSub {
557-
// fn signature() -> Signature {
558-
// Signature::OneOf(vec![
559-
// Signature::Exact(vec![
560-
// DataType::Timestamp(TimeUnit::Nanosecond, None),
561-
// DataType::Interval(IntervalUnit::YearMonth),
562-
// ]),
563-
// Signature::Exact(vec![
564-
// DataType::Timestamp(TimeUnit::Nanosecond, None),
565-
// DataType::Interval(IntervalUnit::DayTime),
566-
// ]),
567-
// ])
568-
// }
569-
// }
570-
//
571-
// impl DateAddSub {
572-
// fn name_static(&self) -> &'static str {
573-
// match self.is_add {
574-
// true => "DATE_ADD",
575-
// false => "DATE_SUB",
576-
// }
577-
// }
578-
// }
579-
//
580-
// impl CubeScalarUDF for DateAddSub {
581-
// fn kind(&self) -> CubeScalarUDFKind {
582-
// match self.is_add {
583-
// true => CubeScalarUDFKind::DateAdd,
584-
// false => CubeScalarUDFKind::DateSub,
585-
// }
586-
// }
587-
//
588-
// fn name(&self) -> &str {
589-
// self.name_static()
590-
// }
591-
//
592-
// fn descriptor(&self) -> ScalarUDF {
593-
// let name = self.name_static();
594-
// let is_add = self.is_add;
595-
// return ScalarUDF {
596-
// name: self.name().to_string(),
597-
// signature: Self::signature(),
598-
// return_type: Arc::new(|_| {
599-
// Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)))
600-
// }),
601-
// fun: Arc::new(move |inputs| {
602-
// assert_eq!(inputs.len(), 2);
603-
// let interval = match &inputs[1] {
604-
// ColumnarValue::Scalar(i) => i.clone(),
605-
// _ => {
606-
// // We leave this case out for simplicity.
607-
// // CubeStore does not allow intervals inside tables, so this is super rare.
608-
// return Err(DataFusionError::Execution(format!(
609-
// "Only scalar intervals are supported in `{}`",
610-
// name
611-
// )));
612-
// }
613-
// };
614-
// match &inputs[0] {
615-
// ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None)) => Ok(
616-
// ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None)),
617-
// ),
618-
// ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(t))) => {
619-
// let r = date_addsub_scalar(Utc.timestamp_nanos(*t), interval, is_add)?;
620-
// Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
621-
// Some(r.timestamp_nanos()),
622-
// )))
623-
// }
624-
// ColumnarValue::Array(t) if t.as_any().is::<TimestampNanosecondArray>() => {
625-
// let t = t
626-
// .as_any()
627-
// .downcast_ref::<TimestampNanosecondArray>()
628-
// .unwrap();
629-
// Ok(ColumnarValue::Array(Arc::new(date_addsub_array(
630-
// &t, interval, is_add,
631-
// )?)))
632-
// }
633-
// _ => {
634-
// return Err(DataFusionError::Execution(format!(
635-
// "First argument of `{}` must be a non-null timestamp",
636-
// name
637-
// )))
638-
// }
639-
// }
640-
// }),
641-
// };
642-
// }
643-
// }
644-
//
554+
#[derive(Debug)]
555+
struct DateAddSub {
556+
is_add: bool,
557+
signature: Signature,
558+
}
559+
560+
impl DateAddSub {
561+
pub fn new(is_add: bool) -> DateAddSub {
562+
DateAddSub {
563+
is_add,
564+
signature: Signature {
565+
type_signature: TypeSignature::OneOf(vec![
566+
TypeSignature::Exact(vec![
567+
DataType::Timestamp(TimeUnit::Nanosecond, None),
568+
DataType::Interval(IntervalUnit::YearMonth),
569+
]),
570+
TypeSignature::Exact(vec![
571+
DataType::Timestamp(TimeUnit::Nanosecond, None),
572+
DataType::Interval(IntervalUnit::DayTime),
573+
]),
574+
TypeSignature::Exact(vec![
575+
DataType::Timestamp(TimeUnit::Nanosecond, None),
576+
DataType::Interval(IntervalUnit::MonthDayNano),
577+
]),
578+
]),
579+
volatility: Volatility::Immutable,
580+
},
581+
}
582+
}
583+
pub fn new_add() -> DateAddSub {
584+
Self::new(true)
585+
}
586+
pub fn new_sub() -> DateAddSub {
587+
Self::new(false)
588+
}
589+
}
590+
591+
impl DateAddSub {
592+
fn name_static(&self) -> &'static str {
593+
match self.is_add {
594+
true => "DATE_ADD",
595+
false => "DATE_SUB",
596+
}
597+
}
598+
}
599+
600+
impl ScalarUDFImpl for DateAddSub {
601+
fn as_any(&self) -> &dyn Any {
602+
self
603+
}
604+
fn name(&self) -> &str {
605+
self.name_static()
606+
}
607+
fn signature(&self) -> &Signature {
608+
&self.signature
609+
}
610+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
611+
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
612+
}
613+
fn invoke(&self, inputs: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
614+
use datafusion::arrow::compute::kernels::numeric::add;
615+
use datafusion::arrow::compute::kernels::numeric::sub;
616+
assert_eq!(inputs.len(), 2);
617+
// DF 42.2.0 already has date + interval or date - interval. Note that `add` and `sub` are
618+
// public (defined in arrow_arith), while timestamp-specific functions they invoke,
619+
// `arithmetic_op` and then `timestamp_op::<TimestampNanosecondType>`, are not.
620+
//
621+
// TODO upgrade DF: Double-check that the TypeSignature is actually enforced.
622+
datafusion::physical_expr_common::datum::apply(
623+
&inputs[0],
624+
&inputs[1],
625+
if self.is_add { add } else { sub },
626+
)
627+
}
628+
}
645629

646630
#[derive(Debug)]
647631
struct HllCardinality {

0 commit comments

Comments
 (0)