Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,065 changes: 580 additions & 485 deletions Cargo.lock

Large diffs are not rendered by default.

38 changes: 21 additions & 17 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
members = ["benchmarks"]

[workspace.dependencies]
datafusion = { version = "50.0.0", default-features = false }
datafusion-proto = { version = "50.0.0" }
datafusion = { version = "51.0.0", default-features = false }
datafusion-proto = { version = "51.0.0" }

[package]
name = "datafusion-distributed"
Expand All @@ -12,14 +12,18 @@ edition = "2024"

[dependencies]
chrono = { version = "0.4.42" }
datafusion = { workspace = true }
datafusion = { workspace = true, features = [
"parquet",
"sql",
"unicode_expressions",
"datetime_expressions",
] }
datafusion-proto = { workspace = true }
arrow-flight = "56.1.0"
arrow-select = "56.1.0"
arrow-flight = "57.0.0"
arrow-select = "57.0.0"
async-trait = "0.1.88"
tokio = { version = "1.46.1", features = ["full"] }
# Updated to 0.13.1 to match arrow-flight 56.1.0
tonic = { version = "0.13.1", features = ["transport"] }
tonic = { version = "0.14.2", features = ["transport"] }
Comment on lines -22 to +26
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 any chance to leave this as 0.13?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I checked and nope. The best thing we can do is leave it at 0.14.1

tower = "0.5.2"
http = "1.3.1"
itertools = "0.14.0"
Expand All @@ -28,18 +32,18 @@ url = "2.5.4"
uuid = "1.17.0"
delegate = "0.13.4"
dashmap = "6.1.0"
prost = "0.13.5"
prost = "0.14.0"
rand = "0.8.5"
object_store = "0.12.3"
bytes = "1.10.1"
pin-project = "1.1.10"

# integration_tests deps
insta = { version = "1.43.1", features = ["filters"], optional = true }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68", optional = true }
parquet = { version = "56.1.0", optional = true }
arrow = { version = "56.1.0", optional = true }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf", optional = true }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf", optional = true }
parquet = { version = "57.0.0", optional = true }
arrow = { version = "57.0.0", optional = true }
tokio-stream = { version = "0.1.17", optional = true }
hyper-util = { version = "0.1.16", optional = true }
pretty_assertions = { version = "1.4", optional = true }
Expand All @@ -53,18 +57,18 @@ integration = [
"arrow",
"tokio-stream",
"hyper-util",
"pretty_assertions"
"pretty_assertions",
]

tpch = ["integration"]

[dev-dependencies]
structopt = "0.3"
insta = { version = "1.43.1", features = ["filters"] }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "482ee68" }
parquet = "56.1.0"
arrow = "56.1.0"
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" }
parquet = "57.0.0"
arrow = "57.0.0"
tokio-stream = "0.1.17"
hyper-util = "0.1.16"
pretty_assertions = "1.4"
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.86.0"
channel = "1.88.0"
profile = "default"
8 changes: 4 additions & 4 deletions src/common/ttl_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ mod tests {
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
panic!("Assertion failed within {:?}", timeout);
panic!("Assertion failed within {timeout:?}");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
Expand Down Expand Up @@ -543,9 +543,9 @@ mod tests {
let elapsed = start_time.elapsed();

println!("\n=== TTLMap Lock Contention Benchmark ===");
println!("Tasks: {}", task_count);
println!("Total time: {:.2?}", elapsed);
println!("Average latency: {:.2} ns per operation", avg_time);
println!("Tasks: {task_count}");
println!("Total time: {elapsed:.2?}");
println!("Average latency: {avg_time:.2} ns per operation");
println!("Entries remaining: {}", ttl_map.data.len());
println!(
"DashMap Lock contention time: {}ms",
Expand Down
4 changes: 2 additions & 2 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub trait DistributedExt: Sized {
/// ```
/// # use std::sync::Arc;
/// # use datafusion::common::DataFusionError;
/// # use datafusion::execution::{SessionState, FunctionRegistry, SessionStateBuilder};
/// # use datafusion::execution::{SessionState, FunctionRegistry, SessionStateBuilder, TaskContext};
/// # use datafusion::physical_plan::ExecutionPlan;
/// # use datafusion::prelude::SessionConfig;
/// # use datafusion_proto::physical_plan::PhysicalExtensionCodec;
Expand All @@ -147,7 +147,7 @@ pub trait DistributedExt: Sized {
/// struct CustomExecCodec;
///
/// impl PhysicalExtensionCodec for CustomExecCodec {
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], registry: &dyn FunctionRegistry) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
/// todo!()
/// }
///
Expand Down
1 change: 1 addition & 0 deletions src/execution_plans/network_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ use tonic::metadata::MetadataMap;
/// 2. Ready: runs within a distributed stage and queries the next input stage over the network
/// using Arrow Flight.
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum NetworkShuffleExec {
Pending(NetworkShufflePendingExec),
Ready(NetworkShuffleReadyExec),
Expand Down
6 changes: 3 additions & 3 deletions src/execution_plans/partition_isolator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use std::{fmt::Formatter, sync::Arc};
/// └───────────────────────────┘ └───────────────────────────┘ └───────────────────────────┘ ■
/// ```
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum PartitionIsolatorExec {
Pending(PartitionIsolatorPendingExec),
Ready(PartitionIsolatorReadyExec),
Expand Down Expand Up @@ -223,7 +224,7 @@ impl ExecutionPlan for PartitionIsolatorExec {
// then look up that index in our group and execute that partition, in this
// example partition 8

let output_stream = match partition_group.get(partition) {
match partition_group.get(partition) {
Some(actual_partition_number) => {
if *actual_partition_number >= input_partitions {
//trace!("{} returning empty stream", ctx_name);
Expand All @@ -239,8 +240,7 @@ impl ExecutionPlan for PartitionIsolatorExec {
Box::pin(EmptyRecordBatchStream::new(self_ready.input.schema()))
as SendableRecordBatchStream,
),
};
output_stream
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ArrowFlightEndpoint {
let stage_data = once
.get_or_try_init(|| async {
let proto_node = PhysicalPlanNode::try_decode(doget.plan_proto.as_ref())?;
let mut plan = proto_node.try_into_physical_plan(&ctx, &self.runtime, &codec)?;
let mut plan = proto_node.try_into_physical_plan(&ctx.task_ctx(), &codec)?;
for hook in self.hooks.on_plan.iter() {
plan = hook(plan)
}
Expand Down
4 changes: 2 additions & 2 deletions src/flight_service/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub trait DistributedSessionBuilder {
/// # use std::sync::Arc;
/// # use async_trait::async_trait;
/// # use datafusion::error::DataFusionError;
/// # use datafusion::execution::{FunctionRegistry, SessionState, SessionStateBuilder};
/// # use datafusion::execution::{FunctionRegistry, SessionState, SessionStateBuilder, TaskContext};
/// # use datafusion::physical_plan::ExecutionPlan;
/// # use datafusion_proto::physical_plan::PhysicalExtensionCodec;
/// # use datafusion_distributed::{DistributedExt, DistributedSessionBuilder, DistributedSessionBuilderContext};
Expand All @@ -34,7 +34,7 @@ pub trait DistributedSessionBuilder {
/// struct CustomExecCodec;
///
/// impl PhysicalExtensionCodec for CustomExecCodec {
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], registry: &dyn FunctionRegistry) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
/// fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
/// todo!()
/// }
///
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/metrics_collecting_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where

let metadata =
FlightAppMetadata::decode(flight_data.app_metadata.as_ref()).map_err(|e| {
FlightError::ProtocolError(format!("failed to decode app_metadata: {}", e))
FlightError::ProtocolError(format!("failed to decode app_metadata: {e}"))
})?;

let Some(content) = metadata.content else {
Expand Down
26 changes: 14 additions & 12 deletions src/metrics/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ pub fn df_metrics_set_to_proto(
Err(err) => {
// Check if this is the specific custom metrics error we want to filter out
if let DataFusionError::Internal(msg) = &err {
if msg == CUSTOM_METRICS_NOT_SUPPORTED {
// Filter out custom metrics error - continue processing other metrics
if msg == CUSTOM_METRICS_NOT_SUPPORTED || msg == UNSUPPORTED_METRICS {
// Filter out custom/unsupported metrics error - continue processing other metrics
continue;
}
}
Expand Down Expand Up @@ -191,6 +191,9 @@ pub fn metrics_set_proto_to_df(
const CUSTOM_METRICS_NOT_SUPPORTED: &str =
"custom metrics are not supported in metrics proto conversion";

/// New DataFusion metrics that are not yet supported in proto conversion.
const UNSUPPORTED_METRICS: &str = "metric type not supported in proto conversion";

/// df_metric_to_proto converts a `datafusion::physical_plan::metrics::Metric` to a `MetricProto`. It does not consume the Arc<Metric>.
pub fn df_metric_to_proto(metric: Arc<Metric>) -> Result<MetricProto, DataFusionError> {
let partition = metric.partition().map(|p| p as u64);
Expand Down Expand Up @@ -285,6 +288,10 @@ pub fn df_metric_to_proto(metric: Arc<Metric>) -> Result<MetricProto, DataFusion
labels,
}),
MetricValue::Custom { .. } => internal_err!("{}", CUSTOM_METRICS_NOT_SUPPORTED),
MetricValue::OutputBytes(_) | MetricValue::PruningMetrics { .. } | MetricValue::Ratio { .. } => {
// TODO: Support these metrics
internal_err!("{}", UNSUPPORTED_METRICS)
Comment on lines +292 to +293
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth opening an issue for supporting this ones.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
}

Expand Down Expand Up @@ -433,38 +440,33 @@ mod tests {
let roundtrip_count = roundtrip_metrics_set.iter().count();
assert_eq!(
original_count, roundtrip_count,
"roundtrip should preserve metrics count for {}",
test_name
"roundtrip should preserve metrics count for {test_name}"
);

// Verify equivalence of each metric.
for (original, roundtrip) in metrics_set.iter().zip(roundtrip_metrics_set.iter()) {
assert_eq!(
original.partition(),
roundtrip.partition(),
"partition mismatch in {}",
test_name
"partition mismatch in {test_name}"
);

assert_eq!(
original.labels().len(),
roundtrip.labels().len(),
"label count mismatch in {}",
test_name
"label count mismatch in {test_name}"
);

for (orig_label, rt_label) in original.labels().iter().zip(roundtrip.labels().iter()) {
assert_eq!(
orig_label.name(),
rt_label.name(),
"label name mismatch in {}",
test_name
"label name mismatch in {test_name}"
);
assert_eq!(
orig_label.value(),
rt_label.value(),
"label value mismatch in {}",
test_name
"label value mismatch in {test_name}"
);
}

Expand Down
Loading
Loading