Skip to content

Commit eab22e2

Browse files
chore: update datafusion to 51.0 (#1345)
* cherry pick Marko's work * update code and test based on new DF apis * fmt toml and bump rust min version * update benchmark tests * fmt * Using task ctx directly * update protos * fmt * remove session context from few more places * remove unnecessary clone --------- Co-authored-by: Marko Milenković <[email protected]>
1 parent a9b0624 commit eab22e2

File tree

20 files changed

+1060
-968
lines changed

20 files changed

+1060
-968
lines changed

Cargo.lock

Lines changed: 536 additions & 490 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,28 @@ resolver = "2"
2727
#
2828
edition = "2021"
2929
# we should try to follow datafusion version
30-
rust-version = "1.86.0"
30+
rust-version = "1.88.0"
3131

3232
[workspace.dependencies]
33-
arrow = { version = "56", features = ["ipc_compression"] }
34-
arrow-flight = { version = "56", features = ["flight-sql-experimental"] }
33+
arrow = { version = "57", features = ["ipc_compression"] }
34+
arrow-flight = { version = "57", features = ["flight-sql-experimental"] }
3535
clap = { version = "4.5", features = ["derive", "cargo"] }
3636
configure_me = { version = "0.4.0" }
3737
configure_me_codegen = { version = "0.4.4" }
38-
datafusion = "50.3.0"
39-
datafusion-cli = "50.3.0"
40-
datafusion-proto = "50.3.0"
41-
datafusion-proto-common = "50.3.0"
38+
datafusion = "51.0.0"
39+
datafusion-cli = "51.0.0"
40+
datafusion-proto = "51.0.0"
41+
datafusion-proto-common = "51.0.0"
4242
object_store = "0.12"
43-
prost = "0.13"
44-
prost-types = "0.13"
43+
prost = "0.14"
44+
prost-types = "0.14"
4545
rstest = { version = "0.26" }
46-
tonic = { version = "0.13" }
47-
tonic-build = { version = "0.13", default-features = false, features = [
46+
tonic = { version = "0.14" }
47+
tonic-build = { version = "0.14", default-features = false, features = [
4848
"transport",
49-
"prost"
5049
] }
50+
tonic-prost = { version = "0.14" }
51+
tonic-prost-build = { version = "0.14" }
5152
tracing = "0.1"
5253
tracing-appender = "0.2.2"
5354
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

ballista-cli/src/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::env;
1918
use std::path::Path;
19+
use std::{env, sync::Arc};
2020

2121
use ballista::{extension::SessionConfigExt, prelude::SessionContextExt};
2222
use ballista_cli::{
@@ -28,7 +28,9 @@ use datafusion::{
2828
execution::SessionStateBuilder,
2929
prelude::{SessionConfig, SessionContext},
3030
};
31-
use datafusion_cli::print_options::MaxRows;
31+
use datafusion_cli::{
32+
object_storage::instrumented::InstrumentedObjectStoreRegistry, print_options::MaxRows,
33+
};
3234
use mimalloc::MiMalloc;
3335

3436
#[global_allocator]
@@ -151,6 +153,7 @@ pub async fn main() -> Result<()> {
151153
quiet: args.quiet,
152154
maxrows: MaxRows::Unlimited,
153155
color: args.color,
156+
instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
154157
};
155158

156159
let files = args.file;

ballista/client/tests/context_basic.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,14 @@ mod basic {
113113

114114
let result = df.unwrap().collect().await.unwrap();
115115

116-
let expected = ["+---------------+--------------+---------------------+-------------+-----------------------------+-------------+",
117-
"| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |",
118-
"+---------------+--------------+---------------------+-------------+-----------------------------+-------------+",
119-
"| datafusion | public | csv_with_timestamps | name | Utf8 | YES |",
120-
"| datafusion | public | csv_with_timestamps | ts | Timestamp(Nanosecond, None) | YES |",
121-
"+---------------+--------------+---------------------+-------------+-----------------------------+-------------+"];
116+
let expected = [
117+
"+---------------+--------------+---------------------+-------------+---------------+-------------+",
118+
"| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |",
119+
"+---------------+--------------+---------------------+-------------+---------------+-------------+",
120+
"| datafusion | public | csv_with_timestamps | name | Utf8 | YES |",
121+
"| datafusion | public | csv_with_timestamps | ts | Timestamp(ns) | YES |",
122+
"+---------------+--------------+---------------------+-------------+---------------+-------------+",
123+
];
122124
datafusion::assert_batches_eq!(expected, &result);
123125
}
124126

@@ -359,7 +361,7 @@ mod basic {
359361
let res = df.collect().await.unwrap();
360362
let expected = vec![
361363
"+-------------------+",
362-
"| var(test.id) |",
364+
"| var_samp(test.id) |",
363365
"+-------------------+",
364366
"| 6.000000000000001 |",
365367
"+-------------------+",
@@ -390,11 +392,11 @@ mod basic {
390392
.unwrap();
391393
let res = df.collect().await.unwrap();
392394
let expected = vec![
393-
"+--------------------+",
394-
"| stddev(test.id) |",
395-
"+--------------------+",
396-
"| 2.4494897427831783 |",
397-
"+--------------------+",
395+
"+----------------------+",
396+
"| stddev_samp(test.id) |",
397+
"+----------------------+",
398+
"| 2.4494897427831783 |",
399+
"+----------------------+",
398400
];
399401
assert_result_eq(expected, &res);
400402
}
@@ -408,11 +410,11 @@ mod basic {
408410
.unwrap();
409411
let res = df.collect().await.unwrap();
410412
let expected = vec![
411-
"+--------------------------------------+",
412-
"| covar_samp(test.id,test.tinyint_col) |",
413-
"+--------------------------------------+",
414-
"| 0.28571428571428586 |",
415-
"+--------------------------------------+",
413+
"+---------------------------------+",
414+
"| covar(test.id,test.tinyint_col) |",
415+
"+---------------------------------+",
416+
"| 0.28571428571428586 |",
417+
"+---------------------------------+",
416418
];
417419
assert_result_eq(expected, &res);
418420
}

ballista/client/tests/context_setup.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ mod standalone {
110110
use datafusion::{
111111
assert_batches_eq,
112112
common::exec_err,
113-
execution::{context::QueryPlanner, SessionState, SessionStateBuilder},
113+
execution::{
114+
context::QueryPlanner, SessionState, SessionStateBuilder, TaskContext,
115+
},
114116
logical_expr::LogicalPlan,
115117
physical_plan::ExecutionPlan,
116118
prelude::{SessionConfig, SessionContext},
@@ -275,7 +277,7 @@ mod standalone {
275277
&self,
276278
_buf: &[u8],
277279
_inputs: &[datafusion::logical_expr::LogicalPlan],
278-
_ctx: &SessionContext,
280+
_ctx: &TaskContext,
279281
) -> datafusion::error::Result<datafusion::logical_expr::Extension> {
280282
self.invoked
281283
.store(true, std::sync::atomic::Ordering::Relaxed);
@@ -297,7 +299,7 @@ mod standalone {
297299
_buf: &[u8],
298300
_table_ref: &datafusion::sql::TableReference,
299301
_schema: datafusion::arrow::datatypes::SchemaRef,
300-
_ctx: &SessionContext,
302+
_ctx: &TaskContext,
301303
) -> datafusion::error::Result<
302304
std::sync::Arc<dyn datafusion::catalog::TableProvider>,
303305
> {
@@ -320,7 +322,7 @@ mod standalone {
320322
fn try_decode_file_format(
321323
&self,
322324
_buf: &[u8],
323-
_ctx: &SessionContext,
325+
_ctx: &TaskContext,
324326
) -> datafusion::error::Result<
325327
Arc<dyn datafusion::datasource::file_format::FileFormatFactory>,
326328
> {
@@ -361,12 +363,12 @@ mod standalone {
361363
&self,
362364
buf: &[u8],
363365
inputs: &[Arc<dyn datafusion::physical_plan::ExecutionPlan>],
364-
registry: &dyn datafusion::execution::FunctionRegistry,
366+
ctx: &TaskContext,
365367
) -> datafusion::error::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>>
366368
{
367369
self.invoked
368370
.store(true, std::sync::atomic::Ordering::Relaxed);
369-
self.codec.try_decode(buf, inputs, registry)
371+
self.codec.try_decode(buf, inputs, ctx)
370372
}
371373

372374
fn try_encode(

ballista/core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ serde = { workspace = true, features = ["derive"] }
6363
tokio = { workspace = true }
6464
tokio-stream = { workspace = true, features = ["net"] }
6565
tonic = { workspace = true }
66+
tonic-prost = { workspace = true }
6667
url = { workspace = true }
6768
uuid = { workspace = true }
6869

@@ -72,3 +73,4 @@ tempfile = { workspace = true }
7273
[build-dependencies]
7374
rustc_version = "0.4.1"
7475
tonic-build = { workspace = true }
76+
tonic-prost-build = { workspace = true }

ballista/core/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ fn main() -> Result<(), String> {
4040
println!("cargo:rerun-if-changed=proto/datafusion_common.proto");
4141
println!("cargo:rerun-if-changed=proto/datafusion.proto");
4242
println!("cargo:rerun-if-changed=proto/ballista.proto");
43-
tonic_build::configure()
43+
tonic_prost_build::configure()
4444
.extern_path(".datafusion_common", "::datafusion_proto_common")
4545
.extern_path(".datafusion", "::datafusion_proto::protobuf")
4646
.protoc_arg("--experimental_allow_proto3_optional")

ballista/core/proto/ballista.proto

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,18 @@ message NamedTime {
264264
uint64 value = 2;
265265
}
266266

267+
message NamedPruningMetrics {
268+
string name = 1;
269+
uint64 pruned = 2;
270+
uint64 matched = 3;
271+
}
272+
273+
message NamedRatio {
274+
string name = 1;
275+
uint64 part = 2;
276+
uint64 total = 3;
277+
}
278+
267279
message OperatorMetric {
268280
oneof metric {
269281
uint64 output_rows = 1;
@@ -277,6 +289,9 @@ message OperatorMetric {
277289
int64 start_timestamp = 9;
278290
int64 end_timestamp = 10;
279291
uint64 spilled_rows = 11;
292+
uint64 output_bytes = 12;
293+
NamedPruningMetrics pruning_metrics = 13;
294+
NamedRatio ratio = 14;
280295
}
281296
}
282297

ballista/core/proto/datafusion.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ message CreateExternalTableNode {
167167
datafusion_common.DfSchema schema = 4;
168168
repeated string table_partition_cols = 5;
169169
bool if_not_exists = 6;
170+
bool or_replace = 15;
170171
bool temporary = 14;
171172
string definition = 7;
172173
repeated SortExprNodeCollection order_exprs = 10;
@@ -180,6 +181,7 @@ message PrepareNode {
180181
string name = 1;
181182
repeated datafusion_common.ArrowType data_types = 2;
182183
LogicalPlanNode input = 3;
184+
repeated datafusion_common.Field fields = 4;
183185
}
184186

185187
message CreateCatalogSchemaNode {
@@ -412,6 +414,8 @@ message Wildcard {
412414
message PlaceholderNode {
413415
string id = 1;
414416
datafusion_common.ArrowType data_type = 2;
417+
optional bool nullable = 3;
418+
map<string, string> metadata = 4;
415419
}
416420

417421
message LogicalExprList {
@@ -517,6 +521,7 @@ message AggregateUDFExprNode {
517521
LogicalExprNode filter = 3;
518522
repeated SortExprNode order_by = 4;
519523
optional bytes fun_definition = 6;
524+
optional NullTreatment null_treatment = 7;
520525
}
521526

522527
message ScalarUDFExprNode {
@@ -537,6 +542,9 @@ message WindowExprNode {
537542
// repeated LogicalExprNode filter = 7;
538543
WindowFrame window_frame = 8;
539544
optional bytes fun_definition = 10;
545+
optional NullTreatment null_treatment = 11;
546+
bool distinct = 12;
547+
LogicalExprNode filter = 13;
540548
}
541549

542550
message BetweenNode {
@@ -621,6 +629,11 @@ message WindowFrameBound {
621629
datafusion_common.ScalarValue bound_value = 2;
622630
}
623631

632+
enum NullTreatment {
633+
RESPECT_NULLS = 0;
634+
IGNORE_NULLS = 1;
635+
}
636+
624637
///////////////////////////////////////////////////////////////////////////////////////////////////
625638
// Arrow Data Types
626639
///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1032,6 +1045,7 @@ message CsvScanExecNode {
10321045
string comment = 6;
10331046
}
10341047
bool newlines_in_values = 7;
1048+
bool truncate_rows = 8;
10351049
}
10361050

10371051
message JsonScanExecNode {

ballista/core/proto/datafusion_common.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ message CsvOptions {
460460
bytes double_quote = 15; // Indicates if quotes are doubled
461461
bytes newlines_in_values = 16; // Indicates if newlines are supported in values
462462
bytes terminator = 17; // Optional terminator character as a byte
463+
bytes truncated_rows = 18; // Indicates if truncated rows are allowed
463464
}
464465

465466
// Options controlling CSV format
@@ -580,6 +581,10 @@ message ParquetOptions {
580581
oneof coerce_int96_opt {
581582
string coerce_int96 = 32;
582583
}
584+
585+
oneof max_predicate_cache_size_opt {
586+
uint64 max_predicate_cache_size = 33;
587+
}
583588
}
584589

585590
enum JoinSide {

0 commit comments

Comments
 (0)