Skip to content

Commit d72e54c

Browse files
authored
feat: rand expression support (#1199)
1 parent d9b4792 commit d72e54c

File tree

10 files changed

+492
-170
lines changed

10 files changed

+492
-170
lines changed

native/Cargo.lock

Lines changed: 149 additions & 162 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/src/execution/jni_api.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,8 +408,9 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
408408
// query plan, we need to defer stream initialization to first time execution.
409409
if exec_context.root_op.is_none() {
410410
let start = Instant::now();
411-
let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx))
412-
.with_exec_id(exec_context_id);
411+
let planner =
412+
PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition)
413+
.with_exec_id(exec_context_id);
413414
let (scans, root_op) = planner.create_plan(
414415
&exec_context.spark_plan,
415416
&mut exec_context.input_sources.clone(),

native/core/src/execution/planner.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ use datafusion_comet_proto::{
106106
use datafusion_comet_spark_expr::{
107107
ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Contains, Correlation, Covariance,
108108
CreateNamedStruct, EndsWith, GetArrayStructFields, GetStructField, IfExpr, Like, ListExtract,
109-
NormalizeNaNAndZero, RLike, SparkCastOptions, StartsWith, Stddev, StringSpaceExpr,
109+
NormalizeNaNAndZero, RLike, RandExpr, SparkCastOptions, StartsWith, Stddev, StringSpaceExpr,
110110
SubstringExpr, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, Variance,
111111
};
112112
use itertools::Itertools;
@@ -141,26 +141,29 @@ pub const TEST_EXEC_CONTEXT_ID: i64 = -1;
141141
pub struct PhysicalPlanner {
142142
// The execution context id of this planner.
143143
exec_context_id: i64,
144+
partition: i32,
144145
session_ctx: Arc<SessionContext>,
145146
}
146147

147148
impl Default for PhysicalPlanner {
148149
fn default() -> Self {
149-
Self::new(Arc::new(SessionContext::new()))
150+
Self::new(Arc::new(SessionContext::new()), 0)
150151
}
151152
}
152153

153154
impl PhysicalPlanner {
154-
pub fn new(session_ctx: Arc<SessionContext>) -> Self {
155+
pub fn new(session_ctx: Arc<SessionContext>, partition: i32) -> Self {
155156
Self {
156157
exec_context_id: TEST_EXEC_CONTEXT_ID,
157158
session_ctx,
159+
partition,
158160
}
159161
}
160162

161163
pub fn with_exec_id(self, exec_context_id: i64) -> Self {
162164
Self {
163165
exec_context_id,
166+
partition: self.partition,
164167
session_ctx: Arc::clone(&self.session_ctx),
165168
}
166169
}
@@ -801,6 +804,10 @@ impl PhysicalPlanner {
801804
expr.legacy_negative_index,
802805
)))
803806
}
807+
ExprStruct::Rand(expr) => {
808+
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
809+
Ok(Arc::new(RandExpr::new(child, self.partition)))
810+
}
804811
expr => Err(GeneralError(format!("Not implemented: {:?}", expr))),
805812
}
806813
}
@@ -2946,7 +2953,7 @@ mod tests {
29462953
datafusion_functions_nested::make_array::MakeArray::new(),
29472954
));
29482955
let task_ctx = session_ctx.task_ctx();
2949-
let planner = PhysicalPlanner::new(Arc::from(session_ctx));
2956+
let planner = PhysicalPlanner::new(Arc::from(session_ctx), 0);
29502957

29512958
// Create a plan for
29522959
// ProjectionExec: expr=[make_array(col_0@0) as col_0]
@@ -3062,7 +3069,7 @@ mod tests {
30623069
fn test_array_repeat() {
30633070
let session_ctx = SessionContext::new();
30643071
let task_ctx = session_ctx.task_ctx();
3065-
let planner = PhysicalPlanner::new(Arc::from(session_ctx));
3072+
let planner = PhysicalPlanner::new(Arc::from(session_ctx), 0);
30663073

30673074
// Mock scan operator with 3 INT32 columns
30683075
let op_scan = Operator {

native/core/src/parquet/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
687687
try_unwrap_or_throw(&e, |mut env| unsafe {
688688
let session_config = SessionConfig::new().with_batch_size(batch_size as usize);
689689
let planer =
690-
PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)));
690+
PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)), 0);
691691
let session_ctx = planer.session_ctx();
692692

693693
let path: String = env

native/proto/src/proto/expr.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ message Expr {
8383
ArrayInsert array_insert = 58;
8484
MathExpr integral_divide = 59;
8585
ToPrettyString to_pretty_string = 60;
86+
UnaryExpr rand = 61;
8687
}
8788
}
8889

native/spark-expr/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,13 @@ pub use cast::{spark_cast, Cast, SparkCastOptions};
5252
mod conditional_funcs;
5353
mod conversion_funcs;
5454
mod math_funcs;
55+
mod nondetermenistic_funcs;
5556

5657
pub use array_funcs::*;
5758
pub use bitwise_funcs::*;
5859
pub use conditional_funcs::*;
5960
pub use conversion_funcs::*;
61+
pub use nondetermenistic_funcs::*;
6062

6163
pub use comet_scalar_funcs::{create_comet_physical_fun, register_all_comet_functions};
6264
pub use datetime_funcs::{
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
pub mod rand;
19+
20+
pub use rand::RandExpr;

0 commit comments

Comments
 (0)