Skip to content

Commit 94a0288

Browse files
committed
chore(cubestore): Upgrade DF: remove coalesce UDF, make coalesce test fixes, handle DataType::Null in batches_to_dataframe
1 parent 0b4ad1b commit 94a0288

File tree

5 files changed

+20
-212
lines changed

5 files changed

+20
-212
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1729,12 +1729,11 @@ async fn coalesce(service: Box<dyn SqlClient>) {
17291729
.await
17301730
.unwrap();
17311731
assert_eq!(to_rows(&r), vec![vec![TableValue::Int(1)]]);
1732-
// TODO: the type should be 'int' here. Hopefully not a problem in practice.
17331732
let r = service
17341733
.exec_query("SELECT coalesce(NULL, 2, 3)")
17351734
.await
17361735
.unwrap();
1737-
assert_eq!(to_rows(&r), vec![vec![TableValue::String("2".to_string())]]);
1736+
assert_eq!(to_rows(&r), vec![vec![TableValue::Int(2)]]);
17381737
let r = service
17391738
.exec_query("SELECT coalesce(NULL, NULL, NULL)")
17401739
.await
@@ -1753,20 +1752,11 @@ async fn coalesce(service: Box<dyn SqlClient>) {
17531752
vec![TableValue::Null],
17541753
]
17551754
);
1756-
// Coerces all args to text.
1757-
let r = service
1755+
// Type mismatch
1756+
service
17581757
.exec_query("SELECT coalesce(n, v, s) FROM s.Data ORDER BY 1")
17591758
.await
1760-
.unwrap();
1761-
assert_eq!(
1762-
to_rows(&r),
1763-
vec![
1764-
vec![TableValue::String("1".to_string())],
1765-
vec![TableValue::String("3".to_string())],
1766-
vec![TableValue::String("baz".to_string())],
1767-
vec![TableValue::Null],
1768-
]
1769-
);
1759+
.unwrap_err();
17701760

17711761
let r = service
17721762
.exec_query("SELECT coalesce(n+1,v+1,0) FROM s.Data ORDER BY 1")

rust/cubestore/cubestore/src/queryplanner/coalesce.rs

Lines changed: 0 additions & 152 deletions
This file was deleted.

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ mod topk;
1515
pub mod trace_data_loaded;
1616
pub use topk::MIN_TOPK_STREAM_ROWS;
1717
use udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_scalar_udfs};
18-
mod coalesce;
1918
mod filter_by_key_range;
2019
mod flatten_union;
2120
pub mod info_schema;
@@ -488,7 +487,6 @@ impl ContextProvider for MetaStoreSchemaProvider {
488487
// TODO upgrade DF
489488
let kind = match name {
490489
"cardinality" | "CARDINALITY" => CubeScalarUDFKind::HllCardinality,
491-
// "coalesce" | "COALESCE" => CubeScalarUDFKind::Coalesce,
492490
"unix_timestamp" | "UNIX_TIMESTAMP" => CubeScalarUDFKind::UnixTimestamp,
493491
"date_add" | "DATE_ADD" => CubeScalarUDFKind::DateAdd,
494492
"date_sub" | "DATE_SUB" => CubeScalarUDFKind::DateSub,

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use async_trait::async_trait;
2424
use core::fmt;
2525
use datafusion::arrow::array::{
2626
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Float64Array,
27-
Int16Array, Int32Array, Int64Array, MutableArrayData, StringArray, TimestampMicrosecondArray,
28-
TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array,
27+
Int16Array, Int32Array, Int64Array, MutableArrayData, NullArray, StringArray,
28+
TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array,
2929
};
3030
use datafusion::arrow::compute::SortOptions;
3131
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
@@ -92,7 +92,10 @@ use std::sync::Arc;
9292
use std::time::SystemTime;
9393
use tracing::{instrument, Instrument};
9494

95-
use super::udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_arc_aggregate_udfs, registerable_arc_scalar_udfs, CubeAggregateUDFKind};
95+
use super::udfs::{
96+
aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_arc_aggregate_udfs,
97+
registerable_arc_scalar_udfs, CubeAggregateUDFKind,
98+
};
9699

97100
#[automock]
98101
#[async_trait]
@@ -1926,6 +1929,13 @@ pub fn batches_to_dataframe(batches: Vec<RecordBatch>) -> Result<DataFrame, Cube
19261929
});
19271930
}
19281931
}
1932+
DataType::Null => {
1933+
// Force the cast, just because.
1934+
let _ = array.as_any().downcast_ref::<NullArray>().unwrap();
1935+
for i in 0..num_rows {
1936+
rows[i].push(TableValue::Null);
1937+
}
1938+
}
19291939
x => panic!("Unsupported data type: {:?}", x),
19301940
}
19311941
}
@@ -1962,6 +1972,8 @@ pub fn arrow_to_column_type(arrow_type: DataType) -> Result<ColumnType, CubeErro
19621972
| DataType::UInt16
19631973
| DataType::UInt32
19641974
| DataType::UInt64 => Ok(ColumnType::Int),
1975+
// This fn is only used for converting to DataFrame, and cubesql does this (as if that's a reason)
1976+
DataType::Null => Ok(ColumnType::String),
19651977
x => Err(CubeError::internal(format!("unsupported type {:?}", x))),
19661978
}
19671979
}

rust/cubestore/cubestore/src/queryplanner/udfs.rs

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::queryplanner::coalesce::SUPPORTED_COALESCE_TYPES;
21
use crate::queryplanner::hll::{Hll, HllUnion};
32
use crate::CubeError;
43
use chrono::{Datelike, Duration, Months, NaiveDateTime, TimeZone, Utc};
@@ -26,7 +25,6 @@ use std::sync::Arc;
2625
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
2726
pub enum CubeScalarUDFKind {
2827
HllCardinality, // cardinality(), accepting the HyperLogLog sketches.
29-
// Coalesce,
3028
UnixTimestamp,
3129
DateAdd,
3230
DateSub,
@@ -36,7 +34,6 @@ pub enum CubeScalarUDFKind {
3634
pub fn scalar_udf_by_kind(k: CubeScalarUDFKind) -> Arc<ScalarUDF> {
3735
match k {
3836
CubeScalarUDFKind::HllCardinality => Arc::new(HllCardinality::descriptor()),
39-
// CubeScalarUDFKind::Coalesce => Box::new(Coalesce {}),
4037
CubeScalarUDFKind::UnixTimestamp => {
4138
Arc::new(ScalarUDF::new_from_impl(UnixTimestamp::new()))
4239
}
@@ -67,9 +64,6 @@ pub fn scalar_kind_by_name(n: &str) -> Option<CubeScalarUDFKind> {
6764
if n == "CARDINALITY" {
6865
return Some(CubeScalarUDFKind::HllCardinality);
6966
}
70-
// if n == "COALESCE" {
71-
// return Some(CubeScalarUDFKind::Coalesce);
72-
// }
7367
if n == "UNIX_TIMESTAMP" {
7468
return Some(CubeScalarUDFKind::UnixTimestamp);
7569
}
@@ -83,7 +77,7 @@ pub fn scalar_kind_by_name(n: &str) -> Option<CubeScalarUDFKind> {
8377
return Some(CubeScalarUDFKind::DateBin);
8478
}
8579
// TODO upgrade DF: Remove this (once we are no longer in flux about naming casing of UDFs and UDAFs).
86-
if ["CARDINALITY", /* "COALESCE", */ "UNIX_TIMESTAMP", "DATE_ADD", "DATE_SUB", "DATE_BIN"].contains(&(&n.to_ascii_uppercase() as &str)) {
80+
if ["CARDINALITY", "UNIX_TIMESTAMP", "DATE_ADD", "DATE_SUB", "DATE_BIN"].contains(&(&n.to_ascii_uppercase() as &str)) {
8781
panic!("scalar_kind_by_name failing on '{}' due to uppercase/lowercase mixup", n);
8882
}
8983
return None;
@@ -126,40 +120,6 @@ pub fn aggregate_kind_by_name(n: &str) -> Option<CubeAggregateUDFKind> {
126120
// The rest of the file are implementations of the various functions that we have.
127121
// TODO: add custom type and use it instead of `Binary` for HLL columns.
128122

129-
// TODO upgrade DF - remove?
130-
// struct Coalesce {}
131-
// impl Coalesce {
132-
// fn signature() -> Signature {
133-
// Signature::Variadic(SUPPORTED_COALESCE_TYPES.to_vec())
134-
// }
135-
// }
136-
// impl CubeScalarUDF for Coalesce {
137-
// fn kind(&self) -> CubeScalarUDFKind {
138-
// CubeScalarUDFKind::Coalesce
139-
// }
140-
//
141-
// fn name(&self) -> &str {
142-
// "COALESCE"
143-
// }
144-
//
145-
// fn descriptor(&self) -> ScalarUDF {
146-
// return ScalarUDF {
147-
// name: self.name().to_string(),
148-
// signature: Self::signature(),
149-
// return_type: Arc::new(|inputs| {
150-
// if inputs.is_empty() {
151-
// return Err(DataFusionError::Plan(
152-
// "COALESCE requires at least 1 argument".to_string(),
153-
// ));
154-
// }
155-
// let ts = type_coercion::data_types(inputs, &Self::signature())?;
156-
// Ok(Arc::new(ts[0].clone()))
157-
// }),
158-
// fun: Arc::new(coalesce),
159-
// };
160-
// }
161-
// }
162-
163123
#[derive(Debug)]
164124
struct UnixTimestamp {
165125
signature: Signature,

0 commit comments

Comments
 (0)