Skip to content

Commit ac59ea3

Browse files
committed
chore(cubestore): Upgrade DF: Implement convert_tz
Includes scalar shift optimization
1 parent e3dc79d commit ac59ea3

File tree

1 file changed

+179
-1
lines changed
  • rust/cubestore/cubestore/src/queryplanner

1 file changed

+179
-1
lines changed

rust/cubestore/cubestore/src/queryplanner/udfs.rs

Lines changed: 179 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ use crate::queryplanner::hll::{Hll, HllUnion};
22
use crate::CubeError;
33
use chrono::{Datelike, Duration, Months, NaiveDateTime};
44
use datafusion::arrow::array::{
5-
Array, ArrayRef, BinaryArray, TimestampNanosecondArray, UInt64Builder,
5+
Array, ArrayRef, BinaryArray, StringArray, TimestampNanosecondArray, UInt64Builder,
66
};
7+
use datafusion::arrow::buffer::ScalarBuffer;
78
use datafusion::arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
89
use datafusion::error::DataFusionError;
910
use datafusion::logical_expr::function::AccumulatorArgs;
@@ -25,6 +26,7 @@ pub enum CubeScalarUDFKind {
2526
DateAdd,
2627
DateSub,
2728
DateBin,
29+
ConvertTz,
2830
}
2931

3032
pub fn scalar_udf_by_kind(k: CubeScalarUDFKind) -> Arc<ScalarUDF> {
@@ -36,6 +38,7 @@ pub fn scalar_udf_by_kind(k: CubeScalarUDFKind) -> Arc<ScalarUDF> {
3638
CubeScalarUDFKind::DateAdd => Arc::new(ScalarUDF::new_from_impl(DateAddSub::new_add())),
3739
CubeScalarUDFKind::DateSub => Arc::new(ScalarUDF::new_from_impl(DateAddSub::new_sub())),
3840
CubeScalarUDFKind::DateBin => Arc::new(ScalarUDF::new_from_impl(DateBin::new())),
41+
CubeScalarUDFKind::ConvertTz => Arc::new(ScalarUDF::new_from_impl(ConvertTz::new())),
3942
}
4043
}
4144

@@ -46,6 +49,7 @@ pub fn registerable_scalar_udfs() -> Vec<ScalarUDF> {
4649
ScalarUDF::new_from_impl(DateAddSub::new_add()),
4750
ScalarUDF::new_from_impl(DateAddSub::new_sub()),
4851
ScalarUDF::new_from_impl(UnixTimestamp::new()),
52+
ScalarUDF::new_from_impl(ConvertTz::new()),
4953
]
5054
}
5155

@@ -716,3 +720,177 @@ impl HllMergeAccumulator {
716720
pub fn read_sketch(data: &[u8]) -> Result<Hll, DataFusionError> {
717721
return Hll::read(&data).map_err(|e| DataFusionError::Execution(e.message));
718722
}
723+
724+
#[derive(Debug)]
725+
struct ConvertTz {
726+
signature: Signature,
727+
}
728+
729+
impl ConvertTz {
730+
fn new() -> ConvertTz {
731+
ConvertTz {
732+
signature: Signature {
733+
type_signature: TypeSignature::Exact(vec![
734+
DataType::Timestamp(TimeUnit::Nanosecond, None),
735+
DataType::Utf8,
736+
]),
737+
volatility: Volatility::Immutable,
738+
},
739+
}
740+
}
741+
}
742+
743+
impl ScalarUDFImpl for ConvertTz {
744+
fn as_any(&self) -> &dyn Any {
745+
self
746+
}
747+
fn name(&self) -> &str {
748+
"convert_tz"
749+
}
750+
fn signature(&self) -> &Signature {
751+
&self.signature
752+
}
753+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
754+
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
755+
}
756+
fn invoke(&self, inputs: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
757+
match (&inputs[0], &inputs[1]) {
758+
(
759+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(t, _)),
760+
ColumnarValue::Scalar(ScalarValue::Utf8(shift)),
761+
) => {
762+
let t: Arc<TimestampNanosecondArray> =
763+
Arc::new(std::iter::repeat(t).take(1).collect());
764+
let shift: Arc<StringArray> = Arc::new(std::iter::repeat(shift).take(1).collect());
765+
let t: ArrayRef = t;
766+
let shift: ArrayRef = shift;
767+
let result = convert_tz(&t, &shift)?;
768+
let ts_array = result
769+
.as_any()
770+
.downcast_ref::<TimestampNanosecondArray>()
771+
.ok_or_else(|| {
772+
DataFusionError::Internal("Wrong type returned in convert_tz".to_string())
773+
})?;
774+
let ts_native = ts_array.value(0);
775+
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
776+
Some(ts_native),
777+
None,
778+
)))
779+
}
780+
(ColumnarValue::Array(t), ColumnarValue::Scalar(ScalarValue::Utf8(shift))) => {
781+
let shift =
782+
convert_tz_compute_shift_nanos(shift.as_ref().map_or("", |s| s.as_str()))?;
783+
784+
convert_tz_precomputed_shift(t, shift).map(|arr| ColumnarValue::Array(arr))
785+
}
786+
(
787+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(t, _)),
788+
ColumnarValue::Array(shift),
789+
) => {
790+
let t: Arc<TimestampNanosecondArray> =
791+
Arc::new(std::iter::repeat(t).take(shift.len()).collect());
792+
let t: ArrayRef = t;
793+
convert_tz(&t, shift).map(|arr| ColumnarValue::Array(arr))
794+
}
795+
(ColumnarValue::Array(t), ColumnarValue::Array(shift)) => {
796+
convert_tz(t, shift).map(|arr| ColumnarValue::Array(arr))
797+
}
798+
_ => Err(DataFusionError::Internal(
799+
"Unsupported input type in convert_tz".to_string(),
800+
)),
801+
}
802+
}
803+
}
804+
805+
fn convert_tz_compute_shift_nanos(shift: &str) -> Result<i64, DataFusionError> {
806+
let hour_min = shift.split(':').collect::<Vec<_>>();
807+
if hour_min.len() != 2 {
808+
return Err(DataFusionError::Execution(format!(
809+
"Can't parse timezone shift '{}'",
810+
shift
811+
)));
812+
}
813+
let hour = hour_min[0].parse::<i64>().map_err(|e| {
814+
DataFusionError::Execution(format!(
815+
"Can't parse hours of timezone shift '{}': {}",
816+
hour_min[0], e
817+
))
818+
})?;
819+
let minute = hour_min[1].parse::<i64>().map_err(|e| {
820+
DataFusionError::Execution(format!(
821+
"Can't parse minutes of timezone shift '{}': {}",
822+
hour_min[1], e
823+
))
824+
})?;
825+
let shift = (hour * 60 + hour.signum() * minute) * 60 * 1_000_000_000;
826+
Ok(shift)
827+
}
828+
829+
/// convert_tz SQL function
830+
pub fn convert_tz(args_0: &ArrayRef, args_1: &ArrayRef) -> Result<ArrayRef, DataFusionError> {
831+
let timestamps = args_0
832+
.as_any()
833+
.downcast_ref::<TimestampNanosecondArray>()
834+
.ok_or_else(|| {
835+
DataFusionError::Execution(
836+
"Could not cast convert_tz timestamp input to TimestampNanosecondArray".to_string(),
837+
)
838+
})?;
839+
840+
let shift = args_1
841+
.as_any()
842+
.downcast_ref::<StringArray>()
843+
.ok_or_else(|| {
844+
DataFusionError::Execution(
845+
"Could not cast convert_tz shift input to StringArray".to_string(),
846+
)
847+
})?;
848+
849+
let range = 0..timestamps.len();
850+
let result = range
851+
.map(|i| {
852+
if timestamps.is_null(i) {
853+
Ok(0_i64)
854+
} else {
855+
let shift: i64 = convert_tz_compute_shift_nanos(shift.value(i))?;
856+
Ok(timestamps.value(i) + shift)
857+
}
858+
})
859+
.collect::<Result<Vec<_>, DataFusionError>>()?;
860+
861+
Ok(Arc::new(TimestampNanosecondArray::new(
862+
ScalarBuffer::<i64>::from(result),
863+
timestamps.nulls().map(|null_buffer| null_buffer.clone()),
864+
)))
865+
}
866+
867+
pub fn convert_tz_precomputed_shift(
868+
args_0: &ArrayRef,
869+
shift: i64,
870+
) -> Result<ArrayRef, DataFusionError> {
871+
let timestamps = args_0
872+
.as_any()
873+
.downcast_ref::<TimestampNanosecondArray>()
874+
.ok_or_else(|| {
875+
DataFusionError::Execution(
876+
"Could not cast convert_tz timestamp input to TimestampNanosecondArray".to_string(),
877+
)
878+
})?;
879+
880+
// TODO: This could be faster.
881+
let range = 0..timestamps.len();
882+
let result = range
883+
.map(|i| {
884+
if timestamps.is_null(i) {
885+
Ok(0_i64)
886+
} else {
887+
Ok(timestamps.value(i) + shift)
888+
}
889+
})
890+
.collect::<Result<Vec<_>, DataFusionError>>()?;
891+
892+
Ok(Arc::new(TimestampNanosecondArray::new(
893+
ScalarBuffer::<i64>::from(result),
894+
timestamps.nulls().map(|null_buffer| null_buffer.clone()),
895+
)))
896+
}

0 commit comments

Comments
 (0)