Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,14 @@ mod tests {
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ CoalesceBatchesExec: target_batch_size=8192
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
Expand All @@ -526,14 +526,14 @@ mod tests {
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ CoalesceBatchesExec: target_batch_size=8192
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
Expand Down Expand Up @@ -598,13 +598,13 @@ mod tests {
│ CoalesceBatchesExec: target_batch_size=8192
│ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
│ CoalesceBatchesExec: target_batch_size=8192
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
│ CoalesceBatchesExec: target_batch_size=8192
Expand All @@ -613,7 +613,7 @@ mod tests {
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
└──────────────────────────────────────────────────
┌───── Stage 3 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3]
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
│ CoalesceBatchesExec: target_batch_size=8192
Expand All @@ -634,7 +634,7 @@ mod tests {
│ SortPreservingMergeExec: [MinTemp@0 DESC]
│ [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0,p1] t1:[p2,p3]
┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
│ SortExec: expr=[MinTemp@0 DESC], preserve_partitioning=[true]
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
Expand All @@ -651,12 +651,12 @@ mod tests {
│ CoalescePartitionsExec
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
│ CoalesceBatchesExec: target_batch_size=8192
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
Expand Down Expand Up @@ -710,6 +710,6 @@ mod tests {
let df = ctx.sql(query).await?;

let physical_plan = df.create_physical_plan().await?;
Ok(display_plan_ascii(physical_plan.as_ref()))
Ok(display_plan_ascii(physical_plan.as_ref(), false))
}
}
38 changes: 33 additions & 5 deletions src/execution_plans/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ use crate::execution_plans::common::require_one_child;
use crate::protobuf::DistributedCodec;
use crate::stage::{ExecutionTask, Stage};
use datafusion::common::exec_err;
use datafusion::common::internal_datafusion_err;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use rand::Rng;
use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;
use std::sync::Mutex;
use url::Url;

/// [ExecutionPlan] that executes the inner plan in distributed mode.
Expand All @@ -20,14 +23,30 @@ use url::Url;
/// channel resolver and assigned to each task in each stage.
/// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them
/// over the wire.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct DistributedExec {
pub plan: Arc<dyn ExecutionPlan>,
pub prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
}

impl DistributedExec {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self { plan }
Self {
plan,
prepared_plan: Arc::new(Mutex::new(None)),
}
}

/// Returns the plan which is lazily prepared on execute() and actually gets executed.
/// It is updated on every call to execute(). Returns an error if .execute() has not been called.
pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
self.prepared_plan
.lock()
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?
.clone()
.ok_or_else(|| {
internal_datafusion_err!("No prepared plan found. Was execute() called?")
})
}

fn prepare_plan(
Expand Down Expand Up @@ -98,7 +117,8 @@ impl ExecutionPlan for DistributedExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(DistributedExec {
plan: require_one_child(children)?,
plan: require_one_child(&children)?,
prepared_plan: self.prepared_plan.clone(),
}))
}

Expand All @@ -120,7 +140,15 @@ impl ExecutionPlan for DistributedExec {
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
let codec = DistributedCodec::new_combined_with_user(context.session_config());

let plan = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
plan.execute(partition, context)
let prepared = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
{
let mut guard = self
.prepared_plan
.lock()
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?;
*guard = Some(prepared.clone());
}

prepared.execute(partition, context)
}
}
4 changes: 2 additions & 2 deletions src/execution_plans/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::fmt::{Debug, Formatter};
/// A transparent wrapper that delegates all execution to its child but returns custom metrics. This node is invisible during display.
/// The structure of a plan tree is closely tied to the [TaskMetricsRewriter].
pub struct MetricsWrapperExec {
inner: Arc<dyn ExecutionPlan>,
pub(crate) inner: Arc<dyn ExecutionPlan>,
/// metrics for this plan node.
metrics: MetricsSet,
/// children is initially None. When used by the [TaskMetricsRewriter], the children will be updated
Expand Down Expand Up @@ -53,7 +53,7 @@ impl ExecutionPlan for MetricsWrapperExec {
}

fn properties(&self) -> &PlanProperties {
unimplemented!("MetricsWrapperExec does not implement properties")
self.inner.properties()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
Expand Down
5 changes: 3 additions & 2 deletions src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ impl ArrowFlightEndpoint {
let stream = map_last_stream(stream, move |last| {
if num_partitions_remaining.fetch_sub(1, Ordering::SeqCst) == 1 {
task_data_entries.remove(key.clone());
return last.and_then(|el| collect_and_create_metrics_flight_data(key, plan, el));
}
last.and_then(|el| collect_and_create_metrics_flight_data(key, plan, el))
last
});

Ok(Response::new(Box::pin(stream.map_err(|err| match err {
Expand All @@ -164,7 +165,7 @@ fn collect_and_create_metrics_flight_data(
plan: Arc<dyn ExecutionPlan>,
incoming: FlightData,
) -> Result<FlightData, FlightError> {
// Get the metrics for the task executed on this worker. Separately, collect metrics for child tasks.
// Get the metrics for the task executed on this worker + child tasks.
let mut result = TaskMetricsCollector::new()
.collect(plan)
.map_err(|err| FlightError::ProtocolError(err.to_string()))?;
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub use flight_service::{
DistributedSessionBuilderContext, MappedDistributedSessionBuilder,
MappedDistributedSessionBuilderExt,
};
pub use metrics::rewrite_distributed_plan_with_metrics;
pub use stage::{
DistributedTaskContext, ExecutionTask, Stage, display_plan_ascii, display_plan_graphviz,
explain_analyze,
};
9 changes: 6 additions & 3 deletions src/metrics/metrics_collecting_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ mod tests {
tasks: vec![TaskMetrics {
stage_key: Some(stage_key.clone()),
// use the task number to seed the test metrics set for convenience
metrics: vec![make_test_metrics_set_proto_from_seed(stage_key.task_number)],
metrics: vec![make_test_metrics_set_proto_from_seed(
stage_key.task_number,
4,
)],
}],
})),
})
Expand Down Expand Up @@ -200,7 +203,7 @@ mod tests {
assert_eq!(collected_metrics.len(), 1);
assert_eq!(
collected_metrics[0],
make_test_metrics_set_proto_from_seed(stage_key.task_number)
make_test_metrics_set_proto_from_seed(stage_key.task_number, 4)
);
}
}
Expand All @@ -209,7 +212,7 @@ mod tests {
async fn test_metrics_collecting_stream_error_missing_stage_key() {
let task_metrics_with_no_stage_key = TaskMetrics {
stage_key: None,
metrics: vec![make_test_metrics_set_proto_from_seed(1)],
metrics: vec![make_test_metrics_set_proto_from_seed(1, 4)],
};

let invalid_app_metadata = FlightAppMetadata {
Expand Down
3 changes: 2 additions & 1 deletion src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pub(crate) mod proto;
mod task_metrics_collector;
mod task_metrics_rewriter;
pub(crate) use metrics_collecting_stream::MetricsCollectingStream;
pub(crate) use task_metrics_collector::TaskMetricsCollector;
pub(crate) use task_metrics_collector::{MetricsCollectorResult, TaskMetricsCollector};
pub use task_metrics_rewriter::rewrite_distributed_plan_with_metrics;
12 changes: 12 additions & 0 deletions src/metrics/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ pub struct MetricsSetProto {
pub metrics: Vec<MetricProto>,
}

impl MetricsSetProto {
pub fn new() -> Self {
Self {
metrics: Vec::new(),
}
}

pub fn push(&mut self, metric: MetricProto) {
self.metrics.push(metric)
}
}

/// MetricValueProto is a protobuf mirror of the [datafusion::physical_plan::metrics::MetricValue] enum.
#[derive(Clone, PartialEq, Eq, ::prost::Oneof)]
pub enum MetricValueProto {
Expand Down
36 changes: 12 additions & 24 deletions src/metrics/task_metrics_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,25 +232,7 @@ mod tests {
ctx
}

/// runs a sql query and returns the coordinator StageExec
async fn plan_sql(ctx: &SessionContext, sql: &str) -> DistributedExec {
let df = ctx.sql(sql).await.unwrap();
let physical_distributed = df.create_physical_plan().await.unwrap();

let stage_exec = match physical_distributed
.as_any()
.downcast_ref::<DistributedExec>()
{
Some(stage_exec) => stage_exec.clone(),
None => panic!(
"expected StageExec from distributed optimization, got: {}",
physical_distributed.name()
),
};
stage_exec
}

async fn execute_plan(stage_exec: &DistributedExec, ctx: &SessionContext) {
async fn execute_plan(stage_exec: Arc<dyn ExecutionPlan>, ctx: &SessionContext) {
let task_ctx = ctx.task_ctx();
let stream = stage_exec.execute(0, task_ctx).unwrap();

Expand All @@ -270,21 +252,27 @@ mod tests {
async fn run_metrics_collection_e2e_test(sql: &str) {
// Plan and execute the query
let ctx = make_test_ctx().await;
let stage_exec = plan_sql(&ctx, sql).await;
execute_plan(&stage_exec, &ctx).await;
let df = ctx.sql(sql).await.unwrap();
let plan = df.create_physical_plan().await.unwrap();
execute_plan(plan.clone(), &ctx).await;

let dist_exec = plan
.as_any()
.downcast_ref::<DistributedExec>()
.expect("expected DistributedExec");

// Assert to ensure the distributed test case is sufficiently complex.
let (stages, expected_stage_keys) = get_stages_and_stage_keys(&stage_exec);
let (stages, expected_stage_keys) = get_stages_and_stage_keys(dist_exec);
assert!(
expected_stage_keys.len() > 1,
"expected more than 1 stage key in test. the plan was not distributed):\n{}",
DisplayableExecutionPlan::new(&stage_exec).indent(true)
DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
);

// Collect metrics for all tasks from the root StageExec.
let collector = TaskMetricsCollector::new();

let result = collector.collect(stage_exec.plan.clone()).unwrap();
let result = collector.collect(dist_exec.plan.clone()).unwrap();

// Ensure that there's metrics for each node for each task for each stage.
for expected_stage_key in expected_stage_keys {
Expand Down
Loading