Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ datafusion-proto = { version = "49.0.0" }
[package]
name = "datafusion-distributed"
version = "0.1.0"
edition = "2021"
edition = "2024"

[dependencies]
chrono = { version = "0.4.42" }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "datafusion-distributed-benchmarks"
version = "0.1.0"
edition = "2021"
edition = "2024"
default-run = "dfbench"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::path::{Path, PathBuf};

use datafusion::common::not_impl_err;

use super::get_tbl_tpch_table_schema;
use super::TPCH_TABLES;
use super::get_tbl_tpch_table_schema;
use datafusion::error::Result;
use datafusion::prelude::*;
use parquet::basic::Compression;
Expand Down
20 changes: 12 additions & 8 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
// under the License.

use super::{
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_QUERY_END_ID,
TPCH_QUERY_START_ID, TPCH_TABLES,
TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, get_query_sql, get_tbl_tpch_table_schema,
get_tpch_table_schema,
};
use crate::util::{
BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryIter,
Expand All @@ -29,21 +29,21 @@ use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::common::instant::Instant;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::utils::get_available_parallelism;
use datafusion::common::{exec_err, DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
use datafusion::common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION, exec_err};
use datafusion::datasource::TableProvider;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::{SessionState, SessionStateBuilder};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_distributed::test_utils::localhost::{
spawn_flight_service, LocalHostChannelResolver,
LocalHostChannelResolver, spawn_flight_service,
};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
Expand Down Expand Up @@ -354,11 +354,15 @@ impl RunOpt {
let data_path = crate_path.join("data");
let entries = fs::read_dir(&data_path)?.collect::<Result<Vec<_>, _>>()?;
if entries.is_empty() {
exec_err!("No TPCH dataset present in '{data_path:?}'. Generate one with ./benchmarks/gen-tpch.sh")
exec_err!(
"No TPCH dataset present in '{data_path:?}'. Generate one with ./benchmarks/gen-tpch.sh"
)
} else if entries.len() == 1 {
Ok(entries[0].path())
} else {
exec_err!("Multiple TPCH datasets present in '{data_path:?}'. One must be selected with --path")
exec_err!(
"Multiple TPCH datasets present in '{data_path:?}'. One must be selected with --path"
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use datafusion::execution::{FunctionRegistry, SendableRecordBatchStream, TaskCon
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, displayable,
};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use futures::{FutureExt, StreamExt};
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl CommonOpt {
return Err(DataFusionError::Configuration(format!(
"Invalid memory pool type: {}",
self.mem_pool_type
)))
)));
}
};
rt_builder = rt_builder
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use chrono::{DateTime, Utc};
use datafusion::common::utils::get_available_parallelism;
use datafusion::{error::Result, DATAFUSION_VERSION};
use datafusion::{DATAFUSION_VERSION, error::Result};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::{
path::Path,
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.85.1"
channel = "1.86.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Now that I think about it, probably it makes sense to leave this as 1.85.1 in this PR, as this is still running on DataFusion 49, and we need to make sure that both pipelines and developers adhere to support at least 1.85.1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay done

profile = "default"
4 changes: 2 additions & 2 deletions src/common/ttl_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use dashmap::{DashMap, Entry};
use datafusion::error::DataFusionError;
use std::collections::HashSet;
use std::hash::Hash;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
#[cfg(test)]
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

Expand Down Expand Up @@ -289,7 +289,7 @@ where
mod tests {
use super::*;
use std::sync::atomic::Ordering;
use tokio::time::{sleep, Duration};
use tokio::time::{Duration, sleep};

#[tokio::test]
async fn test_basic_insert_and_get() {
Expand Down
6 changes: 3 additions & 3 deletions src/config_extension_ext.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use datafusion::common::{internal_datafusion_err, DataFusionError};
use datafusion::common::{DataFusionError, internal_datafusion_err};
use datafusion::config::ConfigExtension;
use datafusion::execution::TaskContext;
use datafusion::prelude::SessionConfig;
Expand Down Expand Up @@ -94,8 +94,8 @@ impl ContextGrpcMetadata {
#[cfg(test)]
mod tests {
use crate::config_extension_ext::{
set_distributed_option_extension, set_distributed_option_extension_from_headers,
ContextGrpcMetadata,
ContextGrpcMetadata, set_distributed_option_extension,
set_distributed_option_extension_from_headers,
};
use datafusion::common::extensions_options;
use datafusion::config::ConfigExtension;
Expand Down
2 changes: 1 addition & 1 deletion src/distributed_ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::ChannelResolver;
use crate::channel_resolver_ext::set_distributed_channel_resolver;
use crate::config_extension_ext::{
set_distributed_option_extension, set_distributed_option_extension_from_headers,
};
use crate::protobuf::set_distributed_user_codec;
use crate::ChannelResolver;
use datafusion::common::DataFusionError;
use datafusion::config::ConfigExtension;
use datafusion::execution::{SessionState, SessionStateBuilder};
Expand Down
2 changes: 1 addition & 1 deletion src/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use datafusion::{
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{repartition::RepartitionExec, ExecutionPlan},
physical_plan::{ExecutionPlan, repartition::RepartitionExec},
};
use std::error::Error;
use std::fmt::{Display, Formatter};
Expand Down
2 changes: 1 addition & 1 deletion src/errors/schema_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl TableReferenceProto {
impl SchemaErrorProto {
pub fn from_schema_error(err: &SchemaError, backtrace: Option<&String>) -> Self {
match err {
SchemaError::AmbiguousReference { ref field } => SchemaErrorProto {
SchemaError::AmbiguousReference { field } => SchemaErrorProto {
inner: Some(SchemaErrorInnerProto::AmbiguousReference(
AmbiguousReferenceProto {
field: Some(ColumnProto::from_column(field)),
Expand Down
16 changes: 10 additions & 6 deletions src/execution_plans/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::execution_plans::{NetworkCoalesceExec, NetworkShuffleExec, StageExec};
use crate::metrics::proto::{metrics_set_proto_to_df, MetricsSetProto};
use crate::metrics::proto::{MetricsSetProto, metrics_set_proto_to_df};
use crate::protobuf::StageKey;
use datafusion::common::internal_err;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
use std::any::Any;
use std::collections::HashMap;
Expand Down Expand Up @@ -152,7 +152,11 @@ impl TaskMetricsRewriter {
) -> Result<Arc<dyn ExecutionPlan>> {
let transformed = plan.rewrite(&mut self)?;
if self.idx != self.metrics.len() {
return internal_err!("too many metrics sets provided to rewrite task: {} metrics sets provided, {} nodes in plan", self.metrics.len(), self.idx);
return internal_err!(
"too many metrics sets provided to rewrite task: {} metrics sets provided, {} nodes in plan",
self.metrics.len(),
self.idx
);
}
Ok(transformed.data)
}
Expand Down Expand Up @@ -276,11 +280,11 @@ mod tests {
use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::record_batch::RecordBatch;

use crate::test_utils::in_memory_channel_resolver::InMemoryChannelResolver;
use crate::test_utils::session_context::register_temp_parquet_table;
use crate::DistributedExt;
use crate::DistributedPhysicalOptimizerRule;
use datafusion::execution::{context::SessionContext, SessionStateBuilder};
use crate::test_utils::in_memory_channel_resolver::InMemoryChannelResolver;
use crate::test_utils::session_context::register_temp_parquet_table;
use datafusion::execution::{SessionStateBuilder, context::SessionContext};
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::prelude::SessionConfig;
use datafusion::{
Expand Down
12 changes: 6 additions & 6 deletions src/execution_plans/network_coalesce.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::ChannelResolver;
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::common::scale_partitioning_props;
use crate::config_extension_ext::ContextGrpcMetadata;
use crate::distributed_physical_optimizer_rule::{limit_tasks_err, NetworkBoundary};
use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_err};
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
use crate::execution_plans::{DistributedTaskContext, StageExec};
use crate::flight_service::DoGet;
use crate::metrics::proto::MetricsSetProto;
use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey};
use crate::ChannelResolver;
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
use arrow_flight::Ticket;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::Ticket;
use dashmap::DashMap;
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
use datafusion::error::DataFusionError;
Expand All @@ -26,8 +26,8 @@ use prost::Message;
use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;
use tonic::metadata::MetadataMap;
use tonic::Request;
use tonic::metadata::MetadataMap;

/// [ExecutionPlan] that coalesces partitions from multiple tasks into a single task without
/// performing any repartition, and maintaining the same partitioning scheme.
Expand Down Expand Up @@ -137,7 +137,7 @@ impl NetworkBoundary for NetworkCoalesceExec {
&self,
n_tasks: usize,
) -> Result<(Arc<dyn ExecutionPlan>, usize), DataFusionError> {
let Self::Pending(ref pending) = self else {
let Self::Pending(pending) = self else {
return plan_err!("can only return wrapped child if on Pending state");
};

Expand Down
10 changes: 5 additions & 5 deletions src/execution_plans/network_shuffle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::ChannelResolver;
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::common::scale_partitioning;
use crate::config_extension_ext::ContextGrpcMetadata;
Expand All @@ -6,12 +7,11 @@ use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_err
use crate::execution_plans::{DistributedTaskContext, StageExec};
use crate::flight_service::DoGet;
use crate::metrics::proto::MetricsSetProto;
use crate::protobuf::{proto_from_stage, DistributedCodec, StageKey};
use crate::ChannelResolver;
use crate::protobuf::{DistributedCodec, StageKey, proto_from_stage};
use arrow_flight::Ticket;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::Ticket;
use dashmap::DashMap;
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
use datafusion::error::DataFusionError;
Expand All @@ -28,8 +28,8 @@ use prost::Message;
use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;
use tonic::metadata::MetadataMap;
use tonic::Request;
use tonic::metadata::MetadataMap;

/// [ExecutionPlan] implementation that shuffles data across the network in a distributed context.
///
Expand Down Expand Up @@ -178,7 +178,7 @@ impl NetworkBoundary for NetworkShuffleExec {
&self,
n_tasks: usize,
) -> Result<(Arc<dyn ExecutionPlan>, usize), DataFusionError> {
let Self::Pending(ref pending) = self else {
let Self::Pending(pending) = self else {
return plan_err!("cannot only return wrapped child if on Pending state");
};

Expand Down
2 changes: 1 addition & 1 deletion src/execution_plans/partition_isolator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::StageExec;
use crate::distributed_physical_optimizer_rule::limit_tasks_err;
use crate::execution_plans::DistributedTaskContext;
use crate::StageExec;
use datafusion::common::{exec_err, plan_err};
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
Expand Down
2 changes: 1 addition & 1 deletion src/execution_plans/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use datafusion::common::{internal_datafusion_err, internal_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, displayable,
};
use datafusion::prelude::SessionContext;
use itertools::Itertools;
Expand Down
12 changes: 6 additions & 6 deletions src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ use crate::errors::datafusion_error_to_tonic_status;
use crate::execution_plans::{DistributedTaskContext, StageExec};
use crate::flight_service::service::ArrowFlightEndpoint;
use crate::flight_service::session_builder::DistributedSessionBuilderContext;
use crate::protobuf::{stage_from_proto, DistributedCodec, StageExecProto, StageKey};
use crate::protobuf::{DistributedCodec, StageExecProto, StageKey, stage_from_proto};
use arrow_flight::Ticket;
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::error::FlightError;
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::Ticket;
use datafusion::common::exec_datafusion_err;
use datafusion::execution::SendableRecordBatchStream;
use futures::TryStreamExt;
use prost::Message;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tonic::{Request, Response, Status};

#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -152,17 +152,17 @@ fn record_batch_stream_to_response(
#[cfg(test)]
mod tests {
use super::*;
use crate::ExecutionTask;
use crate::flight_service::session_builder::DefaultSessionBuilder;
use crate::protobuf::proto_from_stage;
use crate::ExecutionTask;
use arrow::datatypes::{Schema, SchemaRef};
use arrow_flight::Ticket;
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use prost::{bytes::Bytes, Message};
use prost::{Message, bytes::Bytes};
use tonic::Request;
use uuid::Uuid;

Expand Down
2 changes: 1 addition & 1 deletion src/flight_service/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::common::ttl_map::{TTLMap, TTLMapConfig};
use crate::flight_service::do_get::TaskData;
use crate::flight_service::DistributedSessionBuilder;
use crate::flight_service::do_get::TaskData;
use crate::protobuf::StageKey;
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::{
Expand Down
Loading
Loading