diff --git a/Cargo.toml b/Cargo.toml index a5db2cf..97ca8fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ datafusion-proto = { version = "49.0.0" } [package] name = "datafusion-distributed" version = "0.1.0" -edition = "2021" +edition = "2024" [dependencies] chrono = { version = "0.4.42" } diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 8cab583..1c1fa63 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "datafusion-distributed-benchmarks" version = "0.1.0" -edition = "2021" +edition = "2024" default-run = "dfbench" [dependencies] diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs index 4d036e4..2371cdd 100644 --- a/benchmarks/src/tpch/convert.rs +++ b/benchmarks/src/tpch/convert.rs @@ -22,8 +22,8 @@ use std::path::{Path, PathBuf}; use datafusion::common::not_impl_err; -use super::get_tbl_tpch_table_schema; use super::TPCH_TABLES; +use super::get_tbl_tpch_table_schema; use datafusion::error::Result; use datafusion::prelude::*; use parquet::basic::Compression; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 7df2b19..618f299 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -16,8 +16,8 @@ // under the License. use super::{ - get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_QUERY_END_ID, - TPCH_QUERY_START_ID, TPCH_TABLES, + TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, get_query_sql, get_tbl_tpch_table_schema, + get_tpch_table_schema, }; use crate::util::{ BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryIter, @@ -29,21 +29,21 @@ use datafusion::arrow::util::pretty::pretty_format_batches; use datafusion::common::instant::Instant; use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::common::utils::get_available_parallelism; -use datafusion::common::{exec_err, DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION}; +use datafusion::common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION, exec_err}; +use datafusion::datasource::TableProvider; +use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; -use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; use datafusion_distributed::test_utils::localhost::{ - spawn_flight_service, LocalHostChannelResolver, + LocalHostChannelResolver, spawn_flight_service, }; use datafusion_distributed::{ DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder, @@ -354,11 +354,15 @@ impl RunOpt { let data_path = crate_path.join("data"); let entries = fs::read_dir(&data_path)?.collect::, _>>()?; if entries.is_empty() { - exec_err!("No TPCH dataset present in '{data_path:?}'. Generate one with ./benchmarks/gen-tpch.sh") + exec_err!( + "No TPCH dataset present in '{data_path:?}'. Generate one with ./benchmarks/gen-tpch.sh" + ) } else if entries.len() == 1 { Ok(entries[0].path()) } else { - exec_err!("Multiple TPCH datasets present in '{data_path:?}'. One must be selected with --path") + exec_err!( + "Multiple TPCH datasets present in '{data_path:?}'. One must be selected with --path" + ) } } diff --git a/benchmarks/src/util/memory.rs b/benchmarks/src/util/memory.rs index 8d7edf8..61f7e45 100644 --- a/benchmarks/src/util/memory.rs +++ b/benchmarks/src/util/memory.rs @@ -8,7 +8,7 @@ use datafusion::execution::{FunctionRegistry, SendableRecordBatchStream, TaskCon use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - displayable, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, displayable, }; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use futures::{FutureExt, StreamExt}; diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index f08ee7a..0d83868 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -105,7 +105,7 @@ impl CommonOpt { return Err(DataFusionError::Configuration(format!( "Invalid memory pool type: {}", self.mem_pool_type - ))) + ))); } }; rt_builder = rt_builder diff --git a/benchmarks/src/util/run.rs b/benchmarks/src/util/run.rs index 0ec6a6e..48569e8 100644 --- a/benchmarks/src/util/run.rs +++ b/benchmarks/src/util/run.rs @@ -17,7 +17,7 @@ use chrono::{DateTime, Utc}; use datafusion::common::utils::get_available_parallelism; -use datafusion::{error::Result, DATAFUSION_VERSION}; +use datafusion::{DATAFUSION_VERSION, error::Result}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::{ path::Path, diff --git a/src/common/ttl_map.rs b/src/common/ttl_map.rs index 4e5cf9a..56e4fbc 100644 --- a/src/common/ttl_map.rs +++ b/src/common/ttl_map.rs @@ -27,10 +27,10 @@ use dashmap::{DashMap, Entry}; use datafusion::error::DataFusionError; use std::collections::HashSet; use std::hash::Hash; +use std::sync::Arc; use std::sync::atomic::AtomicU64; #[cfg(test)] use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; -use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -289,7 +289,7 @@ where mod tests { use super::*; use std::sync::atomic::Ordering; - use tokio::time::{sleep, Duration}; + use tokio::time::{Duration, sleep}; #[tokio::test] async fn test_basic_insert_and_get() { diff --git a/src/config_extension_ext.rs b/src/config_extension_ext.rs index b3c7faa..a287bf2 100644 --- a/src/config_extension_ext.rs +++ b/src/config_extension_ext.rs @@ -1,4 +1,4 @@ -use datafusion::common::{internal_datafusion_err, DataFusionError}; +use datafusion::common::{DataFusionError, internal_datafusion_err}; use datafusion::config::ConfigExtension; use datafusion::execution::TaskContext; use datafusion::prelude::SessionConfig; @@ -94,8 +94,8 @@ impl ContextGrpcMetadata { #[cfg(test)] mod tests { use crate::config_extension_ext::{ - set_distributed_option_extension, set_distributed_option_extension_from_headers, - ContextGrpcMetadata, + ContextGrpcMetadata, set_distributed_option_extension, + set_distributed_option_extension_from_headers, }; use datafusion::common::extensions_options; use datafusion::config::ConfigExtension; diff --git a/src/distributed_ext.rs b/src/distributed_ext.rs index b83f852..868053d 100644 --- a/src/distributed_ext.rs +++ b/src/distributed_ext.rs @@ -1,9 +1,9 @@ +use crate::ChannelResolver; use crate::channel_resolver_ext::set_distributed_channel_resolver; use crate::config_extension_ext::{ set_distributed_option_extension, set_distributed_option_extension_from_headers, }; use crate::protobuf::set_distributed_user_codec; -use crate::ChannelResolver; use datafusion::common::DataFusionError; use datafusion::config::ConfigExtension; use datafusion::execution::{SessionState, SessionStateBuilder}; diff --git a/src/distributed_physical_optimizer_rule.rs b/src/distributed_physical_optimizer_rule.rs index 6c2d204..17935bf 100644 --- a/src/distributed_physical_optimizer_rule.rs +++ b/src/distributed_physical_optimizer_rule.rs @@ -13,7 +13,7 @@ use datafusion::{ config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{repartition::RepartitionExec, ExecutionPlan}, + physical_plan::{ExecutionPlan, repartition::RepartitionExec}, }; use std::error::Error; use std::fmt::{Display, Formatter}; diff --git a/src/errors/schema_error.rs b/src/errors/schema_error.rs index e55709a..4aa3c69 100644 --- a/src/errors/schema_error.rs +++ b/src/errors/schema_error.rs @@ -168,7 +168,7 @@ impl TableReferenceProto { impl SchemaErrorProto { pub fn from_schema_error(err: &SchemaError, backtrace: Option<&String>) -> Self { match err { - SchemaError::AmbiguousReference { ref field } => SchemaErrorProto { + SchemaError::AmbiguousReference { field } => SchemaErrorProto { inner: Some(SchemaErrorInnerProto::AmbiguousReference( AmbiguousReferenceProto { field: Some(ColumnProto::from_column(field)), diff --git a/src/execution_plans/metrics.rs b/src/execution_plans/metrics.rs index 7e5c341..f772881 100644 --- a/src/execution_plans/metrics.rs +++ b/src/execution_plans/metrics.rs @@ -1,13 +1,13 @@ use crate::execution_plans::{NetworkCoalesceExec, NetworkShuffleExec, StageExec}; -use crate::metrics::proto::{metrics_set_proto_to_df, MetricsSetProto}; +use crate::metrics::proto::{MetricsSetProto, metrics_set_proto_to_df}; use crate::protobuf::StageKey; use datafusion::common::internal_err; use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::SendableRecordBatchStream; use datafusion::execution::TaskContext; -use datafusion::physical_plan::metrics::MetricsSet; use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::metrics::MetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties}; use std::any::Any; use std::collections::HashMap; @@ -152,7 +152,11 @@ impl TaskMetricsRewriter { ) -> Result> { let transformed = plan.rewrite(&mut self)?; if self.idx != self.metrics.len() { - return internal_err!("too many metrics sets provided to rewrite task: {} metrics sets provided, {} nodes in plan", self.metrics.len(), self.idx); + return internal_err!( + "too many metrics sets provided to rewrite task: {} metrics sets provided, {} nodes in plan", + self.metrics.len(), + self.idx + ); } Ok(transformed.data) } @@ -276,11 +280,11 @@ mod tests { use datafusion::arrow::array::{Int32Array, StringArray}; use datafusion::arrow::record_batch::RecordBatch; - use crate::test_utils::in_memory_channel_resolver::InMemoryChannelResolver; - use crate::test_utils::session_context::register_temp_parquet_table; use crate::DistributedExt; use crate::DistributedPhysicalOptimizerRule; - use datafusion::execution::{context::SessionContext, SessionStateBuilder}; + use crate::test_utils::in_memory_channel_resolver::InMemoryChannelResolver; + use crate::test_utils::session_context::register_temp_parquet_table; + use datafusion::execution::{SessionStateBuilder, context::SessionContext}; use datafusion::physical_plan::metrics::MetricValue; use datafusion::prelude::SessionConfig; use datafusion::{ diff --git a/src/execution_plans/network_coalesce.rs b/src/execution_plans/network_coalesce.rs index f2e047b..d9175ba 100644 --- a/src/execution_plans/network_coalesce.rs +++ b/src/execution_plans/network_coalesce.rs @@ -1,17 +1,17 @@ +use crate::ChannelResolver; use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::common::scale_partitioning_props; use crate::config_extension_ext::ContextGrpcMetadata; -use crate::distributed_physical_optimizer_rule::{limit_tasks_err, NetworkBoundary}; +use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_err}; use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error}; use crate::execution_plans::{DistributedTaskContext, StageExec}; use crate::flight_service::DoGet; use crate::metrics::proto::MetricsSetProto; -use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey}; -use crate::ChannelResolver; +use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage}; +use arrow_flight::Ticket; use arrow_flight::decode::FlightRecordBatchStream; use arrow_flight::error::FlightError; use arrow_flight::flight_service_client::FlightServiceClient; -use arrow_flight::Ticket; use dashmap::DashMap; use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err}; use datafusion::error::DataFusionError; @@ -26,8 +26,8 @@ use prost::Message; use std::any::Any; use std::fmt::Formatter; use std::sync::Arc; -use tonic::metadata::MetadataMap; use tonic::Request; +use tonic::metadata::MetadataMap; /// [ExecutionPlan] that coalesces partitions from multiple tasks into a single task without /// performing any repartition, and maintaining the same partitioning scheme. @@ -137,7 +137,7 @@ impl NetworkBoundary for NetworkCoalesceExec { &self, n_tasks: usize, ) -> Result<(Arc, usize), DataFusionError> { - let Self::Pending(ref pending) = self else { + let Self::Pending(pending) = self else { return plan_err!("can only return wrapped child if on Pending state"); }; diff --git a/src/execution_plans/network_shuffle.rs b/src/execution_plans/network_shuffle.rs index 36659f7..2958a50 100644 --- a/src/execution_plans/network_shuffle.rs +++ b/src/execution_plans/network_shuffle.rs @@ -1,3 +1,4 @@ +use crate::ChannelResolver; use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::common::scale_partitioning; use crate::config_extension_ext::ContextGrpcMetadata; @@ -6,12 +7,11 @@ use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_err use crate::execution_plans::{DistributedTaskContext, StageExec}; use crate::flight_service::DoGet; use crate::metrics::proto::MetricsSetProto; -use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey}; -use crate::ChannelResolver; +use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage}; +use arrow_flight::Ticket; use arrow_flight::decode::FlightRecordBatchStream; use arrow_flight::error::FlightError; use arrow_flight::flight_service_client::FlightServiceClient; -use arrow_flight::Ticket; use dashmap::DashMap; use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err}; use datafusion::error::DataFusionError; @@ -28,8 +28,8 @@ use prost::Message; use std::any::Any; use std::fmt::Formatter; use std::sync::Arc; -use tonic::metadata::MetadataMap; use tonic::Request; +use tonic::metadata::MetadataMap; /// [ExecutionPlan] implementation that shuffles data across the network in a distributed context. /// @@ -178,7 +178,7 @@ impl NetworkBoundary for NetworkShuffleExec { &self, n_tasks: usize, ) -> Result<(Arc, usize), DataFusionError> { - let Self::Pending(ref pending) = self else { + let Self::Pending(pending) = self else { return plan_err!("cannot only return wrapped child if on Pending state"); }; diff --git a/src/execution_plans/partition_isolator.rs b/src/execution_plans/partition_isolator.rs index 342107b..987f5f5 100644 --- a/src/execution_plans/partition_isolator.rs +++ b/src/execution_plans/partition_isolator.rs @@ -1,6 +1,6 @@ +use crate::StageExec; use crate::distributed_physical_optimizer_rule::limit_tasks_err; use crate::execution_plans::DistributedTaskContext; -use crate::StageExec; use datafusion::common::{exec_err, plan_err}; use datafusion::error::DataFusionError; use datafusion::execution::TaskContext; diff --git a/src/execution_plans/stage.rs b/src/execution_plans/stage.rs index 5e948f8..a57d637 100644 --- a/src/execution_plans/stage.rs +++ b/src/execution_plans/stage.rs @@ -5,7 +5,7 @@ use datafusion::common::{internal_datafusion_err, internal_err}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::TaskContext; use datafusion::physical_plan::{ - displayable, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, displayable, }; use datafusion::prelude::SessionContext; use itertools::Itertools; diff --git a/src/flight_service/do_get.rs b/src/flight_service/do_get.rs index 68b2188..bd0345f 100644 --- a/src/flight_service/do_get.rs +++ b/src/flight_service/do_get.rs @@ -3,17 +3,17 @@ use crate::errors::datafusion_error_to_tonic_status; use crate::execution_plans::{DistributedTaskContext, StageExec}; use crate::flight_service::service::ArrowFlightEndpoint; use crate::flight_service::session_builder::DistributedSessionBuilderContext; -use crate::protobuf::{stage_from_proto, DistributedCodec, StageExecProto, StageKey}; +use crate::protobuf::{DistributedCodec, StageExecProto, StageKey, stage_from_proto}; +use arrow_flight::Ticket; use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::error::FlightError; use arrow_flight::flight_service_server::FlightService; -use arrow_flight::Ticket; use datafusion::common::exec_datafusion_err; use datafusion::execution::SendableRecordBatchStream; use futures::TryStreamExt; use prost::Message; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use tonic::{Request, Response, Status}; #[derive(Clone, PartialEq, ::prost::Message)] @@ -152,17 +152,17 @@ fn record_batch_stream_to_response( #[cfg(test)] mod tests { use super::*; + use crate::ExecutionTask; use crate::flight_service::session_builder::DefaultSessionBuilder; use crate::protobuf::proto_from_stage; - use crate::ExecutionTask; use arrow::datatypes::{Schema, SchemaRef}; use arrow_flight::Ticket; use datafusion::physical_expr::Partitioning; + use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::repartition::RepartitionExec; - use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; - use prost::{bytes::Bytes, Message}; + use prost::{Message, bytes::Bytes}; use tonic::Request; use uuid::Uuid; diff --git a/src/flight_service/service.rs b/src/flight_service/service.rs index ed23f0e..675cd51 100644 --- a/src/flight_service/service.rs +++ b/src/flight_service/service.rs @@ -1,6 +1,6 @@ use crate::common::ttl_map::{TTLMap, TTLMapConfig}; -use crate::flight_service::do_get::TaskData; use crate::flight_service::DistributedSessionBuilder; +use crate::flight_service::do_get::TaskData; use crate::protobuf::StageKey; use arrow_flight::flight_service_server::FlightService; use arrow_flight::{ diff --git a/src/protobuf/distributed_codec.rs b/src/protobuf/distributed_codec.rs index 21d40b5..67c8fbd 100644 --- a/src/protobuf/distributed_codec.rs +++ b/src/protobuf/distributed_codec.rs @@ -9,9 +9,9 @@ use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::{ExecutionPlan, Partitioning, PlanProperties}; use datafusion::prelude::SessionConfig; +use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning; use datafusion_proto::physical_plan::to_proto::serialize_partitioning; -use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_proto::protobuf; use datafusion_proto::protobuf::proto_error; use prost::Message; @@ -23,7 +23,7 @@ use std::sync::Arc; pub struct DistributedCodec; impl DistributedCodec { - pub fn new_combined_with_user(cfg: &SessionConfig) -> impl PhysicalExtensionCodec { + pub fn new_combined_with_user(cfg: &SessionConfig) -> impl PhysicalExtensionCodec + use<> { let mut combined_codec = ComposedPhysicalExtensionCodec::default(); combined_codec.push(DistributedCodec {}); if let Some(ref user_codec) = get_distributed_user_codec(cfg) { @@ -279,8 +279,8 @@ mod tests { use datafusion::physical_expr::LexOrdering; use datafusion::{ execution::registry::MemoryFunctionRegistry, - physical_expr::{expressions::col, expressions::Column, Partitioning, PhysicalSortExpr}, - physical_plan::{displayable, sorts::sort::SortExec, union::UnionExec, ExecutionPlan}, + physical_expr::{Partitioning, PhysicalSortExpr, expressions::Column, expressions::col}, + physical_plan::{ExecutionPlan, displayable, sorts::sort::SortExec, union::UnionExec}, }; fn schema_i32(name: &str) -> Arc { diff --git a/src/protobuf/mod.rs b/src/protobuf/mod.rs index d01d2dc..5c0870e 100644 --- a/src/protobuf/mod.rs +++ b/src/protobuf/mod.rs @@ -3,5 +3,5 @@ mod stage_proto; mod user_codec; pub(crate) use distributed_codec::DistributedCodec; -pub(crate) use stage_proto::{proto_from_stage, stage_from_proto, StageExecProto, StageKey}; +pub(crate) use stage_proto::{StageExecProto, StageKey, proto_from_stage, stage_from_proto}; pub(crate) use user_codec::{get_distributed_user_codec, set_distributed_user_codec}; diff --git a/src/protobuf/stage_proto.rs b/src/protobuf/stage_proto.rs index 1e1598f..6b03d45 100644 --- a/src/protobuf/stage_proto.rs +++ b/src/protobuf/stage_proto.rs @@ -2,7 +2,7 @@ use crate::execution_plans::{ExecutionTask, StageExec}; use datafusion::{ common::internal_datafusion_err, error::{DataFusionError, Result}, - execution::{runtime_env::RuntimeEnv, FunctionRegistry}, + execution::{FunctionRegistry, runtime_env::RuntimeEnv}, physical_plan::ExecutionPlan, }; use datafusion_proto::{ @@ -147,9 +147,9 @@ pub fn stage_from_proto( mod tests { use std::sync::Arc; + use crate::StageExec; use crate::protobuf::stage_proto::StageExecProto; use crate::protobuf::{proto_from_stage, stage_from_proto}; - use crate::StageExec; use datafusion::{ arrow::{ array::{RecordBatch, StringArray, UInt8Array}, diff --git a/src/test_utils/insta.rs b/src/test_utils/insta.rs index ad7b6c4..5333a33 100644 --- a/src/test_utils/insta.rs +++ b/src/test_utils/insta.rs @@ -12,7 +12,8 @@ macro_rules! assert_snapshot { } pub fn settings() -> insta::Settings { - env::set_var("INSTA_WORKSPACE_ROOT", env!("CARGO_MANIFEST_DIR")); + // Safety: this is only used in tests, it may panic if used in parallel with other tests. + unsafe { env::set_var("INSTA_WORKSPACE_ROOT", env!("CARGO_MANIFEST_DIR")) }; let mut settings = insta::Settings::clone_current(); let cwd = env::current_dir().unwrap(); let cwd = cwd.to_str().unwrap(); diff --git a/src/test_utils/localhost.rs b/src/test_utils/localhost.rs index 5cdca71..e28edee 100644 --- a/src/test_utils/localhost.rs +++ b/src/test_utils/localhost.rs @@ -5,10 +5,10 @@ use crate::{ }; use arrow_flight::flight_service_server::FlightServiceServer; use async_trait::async_trait; -use datafusion::common::runtime::JoinSet; use datafusion::common::DataFusionError; -use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::common::runtime::JoinSet; use datafusion::execution::SessionStateBuilder; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::SessionContext; use std::error::Error; use std::sync::Arc; diff --git a/tests/custom_config_extension.rs b/tests/custom_config_extension.rs index ba2cb77..4caf949 100644 --- a/tests/custom_config_extension.rs +++ b/tests/custom_config_extension.rs @@ -11,7 +11,7 @@ mod tests { use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_stream, }; use datafusion_distributed::test_utils::localhost::start_localhost_context; use datafusion_distributed::{DistributedExt, DistributedSessionBuilderContext}; diff --git a/tests/custom_extension_codec.rs b/tests/custom_extension_codec.rs index 1d6e58d..b07a2cd 100644 --- a/tests/custom_extension_codec.rs +++ b/tests/custom_extension_codec.rs @@ -10,7 +10,7 @@ mod tests { FunctionRegistry, SendableRecordBatchStream, SessionState, SessionStateBuilder, TaskContext, }; use datafusion::logical_expr::Operator; - use datafusion::physical_expr::expressions::{col, lit, BinaryExpr}; + use datafusion::physical_expr::expressions::{BinaryExpr, col, lit}; use datafusion::physical_expr::{ EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr, }; @@ -20,16 +20,16 @@ mod tests { use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - displayable, execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, displayable, execute_stream, }; use datafusion_distributed::test_utils::localhost::start_localhost_context; use datafusion_distributed::{ - assert_snapshot, DistributedExt, DistributedSessionBuilderContext, PartitionIsolatorExec, + DistributedExt, DistributedSessionBuilderContext, PartitionIsolatorExec, assert_snapshot, }; use datafusion_distributed::{DistributedPhysicalOptimizerRule, NetworkShuffleExec}; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_proto::protobuf::proto_error; - use futures::{stream, TryStreamExt}; + use futures::{TryStreamExt, stream}; use prost::Message; use std::any::Any; use std::fmt::Formatter; diff --git a/tests/distributed_aggregation.rs b/tests/distributed_aggregation.rs index 6d92b60..6243225 100644 --- a/tests/distributed_aggregation.rs +++ b/tests/distributed_aggregation.rs @@ -6,7 +6,7 @@ mod tests { use datafusion_distributed::test_utils::localhost::start_localhost_context; use datafusion_distributed::test_utils::parquet::register_parquet_tables; use datafusion_distributed::{ - assert_snapshot, DefaultSessionBuilder, DistributedPhysicalOptimizerRule, + DefaultSessionBuilder, DistributedPhysicalOptimizerRule, assert_snapshot, }; use futures::TryStreamExt; use std::error::Error; diff --git a/tests/error_propagation.rs b/tests/error_propagation.rs index 27e2606..95cf0d0 100644 --- a/tests/error_propagation.rs +++ b/tests/error_propagation.rs @@ -9,7 +9,7 @@ mod tests { use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_stream, }; use datafusion_distributed::test_utils::localhost::start_localhost_context; use datafusion_distributed::{ @@ -18,7 +18,7 @@ mod tests { }; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_proto::protobuf::proto_error; - use futures::{stream, TryStreamExt}; + use futures::{TryStreamExt, stream}; use prost::Message; use std::any::Any; use std::error::Error; diff --git a/tests/highly_distributed_query.rs b/tests/highly_distributed_query.rs index 87b1f39..54b595a 100644 --- a/tests/highly_distributed_query.rs +++ b/tests/highly_distributed_query.rs @@ -5,8 +5,8 @@ mod tests { use datafusion_distributed::test_utils::localhost::start_localhost_context; use datafusion_distributed::test_utils::parquet::register_parquet_tables; use datafusion_distributed::{ - assert_snapshot, DefaultSessionBuilder, DistributedPhysicalOptimizerRule, - NetworkShuffleExec, + DefaultSessionBuilder, DistributedPhysicalOptimizerRule, NetworkShuffleExec, + assert_snapshot, }; use futures::TryStreamExt; use std::error::Error; diff --git a/tests/tpch_validation_test.rs b/tests/tpch_validation_test.rs index 39161d4..179542a 100644 --- a/tests/tpch_validation_test.rs +++ b/tests/tpch_validation_test.rs @@ -7,7 +7,7 @@ mod tests { use datafusion_distributed::test_utils::localhost::start_localhost_context; use datafusion_distributed::test_utils::tpch; use datafusion_distributed::{ - assert_snapshot, DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext, + DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext, assert_snapshot, }; use futures::TryStreamExt; use std::error::Error;