Skip to content

Commit 2a998ff

Browse files
committed
Upgrade to DF v51
1 parent a8952c7 commit 2a998ff

File tree

13 files changed

+657
-554
lines changed

13 files changed

+657
-554
lines changed

Cargo.lock

Lines changed: 580 additions & 485 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
members = ["benchmarks"]
33

44
[workspace.dependencies]
5-
datafusion = { version = "50.0.0", default-features = false }
6-
datafusion-proto = { version = "50.0.0" }
5+
datafusion = { version = "51.0.0", default-features = false }
6+
datafusion-proto = { version = "51.0.0" }
77

88
[package]
99
name = "datafusion-distributed"
@@ -12,14 +12,17 @@ edition = "2024"
1212

1313
[dependencies]
1414
chrono = { version = "0.4.42" }
15-
datafusion = { workspace = true }
15+
datafusion = { workspace = true, features = [
16+
"parquet",
17+
"sql",
18+
"unicode_expressions",
19+
] }
1620
datafusion-proto = { workspace = true }
17-
arrow-flight = "56.1.0"
18-
arrow-select = "56.1.0"
21+
arrow-flight = "57.0.0"
22+
arrow-select = "57.0.0"
1923
async-trait = "0.1.88"
2024
tokio = { version = "1.46.1", features = ["full"] }
21-
# Updated to 0.13.1 to match arrow-flight 56.1.0
22-
tonic = { version = "0.13.1", features = ["transport"] }
25+
tonic = { version = "0.14.2", features = ["transport"] }
2326
tower = "0.5.2"
2427
http = "1.3.1"
2528
itertools = "0.14.0"
@@ -28,18 +31,18 @@ url = "2.5.4"
2831
uuid = "1.17.0"
2932
delegate = "0.13.4"
3033
dashmap = "6.1.0"
31-
prost = "0.13.5"
34+
prost = "0.14.0"
3235
rand = "0.8.5"
3336
object_store = "0.12.3"
3437
bytes = "1.10.1"
3538
pin-project = "1.1.10"
3639

3740
# integration_tests deps
3841
insta = { version = "1.43.1", features = ["filters"], optional = true }
39-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
40-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
41-
parquet = { version = "56.1.0", optional = true }
42-
arrow = { version = "56.1.0", optional = true }
42+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf", optional = true }
43+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf", optional = true }
44+
parquet = { version = "57.0.0", optional = true }
45+
arrow = { version = "57.0.0", optional = true }
4346
tokio-stream = { version = "0.1.17", optional = true }
4447
hyper-util = { version = "0.1.16", optional = true }
4548
pretty_assertions = { version = "1.4", optional = true }
@@ -53,18 +56,18 @@ integration = [
5356
"arrow",
5457
"tokio-stream",
5558
"hyper-util",
56-
"pretty_assertions"
59+
"pretty_assertions",
5760
]
5861

5962
tpch = ["integration"]
6063

6164
[dev-dependencies]
6265
structopt = "0.3"
6366
insta = { version = "1.43.1", features = ["filters"] }
64-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
65-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
66-
parquet = "56.1.0"
67-
arrow = "56.1.0"
67+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" }
68+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" }
69+
parquet = "57.0.0"
70+
arrow = "57.0.0"
6871
tokio-stream = "0.1.17"
6972
hyper-util = "0.1.16"
7073
pretty_assertions = "1.4"

rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[toolchain]
2-
channel = "1.86.0"
2+
channel = "1.88.0"
33
profile = "default"

src/distributed_ext.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pub trait DistributedExt: Sized {
137137
/// ```
138138
/// # use std::sync::Arc;
139139
/// # use datafusion::common::DataFusionError;
140-
/// # use datafusion::execution::{SessionState, FunctionRegistry, SessionStateBuilder};
140+
/// # use datafusion::execution::{SessionState, FunctionRegistry, SessionStateBuilder, TaskContext};
141141
/// # use datafusion::physical_plan::ExecutionPlan;
142142
/// # use datafusion::prelude::SessionConfig;
143143
/// # use datafusion_proto::physical_plan::PhysicalExtensionCodec;
@@ -147,7 +147,7 @@ pub trait DistributedExt: Sized {
147147
/// struct CustomExecCodec;
148148
///
149149
/// impl PhysicalExtensionCodec for CustomExecCodec {
150-
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], registry: &dyn FunctionRegistry) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
150+
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
151151
/// todo!()
152152
/// }
153153
///

src/flight_service/do_get.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl ArrowFlightEndpoint {
9696
let stage_data = once
9797
.get_or_try_init(|| async {
9898
let proto_node = PhysicalPlanNode::try_decode(doget.plan_proto.as_ref())?;
99-
let mut plan = proto_node.try_into_physical_plan(&ctx, &self.runtime, &codec)?;
99+
let mut plan = proto_node.try_into_physical_plan(&ctx.task_ctx(), &codec)?;
100100
for hook in self.hooks.on_plan.iter() {
101101
plan = hook(plan)
102102
}

src/flight_service/session_builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub trait DistributedSessionBuilder {
2525
/// # use std::sync::Arc;
2626
/// # use async_trait::async_trait;
2727
/// # use datafusion::error::DataFusionError;
28-
/// # use datafusion::execution::{FunctionRegistry, SessionState, SessionStateBuilder};
28+
/// # use datafusion::execution::{FunctionRegistry, SessionState, SessionStateBuilder, TaskContext};
2929
/// # use datafusion::physical_plan::ExecutionPlan;
3030
/// # use datafusion_proto::physical_plan::PhysicalExtensionCodec;
3131
/// # use datafusion_distributed::{DistributedExt, DistributedSessionBuilder, DistributedSessionBuilderContext};
@@ -34,7 +34,7 @@ pub trait DistributedSessionBuilder {
3434
/// struct CustomExecCodec;
3535
///
3636
/// impl PhysicalExtensionCodec for CustomExecCodec {
37-
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], registry: &dyn FunctionRegistry) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
37+
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
3838
/// todo!()
3939
/// }
4040
///

src/metrics/proto.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ pub fn df_metrics_set_to_proto(
160160
Err(err) => {
161161
// Check if this is the specific custom metrics error we want to filter out
162162
if let DataFusionError::Internal(msg) = &err {
163-
if msg == CUSTOM_METRICS_NOT_SUPPORTED {
164-
// Filter out custom metrics error - continue processing other metrics
163+
if msg == CUSTOM_METRICS_NOT_SUPPORTED || msg == UNSUPPORTED_METRICS {
164+
// Filter out custom/unsupported metrics error - continue processing other metrics
165165
continue;
166166
}
167167
}
@@ -191,6 +191,9 @@ pub fn metrics_set_proto_to_df(
191191
const CUSTOM_METRICS_NOT_SUPPORTED: &str =
192192
"custom metrics are not supported in metrics proto conversion";
193193

194+
/// New DataFusion metrics that are not yet supported in proto conversion.
195+
const UNSUPPORTED_METRICS: &str = "metric type not supported in proto conversion";
196+
194197
/// df_metric_to_proto converts a `datafusion::physical_plan::metrics::Metric` to a `MetricProto`. It does not consume the Arc<Metric>.
195198
pub fn df_metric_to_proto(metric: Arc<Metric>) -> Result<MetricProto, DataFusionError> {
196199
let partition = metric.partition().map(|p| p as u64);
@@ -285,6 +288,10 @@ pub fn df_metric_to_proto(metric: Arc<Metric>) -> Result<MetricProto, DataFusion
285288
labels,
286289
}),
287290
MetricValue::Custom { .. } => internal_err!("{}", CUSTOM_METRICS_NOT_SUPPORTED),
291+
MetricValue::OutputBytes(_) | MetricValue::PruningMetrics { .. } | MetricValue::Ratio { .. } => {
292+
// TODO: Support these metrics
293+
internal_err!("{}", UNSUPPORTED_METRICS)
294+
}
288295
}
289296
}
290297

src/protobuf/distributed_codec.rs

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use datafusion::arrow::datatypes::Schema;
88
use datafusion::arrow::datatypes::SchemaRef;
99
use datafusion::common::internal_datafusion_err;
1010
use datafusion::error::DataFusionError;
11-
use datafusion::execution::{FunctionRegistry, SessionStateBuilder};
11+
use datafusion::execution::TaskContext;
1212
use datafusion::physical_expr::EquivalenceProperties;
1313
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
1414
use datafusion::physical_plan::{ExecutionPlan, Partitioning, PlanProperties};
15-
use datafusion::prelude::{SessionConfig, SessionContext};
15+
use datafusion::prelude::SessionConfig;
1616
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
1717
use datafusion_proto::physical_plan::to_proto::serialize_partitioning;
1818
use datafusion_proto::physical_plan::{ComposedPhysicalExtensionCodec, PhysicalExtensionCodec};
@@ -40,7 +40,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
4040
&self,
4141
buf: &[u8],
4242
inputs: &[Arc<dyn ExecutionPlan>],
43-
registry: &dyn FunctionRegistry,
43+
ctx: &TaskContext,
4444
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
4545
let DistributedExecProto {
4646
node: Some(distributed_exec_node),
@@ -51,20 +51,6 @@ impl PhysicalExtensionCodec for DistributedCodec {
5151
));
5252
};
5353

54-
// TODO: The PhysicalExtensionCodec trait doesn't provide access to session state,
55-
// so we create a new SessionContext which loses any custom UDFs, UDAFs, and other
56-
// user configurations. This is a limitation of the current trait design.
57-
let state = SessionStateBuilder::new()
58-
.with_scalar_functions(
59-
registry
60-
.udfs()
61-
.iter()
62-
.map(|f| registry.udf(f))
63-
.collect::<Result<Vec<_>, _>>()?,
64-
)
65-
.build();
66-
let ctx = SessionContext::from(state);
67-
6854
fn parse_stage_proto(
6955
proto: Option<StageProto>,
7056
inputs: &[Arc<dyn ExecutionPlan>],
@@ -114,7 +100,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
114100

115101
let partitioning = parse_protobuf_partitioning(
116102
partitioning.as_ref(),
117-
&ctx,
103+
ctx,
118104
&schema,
119105
&DistributedCodec {},
120106
)?
@@ -138,7 +124,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
138124

139125
let partitioning = parse_protobuf_partitioning(
140126
partitioning.as_ref(),
141-
&ctx,
127+
ctx,
142128
&schema,
143129
&DistributedCodec {},
144130
)?
@@ -403,11 +389,12 @@ mod tests {
403389
use datafusion::physical_expr::LexOrdering;
404390
use datafusion::physical_plan::empty::EmptyExec;
405391
use datafusion::{
406-
execution::registry::MemoryFunctionRegistry,
407392
physical_expr::{Partitioning, PhysicalSortExpr, expressions::Column, expressions::col},
408393
physical_plan::{ExecutionPlan, displayable, sorts::sort::SortExec, union::UnionExec},
409394
};
410395

396+
use datafusion::prelude::SessionContext;
397+
411398
fn empty_exec() -> Arc<dyn ExecutionPlan> {
412399
Arc::new(EmptyExec::new(SchemaRef::new(Schema::empty())))
413400
}
@@ -429,10 +416,14 @@ mod tests {
429416
displayable(plan.as_ref()).indent(true).to_string()
430417
}
431418

419+
fn create_context() -> Arc<TaskContext> {
420+
SessionContext::new().task_ctx()
421+
}
422+
432423
#[test]
433424
fn test_roundtrip_single_flight() -> datafusion::common::Result<()> {
434425
let codec = DistributedCodec;
435-
let registry = MemoryFunctionRegistry::new();
426+
let ctx = create_context();
436427

437428
let schema = schema_i32("a");
438429
let part = Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 4);
@@ -442,7 +433,7 @@ mod tests {
442433
let mut buf = Vec::new();
443434
codec.try_encode(plan.clone(), &mut buf)?;
444435

445-
let decoded = codec.try_decode(&buf, &[empty_exec()], &registry)?;
436+
let decoded = codec.try_decode(&buf, &[empty_exec()], &ctx)?;
446437
assert_eq!(repr(&plan), repr(&decoded));
447438

448439
Ok(())
@@ -451,7 +442,7 @@ mod tests {
451442
#[test]
452443
fn test_roundtrip_isolator_flight() -> datafusion::common::Result<()> {
453444
let codec = DistributedCodec;
454-
let registry = MemoryFunctionRegistry::new();
445+
let ctx = create_context();
455446

456447
let schema = schema_i32("b");
457448
let flight = Arc::new(new_network_hash_shuffle_exec(
@@ -466,7 +457,7 @@ mod tests {
466457
let mut buf = Vec::new();
467458
codec.try_encode(plan.clone(), &mut buf)?;
468459

469-
let decoded = codec.try_decode(&buf, &[flight], &registry)?;
460+
let decoded = codec.try_decode(&buf, &[flight], &ctx)?;
470461
assert_eq!(repr(&plan), repr(&decoded));
471462

472463
Ok(())
@@ -475,7 +466,7 @@ mod tests {
475466
#[test]
476467
fn test_roundtrip_isolator_union() -> datafusion::common::Result<()> {
477468
let codec = DistributedCodec;
478-
let registry = MemoryFunctionRegistry::new();
469+
let ctx = create_context();
479470

480471
let schema = schema_i32("c");
481472
let left = Arc::new(new_network_hash_shuffle_exec(
@@ -489,14 +480,14 @@ mod tests {
489480
dummy_stage(),
490481
));
491482

492-
let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()]));
483+
let union = UnionExec::try_new(vec![left.clone(), right.clone()])?;
493484
let plan: Arc<dyn ExecutionPlan> =
494485
Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 1)?);
495486

496487
let mut buf = Vec::new();
497488
codec.try_encode(plan.clone(), &mut buf)?;
498489

499-
let decoded = codec.try_decode(&buf, &[union], &registry)?;
490+
let decoded = codec.try_decode(&buf, &[union], &ctx)?;
500491
assert_eq!(repr(&plan), repr(&decoded));
501492

502493
Ok(())
@@ -505,7 +496,7 @@ mod tests {
505496
#[test]
506497
fn test_roundtrip_isolator_sort_flight() -> datafusion::common::Result<()> {
507498
let codec = DistributedCodec;
508-
let registry = MemoryFunctionRegistry::new();
499+
let ctx = create_context();
509500

510501
let schema = schema_i32("d");
511502
let flight = Arc::new(new_network_hash_shuffle_exec(
@@ -529,7 +520,7 @@ mod tests {
529520
let mut buf = Vec::new();
530521
codec.try_encode(plan.clone(), &mut buf)?;
531522

532-
let decoded = codec.try_decode(&buf, &[sort], &registry)?;
523+
let decoded = codec.try_decode(&buf, &[sort], &ctx)?;
533524
assert_eq!(repr(&plan), repr(&decoded));
534525

535526
Ok(())
@@ -538,7 +529,7 @@ mod tests {
538529
#[test]
539530
fn test_roundtrip_single_flight_coalesce() -> datafusion::common::Result<()> {
540531
let codec = DistributedCodec;
541-
let registry = MemoryFunctionRegistry::new();
532+
let ctx = create_context();
542533

543534
let schema = schema_i32("e");
544535
let plan: Arc<dyn ExecutionPlan> = Arc::new(new_network_coalesce_tasks_exec(
@@ -550,7 +541,7 @@ mod tests {
550541
let mut buf = Vec::new();
551542
codec.try_encode(plan.clone(), &mut buf)?;
552543

553-
let decoded = codec.try_decode(&buf, &[empty_exec()], &registry)?;
544+
let decoded = codec.try_decode(&buf, &[empty_exec()], &ctx)?;
554545
assert_eq!(repr(&plan), repr(&decoded));
555546

556547
Ok(())
@@ -559,7 +550,7 @@ mod tests {
559550
#[test]
560551
fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> {
561552
let codec = DistributedCodec;
562-
let registry = MemoryFunctionRegistry::new();
553+
let ctx = create_context();
563554

564555
let schema = schema_i32("f");
565556
let flight = Arc::new(new_network_coalesce_tasks_exec(
@@ -574,7 +565,7 @@ mod tests {
574565
let mut buf = Vec::new();
575566
codec.try_encode(plan.clone(), &mut buf)?;
576567

577-
let decoded = codec.try_decode(&buf, &[flight], &registry)?;
568+
let decoded = codec.try_decode(&buf, &[flight], &ctx)?;
578569
assert_eq!(repr(&plan), repr(&decoded));
579570

580571
Ok(())
@@ -583,7 +574,7 @@ mod tests {
583574
#[test]
584575
fn test_roundtrip_isolator_union_coalesce() -> datafusion::common::Result<()> {
585576
let codec = DistributedCodec;
586-
let registry = MemoryFunctionRegistry::new();
577+
let ctx = create_context();
587578

588579
let schema = schema_i32("g");
589580
let left = Arc::new(new_network_coalesce_tasks_exec(
@@ -597,14 +588,14 @@ mod tests {
597588
dummy_stage(),
598589
));
599590

600-
let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()]));
591+
let union = UnionExec::try_new(vec![left.clone(), right.clone()])?;
601592
let plan: Arc<dyn ExecutionPlan> =
602593
Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 3)?);
603594

604595
let mut buf = Vec::new();
605596
codec.try_encode(plan.clone(), &mut buf)?;
606597

607-
let decoded = codec.try_decode(&buf, &[union], &registry)?;
598+
let decoded = codec.try_decode(&buf, &[union], &ctx)?;
608599
assert_eq!(repr(&plan), repr(&decoded));
609600

610601
Ok(())

0 commit comments

Comments
 (0)