Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ datafusion-proto = { version = "49.0.0" }
[package]
name = "datafusion-distributed"
version = "0.1.0"
edition = "2021"
edition = "2024"

[dependencies]
chrono = { version = "0.4.42" }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "datafusion-distributed-benchmarks"
version = "0.1.0"
edition = "2021"
edition = "2024"
default-run = "dfbench"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::path::{Path, PathBuf};

use datafusion::common::not_impl_err;

use super::get_tbl_tpch_table_schema;
use super::TPCH_TABLES;
use super::get_tbl_tpch_table_schema;
use datafusion::error::Result;
use datafusion::prelude::*;
use parquet::basic::Compression;
Expand Down
20 changes: 12 additions & 8 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
// under the License.

use super::{
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_QUERY_END_ID,
TPCH_QUERY_START_ID, TPCH_TABLES,
TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, get_query_sql, get_tbl_tpch_table_schema,
get_tpch_table_schema,
};
use crate::util::{
BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryIter,
Expand All @@ -29,21 +29,21 @@ use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::common::instant::Instant;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::utils::get_available_parallelism;
use datafusion::common::{exec_err, DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
use datafusion::common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION, exec_err};
use datafusion::datasource::TableProvider;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::{SessionState, SessionStateBuilder};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_distributed::test_utils::localhost::{
spawn_flight_service, LocalHostChannelResolver,
LocalHostChannelResolver, spawn_flight_service,
};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
Expand Down Expand Up @@ -354,11 +354,15 @@ impl RunOpt {
let data_path = crate_path.join("data");
let entries = fs::read_dir(&data_path)?.collect::<Result<Vec<_>, _>>()?;
if entries.is_empty() {
exec_err!("No TPCH dataset present in '{data_path:?}'. Generate one with ./benchmarks/gen-tpch.sh")
exec_err!(
"No TPCH dataset present in '{data_path:?}'. Generate one with ./benchmarks/gen-tpch.sh"
)
} else if entries.len() == 1 {
Ok(entries[0].path())
} else {
exec_err!("Multiple TPCH datasets present in '{data_path:?}'. One must be selected with --path")
exec_err!(
"Multiple TPCH datasets present in '{data_path:?}'. One must be selected with --path"
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use datafusion::execution::{FunctionRegistry, SendableRecordBatchStream, TaskCon
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, displayable,
};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use futures::{FutureExt, StreamExt};
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl CommonOpt {
return Err(DataFusionError::Configuration(format!(
"Invalid memory pool type: {}",
self.mem_pool_type
)))
)));
}
};
rt_builder = rt_builder
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use chrono::{DateTime, Utc};
use datafusion::common::utils::get_available_parallelism;
use datafusion::{error::Result, DATAFUSION_VERSION};
use datafusion::{DATAFUSION_VERSION, error::Result};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::{
path::Path,
Expand Down
4 changes: 2 additions & 2 deletions src/common/ttl_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use dashmap::{DashMap, Entry};
use datafusion::error::DataFusionError;
use std::collections::HashSet;
use std::hash::Hash;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
#[cfg(test)]
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

Expand Down Expand Up @@ -289,7 +289,7 @@ where
mod tests {
use super::*;
use std::sync::atomic::Ordering;
use tokio::time::{sleep, Duration};
use tokio::time::{Duration, sleep};

#[tokio::test]
async fn test_basic_insert_and_get() {
Expand Down
6 changes: 3 additions & 3 deletions src/config_extension_ext.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use datafusion::common::{internal_datafusion_err, DataFusionError};
use datafusion::common::{DataFusionError, internal_datafusion_err};
use datafusion::config::ConfigExtension;
use datafusion::execution::TaskContext;
use datafusion::prelude::SessionConfig;
Expand Down Expand Up @@ -94,8 +94,8 @@ impl ContextGrpcMetadata {
#[cfg(test)]
mod tests {
use crate::config_extension_ext::{
set_distributed_option_extension, set_distributed_option_extension_from_headers,
ContextGrpcMetadata,
ContextGrpcMetadata, set_distributed_option_extension,
set_distributed_option_extension_from_headers,
};
use datafusion::common::extensions_options;
use datafusion::config::ConfigExtension;
Expand Down
2 changes: 1 addition & 1 deletion src/distributed_ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::ChannelResolver;
use crate::channel_resolver_ext::set_distributed_channel_resolver;
use crate::config_extension_ext::{
set_distributed_option_extension, set_distributed_option_extension_from_headers,
};
use crate::protobuf::set_distributed_user_codec;
use crate::ChannelResolver;
use datafusion::common::DataFusionError;
use datafusion::config::ConfigExtension;
use datafusion::execution::{SessionState, SessionStateBuilder};
Expand Down
2 changes: 1 addition & 1 deletion src/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use datafusion::{
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{repartition::RepartitionExec, ExecutionPlan},
physical_plan::{ExecutionPlan, repartition::RepartitionExec},
};
use std::error::Error;
use std::fmt::{Display, Formatter};
Expand Down
2 changes: 1 addition & 1 deletion src/errors/schema_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl TableReferenceProto {
impl SchemaErrorProto {
pub fn from_schema_error(err: &SchemaError, backtrace: Option<&String>) -> Self {
match err {
SchemaError::AmbiguousReference { ref field } => SchemaErrorProto {
SchemaError::AmbiguousReference { field } => SchemaErrorProto {
inner: Some(SchemaErrorInnerProto::AmbiguousReference(
AmbiguousReferenceProto {
field: Some(ColumnProto::from_column(field)),
Expand Down
16 changes: 10 additions & 6 deletions src/execution_plans/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::execution_plans::{NetworkCoalesceExec, NetworkShuffleExec, StageExec};
use crate::metrics::proto::{metrics_set_proto_to_df, MetricsSetProto};
use crate::metrics::proto::{MetricsSetProto, metrics_set_proto_to_df};
use crate::protobuf::StageKey;
use datafusion::common::internal_err;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
use std::any::Any;
use std::collections::HashMap;
Expand Down Expand Up @@ -152,7 +152,11 @@ impl TaskMetricsRewriter {
) -> Result<Arc<dyn ExecutionPlan>> {
let transformed = plan.rewrite(&mut self)?;
if self.idx != self.metrics.len() {
return internal_err!("too many metrics sets provided to rewrite task: {} metrics sets provided, {} nodes in plan", self.metrics.len(), self.idx);
return internal_err!(
"too many metrics sets provided to rewrite task: {} metrics sets provided, {} nodes in plan",
self.metrics.len(),
self.idx
);
}
Ok(transformed.data)
}
Expand Down Expand Up @@ -276,11 +280,11 @@ mod tests {
use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::record_batch::RecordBatch;

use crate::test_utils::in_memory_channel_resolver::InMemoryChannelResolver;
use crate::test_utils::session_context::register_temp_parquet_table;
use crate::DistributedExt;
use crate::DistributedPhysicalOptimizerRule;
use datafusion::execution::{context::SessionContext, SessionStateBuilder};
use crate::test_utils::in_memory_channel_resolver::InMemoryChannelResolver;
use crate::test_utils::session_context::register_temp_parquet_table;
use datafusion::execution::{SessionStateBuilder, context::SessionContext};
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::prelude::SessionConfig;
use datafusion::{
Expand Down
12 changes: 6 additions & 6 deletions src/execution_plans/network_coalesce.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::ChannelResolver;
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::common::scale_partitioning_props;
use crate::config_extension_ext::ContextGrpcMetadata;
use crate::distributed_physical_optimizer_rule::{limit_tasks_err, NetworkBoundary};
use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_err};
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
use crate::execution_plans::{DistributedTaskContext, StageExec};
use crate::flight_service::DoGet;
use crate::metrics::proto::MetricsSetProto;
use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey};
use crate::ChannelResolver;
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
use arrow_flight::Ticket;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::Ticket;
use dashmap::DashMap;
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
use datafusion::error::DataFusionError;
Expand All @@ -26,8 +26,8 @@ use prost::Message;
use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;
use tonic::metadata::MetadataMap;
use tonic::Request;
use tonic::metadata::MetadataMap;

/// [ExecutionPlan] that coalesces partitions from multiple tasks into a single task without
/// performing any repartition, and maintaining the same partitioning scheme.
Expand Down Expand Up @@ -137,7 +137,7 @@ impl NetworkBoundary for NetworkCoalesceExec {
&self,
n_tasks: usize,
) -> Result<(Arc<dyn ExecutionPlan>, usize), DataFusionError> {
let Self::Pending(ref pending) = self else {
let Self::Pending(pending) = self else {
return plan_err!("can only return wrapped child if on Pending state");
};

Expand Down
10 changes: 5 additions & 5 deletions src/execution_plans/network_shuffle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::ChannelResolver;
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::common::scale_partitioning;
use crate::config_extension_ext::ContextGrpcMetadata;
Expand All @@ -6,12 +7,11 @@ use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_err
use crate::execution_plans::{DistributedTaskContext, StageExec};
use crate::flight_service::DoGet;
use crate::metrics::proto::MetricsSetProto;
use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey};
use crate::ChannelResolver;
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
use arrow_flight::Ticket;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::Ticket;
use dashmap::DashMap;
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
use datafusion::error::DataFusionError;
Expand All @@ -28,8 +28,8 @@ use prost::Message;
use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;
use tonic::metadata::MetadataMap;
use tonic::Request;
use tonic::metadata::MetadataMap;

/// [ExecutionPlan] implementation that shuffles data across the network in a distributed context.
///
Expand Down Expand Up @@ -178,7 +178,7 @@ impl NetworkBoundary for NetworkShuffleExec {
&self,
n_tasks: usize,
) -> Result<(Arc<dyn ExecutionPlan>, usize), DataFusionError> {
let Self::Pending(ref pending) = self else {
let Self::Pending(pending) = self else {
return plan_err!("cannot only return wrapped child if on Pending state");
};

Expand Down
2 changes: 1 addition & 1 deletion src/execution_plans/partition_isolator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::StageExec;
use crate::distributed_physical_optimizer_rule::limit_tasks_err;
use crate::execution_plans::DistributedTaskContext;
use crate::StageExec;
use datafusion::common::{exec_err, plan_err};
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
Expand Down
2 changes: 1 addition & 1 deletion src/execution_plans/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use datafusion::common::{internal_datafusion_err, internal_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, displayable,
};
use datafusion::prelude::SessionContext;
use itertools::Itertools;
Expand Down
12 changes: 6 additions & 6 deletions src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ use crate::errors::datafusion_error_to_tonic_status;
use crate::execution_plans::{DistributedTaskContext, StageExec};
use crate::flight_service::service::ArrowFlightEndpoint;
use crate::flight_service::session_builder::DistributedSessionBuilderContext;
use crate::protobuf::{stage_from_proto, DistributedCodec, StageExecProto, StageKey};
use crate::protobuf::{DistributedCodec, StageExecProto, StageKey, stage_from_proto};
use arrow_flight::Ticket;
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::error::FlightError;
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::Ticket;
use datafusion::common::exec_datafusion_err;
use datafusion::execution::SendableRecordBatchStream;
use futures::TryStreamExt;
use prost::Message;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tonic::{Request, Response, Status};

#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -152,17 +152,17 @@ fn record_batch_stream_to_response(
#[cfg(test)]
mod tests {
use super::*;
use crate::ExecutionTask;
use crate::flight_service::session_builder::DefaultSessionBuilder;
use crate::protobuf::proto_from_stage;
use crate::ExecutionTask;
use arrow::datatypes::{Schema, SchemaRef};
use arrow_flight::Ticket;
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use prost::{bytes::Bytes, Message};
use prost::{Message, bytes::Bytes};
use tonic::Request;
use uuid::Uuid;

Expand Down
2 changes: 1 addition & 1 deletion src/flight_service/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::common::ttl_map::{TTLMap, TTLMapConfig};
use crate::flight_service::do_get::TaskData;
use crate::flight_service::DistributedSessionBuilder;
use crate::flight_service::do_get::TaskData;
use crate::protobuf::StageKey;
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::{
Expand Down
8 changes: 4 additions & 4 deletions src/protobuf/distributed_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::{ExecutionPlan, Partitioning, PlanProperties};
use datafusion::prelude::SessionConfig;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
use datafusion_proto::physical_plan::to_proto::serialize_partitioning;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::protobuf;
use datafusion_proto::protobuf::proto_error;
use prost::Message;
Expand All @@ -23,7 +23,7 @@ use std::sync::Arc;
pub struct DistributedCodec;

impl DistributedCodec {
pub fn new_combined_with_user(cfg: &SessionConfig) -> impl PhysicalExtensionCodec {
pub fn new_combined_with_user(cfg: &SessionConfig) -> impl PhysicalExtensionCodec + use<> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 use<>. Is this a thing in Rust? no idea what this means

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it means that the lifetime of the returned impl doesn't depend on the &SessionConfig

let mut combined_codec = ComposedPhysicalExtensionCodec::default();
combined_codec.push(DistributedCodec {});
if let Some(ref user_codec) = get_distributed_user_codec(cfg) {
Expand Down Expand Up @@ -279,8 +279,8 @@ mod tests {
use datafusion::physical_expr::LexOrdering;
use datafusion::{
execution::registry::MemoryFunctionRegistry,
physical_expr::{expressions::col, expressions::Column, Partitioning, PhysicalSortExpr},
physical_plan::{displayable, sorts::sort::SortExec, union::UnionExec, ExecutionPlan},
physical_expr::{Partitioning, PhysicalSortExpr, expressions::Column, expressions::col},
physical_plan::{ExecutionPlan, displayable, sorts::sort::SortExec, union::UnionExec},
};

fn schema_i32(name: &str) -> Arc<Schema> {
Expand Down
Loading