Skip to content

Commit be1524b

Browse files
authored
feat: Implement native operator registry (#2875)
1 parent bf1f3a2 commit be1524b

File tree

7 files changed

+359
-155
lines changed

7 files changed

+359
-155
lines changed

native/core/src/execution/expressions/arithmetic.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ macro_rules! arithmetic_expr_builder {
2323
($builder_name:ident, $expr_type:ident, $operator:expr) => {
2424
pub struct $builder_name;
2525

26-
impl $crate::execution::planner::traits::ExpressionBuilder for $builder_name {
26+
impl $crate::execution::planner::expression_registry::ExpressionBuilder for $builder_name {
2727
fn build(
2828
&self,
2929
spark_expr: &datafusion_comet_proto::spark_expression::Expr,
@@ -61,7 +61,8 @@ use crate::execution::{
6161
expressions::extract_expr,
6262
operators::ExecutionError,
6363
planner::{
64-
from_protobuf_eval_mode, traits::ExpressionBuilder, BinaryExprOptions, PhysicalPlanner,
64+
expression_registry::ExpressionBuilder, from_protobuf_eval_mode, BinaryExprOptions,
65+
PhysicalPlanner,
6566
},
6667
};
6768

native/core/src/execution/operators/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub use expand::ExpandExec;
3131
mod iceberg_scan;
3232
mod parquet_writer;
3333
pub use parquet_writer::ParquetWriterExec;
34+
pub mod projection;
3435
mod scan;
3536

3637
/// Error returned during executing operators.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Projection operator builder
19+
20+
use std::sync::Arc;
21+
22+
use datafusion::physical_plan::projection::ProjectionExec;
23+
use datafusion_comet_proto::spark_operator::Operator;
24+
use jni::objects::GlobalRef;
25+
26+
use crate::{
27+
execution::{
28+
operators::{ExecutionError, ScanExec},
29+
planner::{operator_registry::OperatorBuilder, PhysicalPlanner},
30+
spark_plan::SparkPlan,
31+
},
32+
extract_op,
33+
};
34+
35+
/// Builder for Projection operators
36+
pub struct ProjectionBuilder;
37+
38+
impl OperatorBuilder for ProjectionBuilder {
39+
fn build(
40+
&self,
41+
spark_plan: &Operator,
42+
inputs: &mut Vec<Arc<GlobalRef>>,
43+
partition_count: usize,
44+
planner: &PhysicalPlanner,
45+
) -> Result<(Vec<ScanExec>, Arc<SparkPlan>), ExecutionError> {
46+
let project = extract_op!(spark_plan, Projection);
47+
let children = &spark_plan.children;
48+
49+
assert_eq!(children.len(), 1);
50+
let (scans, child) = planner.create_plan(&children[0], inputs, partition_count)?;
51+
52+
// Create projection expressions
53+
let exprs: Result<Vec<_>, _> = project
54+
.project_list
55+
.iter()
56+
.enumerate()
57+
.map(|(idx, expr)| {
58+
planner
59+
.create_expr(expr, child.schema())
60+
.map(|r| (r, format!("col_{idx}")))
61+
})
62+
.collect();
63+
64+
let projection = Arc::new(ProjectionExec::try_new(
65+
exprs?,
66+
Arc::clone(&child.native_plan),
67+
)?);
68+
69+
Ok((
70+
scans,
71+
Arc::new(SparkPlan::new(spark_plan.plan_id, projection, vec![child])),
72+
))
73+
}
74+
}

native/core/src/execution/planner.rs

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
//! Converts Spark physical plan to DataFusion physical plan
1919
2020
pub mod expression_registry;
21-
pub mod traits;
21+
pub mod macros;
22+
pub mod operator_registry;
2223

2324
use crate::execution::operators::IcebergScanExec;
2425
use crate::{
@@ -27,6 +28,7 @@ use crate::{
2728
expressions::subquery::Subquery,
2829
operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec},
2930
planner::expression_registry::ExpressionRegistry,
31+
planner::operator_registry::OperatorRegistry,
3032
serde::to_arrow_datatype,
3133
shuffle::ShuffleWriterExec,
3234
},
@@ -861,29 +863,19 @@ impl PhysicalPlanner {
861863
inputs: &mut Vec<Arc<GlobalRef>>,
862864
partition_count: usize,
863865
) -> Result<(Vec<ScanExec>, Arc<SparkPlan>), ExecutionError> {
866+
// Try to use the modular registry first - this automatically handles any registered operator types
867+
if OperatorRegistry::global().can_handle(spark_plan) {
868+
return OperatorRegistry::global().create_plan(
869+
spark_plan,
870+
inputs,
871+
partition_count,
872+
self,
873+
);
874+
}
875+
876+
// Fall back to the original monolithic match for other operators
864877
let children = &spark_plan.children;
865878
match spark_plan.op_struct.as_ref().unwrap() {
866-
OpStruct::Projection(project) => {
867-
assert_eq!(children.len(), 1);
868-
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;
869-
let exprs: PhyExprResult = project
870-
.project_list
871-
.iter()
872-
.enumerate()
873-
.map(|(idx, expr)| {
874-
self.create_expr(expr, child.schema())
875-
.map(|r| (r, format!("col_{idx}")))
876-
})
877-
.collect();
878-
let projection = Arc::new(ProjectionExec::try_new(
879-
exprs?,
880-
Arc::clone(&child.native_plan),
881-
)?);
882-
Ok((
883-
scans,
884-
Arc::new(SparkPlan::new(spark_plan.plan_id, projection, vec![child])),
885-
))
886-
}
887879
OpStruct::Filter(filter) => {
888880
assert_eq!(children.len(), 1);
889881
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;
@@ -1634,6 +1626,10 @@ impl PhysicalPlanner {
16341626
Arc::new(SparkPlan::new(spark_plan.plan_id, window_agg, vec![child])),
16351627
))
16361628
}
1629+
_ => Err(GeneralError(format!(
1630+
"Unsupported or unregistered operator type: {:?}",
1631+
spark_plan.op_struct
1632+
))),
16371633
}
16381634
}
16391635

native/core/src/execution/planner/expression_registry.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,90 @@ use datafusion::physical_expr::PhysicalExpr;
2525
use datafusion_comet_proto::spark_expression::{expr::ExprStruct, Expr};
2626

2727
use crate::execution::operators::ExecutionError;
28-
use crate::execution::planner::traits::{ExpressionBuilder, ExpressionType};
28+
29+
/// Trait for building physical expressions from Spark protobuf expressions
30+
pub trait ExpressionBuilder: Send + Sync {
31+
/// Build a DataFusion physical expression from a Spark protobuf expression
32+
fn build(
33+
&self,
34+
spark_expr: &Expr,
35+
input_schema: SchemaRef,
36+
planner: &super::PhysicalPlanner,
37+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError>;
38+
}
39+
40+
/// Enum to identify different expression types for registry dispatch
41+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42+
pub enum ExpressionType {
43+
// Arithmetic expressions
44+
Add,
45+
Subtract,
46+
Multiply,
47+
Divide,
48+
IntegralDivide,
49+
Remainder,
50+
UnaryMinus,
51+
52+
// Comparison expressions
53+
Eq,
54+
Neq,
55+
Lt,
56+
LtEq,
57+
Gt,
58+
GtEq,
59+
EqNullSafe,
60+
NeqNullSafe,
61+
62+
// Logical expressions
63+
And,
64+
Or,
65+
Not,
66+
67+
// Null checks
68+
IsNull,
69+
IsNotNull,
70+
71+
// Bitwise operations
72+
BitwiseAnd,
73+
BitwiseOr,
74+
BitwiseXor,
75+
BitwiseShiftLeft,
76+
BitwiseShiftRight,
77+
78+
// Other expressions
79+
Bound,
80+
Unbound,
81+
Literal,
82+
Cast,
83+
CaseWhen,
84+
In,
85+
If,
86+
Substring,
87+
Like,
88+
Rlike,
89+
CheckOverflow,
90+
ScalarFunc,
91+
NormalizeNanAndZero,
92+
Subquery,
93+
BloomFilterMightContain,
94+
CreateNamedStruct,
95+
GetStructField,
96+
ToJson,
97+
ToPrettyString,
98+
ListExtract,
99+
GetArrayStructFields,
100+
ArrayInsert,
101+
Rand,
102+
Randn,
103+
SparkPartitionId,
104+
MonotonicallyIncreasingId,
105+
106+
// Time functions
107+
Hour,
108+
Minute,
109+
Second,
110+
TruncTimestamp,
111+
}
29112

30113
/// Registry for expression builders
31114
pub struct ExpressionRegistry {

0 commit comments

Comments
 (0)