Skip to content

Commit 73f1405

Browse files
authored
chore: Move more expressions from core crate to spark-expr crate (#1152)
* move aggregate expressions to spark-expr crate * move more expressions * move benchmark * normalize_nan * bitwise not * comet scalar funcs * update bench imports
1 parent 21503ca commit 73f1405

File tree

24 files changed

+96
-116
lines changed

24 files changed

+96
-116
lines changed

native/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ datafusion-comet-spark-expr = { path = "spark-expr", version = "0.5.0" }
5151
datafusion-comet-proto = { path = "proto", version = "0.5.0" }
5252
chrono = { version = "0.4", default-features = false, features = ["clock"] }
5353
chrono-tz = { version = "0.8" }
54+
futures = "0.3.28"
5455
num = "0.4"
5556
rand = "0.8"
5657
regex = "1.9.6"

native/core/Cargo.toml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ arrow-data = { workspace = true }
4242
arrow-schema = { workspace = true }
4343
parquet = { workspace = true, default-features = false, features = ["experimental"] }
4444
half = { version = "2.4.1", default-features = false }
45-
futures = "0.3.28"
45+
futures = { workspace = true }
4646
mimalloc = { version = "*", default-features = false, optional = true }
4747
tokio = { version = "1", features = ["rt-multi-thread"] }
4848
async-trait = "0.1"
@@ -88,7 +88,6 @@ hex = "0.4.3"
8888

8989
[features]
9090
default = []
91-
nightly = []
9291

9392
[lib]
9493
name = "comet"
@@ -123,10 +122,6 @@ harness = false
123122
name = "filter"
124123
harness = false
125124

126-
[[bench]]
127-
name = "aggregate"
128-
harness = false
129-
130125
[[bench]]
131126
name = "bloom_filter_agg"
132127
harness = false

native/core/src/common/bit.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@
1717

1818
use std::{cmp::min, mem::size_of};
1919

20-
use arrow::buffer::Buffer;
21-
2220
use crate::{
2321
errors::CometResult as Result,
24-
likely,
2522
parquet::{data_type::AsBytes, util::bit_packing::unpack32},
26-
unlikely,
2723
};
24+
use arrow::buffer::Buffer;
25+
use datafusion_comet_spark_expr::utils::{likely, unlikely};
2826

2927
#[inline]
3028
pub fn from_ne_slice<T: FromBytes>(bs: &[u8]) -> T {

native/core/src/execution/datafusion/expressions/checkoverflow.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ use arrow::{
2727
datatypes::{Decimal128Type, DecimalType},
2828
record_batch::RecordBatch,
2929
};
30-
use arrow_data::decimal::{MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION};
31-
use arrow_schema::{DataType, Schema, DECIMAL128_MAX_PRECISION};
30+
use arrow_schema::{DataType, Schema};
3231
use datafusion::logical_expr::ColumnarValue;
3332
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
3433
use datafusion_common::{DataFusionError, ScalarValue};
@@ -172,15 +171,3 @@ impl PhysicalExpr for CheckOverflow {
172171
self.hash(&mut s);
173172
}
174173
}
175-
176-
/// Adapted from arrow-rs `validate_decimal_precision` but returns bool
177-
/// instead of Err to avoid the cost of formatting the error strings and is
178-
/// optimized to remove a memcpy that exists in the original function
179-
/// we can remove this code once we upgrade to a version of arrow-rs that
180-
/// includes https://github.com/apache/arrow-rs/pull/6419
181-
#[inline]
182-
pub fn is_valid_decimal_precision(value: i128, precision: u8) -> bool {
183-
precision <= DECIMAL128_MAX_PRECISION
184-
&& value >= MIN_DECIMAL_FOR_EACH_PRECISION[precision as usize - 1]
185-
&& value <= MAX_DECIMAL_FOR_EACH_PRECISION[precision as usize - 1]
186-
}

native/core/src/execution/datafusion/expressions/mod.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,15 @@
1717

1818
//! Native DataFusion expressions
1919
20-
pub mod bitwise_not;
2120
pub mod checkoverflow;
22-
mod normalize_nan;
23-
pub use normalize_nan::NormalizeNaNAndZero;
2421

2522
use crate::errors::CometError;
26-
pub mod avg;
27-
pub mod avg_decimal;
2823
pub mod bloom_filter_agg;
2924
pub mod bloom_filter_might_contain;
30-
pub mod comet_scalar_funcs;
31-
pub mod correlation;
32-
pub mod covariance;
3325
pub mod negative;
34-
pub mod stddev;
3526
pub mod strings;
3627
pub mod subquery;
37-
pub mod sum_decimal;
3828
pub mod unbound;
39-
pub mod variance;
4029

4130
pub use datafusion_comet_spark_expr::{EvalMode, SparkError};
4231

native/core/src/execution/datafusion/planner.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,19 @@
1818
//! Converts Spark physical plan to DataFusion physical plan
1919
2020
use super::expressions::EvalMode;
21-
use crate::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun;
2221
use crate::execution::operators::{CopyMode, FilterExec};
2322
use crate::{
2423
errors::ExpressionError,
2524
execution::{
2625
datafusion::{
2726
expressions::{
28-
avg::Avg,
29-
avg_decimal::AvgDecimal,
30-
bitwise_not::BitwiseNotExpr,
3127
bloom_filter_agg::BloomFilterAgg,
3228
bloom_filter_might_contain::BloomFilterMightContain,
3329
checkoverflow::CheckOverflow,
34-
correlation::Correlation,
35-
covariance::Covariance,
3630
negative,
37-
stddev::Stddev,
3831
strings::{Contains, EndsWith, Like, StartsWith, StringSpaceExpr, SubstringExpr},
3932
subquery::Subquery,
40-
sum_decimal::SumDecimal,
4133
unbound::UnboundColumn,
42-
variance::Variance,
43-
NormalizeNaNAndZero,
4434
},
4535
operators::expand::CometExpandExec,
4636
shuffle_writer::ShuffleWriterExec,
@@ -82,6 +72,7 @@ use datafusion::{
8272
},
8373
prelude::SessionContext,
8474
};
75+
use datafusion_comet_spark_expr::create_comet_physical_fun;
8576
use datafusion_functions_nested::concat::ArrayAppend;
8677
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8778

@@ -99,9 +90,10 @@ use datafusion_comet_proto::{
9990
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
10091
};
10192
use datafusion_comet_spark_expr::{
102-
ArrayInsert, Cast, CreateNamedStruct, DateTruncExpr, GetArrayStructFields, GetStructField,
103-
HourExpr, IfExpr, ListExtract, MinuteExpr, RLike, SecondExpr, SparkCastOptions,
104-
TimestampTruncExpr, ToJson,
93+
ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Correlation, Covariance, CreateNamedStruct,
94+
DateTruncExpr, GetArrayStructFields, GetStructField, HourExpr, IfExpr, ListExtract, MinuteExpr,
95+
NormalizeNaNAndZero, RLike, SecondExpr, SparkCastOptions, Stddev, SumDecimal,
96+
TimestampTruncExpr, ToJson, Variance,
10597
};
10698
use datafusion_common::scalar::ScalarStructBuilder;
10799
use datafusion_common::{

native/core/src/lib.rs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -104,30 +104,3 @@ fn default_logger_config() -> CometResult<Config> {
104104
.build(root)
105105
.map_err(|err| CometError::Config(err.to_string()))
106106
}
107-
108-
// These are borrowed from hashbrown crate:
109-
// https://github.com/rust-lang/hashbrown/blob/master/src/raw/mod.rs
110-
111-
// On stable we can use #[cold] to get a equivalent effect: this attributes
112-
// suggests that the function is unlikely to be called
113-
#[cfg(not(feature = "nightly"))]
114-
#[inline]
115-
#[cold]
116-
fn cold() {}
117-
118-
#[cfg(not(feature = "nightly"))]
119-
#[inline]
120-
fn likely(b: bool) -> bool {
121-
if !b {
122-
cold();
123-
}
124-
b
125-
}
126-
#[cfg(not(feature = "nightly"))]
127-
#[inline]
128-
fn unlikely(b: bool) -> bool {
129-
if b {
130-
cold();
131-
}
132-
b
133-
}

native/core/src/parquet/read/levels.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
use std::mem;
1919

20-
use arrow::buffer::Buffer;
21-
use parquet::schema::types::ColumnDescPtr;
22-
2320
use super::values::Decoder;
2421
use crate::{
2522
common::bit::{self, read_u32, BitReader},
2623
parquet::ParquetMutableVector,
27-
unlikely,
2824
};
25+
use arrow::buffer::Buffer;
26+
use datafusion_comet_spark_expr::utils::unlikely;
27+
use parquet::schema::types::ColumnDescPtr;
2928

3029
const INITIAL_BUF_LEN: usize = 16;
3130

native/core/src/parquet/read/values.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ use crate::write_val_or_null;
2828
use crate::{
2929
common::bit::{self, BitReader},
3030
parquet::{data_type::*, ParquetMutableVector},
31-
unlikely,
3231
};
3332
use arrow::datatypes::DataType as ArrowDataType;
33+
use datafusion_comet_spark_expr::utils::unlikely;
3434

3535
pub fn get_decoder<T: DataType>(
3636
value_data: Buffer,

0 commit comments

Comments
 (0)