Skip to content

Commit fa8b1c2

Browse files
authored
Upgrade to DF v51 (#236)
1 parent a8952c7 commit fa8b1c2

27 files changed

+1070
-956
lines changed

Cargo.lock

Lines changed: 580 additions & 485 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
members = ["benchmarks"]
33

44
[workspace.dependencies]
5-
datafusion = { version = "50.0.0", default-features = false }
6-
datafusion-proto = { version = "50.0.0" }
5+
datafusion = { version = "51.0.0", default-features = false }
6+
datafusion-proto = { version = "51.0.0" }
77

88
[package]
99
name = "datafusion-distributed"
@@ -12,14 +12,18 @@ edition = "2024"
1212

1313
[dependencies]
1414
chrono = { version = "0.4.42" }
15-
datafusion = { workspace = true }
15+
datafusion = { workspace = true, features = [
16+
"parquet",
17+
"sql",
18+
"unicode_expressions",
19+
"datetime_expressions",
20+
] }
1621
datafusion-proto = { workspace = true }
17-
arrow-flight = "56.1.0"
18-
arrow-select = "56.1.0"
22+
arrow-flight = "57.0.0"
23+
arrow-select = "57.0.0"
1924
async-trait = "0.1.88"
2025
tokio = { version = "1.46.1", features = ["full"] }
21-
# Updated to 0.13.1 to match arrow-flight 56.1.0
22-
tonic = { version = "0.13.1", features = ["transport"] }
26+
tonic = { version = "0.14.2", features = ["transport"] }
2327
tower = "0.5.2"
2428
http = "1.3.1"
2529
itertools = "0.14.0"
@@ -28,18 +32,18 @@ url = "2.5.4"
2832
uuid = "1.17.0"
2933
delegate = "0.13.4"
3034
dashmap = "6.1.0"
31-
prost = "0.13.5"
35+
prost = "0.14.0"
3236
rand = "0.8.5"
3337
object_store = "0.12.3"
3438
bytes = "1.10.1"
3539
pin-project = "1.1.10"
3640

3741
# integration_tests deps
3842
insta = { version = "1.43.1", features = ["filters"], optional = true }
39-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
40-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
41-
parquet = { version = "56.1.0", optional = true }
42-
arrow = { version = "56.1.0", optional = true }
43+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf", optional = true }
44+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf", optional = true }
45+
parquet = { version = "57.0.0", optional = true }
46+
arrow = { version = "57.0.0", optional = true }
4347
tokio-stream = { version = "0.1.17", optional = true }
4448
hyper-util = { version = "0.1.16", optional = true }
4549
pretty_assertions = { version = "1.4", optional = true }
@@ -53,18 +57,18 @@ integration = [
5357
"arrow",
5458
"tokio-stream",
5559
"hyper-util",
56-
"pretty_assertions"
60+
"pretty_assertions",
5761
]
5862

5963
tpch = ["integration"]
6064

6165
[dev-dependencies]
6266
structopt = "0.3"
6367
insta = { version = "1.43.1", features = ["filters"] }
64-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
65-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
66-
parquet = "56.1.0"
67-
arrow = "56.1.0"
68+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" }
69+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" }
70+
parquet = "57.0.0"
71+
arrow = "57.0.0"
6872
tokio-stream = "0.1.17"
6973
hyper-util = "0.1.16"
7074
pretty_assertions = "1.4"

rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[toolchain]
2-
channel = "1.86.0"
2+
channel = "1.88.0"
33
profile = "default"

src/common/ttl_map.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ mod tests {
409409
}
410410
tokio::time::sleep(Duration::from_millis(10)).await;
411411
}
412-
panic!("Assertion failed within {:?}", timeout);
412+
panic!("Assertion failed within {timeout:?}");
413413
}
414414

415415
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
@@ -543,9 +543,9 @@ mod tests {
543543
let elapsed = start_time.elapsed();
544544

545545
println!("\n=== TTLMap Lock Contention Benchmark ===");
546-
println!("Tasks: {}", task_count);
547-
println!("Total time: {:.2?}", elapsed);
548-
println!("Average latency: {:.2} ns per operation", avg_time);
546+
println!("Tasks: {task_count}");
547+
println!("Total time: {elapsed:.2?}");
548+
println!("Average latency: {avg_time:.2} ns per operation");
549549
println!("Entries remaining: {}", ttl_map.data.len());
550550
println!(
551551
"DashMap Lock contention time: {}ms",

src/distributed_ext.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pub trait DistributedExt: Sized {
137137
/// ```
138138
/// # use std::sync::Arc;
139139
/// # use datafusion::common::DataFusionError;
140-
/// # use datafusion::execution::{SessionState, FunctionRegistry, SessionStateBuilder};
140+
/// # use datafusion::execution::{SessionState, FunctionRegistry, SessionStateBuilder, TaskContext};
141141
/// # use datafusion::physical_plan::ExecutionPlan;
142142
/// # use datafusion::prelude::SessionConfig;
143143
/// # use datafusion_proto::physical_plan::PhysicalExtensionCodec;
@@ -147,7 +147,7 @@ pub trait DistributedExt: Sized {
147147
/// struct CustomExecCodec;
148148
///
149149
/// impl PhysicalExtensionCodec for CustomExecCodec {
150-
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], registry: &dyn FunctionRegistry) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
150+
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
151151
/// todo!()
152152
/// }
153153
///

src/execution_plans/network_shuffle.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ use tonic::metadata::MetadataMap;
112112
/// 2. Ready: runs within a distributed stage and queries the next input stage over the network
113113
/// using Arrow Flight.
114114
#[derive(Debug, Clone)]
115+
#[allow(clippy::large_enum_variant)]
115116
pub enum NetworkShuffleExec {
116117
Pending(NetworkShufflePendingExec),
117118
Ready(NetworkShuffleReadyExec),

src/execution_plans/partition_isolator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use std::{fmt::Formatter, sync::Arc};
5050
/// └───────────────────────────┘ └───────────────────────────┘ └───────────────────────────┘ ■
5151
/// ```
5252
#[derive(Debug)]
53+
#[allow(clippy::large_enum_variant)]
5354
pub enum PartitionIsolatorExec {
5455
Pending(PartitionIsolatorPendingExec),
5556
Ready(PartitionIsolatorReadyExec),
@@ -223,7 +224,7 @@ impl ExecutionPlan for PartitionIsolatorExec {
223224
// then look up that index in our group and execute that partition, in this
224225
// example partition 8
225226

226-
let output_stream = match partition_group.get(partition) {
227+
match partition_group.get(partition) {
227228
Some(actual_partition_number) => {
228229
if *actual_partition_number >= input_partitions {
229230
//trace!("{} returning empty stream", ctx_name);
@@ -239,8 +240,7 @@ impl ExecutionPlan for PartitionIsolatorExec {
239240
Box::pin(EmptyRecordBatchStream::new(self_ready.input.schema()))
240241
as SendableRecordBatchStream,
241242
),
242-
};
243-
output_stream
243+
}
244244
}
245245
}
246246

src/flight_service/do_get.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl ArrowFlightEndpoint {
9696
let stage_data = once
9797
.get_or_try_init(|| async {
9898
let proto_node = PhysicalPlanNode::try_decode(doget.plan_proto.as_ref())?;
99-
let mut plan = proto_node.try_into_physical_plan(&ctx, &self.runtime, &codec)?;
99+
let mut plan = proto_node.try_into_physical_plan(&ctx.task_ctx(), &codec)?;
100100
for hook in self.hooks.on_plan.iter() {
101101
plan = hook(plan)
102102
}

src/flight_service/session_builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub trait DistributedSessionBuilder {
2525
/// # use std::sync::Arc;
2626
/// # use async_trait::async_trait;
2727
/// # use datafusion::error::DataFusionError;
28-
/// # use datafusion::execution::{FunctionRegistry, SessionState, SessionStateBuilder};
28+
/// # use datafusion::execution::{FunctionRegistry, SessionState, SessionStateBuilder, TaskContext};
2929
/// # use datafusion::physical_plan::ExecutionPlan;
3030
/// # use datafusion_proto::physical_plan::PhysicalExtensionCodec;
3131
/// # use datafusion_distributed::{DistributedExt, DistributedSessionBuilder, DistributedSessionBuilderContext};
@@ -34,7 +34,7 @@ pub trait DistributedSessionBuilder {
3434
/// struct CustomExecCodec;
3535
///
3636
/// impl PhysicalExtensionCodec for CustomExecCodec {
37-
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], registry: &dyn FunctionRegistry) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
37+
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
3838
/// todo!()
3939
/// }
4040
///

src/metrics/metrics_collecting_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ where
4747

4848
let metadata =
4949
FlightAppMetadata::decode(flight_data.app_metadata.as_ref()).map_err(|e| {
50-
FlightError::ProtocolError(format!("failed to decode app_metadata: {}", e))
50+
FlightError::ProtocolError(format!("failed to decode app_metadata: {e}"))
5151
})?;
5252

5353
let Some(content) = metadata.content else {

0 commit comments

Comments
 (0)