Skip to content

Commit c9ee201

Browse files
authored
chore(cubestore): Implement execution for now() & unix_timestamp() (#9868)
1 parent 78ec2a5 commit c9ee201

File tree

1 file changed

+50
-12
lines changed
  • rust/cubestore/cubestore/src/queryplanner

1 file changed

+50
-12
lines changed

rust/cubestore/cubestore/src/queryplanner/udfs.rs

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use super::udf_xirr::XirrAccumulator;
12
use crate::queryplanner::coalesce::{coalesce, SUPPORTED_COALESCE_TYPES};
23
use crate::queryplanner::hll::{Hll, HllUnion};
34
use crate::queryplanner::udf_xirr::create_xirr_udaf;
@@ -18,8 +19,7 @@ use serde_derive::{Deserialize, Serialize};
1819
use smallvec::smallvec;
1920
use smallvec::SmallVec;
2021
use std::sync::Arc;
21-
22-
use super::udf_xirr::XirrAccumulator;
22+
use std::time::SystemTime;
2323

2424
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
2525
pub enum CubeScalarUDFKind {
@@ -159,19 +159,39 @@ impl CubeScalarUDF for Now {
159159
}
160160

161161
fn descriptor(&self) -> ScalarUDF {
162-
return ScalarUDF {
162+
ScalarUDF {
163163
name: self.name().to_string(),
164164
signature: Self::signature(),
165165
return_type: Arc::new(|inputs| {
166166
assert!(inputs.is_empty());
167167
Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)))
168168
}),
169169
fun: Arc::new(|_| {
170-
Err(DataFusionError::Internal(
171-
"NOW() was not optimized away".to_string(),
172-
))
170+
let t = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
171+
Ok(t) => t,
172+
Err(e) => {
173+
return Err(DataFusionError::Internal(format!(
174+
"Failed to get current timestamp: {}",
175+
e
176+
)))
177+
}
178+
};
179+
180+
let nanos = match i64::try_from(t.as_nanos()) {
181+
Ok(t) => t,
182+
Err(e) => {
183+
return Err(DataFusionError::Internal(format!(
184+
"Failed to convert timestamp to i64: {}",
185+
e
186+
)))
187+
}
188+
};
189+
190+
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
191+
Some(nanos),
192+
)))
173193
}),
174-
};
194+
}
175195
}
176196
}
177197

@@ -191,19 +211,37 @@ impl CubeScalarUDF for UnixTimestamp {
191211
}
192212

193213
fn descriptor(&self) -> ScalarUDF {
194-
return ScalarUDF {
214+
ScalarUDF {
195215
name: self.name().to_string(),
196216
signature: Self::signature(),
197217
return_type: Arc::new(|inputs| {
198218
assert!(inputs.is_empty());
199219
Ok(Arc::new(DataType::Int64))
200220
}),
201221
fun: Arc::new(|_| {
202-
Err(DataFusionError::Internal(
203-
"UNIX_TIMESTAMP() was not optimized away".to_string(),
204-
))
222+
let t = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
223+
Ok(t) => t,
224+
Err(e) => {
225+
return Err(DataFusionError::Internal(format!(
226+
"Failed to get current timestamp: {}",
227+
e
228+
)))
229+
}
230+
};
231+
232+
let seconds = match i64::try_from(t.as_secs()) {
233+
Ok(t) => t,
234+
Err(e) => {
235+
return Err(DataFusionError::Internal(format!(
236+
"Failed to convert timestamp to i64: {}",
237+
e
238+
)))
239+
}
240+
};
241+
242+
Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(seconds))))
205243
}),
206-
};
244+
}
207245
}
208246
}
209247

0 commit comments

Comments
 (0)