Skip to content
8 changes: 7 additions & 1 deletion native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use datafusion::{
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, create_modulo_expr,
create_negate_expr, BinaryOutputStyle, BloomFilterAgg, BloomFilterMightContain, EvalMode,
SparkHour, SparkMinute, SparkSecond,
SparkHour, SparkMinute, SparkSecond, SumInteger,
};

use crate::execution::operators::ExecutionError::GeneralError;
Expand Down Expand Up @@ -1833,6 +1833,12 @@ impl PhysicalPlanner {
let func = AggregateUDF::new_from_impl(SumDecimal::try_new(datatype)?);
AggregateExprBuilder::new(Arc::new(func), vec![child])
}
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
let func =
AggregateUDF::new_from_impl(SumInteger::try_new(datatype, eval_mode)?);
AggregateExprBuilder::new(Arc::new(func), vec![child])
}
_ => {
// cast to the result data type of SUM if necessary, we should not expect
// a cast failure since it should have already been checked at Spark side
Expand Down
2 changes: 1 addition & 1 deletion native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ message Count {
message Sum {
Expr child = 1;
DataType datatype = 2;
bool fail_on_error = 3;
EvalMode eval_mode = 3;
}

message Min {
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/src/agg_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod correlation;
mod covariance;
mod stddev;
mod sum_decimal;
mod sum_int;
mod variance;

pub use avg::Avg;
Expand All @@ -29,4 +30,5 @@ pub use correlation::Correlation;
pub use covariance::Covariance;
pub use stddev::Stddev;
pub use sum_decimal::SumDecimal;
pub use sum_int::SumInteger;
pub use variance::Variance;
Loading
Loading