Skip to content

Commit bc2ac60

Browse files
committed
Add
1 parent 6af833b commit bc2ac60

File tree

1 file changed

+162
-0
lines changed

1 file changed

+162
-0
lines changed
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Temporal expression builders
19+
20+
use std::sync::Arc;
21+
22+
use arrow::datatypes::{DataType, Field, SchemaRef};
23+
use datafusion::config::ConfigOptions;
24+
use datafusion::logical_expr::ScalarUDF;
25+
use datafusion::physical_expr::{PhysicalExpr, ScalarFunctionExpr};
26+
use datafusion_comet_proto::spark_expression::Expr;
27+
use datafusion_comet_spark_expr::{
28+
SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr,
29+
};
30+
31+
use crate::execution::{
32+
expressions::extract_expr,
33+
operators::ExecutionError,
34+
planner::{expression_registry::ExpressionBuilder, PhysicalPlanner},
35+
};
36+
37+
pub struct HourBuilder;
38+
39+
impl ExpressionBuilder for HourBuilder {
40+
fn build(
41+
&self,
42+
spark_expr: &Expr,
43+
input_schema: SchemaRef,
44+
planner: &PhysicalPlanner,
45+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
46+
let expr = extract_expr!(spark_expr, Hour);
47+
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
48+
let timezone = expr.timezone.clone();
49+
let args = vec![child];
50+
let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
51+
let field_ref = Arc::new(Field::new("hour", DataType::Int32, true));
52+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
53+
"hour",
54+
comet_hour,
55+
args,
56+
field_ref,
57+
Arc::new(ConfigOptions::default()),
58+
);
59+
60+
Ok(Arc::new(expr))
61+
}
62+
}
63+
64+
pub struct MinuteBuilder;
65+
66+
impl ExpressionBuilder for MinuteBuilder {
67+
fn build(
68+
&self,
69+
spark_expr: &Expr,
70+
input_schema: SchemaRef,
71+
planner: &PhysicalPlanner,
72+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
73+
let expr = extract_expr!(spark_expr, Minute);
74+
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
75+
let timezone = expr.timezone.clone();
76+
let args = vec![child];
77+
let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
78+
let field_ref = Arc::new(Field::new("minute", DataType::Int32, true));
79+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
80+
"minute",
81+
comet_minute,
82+
args,
83+
field_ref,
84+
Arc::new(ConfigOptions::default()),
85+
);
86+
87+
Ok(Arc::new(expr))
88+
}
89+
}
90+
91+
pub struct SecondBuilder;
92+
93+
impl ExpressionBuilder for SecondBuilder {
94+
fn build(
95+
&self,
96+
spark_expr: &Expr,
97+
input_schema: SchemaRef,
98+
planner: &PhysicalPlanner,
99+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
100+
let expr = extract_expr!(spark_expr, Second);
101+
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
102+
let timezone = expr.timezone.clone();
103+
let args = vec![child];
104+
let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
105+
let field_ref = Arc::new(Field::new("second", DataType::Int32, true));
106+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
107+
"second",
108+
comet_second,
109+
args,
110+
field_ref,
111+
Arc::new(ConfigOptions::default()),
112+
);
113+
114+
Ok(Arc::new(expr))
115+
}
116+
}
117+
118+
pub struct UnixTimestampBuilder;
119+
120+
impl ExpressionBuilder for UnixTimestampBuilder {
121+
fn build(
122+
&self,
123+
spark_expr: &Expr,
124+
input_schema: SchemaRef,
125+
planner: &PhysicalPlanner,
126+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
127+
let expr = extract_expr!(spark_expr, UnixTimestamp);
128+
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
129+
let timezone = expr.timezone.clone();
130+
let args = vec![child];
131+
let comet_unix_timestamp =
132+
Arc::new(ScalarUDF::new_from_impl(SparkUnixTimestamp::new(timezone)));
133+
let field_ref = Arc::new(Field::new("unix_timestamp", DataType::Int64, true));
134+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
135+
"unix_timestamp",
136+
comet_unix_timestamp,
137+
args,
138+
field_ref,
139+
Arc::new(ConfigOptions::default()),
140+
);
141+
142+
Ok(Arc::new(expr))
143+
}
144+
}
145+
146+
pub struct TruncTimestampBuilder;
147+
148+
impl ExpressionBuilder for TruncTimestampBuilder {
149+
fn build(
150+
&self,
151+
spark_expr: &Expr,
152+
input_schema: SchemaRef,
153+
planner: &PhysicalPlanner,
154+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
155+
let expr = extract_expr!(spark_expr, TruncTimestamp);
156+
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
157+
let format = planner.create_expr(expr.format.as_ref().unwrap(), input_schema)?;
158+
let timezone = expr.timezone.clone();
159+
160+
Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone)))
161+
}
162+
}

0 commit comments

Comments
 (0)