Skip to content

Commit 97767c2

Browse files
authored
deps: upgrade DataFusion to 51, Arrow to 57, Iceberg to latest, MSRV to 1.88 (#2729)
1 parent be1524b commit 97767c2

File tree

16 files changed

+618
-373
lines changed

16 files changed

+618
-373
lines changed

native/Cargo.lock

Lines changed: 270 additions & 299 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ license = "Apache-2.0"
3131
edition = "2021"
3232

3333
# Comet uses the same minimum Rust version as DataFusion
34-
rust-version = "1.86"
34+
rust-version = "1.88"
3535

3636
[workspace.dependencies]
37-
arrow = { version = "56.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37+
arrow = { version = "57.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
3838
async-trait = { version = "0.1" }
3939
bytes = { version = "1.10.0" }
40-
parquet = { version = "56.2.0", default-features = false, features = ["experimental"] }
41-
datafusion = { version = "50.3.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
42-
datafusion-datasource = { version = "50.3.0" }
43-
datafusion-spark = { version = "50.3.0" }
40+
parquet = { version = "57.0.0", default-features = false, features = ["experimental"] }
41+
datafusion = { version = "51.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
42+
datafusion-datasource = { version = "51.0.0" }
43+
datafusion-spark = { version = "51.0.0" }
4444
datafusion-comet-spark-expr = { path = "spark-expr" }
4545
datafusion-comet-proto = { path = "proto" }
4646
chrono = { version = "0.4", default-features = false, features = ["clock"] }
@@ -54,8 +54,7 @@ object_store = { version = "0.12.3", features = ["gcp", "azure", "aws", "http"]
5454
url = "2.2"
5555
aws-config = "1.8.10"
5656
aws-credential-types = "1.2.9"
57-
iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "a667539" }
58-
iceberg-datafusion = { git = "https://github.com/apache/iceberg-rust", rev = "a667539" }
57+
iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "16906c1" }
5958

6059
[profile.release]
6160
debug = true

native/core/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ bytes = { workspace = true }
5959
tempfile = "3.8.0"
6060
itertools = "0.14.0"
6161
paste = "1.0.14"
62-
datafusion = { workspace = true, features = ["parquet_encryption"] }
62+
datafusion = { workspace = true, features = ["parquet_encryption", "sql"] }
6363
datafusion-datasource = { workspace = true }
6464
datafusion-spark = { workspace = true }
6565
once_cell = "1.18.0"
@@ -95,7 +95,7 @@ jni = { version = "0.21", features = ["invocation"] }
9595
lazy_static = "1.4"
9696
assertables = "9"
9797
hex = "0.4.3"
98-
datafusion-functions-nested = { version = "50.3.0" }
98+
datafusion-functions-nested = { version = "51.0.0" }
9999

100100
[features]
101101
backtrace = ["datafusion/backtrace"]

native/core/src/execution/planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1442,7 +1442,7 @@ impl PhysicalPlanner {
14421442
vec![], // No struct columns to unnest
14431443
output_schema,
14441444
unnest_options,
1445-
));
1445+
)?);
14461446

14471447
Ok((
14481448
scans,
@@ -2230,7 +2230,7 @@ impl PhysicalPlanner {
22302230
partition_by,
22312231
sort_phy_exprs,
22322232
window_frame.into(),
2233-
input_schema.as_ref(),
2233+
input_schema,
22342234
false, // TODO: Ignore nulls
22352235
false, // TODO: Spark does not support DISTINCT ... OVER
22362236
None,

native/core/src/parquet/encryption_support.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl EncryptionFactory for CometEncryptionFactory {
5454
_options: &EncryptionFactoryOptions,
5555
_schema: &SchemaRef,
5656
_file_path: &Path,
57-
) -> Result<Option<FileEncryptionProperties>, DataFusionError> {
57+
) -> Result<Option<Arc<FileEncryptionProperties>>, DataFusionError> {
5858
Err(DataFusionError::NotImplemented(
5959
"Comet does not support Parquet encryption yet."
6060
.parse()
@@ -69,7 +69,7 @@ impl EncryptionFactory for CometEncryptionFactory {
6969
&self,
7070
options: &EncryptionFactoryOptions,
7171
file_path: &Path,
72-
) -> Result<Option<FileDecryptionProperties>, DataFusionError> {
72+
) -> Result<Option<Arc<FileDecryptionProperties>>, DataFusionError> {
7373
let config: CometEncryptionConfig = options.to_extension_options()?;
7474

7575
let full_path: String = config.uri_base + file_path.as_ref();

native/core/src/parquet/parquet_exec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ pub(crate) fn init_datasource_exec(
122122
object_store_url,
123123
file_source,
124124
)
125-
.with_projection(Some(projection_vector))
125+
.with_projection_indices(Some(projection_vector))
126126
.with_table_partition_cols(partition_fields)
127127
.build()
128128
}

native/core/src/parquet/read/column.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl ColumnReader {
124124
match desc.physical_type() {
125125
PhysicalType::BOOLEAN => typed_reader!(BoolColumnReader, Boolean),
126126
PhysicalType::INT32 => {
127-
if let Some(ref logical_type) = desc.logical_type() {
127+
if let Some(ref logical_type) = desc.logical_type_ref() {
128128
match logical_type {
129129
lt @ LogicalType::Integer {
130130
bit_width,
@@ -282,7 +282,7 @@ impl ColumnReader {
282282
}
283283
}
284284
PhysicalType::INT64 => {
285-
if let Some(ref logical_type) = desc.logical_type() {
285+
if let Some(ref logical_type) = desc.logical_type_ref() {
286286
match logical_type {
287287
lt @ LogicalType::Integer {
288288
bit_width,
@@ -331,19 +331,19 @@ impl ColumnReader {
331331
None
332332
};
333333
match unit {
334-
ParquetTimeUnit::MILLIS(_) => {
334+
ParquetTimeUnit::MILLIS => {
335335
typed_reader!(
336336
Int64TimestampMillisColumnReader,
337337
ArrowDataType::Timestamp(time_unit, time_zone)
338338
)
339339
}
340-
ParquetTimeUnit::MICROS(_) => {
340+
ParquetTimeUnit::MICROS => {
341341
typed_reader!(
342342
Int64TimestampMicrosColumnReader,
343343
ArrowDataType::Timestamp(time_unit, time_zone)
344344
)
345345
}
346-
ParquetTimeUnit::NANOS(_) => {
346+
ParquetTimeUnit::NANOS => {
347347
typed_reader!(
348348
Int64TimestampNanosColumnReader,
349349
ArrowDataType::Int64
@@ -390,7 +390,7 @@ impl ColumnReader {
390390

391391
PhysicalType::DOUBLE => typed_reader!(DoubleColumnReader, Float64),
392392
PhysicalType::BYTE_ARRAY => {
393-
if let Some(logical_type) = desc.logical_type() {
393+
if let Some(logical_type) = desc.logical_type_ref() {
394394
match logical_type {
395395
LogicalType::String => typed_reader!(StringColumnReader, Utf8),
396396
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
@@ -403,13 +403,13 @@ impl ColumnReader {
403403
}
404404
}
405405
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
406-
if let Some(logical_type) = desc.logical_type() {
406+
if let Some(logical_type) = desc.logical_type_ref() {
407407
match logical_type {
408408
LogicalType::Decimal {
409409
precision,
410410
scale: _,
411411
} => {
412-
if !use_decimal_128 && precision <= DECIMAL_MAX_INT_DIGITS {
412+
if !use_decimal_128 && precision <= &DECIMAL_MAX_INT_DIGITS {
413413
typed_reader!(FLBADecimal32ColumnReader, Int32)
414414
} else if !use_decimal_128
415415
&& promotion_info.precision <= DECIMAL_MAX_LONG_DIGITS

native/core/src/parquet/util/jni.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use datafusion::execution::object_store::ObjectStoreUrl;
3030
use object_store::path::Path;
3131
use parquet::{
3232
basic::{Encoding, LogicalType, TimeUnit, Type as PhysicalType},
33-
format::{MicroSeconds, MilliSeconds, NanoSeconds},
3433
schema::types::{ColumnDescriptor, ColumnPath, PrimitiveTypeBuilder},
3534
};
3635
use url::{ParseError, Url};
@@ -185,9 +184,9 @@ fn convert_logical_type(
185184

186185
fn convert_time_unit(time_unit: jint) -> TimeUnit {
187186
match time_unit {
188-
0 => TimeUnit::MILLIS(MilliSeconds::new()),
189-
1 => TimeUnit::MICROS(MicroSeconds::new()),
190-
2 => TimeUnit::NANOS(NanoSeconds::new()),
187+
0 => TimeUnit::MILLIS,
188+
1 => TimeUnit::MICROS,
189+
2 => TimeUnit::NANOS,
191190
_ => panic!("Invalid time unit id for Parquet: {time_unit}"),
192191
}
193192
}

native/spark-expr/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ arrow = {workspace = true}
4545
criterion = { version = "0.7", features = ["async", "async_tokio", "async_std"] }
4646
rand = { workspace = true}
4747
tokio = { version = "1", features = ["rt-multi-thread"] }
48+
datafusion = { workspace = true, features = ["sql"] }
4849

4950
[lib]
5051
name = "datafusion_comet_spark_expr"

native/spark-expr/src/agg_funcs/avg.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ use arrow::compute::sum;
2525
use arrow::datatypes::{DataType, Field, FieldRef};
2626
use datafusion::common::{not_impl_err, Result, ScalarValue};
2727
use datafusion::logical_expr::{
28-
type_coercion::aggregates::avg_return_type, Accumulator, AggregateUDFImpl, EmitTo,
29-
GroupsAccumulator, ReversedUDAF, Signature,
28+
Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature,
3029
};
3130
use datafusion::physical_expr::expressions::format_state_name;
3231
use std::{any::Any, sync::Arc};
@@ -36,6 +35,13 @@ use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs};
3635
use datafusion::logical_expr::Volatility::Immutable;
3736
use DataType::*;
3837

38+
fn avg_return_type(_name: &str, data_type: &DataType) -> Result<DataType> {
39+
match data_type {
40+
Float64 => Ok(Float64),
41+
_ => not_impl_err!("Avg return type for {data_type}"),
42+
}
43+
}
44+
3945
/// AVG aggregate expression
4046
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4147
pub struct Avg {

0 commit comments

Comments
 (0)