Skip to content

Commit e722513

Browse files
committed
big fmt
1 parent 8351925 commit e722513

22 files changed

+65
-55
lines changed

src/common/ttl_map.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ use dashmap::{DashMap, Entry};
2727
use datafusion::error::DataFusionError;
2828
use std::collections::HashSet;
2929
use std::hash::Hash;
30+
use std::sync::Arc;
3031
use std::sync::atomic::AtomicU64;
3132
#[cfg(test)]
3233
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
33-
use std::sync::Arc;
3434
use std::time::Duration;
3535
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
3636

@@ -289,7 +289,7 @@ where
289289
mod tests {
290290
use super::*;
291291
use std::sync::atomic::Ordering;
292-
use tokio::time::{sleep, Duration};
292+
use tokio::time::{Duration, sleep};
293293

294294
#[tokio::test]
295295
async fn test_basic_insert_and_get() {

src/config_extension_ext.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use datafusion::common::{internal_datafusion_err, DataFusionError};
1+
use datafusion::common::{DataFusionError, internal_datafusion_err};
22
use datafusion::config::ConfigExtension;
33
use datafusion::prelude::SessionConfig;
44
use http::{HeaderMap, HeaderName};
@@ -84,8 +84,8 @@ impl ContextGrpcMetadata {
8484
#[cfg(test)]
8585
mod tests {
8686
use crate::config_extension_ext::{
87-
set_distributed_option_extension, set_distributed_option_extension_from_headers,
88-
ContextGrpcMetadata,
87+
ContextGrpcMetadata, set_distributed_option_extension,
88+
set_distributed_option_extension_from_headers,
8989
};
9090
use datafusion::common::extensions_options;
9191
use datafusion::config::ConfigExtension;

src/distributed_ext.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
use crate::ChannelResolver;
12
use crate::channel_resolver_ext::set_distributed_channel_resolver;
23
use crate::config_extension_ext::{
34
set_distributed_option_extension, set_distributed_option_extension_from_headers,
45
};
56
use crate::protobuf::set_distributed_user_codec;
6-
use crate::ChannelResolver;
77
use datafusion::common::DataFusionError;
88
use datafusion::config::ConfigExtension;
99
use datafusion::execution::{SessionState, SessionStateBuilder};

src/distributed_physical_optimizer_rule.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use datafusion::{
1212
config::ConfigOptions,
1313
error::Result,
1414
physical_optimizer::PhysicalOptimizerRule,
15-
physical_plan::{repartition::RepartitionExec, ExecutionPlan},
15+
physical_plan::{ExecutionPlan, repartition::RepartitionExec},
1616
};
1717
use uuid::Uuid;
1818

@@ -83,13 +83,14 @@ impl DistributedPhysicalOptimizerRule {
8383
internal_datafusion_err!("Expected RepartitionExec to have a child"),
8484
)?);
8585

86-
let maybe_isolated_plan =
87-
if let Some(ppt) = self.partitions_per_task && can_be_divided(&plan)? {
88-
let isolated = Arc::new(PartitionIsolatorExec::new(child, ppt));
89-
plan.with_new_children(vec![isolated])?
90-
} else {
91-
plan
92-
};
86+
let maybe_isolated_plan = if let Some(ppt) = self.partitions_per_task
87+
&& can_be_divided(&plan)?
88+
{
89+
let isolated = Arc::new(PartitionIsolatorExec::new(child, ppt));
90+
plan.with_new_children(vec![isolated])?
91+
} else {
92+
plan
93+
};
9394

9495
return Ok(Transformed::yes(Arc::new(
9596
ArrowFlightReadExec::new_pending(Arc::clone(&maybe_isolated_plan)),

src/errors/arrow_error.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,10 @@ impl ArrowErrorProto {
130130
ctx: ctx.cloned(),
131131
},
132132
ArrowError::OffsetOverflowError(size) => ArrowErrorProto {
133-
inner: Some(ArrowErrorInnerProto::ExternalError(format!("Offset overflow error: {}", size))),
133+
inner: Some(ArrowErrorInnerProto::ExternalError(format!(
134+
"Offset overflow error: {}",
135+
size
136+
))),
134137
ctx: ctx.cloned(),
135138
},
136139
}

src/execution_plans/arrow_flight_read.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1+
use crate::ChannelResolver;
12
use crate::channel_resolver_ext::get_distributed_channel_resolver;
23
use crate::config_extension_ext::ContextGrpcMetadata;
34
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
45
use crate::execution_plans::StageExec;
56
use crate::flight_service::DoGet;
67
use crate::metrics::proto::MetricsSetProto;
7-
use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey};
8-
use crate::ChannelResolver;
8+
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
9+
use arrow_flight::Ticket;
910
use arrow_flight::decode::FlightRecordBatchStream;
1011
use arrow_flight::error::FlightError;
1112
use arrow_flight::flight_service_client::FlightServiceClient;
12-
use arrow_flight::Ticket;
1313
use dashmap::DashMap;
1414
use datafusion::arrow::datatypes::SchemaRef;
1515
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
@@ -27,8 +27,8 @@ use prost::Message;
2727
use std::any::Any;
2828
use std::fmt::Formatter;
2929
use std::sync::Arc;
30-
use tonic::metadata::MetadataMap;
3130
use tonic::Request;
31+
use tonic::metadata::MetadataMap;
3232

3333
/// This node has two variants.
3434
/// 1. Pending: it acts as a placeholder for the distributed optimization step to mark it as ready.
@@ -158,7 +158,9 @@ impl ExecutionPlan for ArrowFlightReadExec {
158158
context: Arc<TaskContext>,
159159
) -> Result<SendableRecordBatchStream, DataFusionError> {
160160
let ArrowFlightReadExec::Ready(self_ready) = self else {
161-
return exec_err!("ArrowFlightReadExec is not ready, was the distributed optimization step performed?");
161+
return exec_err!(
162+
"ArrowFlightReadExec is not ready, was the distributed optimization step performed?"
163+
);
162164
};
163165

164166
// get the channel manager and current stage from our context

src/execution_plans/metrics.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::collections::HashMap;
44
use std::sync::Arc;
55

66
use crate::execution_plans::{ArrowFlightReadExec, StageExec};
7-
use crate::metrics::proto::{metrics_set_proto_to_df, MetricsSetProto};
7+
use crate::metrics::proto::{MetricsSetProto, metrics_set_proto_to_df};
88
use crate::protobuf::StageKey;
99
use datafusion::common::internal_err;
1010
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
@@ -143,7 +143,11 @@ impl TaskMetricsRewriter {
143143
) -> Result<Arc<dyn ExecutionPlan>> {
144144
let transformed = plan.rewrite(&mut self)?;
145145
if self.idx != self.metrics.len() {
146-
return internal_err!("too many metrics sets provided to rewrite task: {} metrics sets provided, {} nodes in plan", self.metrics.len(), self.idx);
146+
return internal_err!(
147+
"too many metrics sets provided to rewrite task: {} metrics sets provided, {} nodes in plan",
148+
self.metrics.len(),
149+
self.idx
150+
);
147151
}
148152
Ok(transformed.data)
149153
}
@@ -269,11 +273,11 @@ mod tests {
269273
use datafusion::arrow::array::{Int32Array, StringArray};
270274
use datafusion::arrow::record_batch::RecordBatch;
271275

272-
use crate::test_utils::in_memory_channel_resolver::InMemoryChannelResolver;
273-
use crate::test_utils::session_context::register_temp_parquet_table;
274276
use crate::DistributedExt;
275277
use crate::DistributedPhysicalOptimizerRule;
276-
use datafusion::execution::{context::SessionContext, SessionStateBuilder};
278+
use crate::test_utils::in_memory_channel_resolver::InMemoryChannelResolver;
279+
use crate::test_utils::session_context::register_temp_parquet_table;
280+
use datafusion::execution::{SessionStateBuilder, context::SessionContext};
277281
use datafusion::physical_plan::metrics::MetricValue;
278282
use datafusion::prelude::SessionConfig;
279283
use datafusion::{

src/execution_plans/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ mod stage;
55

66
pub use arrow_flight_read::ArrowFlightReadExec;
77
pub use partition_isolator::{PartitionGroup, PartitionIsolatorExec};
8-
pub use stage::{display_plan_graphviz, ExecutionTask, StageExec};
8+
pub use stage::{ExecutionTask, StageExec, display_plan_graphviz};

src/execution_plans/stage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use datafusion::common::{
77
use datafusion::error::{DataFusionError, Result};
88
use datafusion::execution::TaskContext;
99
use datafusion::physical_plan::{
10-
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
10+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, displayable,
1111
};
1212
use datafusion::prelude::SessionContext;
1313
use itertools::Itertools;

src/flight_service/do_get.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ use crate::errors::datafusion_error_to_tonic_status;
33
use crate::execution_plans::{PartitionGroup, StageExec};
44
use crate::flight_service::service::ArrowFlightEndpoint;
55
use crate::flight_service::session_builder::DistributedSessionBuilderContext;
6-
use crate::protobuf::{stage_from_proto, DistributedCodec, StageExecProto, StageKey};
6+
use crate::protobuf::{DistributedCodec, StageExecProto, StageKey, stage_from_proto};
7+
use arrow_flight::Ticket;
78
use arrow_flight::encode::FlightDataEncoderBuilder;
89
use arrow_flight::error::FlightError;
910
use arrow_flight::flight_service_server::FlightService;
10-
use arrow_flight::Ticket;
1111
use datafusion::execution::{SendableRecordBatchStream, SessionState};
1212
use futures::TryStreamExt;
1313
use http::HeaderMap;
1414
use prost::Message;
1515
use std::fmt::Display;
16-
use std::sync::atomic::{AtomicUsize, Ordering};
1716
use std::sync::Arc;
17+
use std::sync::atomic::{AtomicUsize, Ordering};
1818
use tokio::sync::OnceCell;
1919
use tonic::{Request, Response, Status};
2020

@@ -172,17 +172,17 @@ fn record_batch_stream_to_response(
172172
#[cfg(test)]
173173
mod tests {
174174
use super::*;
175+
use crate::ExecutionTask;
175176
use crate::flight_service::session_builder::DefaultSessionBuilder;
176177
use crate::protobuf::proto_from_stage;
177-
use crate::ExecutionTask;
178178
use arrow::datatypes::{Schema, SchemaRef};
179179
use arrow_flight::Ticket;
180180
use datafusion::physical_expr::Partitioning;
181+
use datafusion::physical_plan::ExecutionPlan;
181182
use datafusion::physical_plan::empty::EmptyExec;
182183
use datafusion::physical_plan::repartition::RepartitionExec;
183-
use datafusion::physical_plan::ExecutionPlan;
184184
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
185-
use prost::{bytes::Bytes, Message};
185+
use prost::{Message, bytes::Bytes};
186186
use tonic::Request;
187187
use uuid::Uuid;
188188

0 commit comments

Comments
 (0)