From 826e08793262e9e37364407f8d2405b40225a44d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 4 Dec 2025 15:33:03 -0700 Subject: [PATCH 01/14] add new framework --- .../src/execution/expressions/arithmetic.rs | 232 ++++++++++++++++++ native/core/src/execution/expressions/mod.rs | 1 + native/core/src/execution/planner.rs | 124 +++------- .../execution/planner/expression_registry.rs | 183 ++++++++++++++ native/core/src/execution/planner/traits.rs | 143 +++++++++++ 5 files changed, 595 insertions(+), 88 deletions(-) create mode 100644 native/core/src/execution/expressions/arithmetic.rs create mode 100644 native/core/src/execution/planner/expression_registry.rs create mode 100644 native/core/src/execution/planner/traits.rs diff --git a/native/core/src/execution/expressions/arithmetic.rs b/native/core/src/execution/expressions/arithmetic.rs new file mode 100644 index 0000000000..53a42bbe8c --- /dev/null +++ b/native/core/src/execution/expressions/arithmetic.rs @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Arithmetic expression builders + +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion::logical_expr::Operator as DataFusionOperator; +use datafusion::physical_expr::PhysicalExpr; +use datafusion_comet_proto::spark_expression::{expr::ExprStruct, Expr}; +use datafusion_comet_spark_expr::{create_modulo_expr, create_negate_expr, EvalMode}; + +use crate::execution::operators::ExecutionError; +use crate::execution::planner::traits::ExpressionBuilder; +use crate::execution::planner::{from_protobuf_eval_mode, BinaryExprOptions}; + +/// Builder for Add expressions +pub struct AddBuilder; + +impl ExpressionBuilder for AddBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &crate::execution::planner::PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::Add(expr)) = &spark_expr.expr_struct { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Plus, + input_schema, + eval_mode, + ) + } else { + Err(ExecutionError::GeneralError( + "Expected Add expression".to_string(), + )) + } + } +} + +/// Builder for Subtract expressions +pub struct SubtractBuilder; + +impl ExpressionBuilder for SubtractBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &crate::execution::planner::PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::Subtract(expr)) = &spark_expr.expr_struct { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Minus, + input_schema, + eval_mode, + ) + } else { + Err(ExecutionError::GeneralError( + "Expected Subtract expression".to_string(), + )) + } + } +} + +/// Builder for Multiply expressions +pub struct MultiplyBuilder; + +impl ExpressionBuilder for MultiplyBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &crate::execution::planner::PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::Multiply(expr)) = &spark_expr.expr_struct { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Multiply, + input_schema, + eval_mode, + ) + } else { + Err(ExecutionError::GeneralError( + "Expected Multiply expression".to_string(), + )) + } + } +} + +/// Builder for Divide expressions +pub struct DivideBuilder; + +impl ExpressionBuilder for DivideBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &crate::execution::planner::PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::Divide(expr)) = &spark_expr.expr_struct { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Divide, + input_schema, + eval_mode, + ) + } else { + Err(ExecutionError::GeneralError( + "Expected Divide expression".to_string(), + )) + } + } +} + +/// Builder for IntegralDivide expressions +pub struct IntegralDivideBuilder; + +impl ExpressionBuilder for IntegralDivideBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &crate::execution::planner::PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::IntegralDivide(expr)) = &spark_expr.expr_struct { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr_with_options( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Divide, + input_schema, + BinaryExprOptions { + is_integral_div: true, + }, + eval_mode, + ) + } else { + Err(ExecutionError::GeneralError( + "Expected IntegralDivide expression".to_string(), + )) + } + } +} + +/// Builder for Remainder expressions +pub struct RemainderBuilder; + +impl ExpressionBuilder for RemainderBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &crate::execution::planner::PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::Remainder(expr)) = &spark_expr.expr_struct { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + let left = + planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = + planner.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?; + + let result = create_modulo_expr( + left, + right, + expr.return_type + .as_ref() + .map(crate::execution::serde::to_arrow_datatype) + .unwrap(), + input_schema, + eval_mode == EvalMode::Ansi, + &planner.session_ctx().state(), + ); + result.map_err(|e| ExecutionError::GeneralError(e.to_string())) + } else { + Err(ExecutionError::GeneralError( + "Expected Remainder expression".to_string(), + )) + } + } +} + +/// Builder for UnaryMinus expressions +pub struct UnaryMinusBuilder; + +impl ExpressionBuilder for UnaryMinusBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &crate::execution::planner::PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::UnaryMinus(expr)) = &spark_expr.expr_struct { + let child = planner.create_expr(expr.child.as_ref().unwrap(), input_schema)?; + let result = create_negate_expr(child, expr.fail_on_error); + result.map_err(|e| ExecutionError::GeneralError(e.to_string())) + } else { + Err(ExecutionError::GeneralError( + "Expected UnaryMinus expression".to_string(), + )) + } + } +} diff --git a/native/core/src/execution/expressions/mod.rs b/native/core/src/execution/expressions/mod.rs index 9bb8fad456..84b930d059 100644 --- a/native/core/src/execution/expressions/mod.rs +++ b/native/core/src/execution/expressions/mod.rs @@ -17,6 +17,7 @@ //! Native DataFusion expressions +pub mod arithmetic; pub mod subquery; pub use datafusion_comet_spark_expr::EvalMode; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ccbd0b2508..38f92b5e68 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -17,12 +17,16 @@ //! Converts Spark physical plan to DataFusion physical plan +pub mod expression_registry; +pub mod traits; + use crate::execution::operators::IcebergScanExec; use crate::{ errors::ExpressionError, execution::{ expressions::subquery::Subquery, operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec}, + planner::expression_registry::ExpressionRegistry, serde::to_arrow_datatype, shuffle::ShuffleWriterExec, }, @@ -62,8 +66,8 @@ use datafusion::{ prelude::SessionContext, }; use datafusion_comet_spark_expr::{ - create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, create_modulo_expr, - create_negate_expr, BinaryOutputStyle, BloomFilterAgg, BloomFilterMightContain, EvalMode, + create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, + BinaryOutputStyle, BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond, }; use iceberg::expr::Bind; @@ -142,7 +146,7 @@ struct JoinParameters { } #[derive(Default)] -struct BinaryExprOptions { +pub struct BinaryExprOptions { pub is_integral_div: bool, } @@ -154,6 +158,7 @@ pub struct PhysicalPlanner { exec_context_id: i64, partition: i32, session_ctx: Arc, + expression_registry: ExpressionRegistry, } impl Default for PhysicalPlanner { @@ -168,6 +173,7 @@ impl PhysicalPlanner { exec_context_id: TEST_EXEC_CONTEXT_ID, session_ctx, partition, + expression_registry: ExpressionRegistry::new(), } } @@ -176,6 +182,7 @@ impl PhysicalPlanner { exec_context_id, partition: self.partition, session_ctx: Arc::clone(&self.session_ctx), + expression_registry: self.expression_registry, } } @@ -184,6 +191,20 @@ impl PhysicalPlanner { &self.session_ctx } + /// Check if an expression is an arithmetic expression that should be handled by the registry + fn is_arithmetic_expression(expr_struct: &ExprStruct) -> bool { + matches!( + expr_struct, + ExprStruct::Add(_) + | ExprStruct::Subtract(_) + | ExprStruct::Multiply(_) + | ExprStruct::Divide(_) + | ExprStruct::IntegralDivide(_) + | ExprStruct::Remainder(_) + | ExprStruct::UnaryMinus(_) + ) + } + /// get DataFusion PartitionedFiles from a Spark FilePartition fn get_partitioned_files( &self, @@ -242,84 +263,17 @@ impl PhysicalPlanner { spark_expr: &Expr, input_schema: SchemaRef, ) -> Result, ExecutionError> { - match spark_expr.expr_struct.as_ref().unwrap() { - ExprStruct::Add(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Plus, - input_schema, - eval_mode, - ) + // Try to use the modular registry for arithmetic expressions first + if let Some(expr_struct) = spark_expr.expr_struct.as_ref() { + if Self::is_arithmetic_expression(expr_struct) { + return self + .expression_registry + .create_expr(spark_expr, input_schema, self); } - ExprStruct::Subtract(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Minus, - input_schema, - eval_mode, - ) - } - ExprStruct::Multiply(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Multiply, - input_schema, - eval_mode, - ) - } - ExprStruct::Divide(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Divide, - input_schema, - eval_mode, - ) - } - ExprStruct::IntegralDivide(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - self.create_binary_expr_with_options( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Divide, - input_schema, - BinaryExprOptions { - is_integral_div: true, - }, - eval_mode, - ) - } - ExprStruct::Remainder(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - // TODO add support for EvalMode::TRY - // https://github.com/apache/datafusion-comet/issues/2021 - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = - self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?; + } - let result = create_modulo_expr( - left, - right, - expr.return_type.as_ref().map(to_arrow_datatype).unwrap(), - input_schema, - eval_mode == EvalMode::Ansi, - &self.session_ctx.state(), - ); - result.map_err(|e| GeneralError(e.to_string())) - } + // Fall back to the original monolithic match for other expressions + match spark_expr.expr_struct.as_ref().unwrap() { ExprStruct::Eq(expr) => { let left = self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; @@ -728,12 +682,6 @@ impl PhysicalPlanner { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; Ok(Arc::new(NotExpr::new(child))) } - ExprStruct::UnaryMinus(expr) => { - let child: Arc = - self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let result = create_negate_expr(child, expr.fail_on_error); - result.map_err(|e| GeneralError(e.to_string())) - } ExprStruct::NormalizeNanAndZero(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap()); @@ -892,7 +840,7 @@ impl PhysicalPlanner { } } - fn create_binary_expr( + pub fn create_binary_expr( &self, left: &Expr, right: &Expr, @@ -913,7 +861,7 @@ impl PhysicalPlanner { } #[allow(clippy::too_many_arguments)] - fn create_binary_expr_with_options( + pub fn create_binary_expr_with_options( &self, left: &Expr, right: &Expr, @@ -2710,7 +2658,7 @@ fn rewrite_physical_expr( Ok(expr.rewrite(&mut rewriter).data()?) } -fn from_protobuf_eval_mode(value: i32) -> Result { +pub fn from_protobuf_eval_mode(value: i32) -> Result { match spark_expression::EvalMode::try_from(value)? { spark_expression::EvalMode::Legacy => Ok(EvalMode::Legacy), spark_expression::EvalMode::Try => Ok(EvalMode::Try), diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs new file mode 100644 index 0000000000..0c66cd6a88 --- /dev/null +++ b/native/core/src/execution/planner/expression_registry.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Expression registry for dispatching expression creation + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion::physical_expr::PhysicalExpr; +use datafusion_comet_proto::spark_expression::{expr::ExprStruct, Expr}; + +use crate::execution::operators::ExecutionError; +use crate::execution::planner::traits::{ExpressionBuilder, ExpressionType}; + +/// Registry for expression builders +pub struct ExpressionRegistry { + builders: HashMap>, +} + +impl ExpressionRegistry { + /// Create a new expression registry with all builders registered + pub fn new() -> Self { + let mut registry = Self { + builders: HashMap::new(), + }; + + registry.register_all_expressions(); + registry + } + + /// Create a physical expression from a Spark protobuf expression + pub fn create_expr( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &super::PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr_type = Self::get_expression_type(spark_expr)?; + + if let Some(builder) = self.builders.get(&expr_type) { + builder.build(spark_expr, input_schema, planner) + } else { + Err(ExecutionError::GeneralError(format!( + "No builder registered for expression type: {:?}", + expr_type + ))) + } + } + + /// Register all expression builders + fn register_all_expressions(&mut self) { + // Register arithmetic expressions + self.register_arithmetic_expressions(); + + // TODO: Register other expression categories in future phases + // self.register_comparison_expressions(); + // self.register_string_expressions(); + // self.register_temporal_expressions(); + // etc. + } + + /// Register arithmetic expression builders + fn register_arithmetic_expressions(&mut self) { + use crate::execution::expressions::arithmetic::*; + + self.builders + .insert(ExpressionType::Add, Box::new(AddBuilder)); + self.builders + .insert(ExpressionType::Subtract, Box::new(SubtractBuilder)); + self.builders + .insert(ExpressionType::Multiply, Box::new(MultiplyBuilder)); + self.builders + .insert(ExpressionType::Divide, Box::new(DivideBuilder)); + self.builders.insert( + ExpressionType::IntegralDivide, + Box::new(IntegralDivideBuilder), + ); + self.builders + .insert(ExpressionType::Remainder, Box::new(RemainderBuilder)); + self.builders + .insert(ExpressionType::UnaryMinus, Box::new(UnaryMinusBuilder)); + } + + /// Extract expression type from Spark protobuf expression + fn get_expression_type(spark_expr: &Expr) -> Result { + match spark_expr.expr_struct.as_ref() { + Some(ExprStruct::Add(_)) => Ok(ExpressionType::Add), + Some(ExprStruct::Subtract(_)) => Ok(ExpressionType::Subtract), + Some(ExprStruct::Multiply(_)) => Ok(ExpressionType::Multiply), + Some(ExprStruct::Divide(_)) => Ok(ExpressionType::Divide), + Some(ExprStruct::IntegralDivide(_)) => Ok(ExpressionType::IntegralDivide), + Some(ExprStruct::Remainder(_)) => Ok(ExpressionType::Remainder), + Some(ExprStruct::UnaryMinus(_)) => Ok(ExpressionType::UnaryMinus), + + Some(ExprStruct::Eq(_)) => Ok(ExpressionType::Eq), + Some(ExprStruct::Neq(_)) => Ok(ExpressionType::Neq), + Some(ExprStruct::Lt(_)) => Ok(ExpressionType::Lt), + Some(ExprStruct::LtEq(_)) => Ok(ExpressionType::LtEq), + Some(ExprStruct::Gt(_)) => Ok(ExpressionType::Gt), + Some(ExprStruct::GtEq(_)) => Ok(ExpressionType::GtEq), + Some(ExprStruct::EqNullSafe(_)) => Ok(ExpressionType::EqNullSafe), + Some(ExprStruct::NeqNullSafe(_)) => Ok(ExpressionType::NeqNullSafe), + + Some(ExprStruct::And(_)) => Ok(ExpressionType::And), + Some(ExprStruct::Or(_)) => Ok(ExpressionType::Or), + Some(ExprStruct::Not(_)) => Ok(ExpressionType::Not), + + Some(ExprStruct::IsNull(_)) => Ok(ExpressionType::IsNull), + Some(ExprStruct::IsNotNull(_)) => Ok(ExpressionType::IsNotNull), + + Some(ExprStruct::BitwiseAnd(_)) => Ok(ExpressionType::BitwiseAnd), + Some(ExprStruct::BitwiseOr(_)) => Ok(ExpressionType::BitwiseOr), + Some(ExprStruct::BitwiseXor(_)) => Ok(ExpressionType::BitwiseXor), + Some(ExprStruct::BitwiseShiftLeft(_)) => Ok(ExpressionType::BitwiseShiftLeft), + Some(ExprStruct::BitwiseShiftRight(_)) => Ok(ExpressionType::BitwiseShiftRight), + + Some(ExprStruct::Bound(_)) => Ok(ExpressionType::Bound), + Some(ExprStruct::Unbound(_)) => Ok(ExpressionType::Unbound), + Some(ExprStruct::Literal(_)) => Ok(ExpressionType::Literal), + Some(ExprStruct::Cast(_)) => Ok(ExpressionType::Cast), + Some(ExprStruct::CaseWhen(_)) => Ok(ExpressionType::CaseWhen), + Some(ExprStruct::In(_)) => Ok(ExpressionType::In), + Some(ExprStruct::If(_)) => Ok(ExpressionType::If), + Some(ExprStruct::Substring(_)) => Ok(ExpressionType::Substring), + Some(ExprStruct::Like(_)) => Ok(ExpressionType::Like), + Some(ExprStruct::Rlike(_)) => Ok(ExpressionType::Rlike), + Some(ExprStruct::CheckOverflow(_)) => Ok(ExpressionType::CheckOverflow), + Some(ExprStruct::ScalarFunc(_)) => Ok(ExpressionType::ScalarFunc), + Some(ExprStruct::NormalizeNanAndZero(_)) => Ok(ExpressionType::NormalizeNanAndZero), + Some(ExprStruct::Subquery(_)) => Ok(ExpressionType::Subquery), + Some(ExprStruct::BloomFilterMightContain(_)) => { + Ok(ExpressionType::BloomFilterMightContain) + } + Some(ExprStruct::CreateNamedStruct(_)) => Ok(ExpressionType::CreateNamedStruct), + Some(ExprStruct::GetStructField(_)) => Ok(ExpressionType::GetStructField), + Some(ExprStruct::ToJson(_)) => Ok(ExpressionType::ToJson), + Some(ExprStruct::ToPrettyString(_)) => Ok(ExpressionType::ToPrettyString), + Some(ExprStruct::ListExtract(_)) => Ok(ExpressionType::ListExtract), + Some(ExprStruct::GetArrayStructFields(_)) => Ok(ExpressionType::GetArrayStructFields), + Some(ExprStruct::ArrayInsert(_)) => Ok(ExpressionType::ArrayInsert), + Some(ExprStruct::Rand(_)) => Ok(ExpressionType::Rand), + Some(ExprStruct::Randn(_)) => Ok(ExpressionType::Randn), + Some(ExprStruct::SparkPartitionId(_)) => Ok(ExpressionType::SparkPartitionId), + Some(ExprStruct::MonotonicallyIncreasingId(_)) => { + Ok(ExpressionType::MonotonicallyIncreasingId) + } + + Some(ExprStruct::Hour(_)) => Ok(ExpressionType::Hour), + Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute), + Some(ExprStruct::Second(_)) => Ok(ExpressionType::Second), + Some(ExprStruct::TruncTimestamp(_)) => Ok(ExpressionType::TruncTimestamp), + + Some(other) => Err(ExecutionError::GeneralError(format!( + "Unsupported expression type: {:?}", + other + ))), + None => Err(ExecutionError::GeneralError( + "Expression struct is None".to_string(), + )), + } + } +} + +impl Default for ExpressionRegistry { + fn default() -> Self { + Self::new() + } +} diff --git a/native/core/src/execution/planner/traits.rs b/native/core/src/execution/planner/traits.rs new file mode 100644 index 0000000000..2fb1df4cf9 --- /dev/null +++ b/native/core/src/execution/planner/traits.rs @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Core traits for the modular planner framework + +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion::physical_expr::PhysicalExpr; +use datafusion_comet_proto::spark_expression::Expr; +use jni::objects::GlobalRef; + +use crate::execution::operators::ScanExec; +use crate::execution::{operators::ExecutionError, spark_plan::SparkPlan}; + +/// Trait for building physical expressions from Spark protobuf expressions +pub trait ExpressionBuilder: Send + Sync { + /// Build a DataFusion physical expression from a Spark protobuf expression + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &super::PhysicalPlanner, + ) -> Result, ExecutionError>; +} + +/// Trait for building physical operators from Spark protobuf operators +pub trait OperatorBuilder: Send + Sync { + /// Build a Spark plan from a protobuf operator + fn build( + &self, + spark_plan: &datafusion_comet_proto::spark_operator::Operator, + inputs: &mut Vec>, + partition_count: usize, + planner: &super::PhysicalPlanner, + ) -> Result<(Vec, Arc), ExecutionError>; +} + +/// Enum to identify different expression types for registry dispatch +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ExpressionType { + // Arithmetic expressions + Add, + Subtract, + Multiply, + Divide, + IntegralDivide, + Remainder, + UnaryMinus, + + // Comparison expressions + Eq, + Neq, + Lt, + LtEq, + Gt, + GtEq, + EqNullSafe, + NeqNullSafe, + + // Logical expressions + And, + Or, + Not, + + // Null checks + IsNull, + IsNotNull, + + // Bitwise operations + BitwiseAnd, + BitwiseOr, + BitwiseXor, + BitwiseShiftLeft, + BitwiseShiftRight, + + // Other expressions + Bound, + Unbound, + Literal, + Cast, + CaseWhen, + In, + If, + Substring, + Like, + Rlike, + CheckOverflow, + ScalarFunc, + NormalizeNanAndZero, + Subquery, + BloomFilterMightContain, + CreateNamedStruct, + GetStructField, + ToJson, + ToPrettyString, + ListExtract, + GetArrayStructFields, + ArrayInsert, + Rand, + Randn, + SparkPartitionId, + MonotonicallyIncreasingId, + + // Time functions + Hour, + Minute, + Second, + TruncTimestamp, +} + +/// Enum to identify different operator types for registry dispatch +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum OperatorType { + Scan, + NativeScan, + IcebergScan, + Projection, + Filter, + HashAgg, + Limit, + Sort, + ShuffleWriter, + ParquetWriter, + Expand, + SortMergeJoin, + HashJoin, + Window, +} From 5dc8f645a53e5b5530f915bdb85b36eee5adc1f6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 4 Dec 2025 15:35:21 -0700 Subject: [PATCH 02/14] format and clippy --- .../src/execution/expressions/arithmetic.rs | 23 +++++++++++-------- native/core/src/execution/planner.rs | 5 ++-- native/core/src/execution/planner/traits.rs | 2 ++ 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/native/core/src/execution/expressions/arithmetic.rs b/native/core/src/execution/expressions/arithmetic.rs index 53a42bbe8c..ecd8c97acb 100644 --- a/native/core/src/execution/expressions/arithmetic.rs +++ b/native/core/src/execution/expressions/arithmetic.rs @@ -25,9 +25,12 @@ use datafusion::physical_expr::PhysicalExpr; use datafusion_comet_proto::spark_expression::{expr::ExprStruct, Expr}; use datafusion_comet_spark_expr::{create_modulo_expr, create_negate_expr, EvalMode}; -use crate::execution::operators::ExecutionError; -use crate::execution::planner::traits::ExpressionBuilder; -use crate::execution::planner::{from_protobuf_eval_mode, BinaryExprOptions}; +use crate::execution::{ + operators::ExecutionError, + planner::{ + from_protobuf_eval_mode, traits::ExpressionBuilder, BinaryExprOptions, PhysicalPlanner, + }, +}; /// Builder for Add expressions pub struct AddBuilder; @@ -37,7 +40,7 @@ impl ExpressionBuilder for AddBuilder { &self, spark_expr: &Expr, input_schema: SchemaRef, - planner: &crate::execution::planner::PhysicalPlanner, + planner: &PhysicalPlanner, ) -> Result, ExecutionError> { if let Some(ExprStruct::Add(expr)) = &spark_expr.expr_struct { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; @@ -65,7 +68,7 @@ impl ExpressionBuilder for SubtractBuilder { &self, spark_expr: &Expr, input_schema: SchemaRef, - planner: &crate::execution::planner::PhysicalPlanner, + planner: &PhysicalPlanner, ) -> Result, ExecutionError> { if let Some(ExprStruct::Subtract(expr)) = &spark_expr.expr_struct { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; @@ -93,7 +96,7 @@ impl ExpressionBuilder for MultiplyBuilder { &self, spark_expr: &Expr, input_schema: SchemaRef, - planner: &crate::execution::planner::PhysicalPlanner, + planner: &PhysicalPlanner, ) -> Result, ExecutionError> { if let Some(ExprStruct::Multiply(expr)) = &spark_expr.expr_struct { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; @@ -121,7 +124,7 @@ impl ExpressionBuilder for DivideBuilder { &self, spark_expr: &Expr, input_schema: SchemaRef, - planner: &crate::execution::planner::PhysicalPlanner, + planner: &PhysicalPlanner, ) -> Result, ExecutionError> { if let Some(ExprStruct::Divide(expr)) = &spark_expr.expr_struct { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; @@ -149,7 +152,7 @@ impl ExpressionBuilder for IntegralDivideBuilder { &self, spark_expr: &Expr, input_schema: SchemaRef, - planner: &crate::execution::planner::PhysicalPlanner, + planner: &PhysicalPlanner, ) -> Result, ExecutionError> { if let Some(ExprStruct::IntegralDivide(expr)) = &spark_expr.expr_struct { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; @@ -180,7 +183,7 @@ impl ExpressionBuilder for RemainderBuilder { &self, spark_expr: &Expr, input_schema: SchemaRef, - planner: &crate::execution::planner::PhysicalPlanner, + planner: &PhysicalPlanner, ) -> Result, ExecutionError> { if let Some(ExprStruct::Remainder(expr)) = &spark_expr.expr_struct { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; @@ -217,7 +220,7 @@ impl ExpressionBuilder for UnaryMinusBuilder { &self, spark_expr: &Expr, input_schema: SchemaRef, - planner: &crate::execution::planner::PhysicalPlanner, + planner: &PhysicalPlanner, ) -> Result, ExecutionError> { if let Some(ExprStruct::UnaryMinus(expr)) = &spark_expr.expr_struct { let child = planner.create_expr(expr.child.as_ref().unwrap(), input_schema)?; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 38f92b5e68..61a5738eef 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -66,9 +66,8 @@ use datafusion::{ prelude::SessionContext, }; use datafusion_comet_spark_expr::{ - create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, - BinaryOutputStyle, BloomFilterAgg, BloomFilterMightContain, EvalMode, - SparkHour, SparkMinute, SparkSecond, + create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle, + BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond, }; use iceberg::expr::Bind; diff --git a/native/core/src/execution/planner/traits.rs b/native/core/src/execution/planner/traits.rs index 2fb1df4cf9..a49ba1b95d 100644 --- a/native/core/src/execution/planner/traits.rs +++ b/native/core/src/execution/planner/traits.rs @@ -39,6 +39,7 @@ pub trait ExpressionBuilder: Send + Sync { } /// Trait for building physical operators from Spark protobuf operators +#[allow(dead_code)] pub trait OperatorBuilder: Send + Sync { /// Build a Spark plan from a protobuf operator fn build( @@ -125,6 +126,7 @@ pub enum ExpressionType { /// Enum to identify different operator types for registry dispatch #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[allow(dead_code)] pub enum OperatorType { Scan, NativeScan, From c0e4bf609214ec35ffb2d8df6a779dd2309d804d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 4 Dec 2025 15:39:31 -0700 Subject: [PATCH 03/14] use dynamic check for registered expression handlers --- native/core/src/execution/planner.rs | 26 ++++--------------- .../execution/planner/expression_registry.rs | 9 +++++++ 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 61a5738eef..b0a68c4219 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -190,20 +190,6 @@ impl PhysicalPlanner { &self.session_ctx } - /// Check if an expression is an arithmetic expression that should be handled by the registry - fn is_arithmetic_expression(expr_struct: &ExprStruct) -> bool { - matches!( - expr_struct, - ExprStruct::Add(_) - | ExprStruct::Subtract(_) - | ExprStruct::Multiply(_) - | ExprStruct::Divide(_) - | ExprStruct::IntegralDivide(_) - | ExprStruct::Remainder(_) - | ExprStruct::UnaryMinus(_) - ) - } - /// get DataFusion PartitionedFiles from a Spark FilePartition fn get_partitioned_files( &self, @@ -262,13 +248,11 @@ impl PhysicalPlanner { spark_expr: &Expr, input_schema: SchemaRef, ) -> Result, ExecutionError> { - // Try to use the modular registry for arithmetic expressions first - if let Some(expr_struct) = spark_expr.expr_struct.as_ref() { - if Self::is_arithmetic_expression(expr_struct) { - return self - .expression_registry - .create_expr(spark_expr, input_schema, self); - } + // Try to use the modular registry first - this automatically handles any registered expression types + if self.expression_registry.can_handle(spark_expr) { + return self + .expression_registry + .create_expr(spark_expr, input_schema, self); } // Fall back to the original monolithic match for other expressions diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index 0c66cd6a88..18d2040286 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -43,6 +43,15 @@ impl ExpressionRegistry { registry } + /// Check if the registry can handle a given expression type + pub fn can_handle(&self, spark_expr: &Expr) -> bool { + if let Ok(expr_type) = Self::get_expression_type(spark_expr) { + self.builders.contains_key(&expr_type) + } else { + false + } + } + /// Create a physical expression from a Spark protobuf expression pub fn create_expr( &self, From 0a3781a478bee3d7ce45ca819784ae1c5739a91c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Dec 2025 06:30:40 -0700 Subject: [PATCH 04/14] global registry --- native/core/src/execution/planner.rs | 9 ++------- .../src/execution/planner/expression_registry.rs | 14 +++++++------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b0a68c4219..f8da98a977 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -157,7 +157,6 @@ pub struct PhysicalPlanner { exec_context_id: i64, partition: i32, session_ctx: Arc, - expression_registry: ExpressionRegistry, } impl Default for PhysicalPlanner { @@ -172,7 +171,6 @@ impl PhysicalPlanner { exec_context_id: TEST_EXEC_CONTEXT_ID, session_ctx, partition, - expression_registry: ExpressionRegistry::new(), } } @@ -181,7 +179,6 @@ impl PhysicalPlanner { exec_context_id, partition: self.partition, session_ctx: Arc::clone(&self.session_ctx), - expression_registry: self.expression_registry, } } @@ -249,10 +246,8 @@ impl PhysicalPlanner { input_schema: SchemaRef, ) -> Result, ExecutionError> { // Try to use the modular registry first - this automatically handles any registered expression types - if self.expression_registry.can_handle(spark_expr) { - return self - .expression_registry - .create_expr(spark_expr, input_schema, self); + if ExpressionRegistry::global().can_handle(spark_expr) { + return ExpressionRegistry::global().create_expr(spark_expr, input_schema, self); } // Fall back to the original monolithic match for other expressions diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index 18d2040286..4b0ef800ea 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -34,7 +34,7 @@ pub struct ExpressionRegistry { impl ExpressionRegistry { /// Create a new expression registry with all builders registered - pub fn new() -> Self { + fn new() -> Self { let mut registry = Self { builders: HashMap::new(), }; @@ -43,6 +43,12 @@ impl ExpressionRegistry { registry } + /// Get the global shared registry instance + pub fn global() -> &'static ExpressionRegistry { + static REGISTRY: std::sync::OnceLock = std::sync::OnceLock::new(); + REGISTRY.get_or_init(ExpressionRegistry::new) + } + /// Check if the registry can handle a given expression type pub fn can_handle(&self, spark_expr: &Expr) -> bool { if let Ok(expr_type) = Self::get_expression_type(spark_expr) { @@ -184,9 +190,3 @@ impl ExpressionRegistry { } } } - -impl Default for ExpressionRegistry { - fn default() -> Self { - Self::new() - } -} From 84003747a885d54b53e94163c60084b59d6c748b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Dec 2025 06:35:32 -0700 Subject: [PATCH 05/14] move comparison expressions to new framework --- .../src/execution/expressions/comparison.rs | 227 ++++++++++++++++++ native/core/src/execution/expressions/mod.rs | 1 + native/core/src/execution/planner.rs | 56 ----- .../execution/planner/expression_registry.rs | 26 +- 4 files changed, 253 insertions(+), 57 deletions(-) create mode 100644 native/core/src/execution/expressions/comparison.rs diff --git a/native/core/src/execution/expressions/comparison.rs b/native/core/src/execution/expressions/comparison.rs new file mode 100644 index 0000000000..78693f0d40 --- /dev/null +++ b/native/core/src/execution/expressions/comparison.rs @@ -0,0 +1,227 @@ +//! Comparison expression builders + +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion::logical_expr::Operator as DataFusionOperator; +use datafusion::physical_expr::{expressions::BinaryExpr, PhysicalExpr}; +use datafusion_comet_proto::spark_expression::{expr::ExprStruct, Expr}; + +use crate::execution::{ + operators::ExecutionError, + planner::{traits::ExpressionBuilder, PhysicalPlanner}, +}; + +/// Helper function to create binary comparison expressions +fn create_binary_comparison_expr( + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + operator: DataFusionOperator, +) -> Result, ExecutionError> { + // Extract left and right from the appropriate comparison expression + let (left_expr, right_expr) = match &spark_expr.expr_struct { + Some(ExprStruct::Eq(expr)) => (expr.left.as_ref(), expr.right.as_ref()), + Some(ExprStruct::Neq(expr)) => (expr.left.as_ref(), expr.right.as_ref()), + Some(ExprStruct::Lt(expr)) => (expr.left.as_ref(), expr.right.as_ref()), + Some(ExprStruct::LtEq(expr)) => (expr.left.as_ref(), expr.right.as_ref()), + Some(ExprStruct::Gt(expr)) => (expr.left.as_ref(), expr.right.as_ref()), + Some(ExprStruct::GtEq(expr)) => (expr.left.as_ref(), expr.right.as_ref()), + Some(ExprStruct::EqNullSafe(expr)) => (expr.left.as_ref(), expr.right.as_ref()), + Some(ExprStruct::NeqNullSafe(expr)) => (expr.left.as_ref(), expr.right.as_ref()), + _ => { + return Err(ExecutionError::GeneralError( + "Expected comparison expression".to_string(), + )) + } + }; + + let left = planner.create_expr(left_expr.unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(right_expr.unwrap(), input_schema)?; + Ok(Arc::new(BinaryExpr::new(left, operator, right))) +} + +/// Builder for Eq expressions +pub struct EqBuilder; + +impl ExpressionBuilder for EqBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::Eq(_)) = &spark_expr.expr_struct { + create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Eq) + } else { + Err(ExecutionError::GeneralError( + "Expected Eq expression".to_string(), + )) + } + } +} + +/// Builder for Neq expressions +pub struct NeqBuilder; + +impl ExpressionBuilder for NeqBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::Neq(_)) = &spark_expr.expr_struct { + create_binary_comparison_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::NotEq, + ) + } else { + Err(ExecutionError::GeneralError( + "Expected Neq expression".to_string(), + )) + } + } +} + +/// Builder for Lt expressions +pub struct LtBuilder; + +impl ExpressionBuilder for LtBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::Lt(_)) = &spark_expr.expr_struct { + create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Lt) + } else { + Err(ExecutionError::GeneralError( + "Expected Lt expression".to_string(), + )) + } + } +} + +/// Builder for LtEq expressions +pub struct LtEqBuilder; + +impl ExpressionBuilder for LtEqBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::LtEq(_)) = &spark_expr.expr_struct { + create_binary_comparison_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::LtEq, + ) + } else { + Err(ExecutionError::GeneralError( + "Expected LtEq expression".to_string(), + )) + } + } +} + +/// Builder for Gt expressions +pub struct GtBuilder; + +impl ExpressionBuilder for GtBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::Gt(_)) = &spark_expr.expr_struct { + create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Gt) + } else { + Err(ExecutionError::GeneralError( + "Expected Gt expression".to_string(), + )) + } + } +} + +/// Builder for GtEq expressions +pub struct GtEqBuilder; + +impl ExpressionBuilder for GtEqBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::GtEq(_)) = &spark_expr.expr_struct { + create_binary_comparison_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::GtEq, + ) + } else { + Err(ExecutionError::GeneralError( + "Expected GtEq expression".to_string(), + )) + } + } +} + +/// Builder for EqNullSafe expressions +pub struct EqNullSafeBuilder; + +impl ExpressionBuilder for EqNullSafeBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::EqNullSafe(_)) = &spark_expr.expr_struct { + create_binary_comparison_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::IsNotDistinctFrom, + ) + } else { + Err(ExecutionError::GeneralError( + "Expected EqNullSafe expression".to_string(), + )) + } + } +} + +/// Builder for NeqNullSafe expressions +pub struct NeqNullSafeBuilder; + +impl ExpressionBuilder for NeqNullSafeBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + if let Some(ExprStruct::NeqNullSafe(_)) = &spark_expr.expr_struct { + create_binary_comparison_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::IsDistinctFrom, + ) + } else { + Err(ExecutionError::GeneralError( + "Expected NeqNullSafe expression".to_string(), + )) + } + } +} diff --git a/native/core/src/execution/expressions/mod.rs b/native/core/src/execution/expressions/mod.rs index 84b930d059..1c98987721 100644 --- a/native/core/src/execution/expressions/mod.rs +++ b/native/core/src/execution/expressions/mod.rs @@ -18,6 +18,7 @@ //! Native DataFusion expressions pub mod arithmetic; +pub mod comparison; pub mod subquery; pub use datafusion_comet_spark_expr::EvalMode; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f8da98a977..1dce4a1a57 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -252,48 +252,6 @@ impl PhysicalPlanner { // Fall back to the original monolithic match for other expressions match spark_expr.expr_struct.as_ref().unwrap() { - ExprStruct::Eq(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::Eq; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::Neq(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::NotEq; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::Gt(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::Gt; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::GtEq(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::GtEq; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::Lt(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::Lt; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::LtEq(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::LtEq; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } ExprStruct::Bound(bound) => { let idx = bound.index as usize; if idx >= input_schema.fields().len() { @@ -559,20 +517,6 @@ impl PhysicalPlanner { _ => func, } } - ExprStruct::EqNullSafe(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::IsNotDistinctFrom; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::NeqNullSafe(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::IsDistinctFrom; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } ExprStruct::BitwiseAnd(expr) => { let left = self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index 4b0ef800ea..c89d517089 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -82,8 +82,10 @@ impl ExpressionRegistry { // Register arithmetic expressions self.register_arithmetic_expressions(); + // Register comparison expressions + self.register_comparison_expressions(); + // TODO: Register other expression categories in future phases - // self.register_comparison_expressions(); // self.register_string_expressions(); // self.register_temporal_expressions(); // etc. @@ -111,6 +113,28 @@ impl ExpressionRegistry { .insert(ExpressionType::UnaryMinus, Box::new(UnaryMinusBuilder)); } + /// Register comparison expression builders + fn register_comparison_expressions(&mut self) { + use crate::execution::expressions::comparison::*; + + self.builders + .insert(ExpressionType::Eq, Box::new(EqBuilder)); + self.builders + .insert(ExpressionType::Neq, Box::new(NeqBuilder)); + self.builders + .insert(ExpressionType::Lt, Box::new(LtBuilder)); + self.builders + .insert(ExpressionType::LtEq, Box::new(LtEqBuilder)); + self.builders + .insert(ExpressionType::Gt, Box::new(GtBuilder)); + self.builders + .insert(ExpressionType::GtEq, Box::new(GtEqBuilder)); + self.builders + .insert(ExpressionType::EqNullSafe, Box::new(EqNullSafeBuilder)); + self.builders + .insert(ExpressionType::NeqNullSafe, Box::new(NeqNullSafeBuilder)); + } + /// Extract expression type from Spark protobuf expression fn get_expression_type(spark_expr: &Expr) -> Result { match spark_expr.expr_struct.as_ref() { From 2e25fad4fe8366ee5ca8f69f17d9fcfbd65ee4e5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Dec 2025 06:45:10 -0700 Subject: [PATCH 06/14] move bitwise expr --- .../src/execution/expressions/arithmetic.rs | 188 +++++++----------- .../src/execution/expressions/comparison.rs | 103 ++-------- native/core/src/execution/expressions/mod.rs | 4 + native/core/src/execution/planner.rs | 35 ---- .../execution/planner/expression_registry.rs | 23 +++ native/core/src/execution/planner/traits.rs | 21 ++ 6 files changed, 143 insertions(+), 231 deletions(-) diff --git a/native/core/src/execution/expressions/arithmetic.rs b/native/core/src/execution/expressions/arithmetic.rs index ecd8c97acb..4b3bb1a4c6 100644 --- a/native/core/src/execution/expressions/arithmetic.rs +++ b/native/core/src/execution/expressions/arithmetic.rs @@ -22,10 +22,11 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use datafusion::logical_expr::Operator as DataFusionOperator; use datafusion::physical_expr::PhysicalExpr; -use datafusion_comet_proto::spark_expression::{expr::ExprStruct, Expr}; +use datafusion_comet_proto::spark_expression::Expr; use datafusion_comet_spark_expr::{create_modulo_expr, create_negate_expr, EvalMode}; use crate::execution::{ + expressions::extract_expr, operators::ExecutionError, planner::{ from_protobuf_eval_mode, traits::ExpressionBuilder, BinaryExprOptions, PhysicalPlanner, @@ -42,21 +43,16 @@ impl ExpressionBuilder for AddBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::Add(expr)) = &spark_expr.expr_struct { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - planner.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Plus, - input_schema, - eval_mode, - ) - } else { - Err(ExecutionError::GeneralError( - "Expected Add expression".to_string(), - )) - } + let expr = extract_expr!(spark_expr, Add); + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Plus, + input_schema, + eval_mode, + ) } } @@ -70,21 +66,16 @@ impl ExpressionBuilder for SubtractBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::Subtract(expr)) = &spark_expr.expr_struct { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - planner.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Minus, - input_schema, - eval_mode, - ) - } else { - Err(ExecutionError::GeneralError( - "Expected Subtract expression".to_string(), - )) - } + let expr = extract_expr!(spark_expr, Subtract); + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Minus, + input_schema, + eval_mode, + ) } } @@ -98,21 +89,16 @@ impl ExpressionBuilder for MultiplyBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::Multiply(expr)) = &spark_expr.expr_struct { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - planner.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Multiply, - input_schema, - eval_mode, - ) - } else { - Err(ExecutionError::GeneralError( - "Expected Multiply expression".to_string(), - )) - } + let expr = extract_expr!(spark_expr, Multiply); + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Multiply, + input_schema, + eval_mode, + ) } } @@ -126,21 +112,16 @@ impl ExpressionBuilder for DivideBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::Divide(expr)) = &spark_expr.expr_struct { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - planner.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Divide, - input_schema, - eval_mode, - ) - } else { - Err(ExecutionError::GeneralError( - "Expected Divide expression".to_string(), - )) - } + let expr = extract_expr!(spark_expr, Divide); + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Divide, + input_schema, + eval_mode, + ) } } @@ -154,24 +135,19 @@ impl ExpressionBuilder for IntegralDivideBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::IntegralDivide(expr)) = &spark_expr.expr_struct { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - planner.create_binary_expr_with_options( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Divide, - input_schema, - BinaryExprOptions { - is_integral_div: true, - }, - eval_mode, - ) - } else { - Err(ExecutionError::GeneralError( - "Expected IntegralDivide expression".to_string(), - )) - } + let expr = extract_expr!(spark_expr, IntegralDivide); + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr_with_options( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Divide, + input_schema, + BinaryExprOptions { + is_integral_div: true, + }, + eval_mode, + ) } } @@ -185,30 +161,23 @@ impl ExpressionBuilder for RemainderBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::Remainder(expr)) = &spark_expr.expr_struct { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - let left = - planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = - planner.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?; - - let result = create_modulo_expr( - left, - right, - expr.return_type - .as_ref() - .map(crate::execution::serde::to_arrow_datatype) - .unwrap(), - input_schema, - eval_mode == EvalMode::Ansi, - &planner.session_ctx().state(), - ); - result.map_err(|e| ExecutionError::GeneralError(e.to_string())) - } else { - Err(ExecutionError::GeneralError( - "Expected Remainder expression".to_string(), - )) - } + let expr = extract_expr!(spark_expr, Remainder); + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?; + + let result = create_modulo_expr( + left, + right, + expr.return_type + .as_ref() + .map(crate::execution::serde::to_arrow_datatype) + .unwrap(), + input_schema, + eval_mode == EvalMode::Ansi, + &planner.session_ctx().state(), + ); + result.map_err(|e| ExecutionError::GeneralError(e.to_string())) } } @@ -222,14 +191,9 @@ impl ExpressionBuilder for UnaryMinusBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::UnaryMinus(expr)) = &spark_expr.expr_struct { - let child = planner.create_expr(expr.child.as_ref().unwrap(), input_schema)?; - let result = create_negate_expr(child, expr.fail_on_error); - result.map_err(|e| ExecutionError::GeneralError(e.to_string())) - } else { - Err(ExecutionError::GeneralError( - "Expected UnaryMinus expression".to_string(), - )) - } + let expr = extract_expr!(spark_expr, UnaryMinus); + let child = planner.create_expr(expr.child.as_ref().unwrap(), input_schema)?; + let result = create_negate_expr(child, expr.fail_on_error); + result.map_err(|e| ExecutionError::GeneralError(e.to_string())) } } diff --git a/native/core/src/execution/expressions/comparison.rs b/native/core/src/execution/expressions/comparison.rs index 78693f0d40..0610d38cc9 100644 --- a/native/core/src/execution/expressions/comparison.rs +++ b/native/core/src/execution/expressions/comparison.rs @@ -30,9 +30,7 @@ fn create_binary_comparison_expr( Some(ExprStruct::EqNullSafe(expr)) => (expr.left.as_ref(), expr.right.as_ref()), Some(ExprStruct::NeqNullSafe(expr)) => (expr.left.as_ref(), expr.right.as_ref()), _ => { - return Err(ExecutionError::GeneralError( - "Expected comparison expression".to_string(), - )) + panic!("create_binary_comparison_expr called with non-comparison expression"); } }; @@ -51,13 +49,7 @@ impl ExpressionBuilder for EqBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::Eq(_)) = &spark_expr.expr_struct { - create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Eq) - } else { - Err(ExecutionError::GeneralError( - "Expected Eq expression".to_string(), - )) - } + create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Eq) } } @@ -71,18 +63,7 @@ impl ExpressionBuilder for NeqBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::Neq(_)) = &spark_expr.expr_struct { - create_binary_comparison_expr( - spark_expr, - input_schema, - planner, - DataFusionOperator::NotEq, - ) - } else { - Err(ExecutionError::GeneralError( - "Expected Neq expression".to_string(), - )) - } + create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::NotEq) } } @@ -96,13 +77,7 @@ impl ExpressionBuilder for LtBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::Lt(_)) = &spark_expr.expr_struct { - create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Lt) - } else { - Err(ExecutionError::GeneralError( - "Expected Lt expression".to_string(), - )) - } + create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Lt) } } @@ -116,18 +91,7 @@ impl ExpressionBuilder for LtEqBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::LtEq(_)) = &spark_expr.expr_struct { - create_binary_comparison_expr( - spark_expr, - input_schema, - planner, - DataFusionOperator::LtEq, - ) - } else { - Err(ExecutionError::GeneralError( - "Expected LtEq expression".to_string(), - )) - } + create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::LtEq) } } @@ -141,13 +105,7 @@ impl ExpressionBuilder for GtBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::Gt(_)) = &spark_expr.expr_struct { - create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Gt) - } else { - Err(ExecutionError::GeneralError( - "Expected Gt expression".to_string(), - )) - } + create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Gt) } } @@ -161,18 +119,7 @@ impl ExpressionBuilder for GtEqBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::GtEq(_)) = &spark_expr.expr_struct { - create_binary_comparison_expr( - spark_expr, - input_schema, - planner, - DataFusionOperator::GtEq, - ) - } else { - Err(ExecutionError::GeneralError( - "Expected GtEq expression".to_string(), - )) - } + create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::GtEq) } } @@ -186,18 +133,12 @@ impl ExpressionBuilder for EqNullSafeBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::EqNullSafe(_)) = &spark_expr.expr_struct { - create_binary_comparison_expr( - spark_expr, - input_schema, - planner, - DataFusionOperator::IsNotDistinctFrom, - ) - } else { - Err(ExecutionError::GeneralError( - "Expected EqNullSafe expression".to_string(), - )) - } + create_binary_comparison_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::IsNotDistinctFrom, + ) } } @@ -211,17 +152,11 @@ impl ExpressionBuilder for NeqNullSafeBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - if let Some(ExprStruct::NeqNullSafe(_)) = &spark_expr.expr_struct { - create_binary_comparison_expr( - spark_expr, - input_schema, - planner, - DataFusionOperator::IsDistinctFrom, - ) - } else { - Err(ExecutionError::GeneralError( - "Expected NeqNullSafe expression".to_string(), - )) - } + create_binary_comparison_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::IsDistinctFrom, + ) } } diff --git a/native/core/src/execution/expressions/mod.rs b/native/core/src/execution/expressions/mod.rs index 1c98987721..5dc20c4b1a 100644 --- a/native/core/src/execution/expressions/mod.rs +++ b/native/core/src/execution/expressions/mod.rs @@ -18,7 +18,11 @@ //! Native DataFusion expressions pub mod arithmetic; +pub mod bitwise; pub mod comparison; pub mod subquery; pub use datafusion_comet_spark_expr::EvalMode; + +// Re-export the extract_expr macro for convenience in expression builders +pub use crate::extract_expr; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 1dce4a1a57..0637fd99a9 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -517,41 +517,6 @@ impl PhysicalPlanner { _ => func, } } - ExprStruct::BitwiseAnd(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::BitwiseAnd; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::BitwiseOr(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::BitwiseOr; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::BitwiseXor(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::BitwiseXor; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::BitwiseShiftRight(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::BitwiseShiftRight; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::BitwiseShiftLeft(expr) => { - let left = - self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::BitwiseShiftLeft; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } ExprStruct::CaseWhen(case_when) => { let when_then_pairs = case_when .when diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index c89d517089..aaf963c660 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -85,6 +85,9 @@ impl ExpressionRegistry { // Register comparison expressions self.register_comparison_expressions(); + // Register bitwise expressions + self.register_bitwise_expressions(); + // TODO: Register other expression categories in future phases // self.register_string_expressions(); // self.register_temporal_expressions(); @@ -135,6 +138,26 @@ impl ExpressionRegistry { .insert(ExpressionType::NeqNullSafe, Box::new(NeqNullSafeBuilder)); } + /// Register bitwise expression builders + fn register_bitwise_expressions(&mut self) { + use crate::execution::expressions::bitwise::*; + + self.builders + .insert(ExpressionType::BitwiseAnd, Box::new(BitwiseAndBuilder)); + self.builders + .insert(ExpressionType::BitwiseOr, Box::new(BitwiseOrBuilder)); + self.builders + .insert(ExpressionType::BitwiseXor, Box::new(BitwiseXorBuilder)); + self.builders.insert( + ExpressionType::BitwiseShiftLeft, + Box::new(BitwiseShiftLeftBuilder), + ); + self.builders.insert( + ExpressionType::BitwiseShiftRight, + Box::new(BitwiseShiftRightBuilder), + ); + } + /// Extract expression type from Spark protobuf expression fn get_expression_type(spark_expr: &Expr) -> Result { match spark_expr.expr_struct.as_ref() { diff --git a/native/core/src/execution/planner/traits.rs b/native/core/src/execution/planner/traits.rs index a49ba1b95d..11109fa2c2 100644 --- a/native/core/src/execution/planner/traits.rs +++ b/native/core/src/execution/planner/traits.rs @@ -27,6 +27,27 @@ use jni::objects::GlobalRef; use crate::execution::operators::ScanExec; use crate::execution::{operators::ExecutionError, spark_plan::SparkPlan}; +/// Macro to extract a specific expression variant, panicking if called with wrong type. +/// This should be used in expression builders where the registry guarantees the correct +/// expression type has been routed to the builder. +#[macro_export] +macro_rules! extract_expr { + ($spark_expr:expr, $variant:ident) => { + match $spark_expr + .expr_struct + .as_ref() + .expect("expression struct must be present") + { + datafusion_comet_proto::spark_expression::expr::ExprStruct::$variant(expr) => expr, + other => panic!( + "{} builder called with wrong expression type: {:?}", + stringify!($variant), + other + ), + } + }; +} + /// Trait for building physical expressions from Spark protobuf expressions pub trait ExpressionBuilder: Send + Sync { /// Build a DataFusion physical expression from a Spark protobuf expression From d04084ab9fba9236844827d8301f246bbd2b0865 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Dec 2025 06:45:24 -0700 Subject: [PATCH 07/14] move bitwise expr --- .../core/src/execution/expressions/bitwise.rs | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 native/core/src/execution/expressions/bitwise.rs diff --git a/native/core/src/execution/expressions/bitwise.rs b/native/core/src/execution/expressions/bitwise.rs new file mode 100644 index 0000000000..187cf385f5 --- /dev/null +++ b/native/core/src/execution/expressions/bitwise.rs @@ -0,0 +1,142 @@ +//! Bitwise expression builders + +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion::logical_expr::Operator as DataFusionOperator; +use datafusion::physical_expr::{expressions::BinaryExpr, PhysicalExpr}; +use datafusion_comet_proto::spark_expression::Expr; + +use crate::execution::{ + operators::ExecutionError, + planner::{traits::ExpressionBuilder, PhysicalPlanner}, +}; + +/// Helper function to create binary bitwise expressions +fn create_binary_bitwise_expr( + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + operator: DataFusionOperator, +) -> Result, ExecutionError> { + // Extract left and right from the appropriate bitwise expression + let (left_expr, right_expr) = match &spark_expr.expr_struct { + Some(datafusion_comet_proto::spark_expression::expr::ExprStruct::BitwiseAnd(expr)) => { + (expr.left.as_ref(), expr.right.as_ref()) + } + Some(datafusion_comet_proto::spark_expression::expr::ExprStruct::BitwiseOr(expr)) => { + (expr.left.as_ref(), expr.right.as_ref()) + } + Some(datafusion_comet_proto::spark_expression::expr::ExprStruct::BitwiseXor(expr)) => { + (expr.left.as_ref(), expr.right.as_ref()) + } + Some(datafusion_comet_proto::spark_expression::expr::ExprStruct::BitwiseShiftLeft( + expr, + )) => (expr.left.as_ref(), expr.right.as_ref()), + Some(datafusion_comet_proto::spark_expression::expr::ExprStruct::BitwiseShiftRight( + expr, + )) => (expr.left.as_ref(), expr.right.as_ref()), + _ => { + panic!("create_binary_bitwise_expr called with non-bitwise expression"); + } + }; + + let left = planner.create_expr(left_expr.unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(right_expr.unwrap(), input_schema)?; + Ok(Arc::new(BinaryExpr::new(left, operator, right))) +} + +/// Builder for BitwiseAnd expressions +pub struct BitwiseAndBuilder; + +impl ExpressionBuilder for BitwiseAndBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + create_binary_bitwise_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::BitwiseAnd, + ) + } +} + +/// Builder for BitwiseOr expressions +pub struct BitwiseOrBuilder; + +impl ExpressionBuilder for BitwiseOrBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + create_binary_bitwise_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::BitwiseOr, + ) + } +} + +/// Builder for BitwiseXor expressions +pub struct BitwiseXorBuilder; + +impl ExpressionBuilder for BitwiseXorBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + create_binary_bitwise_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::BitwiseXor, + ) + } +} + +/// Builder for BitwiseShiftLeft expressions +pub struct BitwiseShiftLeftBuilder; + +impl ExpressionBuilder for BitwiseShiftLeftBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + create_binary_bitwise_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::BitwiseShiftLeft, + ) + } +} + +/// Builder for BitwiseShiftRight expressions +pub struct BitwiseShiftRightBuilder; + +impl ExpressionBuilder for BitwiseShiftRightBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + create_binary_bitwise_expr( + spark_expr, + input_schema, + planner, + DataFusionOperator::BitwiseShiftRight, + ) + } +} From 79d58cc93ba7ceab84380ed4da87d05a58d9bffc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Dec 2025 06:52:22 -0700 Subject: [PATCH 08/14] move more expressions --- native/core/src/execution/expressions/mod.rs | 2 ++ native/core/src/execution/planner.rs | 30 ++----------------- .../execution/planner/expression_registry.rs | 28 +++++++++++++++++ 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/native/core/src/execution/expressions/mod.rs b/native/core/src/execution/expressions/mod.rs index 5dc20c4b1a..105afd5952 100644 --- a/native/core/src/execution/expressions/mod.rs +++ b/native/core/src/execution/expressions/mod.rs @@ -20,6 +20,8 @@ pub mod arithmetic; pub mod bitwise; pub mod comparison; +pub mod logical; +pub mod nullcheck; pub mod subquery; pub use datafusion_comet_spark_expr::EvalMode; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 0637fd99a9..23c6964abf 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -50,8 +50,8 @@ use datafusion::{ logical_expr::Operator as DataFusionOperator, physical_expr::{ expressions::{ - in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, - Literal as DataFusionLiteral, NotExpr, + in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNullExpr, LikeExpr, + Literal as DataFusionLiteral, }, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr, }, @@ -269,28 +269,6 @@ impl PhysicalPlanner { data_type, ))) } - ExprStruct::IsNotNull(is_notnull) => { - let child = self.create_expr(is_notnull.child.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(IsNotNullExpr::new(child))) - } - ExprStruct::IsNull(is_null) => { - let child = self.create_expr(is_null.child.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(IsNullExpr::new(child))) - } - ExprStruct::And(and) => { - let left = - self.create_expr(and.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(and.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::And; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } - ExprStruct::Or(or) => { - let left = - self.create_expr(or.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = self.create_expr(or.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::Or; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } ExprStruct::Literal(literal) => { let data_type = to_arrow_datatype(literal.datatype.as_ref().unwrap()); let scalar_value = if literal.is_null { @@ -565,10 +543,6 @@ impl PhysicalPlanner { self.create_expr(expr.false_expr.as_ref().unwrap(), input_schema)?; Ok(Arc::new(IfExpr::new(if_expr, true_expr, false_expr))) } - ExprStruct::Not(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(NotExpr::new(child))) - } ExprStruct::NormalizeNanAndZero(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap()); diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index aaf963c660..f97cb984b1 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -88,6 +88,12 @@ impl ExpressionRegistry { // Register bitwise expressions self.register_bitwise_expressions(); + // Register logical expressions + self.register_logical_expressions(); + + // Register null check expressions + self.register_null_check_expressions(); + // TODO: Register other expression categories in future phases // self.register_string_expressions(); // self.register_temporal_expressions(); @@ -158,6 +164,28 @@ impl ExpressionRegistry { ); } + /// Register logical expression builders + fn register_logical_expressions(&mut self) { + use crate::execution::expressions::logical::*; + + self.builders + .insert(ExpressionType::And, Box::new(AndBuilder)); + self.builders + .insert(ExpressionType::Or, Box::new(OrBuilder)); + self.builders + .insert(ExpressionType::Not, Box::new(NotBuilder)); + } + + /// Register null check expression builders + fn register_null_check_expressions(&mut self) { + use crate::execution::expressions::nullcheck::*; + + self.builders + .insert(ExpressionType::IsNull, Box::new(IsNullBuilder)); + self.builders + .insert(ExpressionType::IsNotNull, Box::new(IsNotNullBuilder)); + } + /// Extract expression type from Spark protobuf expression fn get_expression_type(spark_expr: &Expr) -> Result { match spark_expr.expr_struct.as_ref() { From decdd02aa7d291bb5f08822af9bd920f8ef95bd1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Dec 2025 06:55:58 -0700 Subject: [PATCH 09/14] Refactor --- .../core/src/execution/expressions/bitwise.rs | 90 +++++---------- .../src/execution/expressions/comparison.rs | 108 +++++++++++------- 2 files changed, 90 insertions(+), 108 deletions(-) diff --git a/native/core/src/execution/expressions/bitwise.rs b/native/core/src/execution/expressions/bitwise.rs index 187cf385f5..ffcf572c06 100644 --- a/native/core/src/execution/expressions/bitwise.rs +++ b/native/core/src/execution/expressions/bitwise.rs @@ -8,44 +8,11 @@ use datafusion::physical_expr::{expressions::BinaryExpr, PhysicalExpr}; use datafusion_comet_proto::spark_expression::Expr; use crate::execution::{ + expressions::extract_expr, operators::ExecutionError, planner::{traits::ExpressionBuilder, PhysicalPlanner}, }; -/// Helper function to create binary bitwise expressions -fn create_binary_bitwise_expr( - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - operator: DataFusionOperator, -) -> Result, ExecutionError> { - // Extract left and right from the appropriate bitwise expression - let (left_expr, right_expr) = match &spark_expr.expr_struct { - Some(datafusion_comet_proto::spark_expression::expr::ExprStruct::BitwiseAnd(expr)) => { - (expr.left.as_ref(), expr.right.as_ref()) - } - Some(datafusion_comet_proto::spark_expression::expr::ExprStruct::BitwiseOr(expr)) => { - (expr.left.as_ref(), expr.right.as_ref()) - } - Some(datafusion_comet_proto::spark_expression::expr::ExprStruct::BitwiseXor(expr)) => { - (expr.left.as_ref(), expr.right.as_ref()) - } - Some(datafusion_comet_proto::spark_expression::expr::ExprStruct::BitwiseShiftLeft( - expr, - )) => (expr.left.as_ref(), expr.right.as_ref()), - Some(datafusion_comet_proto::spark_expression::expr::ExprStruct::BitwiseShiftRight( - expr, - )) => (expr.left.as_ref(), expr.right.as_ref()), - _ => { - panic!("create_binary_bitwise_expr called with non-bitwise expression"); - } - }; - - let left = planner.create_expr(left_expr.unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(right_expr.unwrap(), input_schema)?; - Ok(Arc::new(BinaryExpr::new(left, operator, right))) -} - /// Builder for BitwiseAnd expressions pub struct BitwiseAndBuilder; @@ -56,12 +23,11 @@ impl ExpressionBuilder for BitwiseAndBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_bitwise_expr( - spark_expr, - input_schema, - planner, - DataFusionOperator::BitwiseAnd, - ) + let expr = extract_expr!(spark_expr, BitwiseAnd); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + let op = DataFusionOperator::BitwiseAnd; + Ok(Arc::new(BinaryExpr::new(left, op, right))) } } @@ -75,12 +41,11 @@ impl ExpressionBuilder for BitwiseOrBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_bitwise_expr( - spark_expr, - input_schema, - planner, - DataFusionOperator::BitwiseOr, - ) + let expr = extract_expr!(spark_expr, BitwiseOr); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + let op = DataFusionOperator::BitwiseOr; + Ok(Arc::new(BinaryExpr::new(left, op, right))) } } @@ -94,12 +59,11 @@ impl ExpressionBuilder for BitwiseXorBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_bitwise_expr( - spark_expr, - input_schema, - planner, - DataFusionOperator::BitwiseXor, - ) + let expr = extract_expr!(spark_expr, BitwiseXor); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + let op = DataFusionOperator::BitwiseXor; + Ok(Arc::new(BinaryExpr::new(left, op, right))) } } @@ -113,12 +77,11 @@ impl ExpressionBuilder for BitwiseShiftLeftBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_bitwise_expr( - spark_expr, - input_schema, - planner, - DataFusionOperator::BitwiseShiftLeft, - ) + let expr = extract_expr!(spark_expr, BitwiseShiftLeft); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + let op = DataFusionOperator::BitwiseShiftLeft; + Ok(Arc::new(BinaryExpr::new(left, op, right))) } } @@ -132,11 +95,10 @@ impl ExpressionBuilder for BitwiseShiftRightBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_bitwise_expr( - spark_expr, - input_schema, - planner, - DataFusionOperator::BitwiseShiftRight, - ) + let expr = extract_expr!(spark_expr, BitwiseShiftRight); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + let op = DataFusionOperator::BitwiseShiftRight; + Ok(Arc::new(BinaryExpr::new(left, op, right))) } } diff --git a/native/core/src/execution/expressions/comparison.rs b/native/core/src/execution/expressions/comparison.rs index 0610d38cc9..70bb3d4904 100644 --- a/native/core/src/execution/expressions/comparison.rs +++ b/native/core/src/execution/expressions/comparison.rs @@ -5,40 +5,14 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use datafusion::logical_expr::Operator as DataFusionOperator; use datafusion::physical_expr::{expressions::BinaryExpr, PhysicalExpr}; -use datafusion_comet_proto::spark_expression::{expr::ExprStruct, Expr}; +use datafusion_comet_proto::spark_expression::Expr; use crate::execution::{ + expressions::extract_expr, operators::ExecutionError, planner::{traits::ExpressionBuilder, PhysicalPlanner}, }; -/// Helper function to create binary comparison expressions -fn create_binary_comparison_expr( - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - operator: DataFusionOperator, -) -> Result, ExecutionError> { - // Extract left and right from the appropriate comparison expression - let (left_expr, right_expr) = match &spark_expr.expr_struct { - Some(ExprStruct::Eq(expr)) => (expr.left.as_ref(), expr.right.as_ref()), - Some(ExprStruct::Neq(expr)) => (expr.left.as_ref(), expr.right.as_ref()), - Some(ExprStruct::Lt(expr)) => (expr.left.as_ref(), expr.right.as_ref()), - Some(ExprStruct::LtEq(expr)) => (expr.left.as_ref(), expr.right.as_ref()), - Some(ExprStruct::Gt(expr)) => (expr.left.as_ref(), expr.right.as_ref()), - Some(ExprStruct::GtEq(expr)) => (expr.left.as_ref(), expr.right.as_ref()), - Some(ExprStruct::EqNullSafe(expr)) => (expr.left.as_ref(), expr.right.as_ref()), - Some(ExprStruct::NeqNullSafe(expr)) => (expr.left.as_ref(), expr.right.as_ref()), - _ => { - panic!("create_binary_comparison_expr called with non-comparison expression"); - } - }; - - let left = planner.create_expr(left_expr.unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(right_expr.unwrap(), input_schema)?; - Ok(Arc::new(BinaryExpr::new(left, operator, right))) -} - /// Builder for Eq expressions pub struct EqBuilder; @@ -49,7 +23,14 @@ impl ExpressionBuilder for EqBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Eq) + let expr = extract_expr!(spark_expr, Eq); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + Ok(Arc::new(BinaryExpr::new( + left, + DataFusionOperator::Eq, + right, + ))) } } @@ -63,7 +44,14 @@ impl ExpressionBuilder for NeqBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::NotEq) + let expr = extract_expr!(spark_expr, Neq); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + Ok(Arc::new(BinaryExpr::new( + left, + DataFusionOperator::NotEq, + right, + ))) } } @@ -77,7 +65,14 @@ impl ExpressionBuilder for LtBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Lt) + let expr = extract_expr!(spark_expr, Lt); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + Ok(Arc::new(BinaryExpr::new( + left, + DataFusionOperator::Lt, + right, + ))) } } @@ -91,7 +86,14 @@ impl ExpressionBuilder for LtEqBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::LtEq) + let expr = extract_expr!(spark_expr, LtEq); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + Ok(Arc::new(BinaryExpr::new( + left, + DataFusionOperator::LtEq, + right, + ))) } } @@ -105,7 +107,14 @@ impl ExpressionBuilder for GtBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::Gt) + let expr = extract_expr!(spark_expr, Gt); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + Ok(Arc::new(BinaryExpr::new( + left, + DataFusionOperator::Gt, + right, + ))) } } @@ -119,7 +128,14 @@ impl ExpressionBuilder for GtEqBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_comparison_expr(spark_expr, input_schema, planner, DataFusionOperator::GtEq) + let expr = extract_expr!(spark_expr, GtEq); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + Ok(Arc::new(BinaryExpr::new( + left, + DataFusionOperator::GtEq, + right, + ))) } } @@ -133,12 +149,14 @@ impl ExpressionBuilder for EqNullSafeBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_comparison_expr( - spark_expr, - input_schema, - planner, + let expr = extract_expr!(spark_expr, EqNullSafe); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + Ok(Arc::new(BinaryExpr::new( + left, DataFusionOperator::IsNotDistinctFrom, - ) + right, + ))) } } @@ -152,11 +170,13 @@ impl ExpressionBuilder for NeqNullSafeBuilder { input_schema: SchemaRef, planner: &PhysicalPlanner, ) -> Result, ExecutionError> { - create_binary_comparison_expr( - spark_expr, - input_schema, - planner, + let expr = extract_expr!(spark_expr, NeqNullSafe); + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + Ok(Arc::new(BinaryExpr::new( + left, DataFusionOperator::IsDistinctFrom, - ) + right, + ))) } } From d9e78de60302f6a214db8ff32c3d6dcc891d8f71 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Dec 2025 07:01:43 -0700 Subject: [PATCH 10/14] macros --- .../src/execution/expressions/arithmetic.rs | 125 ++--------- .../core/src/execution/expressions/bitwise.rs | 128 +++-------- .../src/execution/expressions/comparison.rs | 207 +++--------------- .../core/src/execution/expressions/logical.rs | 17 ++ .../src/execution/expressions/nullcheck.rs | 15 ++ native/core/src/execution/planner/traits.rs | 86 ++++++++ 6 files changed, 195 insertions(+), 383 deletions(-) create mode 100644 native/core/src/execution/expressions/logical.rs create mode 100644 native/core/src/execution/expressions/nullcheck.rs diff --git a/native/core/src/execution/expressions/arithmetic.rs b/native/core/src/execution/expressions/arithmetic.rs index 4b3bb1a4c6..c4862c5507 100644 --- a/native/core/src/execution/expressions/arithmetic.rs +++ b/native/core/src/execution/expressions/arithmetic.rs @@ -1,20 +1,3 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - //! Arithmetic expression builders use std::sync::Arc; @@ -33,99 +16,25 @@ use crate::execution::{ }, }; -/// Builder for Add expressions -pub struct AddBuilder; - -impl ExpressionBuilder for AddBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, Add); - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - planner.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Plus, - input_schema, - eval_mode, - ) - } -} - -/// Builder for Subtract expressions -pub struct SubtractBuilder; - -impl ExpressionBuilder for SubtractBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, Subtract); - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - planner.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Minus, - input_schema, - eval_mode, - ) - } -} - -/// Builder for Multiply expressions -pub struct MultiplyBuilder; +use crate::arithmetic_expr_builder; -impl ExpressionBuilder for MultiplyBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, Multiply); - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - planner.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Multiply, - input_schema, - eval_mode, - ) - } +/// Macro to define basic arithmetic builders that use eval_mode +macro_rules! define_basic_arithmetic_builders { + ($(($builder:ident, $expr_type:ident, $op:expr)),* $(,)?) => { + $( + arithmetic_expr_builder!($builder, $expr_type, $op); + )* + }; } -/// Builder for Divide expressions -pub struct DivideBuilder; - -impl ExpressionBuilder for DivideBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, Divide); - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - planner.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Divide, - input_schema, - eval_mode, - ) - } -} +define_basic_arithmetic_builders![ + (AddBuilder, Add, DataFusionOperator::Plus), + (SubtractBuilder, Subtract, DataFusionOperator::Minus), + (MultiplyBuilder, Multiply, DataFusionOperator::Multiply), + (DivideBuilder, Divide, DataFusionOperator::Divide), +]; -/// Builder for IntegralDivide expressions +/// Builder for IntegralDivide expressions (requires special options) pub struct IntegralDivideBuilder; impl ExpressionBuilder for IntegralDivideBuilder { @@ -151,7 +60,7 @@ impl ExpressionBuilder for IntegralDivideBuilder { } } -/// Builder for Remainder expressions +/// Builder for Remainder expressions (uses special modulo function) pub struct RemainderBuilder; impl ExpressionBuilder for RemainderBuilder { @@ -181,7 +90,7 @@ impl ExpressionBuilder for RemainderBuilder { } } -/// Builder for UnaryMinus expressions +/// Builder for UnaryMinus expressions (uses special negate function) pub struct UnaryMinusBuilder; impl ExpressionBuilder for UnaryMinusBuilder { diff --git a/native/core/src/execution/expressions/bitwise.rs b/native/core/src/execution/expressions/bitwise.rs index ffcf572c06..4f64ab0b9a 100644 --- a/native/core/src/execution/expressions/bitwise.rs +++ b/native/core/src/execution/expressions/bitwise.rs @@ -1,104 +1,38 @@ //! Bitwise expression builders -use std::sync::Arc; - -use arrow::datatypes::SchemaRef; use datafusion::logical_expr::Operator as DataFusionOperator; -use datafusion::physical_expr::{expressions::BinaryExpr, PhysicalExpr}; -use datafusion_comet_proto::spark_expression::Expr; - -use crate::execution::{ - expressions::extract_expr, - operators::ExecutionError, - planner::{traits::ExpressionBuilder, PhysicalPlanner}, -}; -/// Builder for BitwiseAnd expressions -pub struct BitwiseAndBuilder; +use crate::binary_expr_builder; -impl ExpressionBuilder for BitwiseAndBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, BitwiseAnd); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::BitwiseAnd; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } +/// Macro to define all bitwise builders at once +macro_rules! define_bitwise_builders { + ($(($builder:ident, $expr_type:ident, $op:expr)),* $(,)?) => { + $( + binary_expr_builder!($builder, $expr_type, $op); + )* + }; } -/// Builder for BitwiseOr expressions -pub struct BitwiseOrBuilder; - -impl ExpressionBuilder for BitwiseOrBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, BitwiseOr); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::BitwiseOr; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } -} - -/// Builder for BitwiseXor expressions -pub struct BitwiseXorBuilder; - -impl ExpressionBuilder for BitwiseXorBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, BitwiseXor); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::BitwiseXor; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } -} - -/// Builder for BitwiseShiftLeft expressions -pub struct BitwiseShiftLeftBuilder; - -impl ExpressionBuilder for BitwiseShiftLeftBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, BitwiseShiftLeft); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::BitwiseShiftLeft; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } -} - -/// Builder for BitwiseShiftRight expressions -pub struct BitwiseShiftRightBuilder; - -impl ExpressionBuilder for BitwiseShiftRightBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, BitwiseShiftRight); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - let op = DataFusionOperator::BitwiseShiftRight; - Ok(Arc::new(BinaryExpr::new(left, op, right))) - } -} +define_bitwise_builders![ + ( + BitwiseAndBuilder, + BitwiseAnd, + DataFusionOperator::BitwiseAnd + ), + (BitwiseOrBuilder, BitwiseOr, DataFusionOperator::BitwiseOr), + ( + BitwiseXorBuilder, + BitwiseXor, + DataFusionOperator::BitwiseXor + ), + ( + BitwiseShiftLeftBuilder, + BitwiseShiftLeft, + DataFusionOperator::BitwiseShiftLeft + ), + ( + BitwiseShiftRightBuilder, + BitwiseShiftRight, + DataFusionOperator::BitwiseShiftRight + ), +]; diff --git a/native/core/src/execution/expressions/comparison.rs b/native/core/src/execution/expressions/comparison.rs index 70bb3d4904..e3dfce2bdf 100644 --- a/native/core/src/execution/expressions/comparison.rs +++ b/native/core/src/execution/expressions/comparison.rs @@ -1,182 +1,33 @@ //! Comparison expression builders -use std::sync::Arc; - -use arrow::datatypes::SchemaRef; use datafusion::logical_expr::Operator as DataFusionOperator; -use datafusion::physical_expr::{expressions::BinaryExpr, PhysicalExpr}; -use datafusion_comet_proto::spark_expression::Expr; - -use crate::execution::{ - expressions::extract_expr, - operators::ExecutionError, - planner::{traits::ExpressionBuilder, PhysicalPlanner}, -}; -/// Builder for Eq expressions -pub struct EqBuilder; - -impl ExpressionBuilder for EqBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, Eq); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(BinaryExpr::new( - left, - DataFusionOperator::Eq, - right, - ))) - } -} - -/// Builder for Neq expressions -pub struct NeqBuilder; - -impl ExpressionBuilder for NeqBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, Neq); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(BinaryExpr::new( - left, - DataFusionOperator::NotEq, - right, - ))) - } -} - -/// Builder for Lt expressions -pub struct LtBuilder; - -impl ExpressionBuilder for LtBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, Lt); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(BinaryExpr::new( - left, - DataFusionOperator::Lt, - right, - ))) - } -} - -/// Builder for LtEq expressions -pub struct LtEqBuilder; - -impl ExpressionBuilder for LtEqBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, LtEq); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(BinaryExpr::new( - left, - DataFusionOperator::LtEq, - right, - ))) - } -} - -/// Builder for Gt expressions -pub struct GtBuilder; - -impl ExpressionBuilder for GtBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, Gt); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(BinaryExpr::new( - left, - DataFusionOperator::Gt, - right, - ))) - } -} - -/// Builder for GtEq expressions -pub struct GtEqBuilder; - -impl ExpressionBuilder for GtEqBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, GtEq); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(BinaryExpr::new( - left, - DataFusionOperator::GtEq, - right, - ))) - } -} - -/// Builder for EqNullSafe expressions -pub struct EqNullSafeBuilder; - -impl ExpressionBuilder for EqNullSafeBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, EqNullSafe); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(BinaryExpr::new( - left, - DataFusionOperator::IsNotDistinctFrom, - right, - ))) - } -} - -/// Builder for NeqNullSafe expressions -pub struct NeqNullSafeBuilder; - -impl ExpressionBuilder for NeqNullSafeBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - let expr = extract_expr!(spark_expr, NeqNullSafe); - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(BinaryExpr::new( - left, - DataFusionOperator::IsDistinctFrom, - right, - ))) - } -} +use crate::binary_expr_builder; + +/// Macro to define all comparison builders at once +macro_rules! define_comparison_builders { + ($(($builder:ident, $expr_type:ident, $op:expr)),* $(,)?) => { + $( + binary_expr_builder!($builder, $expr_type, $op); + )* + }; +} + +define_comparison_builders![ + (EqBuilder, Eq, DataFusionOperator::Eq), + (NeqBuilder, Neq, DataFusionOperator::NotEq), + (LtBuilder, Lt, DataFusionOperator::Lt), + (LtEqBuilder, LtEq, DataFusionOperator::LtEq), + (GtBuilder, Gt, DataFusionOperator::Gt), + (GtEqBuilder, GtEq, DataFusionOperator::GtEq), + ( + EqNullSafeBuilder, + EqNullSafe, + DataFusionOperator::IsNotDistinctFrom + ), + ( + NeqNullSafeBuilder, + NeqNullSafe, + DataFusionOperator::IsDistinctFrom + ), +]; diff --git a/native/core/src/execution/expressions/logical.rs b/native/core/src/execution/expressions/logical.rs new file mode 100644 index 0000000000..1cf4124e74 --- /dev/null +++ b/native/core/src/execution/expressions/logical.rs @@ -0,0 +1,17 @@ +//! Logical expression builders + +use datafusion::logical_expr::Operator as DataFusionOperator; +use datafusion::physical_expr::expressions::NotExpr; + +use crate::{binary_expr_builder, unary_expr_builder}; + +/// Macro to define all logical builders at once +macro_rules! define_logical_builders { + () => { + binary_expr_builder!(AndBuilder, And, DataFusionOperator::And); + binary_expr_builder!(OrBuilder, Or, DataFusionOperator::Or); + unary_expr_builder!(NotBuilder, Not, NotExpr::new); + }; +} + +define_logical_builders!(); diff --git a/native/core/src/execution/expressions/nullcheck.rs b/native/core/src/execution/expressions/nullcheck.rs new file mode 100644 index 0000000000..506e46ec3f --- /dev/null +++ b/native/core/src/execution/expressions/nullcheck.rs @@ -0,0 +1,15 @@ +//! Null check expression builders + +use datafusion::physical_expr::expressions::{IsNotNullExpr, IsNullExpr}; + +use crate::unary_expr_builder; + +/// Macro to define all null check builders at once +macro_rules! define_null_check_builders { + () => { + unary_expr_builder!(IsNullBuilder, IsNull, IsNullExpr::new); + unary_expr_builder!(IsNotNullBuilder, IsNotNull, IsNotNullExpr::new); + }; +} + +define_null_check_builders!(); diff --git a/native/core/src/execution/planner/traits.rs b/native/core/src/execution/planner/traits.rs index 11109fa2c2..05f0a0bc3c 100644 --- a/native/core/src/execution/planner/traits.rs +++ b/native/core/src/execution/planner/traits.rs @@ -48,6 +48,92 @@ macro_rules! extract_expr { }; } +/// Macro to generate binary expression builders with minimal boilerplate +#[macro_export] +macro_rules! binary_expr_builder { + ($builder_name:ident, $expr_type:ident, $operator:expr) => { + pub struct $builder_name; + + impl $crate::execution::planner::traits::ExpressionBuilder for $builder_name { + fn build( + &self, + spark_expr: &datafusion_comet_proto::spark_expression::Expr, + input_schema: arrow::datatypes::SchemaRef, + planner: &$crate::execution::planner::PhysicalPlanner, + ) -> Result< + std::sync::Arc, + $crate::execution::operators::ExecutionError, + > { + let expr = $crate::extract_expr!(spark_expr, $expr_type); + let left = planner.create_expr( + expr.left.as_ref().unwrap(), + std::sync::Arc::clone(&input_schema), + )?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + Ok(std::sync::Arc::new( + datafusion::physical_expr::expressions::BinaryExpr::new(left, $operator, right), + )) + } + } + }; +} + +/// Macro to generate unary expression builders +#[macro_export] +macro_rules! unary_expr_builder { + ($builder_name:ident, $expr_type:ident, $expr_constructor:expr) => { + pub struct $builder_name; + + impl $crate::execution::planner::traits::ExpressionBuilder for $builder_name { + fn build( + &self, + spark_expr: &datafusion_comet_proto::spark_expression::Expr, + input_schema: arrow::datatypes::SchemaRef, + planner: &$crate::execution::planner::PhysicalPlanner, + ) -> Result< + std::sync::Arc, + $crate::execution::operators::ExecutionError, + > { + let expr = $crate::extract_expr!(spark_expr, $expr_type); + let child = planner.create_expr(expr.child.as_ref().unwrap(), input_schema)?; + Ok(std::sync::Arc::new($expr_constructor(child))) + } + } + }; +} + +/// Macro to generate arithmetic expression builders that need eval_mode handling +#[macro_export] +macro_rules! arithmetic_expr_builder { + ($builder_name:ident, $expr_type:ident, $operator:expr) => { + pub struct $builder_name; + + impl $crate::execution::planner::traits::ExpressionBuilder for $builder_name { + fn build( + &self, + spark_expr: &datafusion_comet_proto::spark_expression::Expr, + input_schema: arrow::datatypes::SchemaRef, + planner: &$crate::execution::planner::PhysicalPlanner, + ) -> Result< + std::sync::Arc, + $crate::execution::operators::ExecutionError, + > { + let expr = $crate::extract_expr!(spark_expr, $expr_type); + let eval_mode = + $crate::execution::planner::from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + $operator, + input_schema, + eval_mode, + ) + } + } + }; +} + /// Trait for building physical expressions from Spark protobuf expressions pub trait ExpressionBuilder: Send + Sync { /// Build a DataFusion physical expression from a Spark protobuf expression From 809a117f68520df06273e47da75c5c48b0e5056c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Dec 2025 07:11:14 -0700 Subject: [PATCH 11/14] add ASF header --- .../src/execution/expressions/arithmetic.rs | 17 +++++++++++++++++ .../core/src/execution/expressions/bitwise.rs | 17 +++++++++++++++++ .../src/execution/expressions/comparison.rs | 17 +++++++++++++++++ .../core/src/execution/expressions/logical.rs | 17 +++++++++++++++++ .../core/src/execution/expressions/nullcheck.rs | 17 +++++++++++++++++ 5 files changed, 85 insertions(+) diff --git a/native/core/src/execution/expressions/arithmetic.rs b/native/core/src/execution/expressions/arithmetic.rs index c4862c5507..22ddda7b69 100644 --- a/native/core/src/execution/expressions/arithmetic.rs +++ b/native/core/src/execution/expressions/arithmetic.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + //! Arithmetic expression builders use std::sync::Arc; diff --git a/native/core/src/execution/expressions/bitwise.rs b/native/core/src/execution/expressions/bitwise.rs index 4f64ab0b9a..2e39588b44 100644 --- a/native/core/src/execution/expressions/bitwise.rs +++ b/native/core/src/execution/expressions/bitwise.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + //! Bitwise expression builders use datafusion::logical_expr::Operator as DataFusionOperator; diff --git a/native/core/src/execution/expressions/comparison.rs b/native/core/src/execution/expressions/comparison.rs index e3dfce2bdf..8312059e90 100644 --- a/native/core/src/execution/expressions/comparison.rs +++ b/native/core/src/execution/expressions/comparison.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + //! Comparison expression builders use datafusion::logical_expr::Operator as DataFusionOperator; diff --git a/native/core/src/execution/expressions/logical.rs b/native/core/src/execution/expressions/logical.rs index 1cf4124e74..04d09bd660 100644 --- a/native/core/src/execution/expressions/logical.rs +++ b/native/core/src/execution/expressions/logical.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + //! Logical expression builders use datafusion::logical_expr::Operator as DataFusionOperator; diff --git a/native/core/src/execution/expressions/nullcheck.rs b/native/core/src/execution/expressions/nullcheck.rs index 506e46ec3f..3981ab5504 100644 --- a/native/core/src/execution/expressions/nullcheck.rs +++ b/native/core/src/execution/expressions/nullcheck.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + //! Null check expression builders use datafusion::physical_expr::expressions::{IsNotNullExpr, IsNullExpr}; From 86685dcb3981d466a2a87402ae99da3003159db5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Dec 2025 07:29:11 -0700 Subject: [PATCH 12/14] move macro --- .../src/execution/expressions/arithmetic.rs | 34 +++++++++++++++++-- native/core/src/execution/planner/traits.rs | 32 ----------------- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/native/core/src/execution/expressions/arithmetic.rs b/native/core/src/execution/expressions/arithmetic.rs index 22ddda7b69..71fe85ef52 100644 --- a/native/core/src/execution/expressions/arithmetic.rs +++ b/native/core/src/execution/expressions/arithmetic.rs @@ -17,6 +17,38 @@ //! Arithmetic expression builders +/// Macro to generate arithmetic expression builders that need eval_mode handling +#[macro_export] +macro_rules! arithmetic_expr_builder { + ($builder_name:ident, $expr_type:ident, $operator:expr) => { + pub struct $builder_name; + + impl $crate::execution::planner::traits::ExpressionBuilder for $builder_name { + fn build( + &self, + spark_expr: &datafusion_comet_proto::spark_expression::Expr, + input_schema: arrow::datatypes::SchemaRef, + planner: &$crate::execution::planner::PhysicalPlanner, + ) -> Result< + std::sync::Arc, + $crate::execution::operators::ExecutionError, + > { + let expr = $crate::extract_expr!(spark_expr, $expr_type); + let eval_mode = + $crate::execution::planner::from_protobuf_eval_mode(expr.eval_mode)?; + planner.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + $operator, + input_schema, + eval_mode, + ) + } + } + }; +} + use std::sync::Arc; use arrow::datatypes::SchemaRef; @@ -33,8 +65,6 @@ use crate::execution::{ }, }; -use crate::arithmetic_expr_builder; - /// Macro to define basic arithmetic builders that use eval_mode macro_rules! define_basic_arithmetic_builders { ($(($builder:ident, $expr_type:ident, $op:expr)),* $(,)?) => { diff --git a/native/core/src/execution/planner/traits.rs b/native/core/src/execution/planner/traits.rs index 05f0a0bc3c..3f3467d0d0 100644 --- a/native/core/src/execution/planner/traits.rs +++ b/native/core/src/execution/planner/traits.rs @@ -102,38 +102,6 @@ macro_rules! unary_expr_builder { }; } -/// Macro to generate arithmetic expression builders that need eval_mode handling -#[macro_export] -macro_rules! arithmetic_expr_builder { - ($builder_name:ident, $expr_type:ident, $operator:expr) => { - pub struct $builder_name; - - impl $crate::execution::planner::traits::ExpressionBuilder for $builder_name { - fn build( - &self, - spark_expr: &datafusion_comet_proto::spark_expression::Expr, - input_schema: arrow::datatypes::SchemaRef, - planner: &$crate::execution::planner::PhysicalPlanner, - ) -> Result< - std::sync::Arc, - $crate::execution::operators::ExecutionError, - > { - let expr = $crate::extract_expr!(spark_expr, $expr_type); - let eval_mode = - $crate::execution::planner::from_protobuf_eval_mode(expr.eval_mode)?; - planner.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - $operator, - input_schema, - eval_mode, - ) - } - } - }; -} - /// Trait for building physical expressions from Spark protobuf expressions pub trait ExpressionBuilder: Send + Sync { /// Build a DataFusion physical expression from a Spark protobuf expression From 68e2e7dc370c383227c83961bb1ffb773c7db89b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 9 Dec 2025 16:46:00 -0700 Subject: [PATCH 13/14] update contributor guide --- docs/source/contributor-guide/adding_a_new_expression.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/source/contributor-guide/adding_a_new_expression.md b/docs/source/contributor-guide/adding_a_new_expression.md index 9ce9af8004..4e148eafd5 100644 --- a/docs/source/contributor-guide/adding_a_new_expression.md +++ b/docs/source/contributor-guide/adding_a_new_expression.md @@ -271,8 +271,9 @@ How this works is somewhat dependent on the type of expression you're adding. Ex If you're adding a new expression that requires custom protobuf serialization, you may need to: 1. Add a new message to the protobuf definition in `native/proto/src/proto/expr.proto` -2. Update the Rust deserialization code to handle the new protobuf message type - +2. Add a native expression handler in `expression_registry.rs` to deserialize the new protobuf message type and + create a native expression + For most expressions, you can skip this step if you're using the existing scalar function infrastructure. #### Adding a New Scalar Function Expression From 36fa8caeee09b3c0554f07017d890c13655f88c9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 9 Dec 2025 17:15:10 -0700 Subject: [PATCH 14/14] prettier --- docs/source/contributor-guide/adding_a_new_expression.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/contributor-guide/adding_a_new_expression.md b/docs/source/contributor-guide/adding_a_new_expression.md index 4e148eafd5..74825f4301 100644 --- a/docs/source/contributor-guide/adding_a_new_expression.md +++ b/docs/source/contributor-guide/adding_a_new_expression.md @@ -273,7 +273,7 @@ If you're adding a new expression that requires custom protobuf serialization, y 1. Add a new message to the protobuf definition in `native/proto/src/proto/expr.proto` 2. Add a native expression handler in `expression_registry.rs` to deserialize the new protobuf message type and create a native expression - + For most expressions, you can skip this step if you're using the existing scalar function infrastructure. #### Adding a New Scalar Function Expression