Skip to content

Commit 4195eab

Browse files
committed
cargo fix and fmt
1 parent 6b1c747 commit 4195eab

File tree

12 files changed

+22
-70
lines changed

12 files changed

+22
-70
lines changed

src/flight_service/do_get.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,19 @@
1-
use crate::composed_extension_codec::ComposedPhysicalExtensionCodec;
21
use crate::errors::datafusion_error_to_tonic_status;
32
use crate::flight_service::service::ArrowFlightEndpoint;
43
use crate::plan::DistributedCodec;
5-
use crate::stage::{stage_from_proto, ExecutionStage, ExecutionStageProto};
4+
use crate::stage::{stage_from_proto, ExecutionStageProto};
65
use arrow_flight::encode::FlightDataEncoderBuilder;
76
use arrow_flight::error::FlightError;
87
use arrow_flight::flight_service_server::FlightService;
98
use arrow_flight::Ticket;
10-
use datafusion::error::DataFusionError;
119
use datafusion::execution::SessionStateBuilder;
1210
use datafusion::optimizer::OptimizerConfig;
13-
use datafusion::physical_expr::{Partitioning, PhysicalExpr};
1411
use datafusion::physical_plan::ExecutionPlan;
15-
use datafusion_proto::physical_plan::from_proto::parse_physical_exprs;
16-
use datafusion_proto::physical_plan::to_proto::serialize_physical_exprs;
17-
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
18-
use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode};
12+
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
1913
use futures::TryStreamExt;
2014
use prost::Message;
2115
use std::sync::Arc;
2216
use tonic::{Request, Response, Status};
23-
use uuid::Uuid;
2417

2518
#[derive(Clone, PartialEq, ::prost::Message)]
2619
pub struct DoGet {

src/physical_optimizer.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
use std::sync::Arc;
22

33
use datafusion::{
4-
catalog::memory::DataSourceExec,
54
common::{
6-
internal_datafusion_err, internal_err,
7-
tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor},
8-
DataFusionError,
5+
internal_datafusion_err,
6+
tree_node::{Transformed, TreeNode, TreeNodeRewriter},
97
},
108
config::ConfigOptions,
11-
datasource::physical_plan::{FileScanConfig, FileSource},
9+
datasource::physical_plan::FileSource,
1210
error::Result,
1311
physical_optimizer::PhysicalOptimizerRule,
1412
physical_plan::{
15-
displayable, execution_plan::need_data_exchange, repartition::RepartitionExec,
16-
ExecutionPlan, ExecutionPlanProperties,
13+
displayable, repartition::RepartitionExec, ExecutionPlan, ExecutionPlanProperties,
1714
},
1815
};
1916
use datafusion_proto::physical_plan::PhysicalExtensionCodec;

src/plan/arrow_flight_read.rs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,21 @@
11
use crate::channel_manager::ChannelManager;
2-
use crate::composed_extension_codec::ComposedPhysicalExtensionCodec;
3-
use crate::errors::tonic_status_to_datafusion_error;
42
use crate::flight_service::DoGet;
5-
use crate::plan::codec::DistributedCodec;
63
use crate::stage::{ExecutionStage, ExecutionStageProto};
7-
use arrow_flight::decode::FlightRecordBatchStream;
8-
use arrow_flight::error::FlightError;
9-
use arrow_flight::{FlightClient, FlightData, Ticket};
4+
use arrow_flight::{FlightClient, Ticket};
105
use datafusion::arrow::datatypes::SchemaRef;
11-
use datafusion::common::runtime::JoinSet;
12-
use datafusion::common::{internal_datafusion_err, internal_err, plan_err};
13-
use datafusion::error::DataFusionError;
6+
use datafusion::common::{internal_datafusion_err, plan_err};
147
use datafusion::error::Result;
158
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
169
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
1710
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
1811
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1912
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
20-
use futures::channel::oneshot;
2113
use futures::{future, TryFutureExt, TryStreamExt};
2214
use prost::Message;
2315
use std::any::Any;
2416
use std::fmt::Formatter;
25-
use std::future::Future;
26-
use std::io::Bytes;
27-
use std::sync::{self, Arc};
17+
use std::sync::Arc;
2818
use tonic::transport::Channel;
29-
use tonic::{IntoRequest, Streaming};
3019
use url::Url;
3120

3221
use super::combined::CombinedRecordBatchStream;

src/plan/codec.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
use crate::plan::arrow_flight_read::ArrowFlightReadExec;
2-
use datafusion::arrow::datatypes::{Schema, SchemaRef};
3-
use datafusion::error::DataFusionError;
2+
use datafusion::arrow::datatypes::Schema;
43
use datafusion::execution::FunctionRegistry;
54
use datafusion::physical_plan::ExecutionPlan;
65
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
76
use datafusion_proto::physical_plan::to_proto::serialize_partitioning;
8-
use datafusion_proto::physical_plan::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec};
7+
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
98
use datafusion_proto::protobuf;
109
use datafusion_proto::protobuf::proto_error;
1110
use prost::Message;
1211
use std::sync::Arc;
13-
use url::Url;
14-
use uuid::Uuid;
1512

1613
use super::PartitionIsolatorExec;
1714

src/plan/combined.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ use std::{
44
task::{Context, Poll},
55
};
66

7+
use datafusion::error::Result;
78
use datafusion::{
89
arrow::{array::RecordBatch, datatypes::SchemaRef},
910
common::internal_err,
1011
error::DataFusionError,
1112
execution::{RecordBatchStream, SendableRecordBatchStream},
1213
};
13-
use datafusion::{common::internal_datafusion_err, error::Result};
1414
use futures::Stream;
1515

1616
pub(crate) struct CombinedRecordBatchStream {

src/plan/isolator.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{fmt::Formatter, sync::Arc};
22

33
use datafusion::{
4-
common::internal_datafusion_err,
54
error::Result,
65
execution::SendableRecordBatchStream,
76
physical_plan::{

src/plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ mod isolator;
55

66
pub use arrow_flight_read::ArrowFlightReadExec;
77
pub use codec::DistributedCodec;
8-
pub use isolator::{PartitionGroup, PartitionIsolatorExec};
8+
pub use isolator::PartitionIsolatorExec;

src/stage/display.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,11 @@
1010
///
1111
/// In the meantime, we can make a dummy ExecutionPlan that will let us render
1212
/// the Stage tree.
13-
use std::{
14-
fmt::{Display, Formatter, Write},
15-
sync::Arc,
16-
};
13+
use std::fmt::Write;
1714

1815
use datafusion::{
19-
common::{
20-
internal_datafusion_err,
21-
tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor},
22-
},
2316
error::Result,
24-
physical_expr::EquivalenceProperties,
25-
physical_plan::{
26-
displayable,
27-
execution_plan::{Boundedness, EmissionType},
28-
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
29-
},
17+
physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan},
3018
};
3119

3220
use crate::{

src/stage/proto.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use std::sync::Arc;
22

33
use datafusion::{
4-
common::{internal_datafusion_err, internal_err},
4+
common::internal_datafusion_err,
55
error::{DataFusionError, Result},
66
execution::{runtime_env::RuntimeEnv, FunctionRegistry},
77
physical_plan::ExecutionPlan,
88
};
99
use datafusion_proto::{
10-
physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec},
10+
physical_plan::{AsExecutionPlan, PhysicalExtensionCodec},
1111
protobuf::PhysicalPlanNode,
1212
};
1313
use prost::Message;

src/stage/stage.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
1-
use std::any::Any;
2-
use std::cell::RefCell;
31
use std::sync::atomic::{AtomicU64, Ordering};
42
use std::sync::Arc;
53

64
use datafusion::common::internal_err;
75
use datafusion::error::{DataFusionError, Result};
86
use datafusion::execution::TaskContext;
9-
use datafusion::physical_plan::{displayable, DisplayAs, DisplayFormatType, ExecutionPlan};
7+
use datafusion::physical_plan::{displayable, ExecutionPlan};
108
use datafusion::prelude::SessionContext;
119
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
1210

1311
use itertools::Itertools;
1412
use rand::Rng;
15-
use tokio::sync::RwLock;
1613
use url::Url;
1714

1815
use crate::task::ExecutionTask;
@@ -150,7 +147,7 @@ impl ExecutionStage {
150147
}
151148

152149
pub fn try_assign(
153-
mut self,
150+
self,
154151
channel_manager: impl TryInto<ChannelManager, Error = DataFusionError>,
155152
) -> Result<Self> {
156153
let urls: Vec<Url> = channel_manager.try_into()?.get_urls()?;

0 commit comments

Comments
 (0)