Skip to content

Commit 97b8aa3

Browse files
authored
chore: update to datafusion v50 (#1320)
* update to datafusion 50 * minor comment * sort join test moved to supported * fix cargo.toml formatting * update ballista version * update protos * update MSRV * dependency cleanup * update MSRV to 1.86 * cargo update * update datafusion to 50.1 * update datafusion to 50.1 and cargo lock as well * update code after df50.1 update * improve test
1 parent fb400de commit 97b8aa3

File tree

20 files changed

+1039
-865
lines changed

20 files changed

+1039
-865
lines changed

Cargo.lock

Lines changed: 728 additions & 710 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,28 @@ resolver = "2"
2323
[workspace.package]
2424
# edition to be changed to 2024 when we update
2525
# Minimum Supported Rust Version (MSRV) to 1.85.0
26-
# which is datafusion 49s
26+
# which is datafusion 49
2727
#
2828
edition = "2021"
2929
# we should try to follow datafusion version
30-
rust-version = "1.82.0"
30+
rust-version = "1.86.0"
3131

3232
[workspace.dependencies]
33-
arrow = { version = "55", features = ["ipc_compression"] }
34-
arrow-flight = { version = "55", features = ["flight-sql-experimental"] }
33+
arrow = { version = "56", features = ["ipc_compression"] }
34+
arrow-flight = { version = "56", features = ["flight-sql-experimental"] }
3535
clap = { version = "4.5", features = ["derive", "cargo"] }
3636
configure_me = { version = "0.4.0" }
3737
configure_me_codegen = { version = "0.4.4" }
38-
datafusion = "49.0.2"
39-
datafusion-cli = "49.0.2"
40-
datafusion-proto = "49.0.2"
41-
datafusion-proto-common = "49.0.2"
38+
datafusion = "50.1.0"
39+
datafusion-cli = "50.1.0"
40+
datafusion-proto = "50.1.0"
41+
datafusion-proto-common = "50.1.0"
4242
object_store = "0.12"
4343
prost = "0.13"
4444
prost-types = "0.13"
45-
tonic = { version = "0.12" }
46-
tonic-build = { version = "0.12", default-features = false, features = [
45+
rstest = { version = "0.26" }
46+
tonic = { version = "0.13" }
47+
tonic-build = { version = "0.13", default-features = false, features = [
4748
"transport",
4849
"prost"
4950
] }

ballista-cli/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
[package]
1919
name = "ballista-cli"
2020
description = "Command Line Client for Ballista distributed query engine."
21-
version = "49.0.0"
21+
version = "50.0.0"
2222
authors = ["Apache DataFusion <[email protected]>"]
2323
edition = { workspace = true }
2424
rust-version = { workspace = true }
@@ -29,14 +29,14 @@ repository = "https://github.com/apache/datafusion-ballista"
2929
readme = "README.md"
3030

3131
[dependencies]
32-
ballista = { path = "../ballista/client", version = "49.0.0", features = ["standalone"] }
32+
ballista = { path = "../ballista/client", version = "50.0.0", features = ["standalone"] }
3333
clap = { workspace = true, features = ["derive", "cargo"] }
3434
datafusion = { workspace = true }
3535
datafusion-cli = { workspace = true }
3636
dirs = "6.0"
3737
env_logger = { workspace = true }
3838
mimalloc = { workspace = true }
39-
rustyline = "16.0.0"
39+
rustyline = "17.0.1"
4040
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
4141

4242
[features]

ballista/client/Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
name = "ballista"
2020
description = "Ballista Distributed Compute"
2121
license = "Apache-2.0"
22-
version = "49.0.0"
22+
version = "50.0.0"
2323
homepage = "https://datafusion.apache.org/ballista/"
2424
repository = "https://github.com/apache/datafusion-ballista"
2525
readme = "README.md"
@@ -29,22 +29,22 @@ rust-version = { workspace = true }
2929

3030
[dependencies]
3131
async-trait = { workspace = true }
32-
ballista-core = { path = "../core", version = "49.0.0" }
33-
ballista-executor = { path = "../executor", version = "49.0.0", optional = true }
34-
ballista-scheduler = { path = "../scheduler", version = "49.0.0", optional = true }
32+
ballista-core = { path = "../core", version = "50.0.0" }
33+
ballista-executor = { path = "../executor", version = "50.0.0", optional = true }
34+
ballista-scheduler = { path = "../scheduler", version = "50.0.0", optional = true }
3535
datafusion = { workspace = true }
3636
log = { workspace = true }
3737

3838
tokio = { workspace = true }
3939
url = { workspace = true }
4040

4141
[dev-dependencies]
42-
ballista-executor = { path = "../executor", version = "49.0.0" }
43-
ballista-scheduler = { path = "../scheduler", version = "49.0.0" }
42+
ballista-executor = { path = "../executor", version = "50.0.0" }
43+
ballista-scheduler = { path = "../scheduler", version = "50.0.0" }
4444
ctor = { workspace = true }
4545
datafusion-proto = { workspace = true }
4646
env_logger = { workspace = true }
47-
rstest = { version = "0.25" }
47+
rstest = { workspace = true }
4848
tempfile = { workspace = true }
4949
tonic = { workspace = true }
5050

ballista/client/tests/context_checks.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -935,4 +935,49 @@ mod supported {
935935

936936
Ok(())
937937
}
938+
939+
// Sort Merge Join is supported since DF.v50
940+
// testing if it will work in ballista
941+
#[rstest]
942+
#[case::standalone(standalone_context())]
943+
#[case::remote(remote_context())]
944+
#[tokio::test]
945+
async fn should_support_sort_merge_join(
946+
#[future(awt)]
947+
#[case]
948+
ctx: SessionContext,
949+
test_data: String,
950+
) -> datafusion::error::Result<()> {
951+
ctx.register_parquet(
952+
"t0",
953+
&format!("{test_data}/alltypes_plain.parquet"),
954+
Default::default(),
955+
)
956+
.await?;
957+
958+
ctx.register_parquet(
959+
"t1",
960+
&format!("{test_data}/alltypes_plain.parquet"),
961+
Default::default(),
962+
)
963+
.await?;
964+
ctx.sql("SET datafusion.optimizer.prefer_hash_join = false")
965+
.await?
966+
.show()
967+
.await?;
968+
let result = ctx.sql(
969+
"select t0.id from t0 join t1 on t0.id = t1.id order by t0.id desc limit 5",
970+
)
971+
.await?
972+
.collect()
973+
.await?;
974+
975+
let expected = [
976+
"+----+", "| id |", "+----+", "| 7 |", "| 6 |", "| 5 |", "| 4 |",
977+
"| 3 |", "+----+",
978+
];
979+
assert_batches_eq!(expected, &result);
980+
981+
Ok(())
982+
}
938983
}

ballista/client/tests/context_unsupported.rs

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -146,46 +146,4 @@ mod unsupported {
146146

147147
assert_batches_eq!(expected, &result);
148148
}
149-
150-
// at the moment sort merge join is not supported due to
151-
// serde issues. it should be supported with DF.50
152-
#[rstest]
153-
#[case::standalone(standalone_context())]
154-
#[case::remote(remote_context())]
155-
#[tokio::test]
156-
#[should_panic]
157-
async fn should_support_sort_merge_join(
158-
#[future(awt)]
159-
#[case]
160-
ctx: SessionContext,
161-
test_data: String,
162-
) {
163-
ctx.register_parquet(
164-
"t0",
165-
&format!("{test_data}/alltypes_plain.parquet"),
166-
Default::default(),
167-
)
168-
.await
169-
.unwrap();
170-
171-
ctx.register_parquet(
172-
"t1",
173-
&format!("{test_data}/alltypes_plain.parquet"),
174-
Default::default(),
175-
)
176-
.await
177-
.unwrap();
178-
ctx.sql("SET datafusion.optimizer.prefer_hash_join = false")
179-
.await
180-
.unwrap()
181-
.show()
182-
.await
183-
.unwrap();
184-
ctx.sql("select t0.id from t0 join t1 on t0.id = t1.id")
185-
.await
186-
.unwrap()
187-
.show()
188-
.await
189-
.unwrap();
190-
}
191149
}

ballista/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
name = "ballista-core"
2020
description = "Ballista Distributed Compute"
2121
license = "Apache-2.0"
22-
version = "49.0.0"
22+
version = "50.0.0"
2323
homepage = "https://datafusion.apache.org/ballista/"
2424
repository = "https://github.com/apache/datafusion-ballista"
2525
readme = "README.md"

ballista/core/proto/datafusion.proto

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,9 @@ message PhysicalPlanNode {
728728
UnnestExecNode unnest = 30;
729729
JsonScanExecNode json_scan = 31;
730730
CooperativeExecNode cooperative = 32;
731+
GenerateSeriesNode generate_series = 33;
732+
SortMergeJoinExecNode sort_merge_join = 34;
733+
MemoryScanExecNode memory_scan = 35;
731734
}
732735
}
733736

@@ -887,6 +890,8 @@ message PhysicalWindowExprNode {
887890
WindowFrame window_frame = 7;
888891
string name = 8;
889892
optional bytes fun_definition = 9;
893+
bool ignore_nulls = 11;
894+
bool distinct = 12;
890895
}
891896

892897
message PhysicalIsNull {
@@ -1037,6 +1042,15 @@ message AvroScanExecNode {
10371042
FileScanExecConf base_conf = 1;
10381043
}
10391044

1045+
message MemoryScanExecNode {
1046+
repeated bytes partitions = 1;
1047+
datafusion_common.Schema schema = 2;
1048+
repeated uint32 projection = 3;
1049+
repeated PhysicalSortExprNodeCollection sort_information = 4;
1050+
bool show_sizes = 5;
1051+
optional uint32 fetch = 6;
1052+
}
1053+
10401054
message CooperativeExecNode {
10411055
PhysicalPlanNode input = 1;
10421056
}
@@ -1293,4 +1307,60 @@ message RecursiveQueryNode {
12931307
message CteWorkTableScanNode {
12941308
string name = 1;
12951309
datafusion_common.Schema schema = 2;
1310+
}
1311+
1312+
enum GenerateSeriesName {
1313+
GS_GENERATE_SERIES = 0;
1314+
GS_RANGE = 1;
1315+
}
1316+
1317+
message GenerateSeriesArgsContainsNull {
1318+
GenerateSeriesName name = 1;
1319+
}
1320+
1321+
message GenerateSeriesArgsInt64 {
1322+
int64 start = 1;
1323+
int64 end = 2;
1324+
int64 step = 3;
1325+
bool include_end = 4;
1326+
GenerateSeriesName name = 5;
1327+
}
1328+
1329+
message GenerateSeriesArgsTimestamp {
1330+
int64 start = 1;
1331+
int64 end = 2;
1332+
datafusion_common.IntervalMonthDayNanoValue step = 3;
1333+
optional string tz = 4;
1334+
bool include_end = 5;
1335+
GenerateSeriesName name = 6;
1336+
}
1337+
1338+
message GenerateSeriesArgsDate {
1339+
int64 start = 1;
1340+
int64 end = 2;
1341+
datafusion_common.IntervalMonthDayNanoValue step = 3;
1342+
bool include_end = 4;
1343+
GenerateSeriesName name = 5;
1344+
}
1345+
1346+
message GenerateSeriesNode {
1347+
datafusion_common.Schema schema = 1;
1348+
uint32 target_batch_size = 2;
1349+
1350+
oneof args {
1351+
GenerateSeriesArgsContainsNull contains_null = 3;
1352+
GenerateSeriesArgsInt64 int64_args = 4;
1353+
GenerateSeriesArgsTimestamp timestamp_args = 5;
1354+
GenerateSeriesArgsDate date_args = 6;
1355+
}
1356+
}
1357+
1358+
message SortMergeJoinExecNode {
1359+
PhysicalPlanNode left = 1;
1360+
PhysicalPlanNode right = 2;
1361+
repeated JoinOn on = 3;
1362+
datafusion_common.JoinType join_type = 4;
1363+
JoinFilter filter = 5;
1364+
repeated SortExprNode sort_options = 6;
1365+
datafusion_common.NullEquality null_equality = 7;
12961366
}

ballista/core/proto/datafusion_common.proto

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,19 @@ enum IntervalUnit{
136136
MonthDayNano = 2;
137137
}
138138

139-
message Decimal{
139+
message Decimal32Type {
140+
reserved 1, 2;
141+
uint32 precision = 3;
142+
int32 scale = 4;
143+
}
144+
145+
message Decimal64Type {
146+
reserved 1, 2;
147+
uint32 precision = 3;
148+
int32 scale = 4;
149+
}
150+
151+
message Decimal128Type {
140152
reserved 1, 2;
141153
uint32 precision = 3;
142154
int32 scale = 4;
@@ -286,6 +298,8 @@ message ScalarValue{
286298
ScalarNestedValue struct_value = 32;
287299
ScalarNestedValue map_value = 41;
288300

301+
Decimal32 decimal32_value = 43;
302+
Decimal64 decimal64_value = 44;
289303
Decimal128 decimal128_value = 20;
290304
Decimal256 decimal256_value = 39;
291305

@@ -310,6 +324,18 @@ message ScalarValue{
310324
}
311325
}
312326

327+
message Decimal32{
328+
bytes value = 1;
329+
int64 p = 2;
330+
int64 s = 3;
331+
}
332+
333+
message Decimal64{
334+
bytes value = 1;
335+
int64 p = 2;
336+
int64 s = 3;
337+
}
338+
313339
message Decimal128{
314340
bytes value = 1;
315341
int64 p = 2;
@@ -352,7 +378,9 @@ message ArrowType{
352378
TimeUnit TIME32 = 21 ;
353379
TimeUnit TIME64 = 22 ;
354380
IntervalUnit INTERVAL = 23 ;
355-
Decimal DECIMAL = 24 ;
381+
Decimal32Type DECIMAL32 = 40;
382+
Decimal64Type DECIMAL64 = 41;
383+
Decimal128Type DECIMAL128 = 24;
356384
Decimal256Type DECIMAL256 = 36;
357385
List LIST = 25;
358386
List LARGE_LIST = 26;
@@ -480,9 +508,7 @@ message ParquetColumnOptions {
480508
uint64 bloom_filter_ndv = 7;
481509
}
482510

483-
oneof max_statistics_size_opt {
484-
uint32 max_statistics_size = 8;
485-
}
511+
reserved 8; // used to be uint32 max_statistics_size = 8;
486512
}
487513

488514
message ParquetOptions {
@@ -521,9 +547,7 @@ message ParquetOptions {
521547
string statistics_enabled = 13;
522548
}
523549

524-
oneof max_statistics_size_opt {
525-
uint64 max_statistics_size = 14;
526-
}
550+
reserved 14; // used to be uint32 max_statistics_size = 20;
527551

528552
oneof column_index_truncate_length_opt {
529553
uint64 column_index_truncate_length = 17;

0 commit comments

Comments
 (0)