Skip to content

Commit a52686e

Browse files
committed
update to df 50
1 parent acd974b commit a52686e

File tree

12 files changed

+441
-424
lines changed

12 files changed

+441
-424
lines changed

Cargo.lock

Lines changed: 391 additions & 384 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,26 @@
22
members = ["benchmarks"]
33

44
[workspace.dependencies]
5-
datafusion = { version = "49.0.0" }
6-
datafusion-proto = { version = "49.0.0" }
5+
datafusion = { version = "50.0.0" }
6+
datafusion-proto = { version = "50.0.0" }
77

88
[package]
99
name = "datafusion-distributed"
10-
version = "0.1.0"
11-
edition = "2021"
10+
version = "0.1.1"
11+
edition = "2024"
12+
13+
[workspace.package]
14+
edition = "2024"
15+
rust-version = "1.86.0"
1216

1317
[dependencies]
1418
datafusion = { workspace = true }
1519
datafusion-proto = { workspace = true }
16-
arrow-flight = "55.2.0"
20+
arrow-flight = "56.1.0"
1721
async-trait = "0.1.88"
1822
tokio = { version = "1.46.1", features = ["full"] }
19-
# Fixed to 0.12.3 because of arrow-flight
20-
tonic = { version = "0.12.3", features = ["transport"] }
23+
# Updated to 0.13.1 to match arrow-flight 56.1.0
24+
tonic = { version = "0.13.1", features = ["transport"] }
2125
tower = "0.5.2"
2226
http = "1.3.1"
2327
itertools = "0.14.0"
@@ -29,16 +33,16 @@ dashmap = "6.1.0"
2933
prost = "0.13.5"
3034
rand = "0.8.5"
3135
object_store = "0.12.3"
36+
chrono = { version = "0.4.42" }
3237

3338
# integration_tests deps
3439
insta = { version = "1.43.1", features = ["filters"], optional = true }
35-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
36-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
37-
parquet = { version = "55.2.0", optional = true }
38-
arrow = { version = "55.2.0", optional = true }
40+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
41+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
42+
parquet = { version = "56.1.0", optional = true }
43+
arrow = { version = "56.1.0", optional = true }
3944
tokio-stream = { version = "0.1.17", optional = true }
4045
hyper-util = { version = "0.1.16", optional = true }
41-
chrono = { version = "0.4.42", optional = true }
4246

4347
[features]
4448
integration = [
@@ -49,17 +53,16 @@ integration = [
4953
"arrow",
5054
"tokio-stream",
5155
"hyper-util",
52-
"chrono"
5356
]
5457

5558
tpch = ["integration"]
5659

5760
[dev-dependencies]
5861
structopt = "0.3"
5962
insta = { version = "1.43.1", features = ["filters"] }
60-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
61-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
62-
parquet = "55.2.0"
63-
arrow = "55.2.0"
63+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
64+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
65+
parquet = "56.1.0"
66+
arrow = "56.1.0"
6467
tokio-stream = "0.1.17"
6568
hyper-util = "0.1.16"

benchmarks/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ datafusion = { workspace = true }
99
datafusion-proto = { workspace = true }
1010
datafusion-distributed = { path = "..", features = ["integration"] }
1111
tokio = { version = "1.46.1", features = ["full"] }
12-
parquet = { version = "55.2.0" }
12+
parquet = { version = "56.1.0" }
1313
structopt = { version = "0.3.26" }
1414
log = "0.4.27"
1515
serde = "1.0.219"

rust-toolchain.toml

Lines changed: 0 additions & 3 deletions
This file was deleted.

src/channel_resolver_ext.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use datafusion::common::exec_datafusion_err;
33
use datafusion::error::DataFusionError;
44
use datafusion::prelude::SessionConfig;
55
use std::sync::Arc;
6-
use tonic::body::BoxBody;
6+
use tonic::body::Body;
77
use url::Url;
88

99
pub(crate) fn set_distributed_channel_resolver(
@@ -27,8 +27,8 @@ pub(crate) fn get_distributed_channel_resolver(
2727
struct ChannelResolverExtension(Arc<dyn ChannelResolver + Send + Sync>);
2828

2929
pub type BoxCloneSyncChannel = tower::util::BoxCloneSyncService<
30-
http::Request<BoxBody>,
31-
http::Response<BoxBody>,
30+
http::Request<Body>,
31+
http::Response<Body>,
3232
tonic::transport::Error,
3333
>;
3434

src/errors/arrow_error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ impl ArrowErrorProto {
129129
inner: Some(ArrowErrorInnerProto::RunEndIndexOverflowError(true)),
130130
ctx: ctx.cloned(),
131131
},
132+
ArrowError::OffsetOverflowError(size) => ArrowErrorProto {
133+
inner: Some(ArrowErrorInnerProto::ExternalError(format!("Offset overflow error: {}", size))),
134+
ctx: ctx.cloned(),
135+
},
132136
}
133137
}
134138

src/errors/schema_error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ impl TableReferenceProto {
168168
impl SchemaErrorProto {
169169
pub fn from_schema_error(err: &SchemaError, backtrace: Option<&String>) -> Self {
170170
match err {
171-
SchemaError::AmbiguousReference { ref field } => SchemaErrorProto {
171+
SchemaError::AmbiguousReference { field } => SchemaErrorProto {
172172
inner: Some(SchemaErrorInnerProto::AmbiguousReference(
173173
AmbiguousReferenceProto {
174174
field: Some(ColumnProto::from_column(field)),

src/execution_plans/stage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ pub fn display_plan(
530530
plan: &Arc<dyn ExecutionPlan>,
531531
partition_group: &[usize],
532532
stage_num: usize,
533-
distributed: bool,
533+
_distributed: bool,
534534
) -> Result<String> {
535535
// draw all plans
536536
// we need to label the nodes including depth to uniquely identify them within this task

src/metrics/proto.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,10 +381,10 @@ pub fn metric_proto_to_df(metric: MetricProto) -> Result<Arc<Metric>, DataFusion
381381
)))
382382
}
383383
Some(MetricValueProto::StartTimestamp(start_ts)) => match start_ts.value {
384-
Some(value) => {
384+
Some(_value) => {
385385
let timestamp = Timestamp::new();
386386
#[cfg(feature = "integration")]
387-
timestamp.set(DateTime::from_timestamp_nanos(value));
387+
timestamp.set(DateTime::from_timestamp_nanos(_value));
388388
#[cfg(not(feature = "integration"))]
389389
{ /* DateTime not available without integration feature */ }
390390
Ok(Arc::new(Metric::new_with_labels(
@@ -400,10 +400,10 @@ pub fn metric_proto_to_df(metric: MetricProto) -> Result<Arc<Metric>, DataFusion
400400
))),
401401
},
402402
Some(MetricValueProto::EndTimestamp(end_ts)) => match end_ts.value {
403-
Some(value) => {
403+
Some(_value) => {
404404
let timestamp = Timestamp::new();
405405
#[cfg(feature = "integration")]
406-
timestamp.set(DateTime::from_timestamp_nanos(value));
406+
timestamp.set(DateTime::from_timestamp_nanos(_value));
407407
#[cfg(not(feature = "integration"))]
408408
{ /* DateTime not available without integration feature */ }
409409
Ok(Arc::new(Metric::new_with_labels(

src/protobuf/distributed_codec.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::get_distributed_user_codec;
22
use crate::common::ComposedPhysicalExtensionCodec;
33
use crate::{ArrowFlightReadExec, PartitionIsolatorExec};
44
use datafusion::arrow::datatypes::Schema;
5-
use datafusion::execution::FunctionRegistry;
5+
use datafusion::execution::{FunctionRegistry, context::SessionContext};
66
use datafusion::physical_plan::ExecutionPlan;
77
use datafusion::prelude::SessionConfig;
88
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
@@ -19,7 +19,7 @@ use std::sync::Arc;
1919
pub struct DistributedCodec;
2020

2121
impl DistributedCodec {
22-
pub fn new_combined_with_user(cfg: &SessionConfig) -> impl PhysicalExtensionCodec {
22+
pub fn new_combined_with_user(cfg: &SessionConfig) -> impl PhysicalExtensionCodec + use<> {
2323
let mut combined_codec = ComposedPhysicalExtensionCodec::default();
2424
combined_codec.push(DistributedCodec {});
2525
if let Some(ref user_codec) = get_distributed_user_codec(cfg) {
@@ -34,7 +34,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
3434
&self,
3535
buf: &[u8],
3636
inputs: &[Arc<dyn ExecutionPlan>],
37-
registry: &dyn FunctionRegistry,
37+
_registry: &dyn FunctionRegistry,
3838
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
3939
let DistributedExecProto {
4040
node: Some(distributed_exec_node),
@@ -56,9 +56,12 @@ impl PhysicalExtensionCodec for DistributedCodec {
5656
.map(|s| s.try_into())
5757
.ok_or(proto_error("ArrowFlightReadExec is missing schema"))??;
5858

59+
// Create a default SessionContext for the protobuf parsing
60+
// TODO: This loses the original function registry, but DataFusion 50.0.0 requires SessionContext
61+
let session_ctx = SessionContext::new();
5962
let partioning = parse_protobuf_partitioning(
6063
partitioning.as_ref(),
61-
registry,
64+
&session_ctx,
6265
&schema,
6366
&DistributedCodec {},
6467
)?

0 commit comments

Comments
 (0)