Skip to content

Commit 0e030ec

Browse files
committed
update to df 50
1 parent e428e7f commit 0e030ec

File tree

11 files changed

+440
-421
lines changed

11 files changed

+440
-421
lines changed

Cargo.lock

Lines changed: 391 additions & 384 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,27 @@
22
members = ["benchmarks"]
33

44
[workspace.dependencies]
5-
datafusion = { version = "49.0.0" }
6-
datafusion-proto = { version = "49.0.0" }
5+
datafusion = { version = "50.0.0" }
6+
datafusion-proto = { version = "50.0.0" }
77

88
[package]
99
name = "datafusion-distributed"
10-
version = "0.1.0"
11-
edition = "2021"
10+
version = "0.1.1"
11+
edition = "2024"
12+
13+
[workspace.package]
14+
edition = "2024"
15+
rust-version = "1.86.0"
1216

1317
[dependencies]
1418
chrono = { version = "0.4.42" }
1519
datafusion = { workspace = true }
1620
datafusion-proto = { workspace = true }
17-
arrow-flight = "55.2.0"
21+
arrow-flight = "56.1.0"
1822
async-trait = "0.1.88"
1923
tokio = { version = "1.46.1", features = ["full"] }
20-
# Fixed to 0.12.3 because of arrow-flight
21-
tonic = { version = "0.12.3", features = ["transport"] }
24+
# Updated to 0.13.1 to match arrow-flight 56.1.0
25+
tonic = { version = "0.13.1", features = ["transport"] }
2226
tower = "0.5.2"
2327
http = "1.3.1"
2428
itertools = "0.14.0"
@@ -30,13 +34,14 @@ dashmap = "6.1.0"
3034
prost = "0.13.5"
3135
rand = "0.8.5"
3236
object_store = "0.12.3"
37+
chrono = { version = "0.4.42" }
3338

3439
# integration_tests deps
3540
insta = { version = "1.43.1", features = ["filters"], optional = true }
36-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
37-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
38-
parquet = { version = "55.2.0", optional = true }
39-
arrow = { version = "55.2.0", optional = true }
41+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
42+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
43+
parquet = { version = "56.1.0", optional = true }
44+
arrow = { version = "56.1.0", optional = true }
4045
tokio-stream = { version = "0.1.17", optional = true }
4146
hyper-util = { version = "0.1.16", optional = true }
4247

@@ -56,9 +61,9 @@ tpch = ["integration"]
5661
[dev-dependencies]
5762
structopt = "0.3"
5863
insta = { version = "1.43.1", features = ["filters"] }
59-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
60-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
61-
parquet = "55.2.0"
62-
arrow = "55.2.0"
64+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
65+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
66+
parquet = "56.1.0"
67+
arrow = "56.1.0"
6368
tokio-stream = "0.1.17"
6469
hyper-util = "0.1.16"

benchmarks/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ datafusion = { workspace = true }
99
datafusion-proto = { workspace = true }
1010
datafusion-distributed = { path = "..", features = ["integration"] }
1111
tokio = { version = "1.46.1", features = ["full"] }
12-
parquet = { version = "55.2.0" }
12+
parquet = { version = "56.1.0" }
1313
structopt = { version = "0.3.26" }
1414
log = "0.4.27"
1515
serde = "1.0.219"

rust-toolchain.toml

Lines changed: 0 additions & 3 deletions
This file was deleted.

src/channel_resolver_ext.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use datafusion::common::exec_datafusion_err;
33
use datafusion::error::DataFusionError;
44
use datafusion::prelude::SessionConfig;
55
use std::sync::Arc;
6-
use tonic::body::BoxBody;
6+
use tonic::body::Body;
77
use url::Url;
88

99
pub(crate) fn set_distributed_channel_resolver(
@@ -27,8 +27,8 @@ pub(crate) fn get_distributed_channel_resolver(
2727
struct ChannelResolverExtension(Arc<dyn ChannelResolver + Send + Sync>);
2828

2929
pub type BoxCloneSyncChannel = tower::util::BoxCloneSyncService<
30-
http::Request<BoxBody>,
31-
http::Response<BoxBody>,
30+
http::Request<Body>,
31+
http::Response<Body>,
3232
tonic::transport::Error,
3333
>;
3434

src/errors/arrow_error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ impl ArrowErrorProto {
129129
inner: Some(ArrowErrorInnerProto::RunEndIndexOverflowError(true)),
130130
ctx: ctx.cloned(),
131131
},
132+
ArrowError::OffsetOverflowError(size) => ArrowErrorProto {
133+
inner: Some(ArrowErrorInnerProto::ExternalError(format!("Offset overflow error: {}", size))),
134+
ctx: ctx.cloned(),
135+
},
132136
}
133137
}
134138

src/errors/schema_error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ impl TableReferenceProto {
168168
impl SchemaErrorProto {
169169
pub fn from_schema_error(err: &SchemaError, backtrace: Option<&String>) -> Self {
170170
match err {
171-
SchemaError::AmbiguousReference { ref field } => SchemaErrorProto {
171+
SchemaError::AmbiguousReference { field } => SchemaErrorProto {
172172
inner: Some(SchemaErrorInnerProto::AmbiguousReference(
173173
AmbiguousReferenceProto {
174174
field: Some(ColumnProto::from_column(field)),

src/metrics/proto.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,10 +381,10 @@ pub fn metric_proto_to_df(metric: MetricProto) -> Result<Arc<Metric>, DataFusion
381381
)))
382382
}
383383
Some(MetricValueProto::StartTimestamp(start_ts)) => match start_ts.value {
384-
Some(value) => {
384+
Some(_value) => {
385385
let timestamp = Timestamp::new();
386386
#[cfg(feature = "integration")]
387-
timestamp.set(DateTime::from_timestamp_nanos(value));
387+
timestamp.set(DateTime::from_timestamp_nanos(_value));
388388
#[cfg(not(feature = "integration"))]
389389
{ /* DateTime not available without integration feature */ }
390390
Ok(Arc::new(Metric::new_with_labels(
@@ -400,10 +400,10 @@ pub fn metric_proto_to_df(metric: MetricProto) -> Result<Arc<Metric>, DataFusion
400400
))),
401401
},
402402
Some(MetricValueProto::EndTimestamp(end_ts)) => match end_ts.value {
403-
Some(value) => {
403+
Some(_value) => {
404404
let timestamp = Timestamp::new();
405405
#[cfg(feature = "integration")]
406-
timestamp.set(DateTime::from_timestamp_nanos(value));
406+
timestamp.set(DateTime::from_timestamp_nanos(_value));
407407
#[cfg(not(feature = "integration"))]
408408
{ /* DateTime not available without integration feature */ }
409409
Ok(Arc::new(Metric::new_with_labels(

src/protobuf/distributed_codec.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::get_distributed_user_codec;
22
use crate::common::ComposedPhysicalExtensionCodec;
33
use crate::{ArrowFlightReadExec, PartitionIsolatorExec};
44
use datafusion::arrow::datatypes::Schema;
5-
use datafusion::execution::FunctionRegistry;
5+
use datafusion::execution::{FunctionRegistry, context::SessionContext};
66
use datafusion::physical_plan::ExecutionPlan;
77
use datafusion::prelude::SessionConfig;
88
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
@@ -19,7 +19,7 @@ use std::sync::Arc;
1919
pub struct DistributedCodec;
2020

2121
impl DistributedCodec {
22-
pub fn new_combined_with_user(cfg: &SessionConfig) -> impl PhysicalExtensionCodec {
22+
pub fn new_combined_with_user(cfg: &SessionConfig) -> impl PhysicalExtensionCodec + use<> {
2323
let mut combined_codec = ComposedPhysicalExtensionCodec::default();
2424
combined_codec.push(DistributedCodec {});
2525
if let Some(ref user_codec) = get_distributed_user_codec(cfg) {
@@ -34,7 +34,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
3434
&self,
3535
buf: &[u8],
3636
inputs: &[Arc<dyn ExecutionPlan>],
37-
registry: &dyn FunctionRegistry,
37+
_registry: &dyn FunctionRegistry,
3838
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
3939
let DistributedExecProto {
4040
node: Some(distributed_exec_node),
@@ -56,9 +56,12 @@ impl PhysicalExtensionCodec for DistributedCodec {
5656
.map(|s| s.try_into())
5757
.ok_or(proto_error("ArrowFlightReadExec is missing schema"))??;
5858

59+
// Create a default SessionContext for the protobuf parsing
60+
// TODO: This loses the original function registry, but DataFusion 50.0.0 requires SessionContext
61+
let session_ctx = SessionContext::new();
5962
let partioning = parse_protobuf_partitioning(
6063
partitioning.as_ref(),
61-
registry,
64+
&session_ctx,
6265
&schema,
6366
&DistributedCodec {},
6467
)?

src/protobuf/stage_proto.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::execution_plans::{ExecutionTask, StageExec};
22
use datafusion::{
33
common::internal_datafusion_err,
44
error::{DataFusionError, Result},
5-
execution::{runtime_env::RuntimeEnv, FunctionRegistry},
5+
execution::{runtime_env::RuntimeEnv, SessionState, context::SessionContext},
66
physical_plan::ExecutionPlan,
77
};
88
use datafusion_proto::{
@@ -100,21 +100,22 @@ pub fn proto_from_stage(
100100

101101
pub fn stage_from_proto(
102102
msg: StageExecProto,
103-
registry: &dyn FunctionRegistry,
103+
session_state: &SessionState,
104104
runtime: &RuntimeEnv,
105105
codec: &dyn PhysicalExtensionCodec,
106106
) -> Result<StageExec> {
107107
let plan_node = msg.plan.ok_or(internal_datafusion_err!(
108108
"ExecutionStageMsg is missing the plan"
109109
))?;
110110

111-
let plan = plan_node.try_into_physical_plan(registry, runtime, codec)?;
111+
let session_ctx = SessionContext::new_with_state(session_state.clone());
112+
let plan = plan_node.try_into_physical_plan(&session_ctx, runtime, codec)?;
112113

113114
let inputs = msg
114115
.inputs
115116
.into_iter()
116117
.map(|s| {
117-
stage_from_proto(s, registry, runtime, codec)
118+
stage_from_proto(s, session_state, runtime, codec)
118119
.map(|s| Arc::new(s) as Arc<dyn ExecutionPlan>)
119120
})
120121
.collect::<Result<Vec<_>>>()?;
@@ -229,7 +230,7 @@ mod tests {
229230
// Convert back to ExecutionStage
230231
let round_trip_stage = stage_from_proto(
231232
decoded_msg,
232-
&ctx,
233+
&ctx.state(),
233234
ctx.runtime_env().as_ref(),
234235
&DefaultPhysicalExtensionCodec {},
235236
)?;

0 commit comments

Comments
 (0)