Skip to content

Commit 7cb59e5

Browse files
committed
update to df 50
1 parent fe7161e commit 7cb59e5

File tree

10 files changed

+340
-312
lines changed

10 files changed

+340
-312
lines changed

Cargo.lock

Lines changed: 236 additions & 240 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
members = ["benchmarks"]
33

44
[workspace.dependencies]
5-
datafusion = { version = "49.0.0", default-features = false }
6-
datafusion-proto = { version = "49.0.0" }
5+
datafusion = { version = "50.0.0", default-features = false }
6+
datafusion-proto = { version = "50.0.0" }
77

88
[package]
99
name = "datafusion-distributed"
@@ -14,11 +14,11 @@ edition = "2024"
1414
chrono = { version = "0.4.42" }
1515
datafusion = { workspace = true }
1616
datafusion-proto = { workspace = true }
17-
arrow-flight = "55.2.0"
17+
arrow-flight = "56.1.0"
1818
async-trait = "0.1.88"
1919
tokio = { version = "1.46.1", features = ["full"] }
20-
# Fixed to 0.12.3 because of arrow-flight
21-
tonic = { version = "0.12.3", features = ["transport"] }
20+
# Updated to 0.13.1 to match arrow-flight 56.1.0
21+
tonic = { version = "0.13.1", features = ["transport"] }
2222
tower = "0.5.2"
2323
http = "1.3.1"
2424
itertools = "0.14.0"
@@ -33,10 +33,10 @@ object_store = "0.12.3"
3333

3434
# integration_tests deps
3535
insta = { version = "1.43.1", features = ["filters"], optional = true }
36-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
37-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
38-
parquet = { version = "55.2.0", optional = true }
39-
arrow = { version = "55.2.0", optional = true }
36+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
37+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
38+
parquet = { version = "56.1.0", optional = true }
39+
arrow = { version = "56.1.0", optional = true }
4040
tokio-stream = { version = "0.1.17", optional = true }
4141
hyper-util = { version = "0.1.16", optional = true }
4242

@@ -56,9 +56,9 @@ tpch = ["integration"]
5656
[dev-dependencies]
5757
structopt = "0.3"
5858
insta = { version = "1.43.1", features = ["filters"] }
59-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
60-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
61-
parquet = "55.2.0"
62-
arrow = "55.2.0"
59+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
60+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
61+
parquet = "56.1.0"
62+
arrow = "56.1.0"
6363
tokio-stream = "0.1.17"
6464
hyper-util = "0.1.16"

benchmarks/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ datafusion = { workspace = true }
99
datafusion-proto = { workspace = true }
1010
datafusion-distributed = { path = "..", features = ["integration"] }
1111
tokio = { version = "1.46.1", features = ["full"] }
12-
parquet = { version = "55.2.0" }
12+
parquet = { version = "56.1.0" }
1313
structopt = { version = "0.3.26" }
1414
log = "0.4.27"
1515
serde = "1.0.219"

rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[toolchain]
2-
channel = "1.85.1"
2+
channel = "1.86.0"
33
profile = "default"

src/channel_resolver_ext.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use datafusion::common::exec_datafusion_err;
33
use datafusion::error::DataFusionError;
44
use datafusion::prelude::SessionConfig;
55
use std::sync::Arc;
6-
use tonic::body::BoxBody;
6+
use tonic::body::Body;
77
use url::Url;
88

99
pub(crate) fn set_distributed_channel_resolver(
@@ -27,8 +27,8 @@ pub(crate) fn get_distributed_channel_resolver(
2727
struct ChannelResolverExtension(Arc<dyn ChannelResolver + Send + Sync>);
2828

2929
pub type BoxCloneSyncChannel = tower::util::BoxCloneSyncService<
30-
http::Request<BoxBody>,
31-
http::Response<BoxBody>,
30+
http::Request<Body>,
31+
http::Response<Body>,
3232
tonic::transport::Error,
3333
>;
3434

src/errors/arrow_error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@ impl ArrowErrorProto {
129129
inner: Some(ArrowErrorInnerProto::RunEndIndexOverflowError(true)),
130130
ctx: ctx.cloned(),
131131
},
132+
ArrowError::OffsetOverflowError(offset) => ArrowErrorProto {
133+
inner: Some(ArrowErrorInnerProto::ParseError(format!(
134+
"Offset overflow error: {offset}"
135+
))),
136+
ctx: ctx.cloned(),
137+
},
132138
}
133139
}
134140

src/flight_service/do_get.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use arrow_flight::error::FlightError;
1010
use arrow_flight::flight_service_server::FlightService;
1111
use datafusion::common::exec_datafusion_err;
1212
use datafusion::execution::SendableRecordBatchStream;
13+
use datafusion::prelude::SessionContext;
1314
use futures::TryStreamExt;
1415
use prost::Message;
1516
use std::sync::Arc;
@@ -67,6 +68,8 @@ impl ArrowFlightEndpoint {
6768
.await
6869
.map_err(|err| datafusion_error_to_tonic_status(&err))?;
6970

71+
let ctx = SessionContext::new();
72+
7073
let codec = DistributedCodec::new_combined_with_user(session_state.config());
7174

7275
// There's only 1 `StageExec` responsible for all requests that share the same `stage_key`,
@@ -79,8 +82,8 @@ impl ArrowFlightEndpoint {
7982
let stage_data = once
8083
.get_or_try_init(|| async {
8184
let stage_proto = doget.stage_proto.ok_or_else(missing("stage_proto"))?;
82-
let stage = stage_from_proto(stage_proto, &session_state, &self.runtime, &codec)
83-
.map_err(|err| {
85+
let stage =
86+
stage_from_proto(stage_proto, &ctx, &self.runtime, &codec).map_err(|err| {
8487
Status::invalid_argument(format!("Cannot decode stage proto: {err}"))
8588
})?;
8689

src/protobuf/distributed_codec.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use datafusion::execution::FunctionRegistry;
88
use datafusion::physical_expr::EquivalenceProperties;
99
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
1010
use datafusion::physical_plan::{ExecutionPlan, Partitioning, PlanProperties};
11-
use datafusion::prelude::SessionConfig;
11+
use datafusion::prelude::{SessionConfig, SessionContext};
1212
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
1313
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
1414
use datafusion_proto::physical_plan::to_proto::serialize_partitioning;
@@ -38,7 +38,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
3838
&self,
3939
buf: &[u8],
4040
inputs: &[Arc<dyn ExecutionPlan>],
41-
registry: &dyn FunctionRegistry,
41+
_registry: &dyn FunctionRegistry,
4242
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
4343
let DistributedExecProto {
4444
node: Some(distributed_exec_node),
@@ -49,6 +49,8 @@ impl PhysicalExtensionCodec for DistributedCodec {
4949
));
5050
};
5151

52+
let ctx = SessionContext::new();
53+
5254
match distributed_exec_node {
5355
DistributedExecNode::NetworkHashShuffle(NetworkShuffleExecProto {
5456
schema,
@@ -62,7 +64,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
6264

6365
let partitioning = parse_protobuf_partitioning(
6466
partitioning.as_ref(),
65-
registry,
67+
&ctx,
6668
&schema,
6769
&DistributedCodec {},
6870
)?
@@ -87,7 +89,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
8789

8890
let partitioning = parse_protobuf_partitioning(
8991
partitioning.as_ref(),
90-
registry,
92+
&ctx,
9193
&schema,
9294
&DistributedCodec {},
9395
)?

src/protobuf/stage_proto.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ use crate::execution_plans::{ExecutionTask, StageExec};
22
use datafusion::{
33
common::internal_datafusion_err,
44
error::{DataFusionError, Result},
5-
execution::{FunctionRegistry, runtime_env::RuntimeEnv},
5+
execution::runtime_env::RuntimeEnv,
66
physical_plan::ExecutionPlan,
7+
prelude::SessionContext,
78
};
89
use datafusion_proto::{
910
physical_plan::{AsExecutionPlan, PhysicalExtensionCodec},
@@ -96,22 +97,21 @@ pub fn proto_from_stage(
9697

9798
pub fn stage_from_proto(
9899
msg: StageExecProto,
99-
registry: &dyn FunctionRegistry,
100+
ctx: &SessionContext,
100101
runtime: &RuntimeEnv,
101102
codec: &dyn PhysicalExtensionCodec,
102103
) -> Result<StageExec> {
103104
let plan_node = msg.plan.ok_or(internal_datafusion_err!(
104105
"ExecutionStageMsg is missing the plan"
105106
))?;
106107

107-
let plan = plan_node.try_into_physical_plan(registry, runtime, codec)?;
108+
let plan = plan_node.try_into_physical_plan(ctx, runtime, codec)?;
108109

109110
let inputs = msg
110111
.inputs
111112
.into_iter()
112113
.map(|s| {
113-
stage_from_proto(s, registry, runtime, codec)
114-
.map(|s| Arc::new(s) as Arc<dyn ExecutionPlan>)
114+
stage_from_proto(s, ctx, runtime, codec).map(|s| Arc::new(s) as Arc<dyn ExecutionPlan>)
115115
})
116116
.collect::<Result<Vec<_>>>()?;
117117

0 commit comments

Comments
 (0)