Skip to content

Commit a4fa271

Browse files
committed
Run cargo fmt
1 parent a0dc5e5 commit a4fa271

File tree

2 files changed

+9
-10
lines changed

2 files changed

+9
-10
lines changed

src/plan/arrow_flight_read.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use super::combined::CombinedRecordBatchStream;
2+
use crate::channel_manager_ext::get_channel_resolver;
23
use crate::common::ComposedPhysicalExtensionCodec;
34
use crate::config_extension_ext::ContextGrpcMetadata;
45
use crate::errors::tonic_status_to_datafusion_error;
56
use crate::flight_service::{DoGet, StageKey};
67
use crate::plan::DistributedCodec;
78
use crate::stage::{proto_from_stage, ExecutionStage};
89
use crate::user_codec_ext::get_user_codec;
10+
use crate::ChannelResolver;
911
use arrow_flight::decode::FlightRecordBatchStream;
1012
use arrow_flight::error::FlightError;
1113
use arrow_flight::flight_service_client::FlightServiceClient;
@@ -27,8 +29,6 @@ use std::sync::Arc;
2729
use tonic::metadata::MetadataMap;
2830
use tonic::Request;
2931
use url::Url;
30-
use crate::channel_manager_ext::get_channel_resolver;
31-
use crate::ChannelResolver;
3232

3333
/// This node has two variants.
3434
/// 1. Pending: it acts as a placeholder for the distributed optimization step to mark it as ready.
@@ -160,7 +160,9 @@ impl ExecutionPlan for ArrowFlightReadExec {
160160

161161
// get the channel manager and current stage from our context
162162
let Some(channel_resolver) = get_channel_resolver(context.session_config()) else {
163-
return exec_err!("ArrowFlightReadExec requires a ChannelResolver in the session config");
163+
return exec_err!(
164+
"ArrowFlightReadExec requires a ChannelResolver in the session config"
165+
);
164166
};
165167

166168
let stage = context

src/stage/execution_stage.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ use datafusion::execution::TaskContext;
66
use datafusion::physical_plan::ExecutionPlan;
77
use datafusion::prelude::SessionContext;
88

9+
use crate::channel_manager_ext::get_channel_resolver;
10+
use crate::task::ExecutionTask;
11+
use crate::ChannelResolver;
912
use itertools::Itertools;
1013
use rand::Rng;
1114
use url::Url;
1215
use uuid::Uuid;
13-
use crate::channel_manager_ext::get_channel_resolver;
14-
use crate::ChannelResolver;
15-
use crate::task::ExecutionTask;
1616

1717
/// A unit of isolation for a portion of a physical execution plan
1818
/// that can be executed independently and across a network boundary.
@@ -171,10 +171,7 @@ impl ExecutionStage {
171171
format!("Stage {:<3}{}", self.num, child_str)
172172
}
173173

174-
pub fn try_assign(
175-
self,
176-
channel_resolver: &impl ChannelResolver
177-
) -> Result<Self> {
174+
pub fn try_assign(self, channel_resolver: &impl ChannelResolver) -> Result<Self> {
178175
let urls: Vec<Url> = channel_resolver.get_urls()?;
179176
if urls.is_empty() {
180177
return internal_err!("No URLs found in ChannelManager");

0 commit comments

Comments
 (0)