Skip to content

Commit a247014

Browse files
committed
chore(cubestore): Upgrade DF: upgrade HllCardinality ScalarUDF implementation
1 parent b42374c commit a247014

File tree

1 file changed

+67
-53
lines changed
  • rust/cubestore/cubestore/src/queryplanner

1 file changed

+67
-53
lines changed

rust/cubestore/cubestore/src/queryplanner/udfs.rs

Lines changed: 67 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ use datafusion::arrow::array::{
66
Array, ArrayRef, BinaryArray, TimestampNanosecondArray, UInt64Builder,
77
};
88
use datafusion::arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
9+
use tokio_tungstenite::tungstenite::protocol::frame::coding::Data;
910
use std::any::Any;
1011
// use datafusion::cube_ext::datetime::{date_addsub_array, date_addsub_scalar};
1112
use datafusion::error::DataFusionError;
1213
use datafusion::logical_expr::function::AccumulatorArgs;
1314
use datafusion::logical_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
1415
use datafusion::logical_expr::{
15-
AggregateUDF, AggregateUDFImpl, Expr, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
16+
AggregateUDF, AggregateUDFImpl, Expr, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility
1617
};
1718
use datafusion::physical_plan::{Accumulator, ColumnarValue};
1819
use datafusion::scalar::ScalarValue;
@@ -32,15 +33,9 @@ pub enum CubeScalarUDFKind {
3233
DateBin,
3334
}
3435

35-
pub trait CubeScalarUDF {
36-
fn kind(&self) -> CubeScalarUDFKind;
37-
fn name(&self) -> &str;
38-
fn descriptor(&self) -> ScalarUDF;
39-
}
40-
4136
pub fn scalar_udf_by_kind(k: CubeScalarUDFKind) -> Arc<ScalarUDF> {
4237
match k {
43-
CubeScalarUDFKind::HllCardinality => todo!(), // Box::new(HllCardinality {}),
38+
CubeScalarUDFKind::HllCardinality => Arc::new(HllCardinality::descriptor()),
4439
// CubeScalarUDFKind::Coalesce => Box::new(Coalesce {}),
4540
// CubeScalarUDFKind::Now => Box::new(Now {}),
4641
CubeScalarUDFKind::UnixTimestamp => {
@@ -557,47 +552,66 @@ impl ScalarUDFImpl for UnixTimestamp {
557552
// }
558553
// }
559554
//
560-
// struct HllCardinality {}
561-
// impl CubeScalarUDF for HllCardinality {
562-
// fn kind(&self) -> CubeScalarUDFKind {
563-
// return CubeScalarUDFKind::HllCardinality;
564-
// }
565-
//
566-
// fn name(&self) -> &str {
567-
// return "CARDINALITY";
568-
// }
569-
//
570-
// fn descriptor(&self) -> ScalarUDF {
571-
// return ScalarUDF {
572-
// name: self.name().to_string(),
573-
// signature: Signature::Exact(vec![DataType::Binary]),
574-
// return_type: Arc::new(|_| Ok(Arc::new(DataType::UInt64))),
575-
// fun: Arc::new(|a| {
576-
// assert_eq!(a.len(), 1);
577-
// let sketches = a[0].clone().into_array(1);
578-
// let sketches = sketches
579-
// .as_any()
580-
// .downcast_ref::<BinaryArray>()
581-
// .expect("expected binary data");
582-
//
583-
// let mut r = UInt64Builder::new(sketches.len());
584-
// for s in sketches {
585-
// match s {
586-
// None => r.append_null()?,
587-
// Some(d) => {
588-
// if d.len() == 0 {
589-
// r.append_value(0)?
590-
// } else {
591-
// r.append_value(read_sketch(d)?.cardinality())?
592-
// }
593-
// }
594-
// }
595-
// }
596-
// return Ok(ColumnarValue::Array(Arc::new(r.finish())));
597-
// }),
598-
// };
599-
// }
600-
// }
555+
556+
#[derive(Debug)]
557+
struct HllCardinality {
558+
signature: Signature,
559+
}
560+
impl HllCardinality {
561+
pub fn new() -> HllCardinality {
562+
// TODO upgrade DF: Is it Volatile or Immutable?
563+
let signature = Signature::new(TypeSignature::Exact(vec![DataType::Binary]), Volatility::Volatile);
564+
565+
HllCardinality{
566+
signature
567+
}
568+
}
569+
fn descriptor() -> ScalarUDF {
570+
return ScalarUDF::new_from_impl(HllCardinality::new());
571+
}
572+
}
573+
574+
impl ScalarUDFImpl for HllCardinality {
575+
fn as_any(&self) -> &dyn Any {
576+
self
577+
}
578+
fn name(&self) -> &str {
579+
"CARDINALITY"
580+
}
581+
fn signature(&self) -> &Signature {
582+
&self.signature
583+
}
584+
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
585+
Ok(DataType::UInt64)
586+
}
587+
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
588+
assert_eq!(args.len(), 1);
589+
let sketches = args[0].clone().into_array(1)?;
590+
let sketches = sketches
591+
.as_any()
592+
.downcast_ref::<BinaryArray>()
593+
.expect("expected binary data");
594+
595+
let mut r = UInt64Builder::with_capacity(sketches.len());
596+
for s in sketches {
597+
match s {
598+
None => r.append_null(),
599+
Some(d) => {
600+
if d.len() == 0 {
601+
r.append_value(0)
602+
} else {
603+
r.append_value(read_sketch(d)?.cardinality())
604+
}
605+
}
606+
}
607+
}
608+
return Ok(ColumnarValue::Array(Arc::new(r.finish())));
609+
}
610+
fn aliases(&self) -> &[String] {
611+
&[]
612+
}
613+
}
614+
601615
//
602616
// #[derive(Debug)]
603617
// struct HllMergeUDF {}
@@ -712,7 +726,7 @@ impl ScalarUDFImpl for UnixTimestamp {
712726
// return Ok(());
713727
// }
714728
// }
715-
//
716-
// pub fn read_sketch(data: &[u8]) -> Result<Hll, DataFusionError> {
717-
// return Hll::read(&data).map_err(|e| DataFusionError::Execution(e.message));
718-
// }
729+
730+
pub fn read_sketch(data: &[u8]) -> Result<Hll, DataFusionError> {
731+
return Hll::read(&data).map_err(|e| DataFusionError::Execution(e.message));
732+
}

0 commit comments

Comments
 (0)