Skip to content
Merged
1,438 changes: 728 additions & 710 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,28 @@ resolver = "2"
[workspace.package]
# edition to be changed to 2024 when we update
# Minimum Supported Rust Version (MSRV) to 1.85.0
# which is datafusion 49s
# which is datafusion 49
#
edition = "2021"
# we should try to follow datafusion version
rust-version = "1.82.0"
rust-version = "1.86.0"

[workspace.dependencies]
arrow = { version = "55", features = ["ipc_compression"] }
arrow-flight = { version = "55", features = ["flight-sql-experimental"] }
arrow = { version = "56", features = ["ipc_compression"] }
arrow-flight = { version = "56", features = ["flight-sql-experimental"] }
clap = { version = "4.5", features = ["derive", "cargo"] }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
datafusion = "49.0.2"
datafusion-cli = "49.0.2"
datafusion-proto = "49.0.2"
datafusion-proto-common = "49.0.2"
datafusion = "50.1.0"
datafusion-cli = "50.1.0"
datafusion-proto = "50.1.0"
datafusion-proto-common = "50.1.0"
object_store = "0.12"
prost = "0.13"
prost-types = "0.13"
tonic = { version = "0.12" }
tonic-build = { version = "0.12", default-features = false, features = [
rstest = { version = "0.26" }
tonic = { version = "0.13" }
tonic-build = { version = "0.13", default-features = false, features = [
"transport",
"prost"
] }
Expand Down
6 changes: 3 additions & 3 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
[package]
name = "ballista-cli"
description = "Command Line Client for Ballista distributed query engine."
version = "49.0.0"
version = "50.0.0"
authors = ["Apache DataFusion <[email protected]>"]
edition = { workspace = true }
rust-version = { workspace = true }
Expand All @@ -29,14 +29,14 @@ repository = "https://github.com/apache/datafusion-ballista"
readme = "README.md"

[dependencies]
ballista = { path = "../ballista/client", version = "49.0.0", features = ["standalone"] }
ballista = { path = "../ballista/client", version = "50.0.0", features = ["standalone"] }
clap = { workspace = true, features = ["derive", "cargo"] }
datafusion = { workspace = true }
datafusion-cli = { workspace = true }
dirs = "6.0"
env_logger = { workspace = true }
mimalloc = { workspace = true }
rustyline = "16.0.0"
rustyline = "17.0.1"
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }

[features]
Expand Down
14 changes: 7 additions & 7 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista"
description = "Ballista Distributed Compute"
license = "Apache-2.0"
version = "49.0.0"
version = "50.0.0"
homepage = "https://datafusion.apache.org/ballista/"
repository = "https://github.com/apache/datafusion-ballista"
readme = "README.md"
Expand All @@ -29,22 +29,22 @@ rust-version = { workspace = true }

[dependencies]
async-trait = { workspace = true }
ballista-core = { path = "../core", version = "49.0.0" }
ballista-executor = { path = "../executor", version = "49.0.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "49.0.0", optional = true }
ballista-core = { path = "../core", version = "50.0.0" }
ballista-executor = { path = "../executor", version = "50.0.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "50.0.0", optional = true }
datafusion = { workspace = true }
log = { workspace = true }

tokio = { workspace = true }
url = { workspace = true }

[dev-dependencies]
ballista-executor = { path = "../executor", version = "49.0.0" }
ballista-scheduler = { path = "../scheduler", version = "49.0.0" }
ballista-executor = { path = "../executor", version = "50.0.0" }
ballista-scheduler = { path = "../scheduler", version = "50.0.0" }
ctor = { workspace = true }
datafusion-proto = { workspace = true }
env_logger = { workspace = true }
rstest = { version = "0.25" }
rstest = { workspace = true }
tempfile = { workspace = true }
tonic = { workspace = true }

Expand Down
45 changes: 45 additions & 0 deletions ballista/client/tests/context_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,4 +935,49 @@ mod supported {

Ok(())
}

// Sort Merge Join is supported since DF.v50
// testing if it will work in ballista
#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
async fn should_support_sort_merge_join(
#[future(awt)]
#[case]
ctx: SessionContext,
test_data: String,
) -> datafusion::error::Result<()> {
ctx.register_parquet(
"t0",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await?;

ctx.register_parquet(
"t1",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await?;
ctx.sql("SET datafusion.optimizer.prefer_hash_join = false")
.await?
.show()
.await?;
let result = ctx.sql(
"select t0.id from t0 join t1 on t0.id = t1.id order by t0.id desc limit 5",
)
.await?
.collect()
.await?;

let expected = [
"+----+", "| id |", "+----+", "| 7 |", "| 6 |", "| 5 |", "| 4 |",
"| 3 |", "+----+",
];
assert_batches_eq!(expected, &result);

Ok(())
}
}
42 changes: 0 additions & 42 deletions ballista/client/tests/context_unsupported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,46 +146,4 @@ mod unsupported {

assert_batches_eq!(expected, &result);
}

// at the moment sort merge join is not supported due to
// serde issues. it should be supported with DF.50
#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
#[should_panic]
async fn should_support_sort_merge_join(
#[future(awt)]
#[case]
ctx: SessionContext,
test_data: String,
) {
ctx.register_parquet(
"t0",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await
.unwrap();

ctx.register_parquet(
"t1",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await
.unwrap();
ctx.sql("SET datafusion.optimizer.prefer_hash_join = false")
.await
.unwrap()
.show()
.await
.unwrap();
ctx.sql("select t0.id from t0 join t1 on t0.id = t1.id")
.await
.unwrap()
.show()
.await
.unwrap();
}
}
2 changes: 1 addition & 1 deletion ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista-core"
description = "Ballista Distributed Compute"
license = "Apache-2.0"
version = "49.0.0"
version = "50.0.0"
homepage = "https://datafusion.apache.org/ballista/"
repository = "https://github.com/apache/datafusion-ballista"
readme = "README.md"
Expand Down
70 changes: 70 additions & 0 deletions ballista/core/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,9 @@ message PhysicalPlanNode {
UnnestExecNode unnest = 30;
JsonScanExecNode json_scan = 31;
CooperativeExecNode cooperative = 32;
GenerateSeriesNode generate_series = 33;
SortMergeJoinExecNode sort_merge_join = 34;
MemoryScanExecNode memory_scan = 35;
}
}

Expand Down Expand Up @@ -887,6 +890,8 @@ message PhysicalWindowExprNode {
WindowFrame window_frame = 7;
string name = 8;
optional bytes fun_definition = 9;
bool ignore_nulls = 11;
bool distinct = 12;
}

message PhysicalIsNull {
Expand Down Expand Up @@ -1037,6 +1042,15 @@ message AvroScanExecNode {
FileScanExecConf base_conf = 1;
}

message MemoryScanExecNode {
repeated bytes partitions = 1;
datafusion_common.Schema schema = 2;
repeated uint32 projection = 3;
repeated PhysicalSortExprNodeCollection sort_information = 4;
bool show_sizes = 5;
optional uint32 fetch = 6;
}

message CooperativeExecNode {
PhysicalPlanNode input = 1;
}
Expand Down Expand Up @@ -1293,4 +1307,60 @@ message RecursiveQueryNode {
message CteWorkTableScanNode {
string name = 1;
datafusion_common.Schema schema = 2;
}

enum GenerateSeriesName {
GS_GENERATE_SERIES = 0;
GS_RANGE = 1;
}

message GenerateSeriesArgsContainsNull {
GenerateSeriesName name = 1;
}

message GenerateSeriesArgsInt64 {
int64 start = 1;
int64 end = 2;
int64 step = 3;
bool include_end = 4;
GenerateSeriesName name = 5;
}

message GenerateSeriesArgsTimestamp {
int64 start = 1;
int64 end = 2;
datafusion_common.IntervalMonthDayNanoValue step = 3;
optional string tz = 4;
bool include_end = 5;
GenerateSeriesName name = 6;
}

message GenerateSeriesArgsDate {
int64 start = 1;
int64 end = 2;
datafusion_common.IntervalMonthDayNanoValue step = 3;
bool include_end = 4;
GenerateSeriesName name = 5;
}

message GenerateSeriesNode {
datafusion_common.Schema schema = 1;
uint32 target_batch_size = 2;

oneof args {
GenerateSeriesArgsContainsNull contains_null = 3;
GenerateSeriesArgsInt64 int64_args = 4;
GenerateSeriesArgsTimestamp timestamp_args = 5;
GenerateSeriesArgsDate date_args = 6;
}
}

message SortMergeJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
repeated JoinOn on = 3;
datafusion_common.JoinType join_type = 4;
JoinFilter filter = 5;
repeated SortExprNode sort_options = 6;
datafusion_common.NullEquality null_equality = 7;
}
40 changes: 32 additions & 8 deletions ballista/core/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,19 @@ enum IntervalUnit{
MonthDayNano = 2;
}

message Decimal{
message Decimal32Type {
reserved 1, 2;
uint32 precision = 3;
int32 scale = 4;
}

message Decimal64Type {
reserved 1, 2;
uint32 precision = 3;
int32 scale = 4;
}

message Decimal128Type {
reserved 1, 2;
uint32 precision = 3;
int32 scale = 4;
Expand Down Expand Up @@ -286,6 +298,8 @@ message ScalarValue{
ScalarNestedValue struct_value = 32;
ScalarNestedValue map_value = 41;

Decimal32 decimal32_value = 43;
Decimal64 decimal64_value = 44;
Decimal128 decimal128_value = 20;
Decimal256 decimal256_value = 39;

Expand All @@ -310,6 +324,18 @@ message ScalarValue{
}
}

message Decimal32{
bytes value = 1;
int64 p = 2;
int64 s = 3;
}

message Decimal64{
bytes value = 1;
int64 p = 2;
int64 s = 3;
}

message Decimal128{
bytes value = 1;
int64 p = 2;
Expand Down Expand Up @@ -352,7 +378,9 @@ message ArrowType{
TimeUnit TIME32 = 21 ;
TimeUnit TIME64 = 22 ;
IntervalUnit INTERVAL = 23 ;
Decimal DECIMAL = 24 ;
Decimal32Type DECIMAL32 = 40;
Decimal64Type DECIMAL64 = 41;
Decimal128Type DECIMAL128 = 24;
Decimal256Type DECIMAL256 = 36;
List LIST = 25;
List LARGE_LIST = 26;
Expand Down Expand Up @@ -480,9 +508,7 @@ message ParquetColumnOptions {
uint64 bloom_filter_ndv = 7;
}

oneof max_statistics_size_opt {
uint32 max_statistics_size = 8;
}
reserved 8; // used to be uint32 max_statistics_size = 8;
}

message ParquetOptions {
Expand Down Expand Up @@ -521,9 +547,7 @@ message ParquetOptions {
string statistics_enabled = 13;
}

oneof max_statistics_size_opt {
uint64 max_statistics_size = 14;
}
reserved 14; // used to be uint32 max_statistics_size = 20;

oneof column_index_truncate_length_opt {
uint64 column_index_truncate_length = 17;
Expand Down
Loading