Skip to content

Commit 14a4d3a

Browse files
committed
cleanup
1 parent e722513 commit 14a4d3a

File tree

2 files changed

+9
-8
lines changed

2 files changed

+9
-8
lines changed

src/flight_service/do_get.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use arrow_flight::encode::FlightDataEncoderBuilder;
99
use arrow_flight::error::FlightError;
1010
use arrow_flight::flight_service_server::FlightService;
1111
use datafusion::execution::{SendableRecordBatchStream, SessionState};
12+
use datafusion::prelude::SessionContext;
1213
use futures::TryStreamExt;
1314
use http::HeaderMap;
1415
use prost::Message;
@@ -115,10 +116,11 @@ impl ArrowFlightEndpoint {
115116
})
116117
.await
117118
.map_err(|err| datafusion_error_to_tonic_status(&err))?;
119+
let ctx = SessionContext::new_with_state(session_state.clone());
118120

119121
let codec = DistributedCodec::new_combined_with_user(session_state.config());
120122

121-
let stage = stage_from_proto(stage_proto, &session_state, &self.runtime, &codec)
123+
let stage = stage_from_proto(stage_proto, &ctx, &self.runtime, &codec)
122124
.map_err(|err| {
123125
Status::invalid_argument(format!("Cannot decode stage proto: {err}"))
124126
})?;

src/protobuf/stage_proto.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::execution_plans::{ExecutionTask, StageExec};
22
use datafusion::{
33
common::internal_datafusion_err,
44
error::{DataFusionError, Result},
5-
execution::{SessionState, context::SessionContext, runtime_env::RuntimeEnv},
5+
execution::{runtime_env::RuntimeEnv, context::SessionContext},
66
physical_plan::ExecutionPlan,
77
};
88
use datafusion_proto::{
@@ -100,22 +100,21 @@ pub fn proto_from_stage(
100100

101101
pub fn stage_from_proto(
102102
msg: StageExecProto,
103-
session_state: &SessionState,
103+
ctx: &SessionContext,
104104
runtime: &RuntimeEnv,
105105
codec: &dyn PhysicalExtensionCodec,
106106
) -> Result<StageExec> {
107107
let plan_node = msg.plan.ok_or(internal_datafusion_err!(
108108
"ExecutionStageMsg is missing the plan"
109109
))?;
110110

111-
let session_ctx = SessionContext::new_with_state(session_state.clone());
112-
let plan = plan_node.try_into_physical_plan(&session_ctx, runtime, codec)?;
111+
let plan = plan_node.try_into_physical_plan(&ctx, runtime, codec)?;
113112

114113
let inputs = msg
115114
.inputs
116115
.into_iter()
117116
.map(|s| {
118-
stage_from_proto(s, session_state, runtime, codec)
117+
stage_from_proto(s, ctx, runtime, codec)
119118
.map(|s| Arc::new(s) as Arc<dyn ExecutionPlan>)
120119
})
121120
.collect::<Result<Vec<_>>>()?;
@@ -153,9 +152,9 @@ pub fn stage_from_proto(
153152
mod tests {
154153
use std::sync::Arc;
155154

156-
use crate::StageExec;
157155
use crate::protobuf::stage_proto::StageExecProto;
158156
use crate::protobuf::{proto_from_stage, stage_from_proto};
157+
use crate::StageExec;
159158
use datafusion::{
160159
arrow::{
161160
array::{RecordBatch, StringArray, UInt8Array},
@@ -230,7 +229,7 @@ mod tests {
230229
// Convert back to ExecutionStage
231230
let round_trip_stage = stage_from_proto(
232231
decoded_msg,
233-
&ctx.state(),
232+
&ctx,
234233
ctx.runtime_env().as_ref(),
235234
&DefaultPhysicalExtensionCodec {},
236235
)?;

0 commit comments

Comments
 (0)