Skip to content

Commit 2c2645c

Browse files
wip
1 parent 1f29ca6 commit 2c2645c

File tree

10 files changed

+941
-2137
lines changed

10 files changed

+941
-2137
lines changed

src/execution_plans/distributed.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
33
use crate::execution_plans::common::require_one_child;
4+
use crate::protobuf::DistributedCodec;
5+
use crate::stage::{DisplayCtx, MaybeEncodedPlan};
46
use crate::stage::{ExecutionTask, Stage};
5-
use crate::protobuf::{DistributedCodec, StageKey};
6-
use crate::stage::DisplayCtx;
7-
use bytes::Bytes;
87
use datafusion::common::exec_err;
98
use datafusion::common::tree_node::{Transformed, TreeNode};
109
use datafusion::error::DataFusionError;
@@ -43,11 +42,12 @@ impl DistributedExec {
4342

4443
/// Returns a special stage key used to identify the root "stage" of the distributed plan.
4544
/// TODO: reconcile this with display_plan_graphviz
46-
pub(crate) fn to_stage_key(&self) -> StageKey {
47-
StageKey {
48-
query_id: Bytes::new(),
49-
stage_id: 0_u64,
50-
task_number: 0,
45+
pub(crate) fn synthetic_stage(&self) -> Stage {
46+
Stage {
47+
query_id: uuid::Uuid::nil(),
48+
num: 0,
49+
plan: MaybeEncodedPlan::Decoded(self.plan.clone()),
50+
tasks: vec![ExecutionTask { url: None }],
5151
}
5252
}
5353

src/execution_plans/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl ExecutionPlan for MetricsWrapperExec {
5353
}
5454

5555
fn properties(&self) -> &PlanProperties {
56-
unimplemented!("MetricsWrapperExec does not implement properties")
56+
self.inner.properties()
5757
}
5858

5959
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {

src/explain.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use crate::metrics::MetricsCollectorResult;
44
use crate::metrics::TaskMetricsCollector;
55
use crate::metrics::proto::MetricsSetProto;
66
use crate::metrics::proto::df_metrics_set_to_proto;
7+
use crate::metrics::rewrite_distributed_plan_with_metrics;
8+
use crate::protobuf::StageKey;
79
use crate::stage::DisplayCtx;
10+
use bytes::Bytes;
811
use datafusion::error::DataFusionError;
912
use datafusion::physical_plan::ExecutionPlan;
1013
use datafusion::physical_plan::display::DisplayableExecutionPlan;
@@ -25,15 +28,28 @@ pub fn explain_analyze(executed: Arc<dyn ExecutionPlan>) -> Result<String, DataF
2528
mut input_task_metrics,
2629
} = TaskMetricsCollector::new().collect(dist_exec.pepared_plan()?)?;
2730

31+
// let input_stage = dist_exec.pepared_plan()?
32+
let fake_stage = dist_exec.synthetic_stage();
33+
let stage_key = StageKey {
34+
query_id: Bytes::from(fake_stage.query_id.as_bytes().to_vec()),
35+
stage_id: fake_stage.num as u64,
36+
task_number: 0,
37+
};
2838
input_task_metrics.insert(
29-
dist_exec.to_stage_key(),
39+
stage_key,
3040
task_metrics
3141
.into_iter()
3242
.map(|metrics| df_metrics_set_to_proto(&metrics))
3343
.collect::<Result<Vec<MetricsSetProto>, DataFusionError>>()?,
3444
);
3545

36-
let display_ctx = DisplayCtx::new(input_task_metrics);
46+
let executed = rewrite_distributed_plan_with_metrics(
47+
executed.clone(),
48+
Arc::new(input_task_metrics),
49+
)?;
50+
let dist_exec = executed.as_any().downcast_ref::<DistributedExec>().unwrap();
51+
52+
let display_ctx = DisplayCtx::new().with_metrics();
3753
Ok(display_plan_ascii(&dist_exec.with_display_ctx(display_ctx)))
3854
}
3955
}

src/flight_service/do_get.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ fn collect_and_create_metrics_flight_data(
165165
plan: Arc<dyn ExecutionPlan>,
166166
incoming: FlightData,
167167
) -> Result<FlightData, FlightError> {
168-
// Get the metrics for the task executed on this worker. Separately, collect metrics for child tasks.
168+
// Get the metrics for the task executed on this worker + child tasks.
169169
let mut result = TaskMetricsCollector::new()
170170
.collect(plan)
171171
.map_err(|err| FlightError::ProtocolError(err.to_string()))?;

src/metrics/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ mod task_metrics_collector;
44
mod task_metrics_rewriter;
55
pub(crate) use metrics_collecting_stream::MetricsCollectingStream;
66
pub(crate) use task_metrics_collector::{MetricsCollectorResult, TaskMetricsCollector};
7-
pub(crate) use task_metrics_rewriter::TaskMetricsRewriter;
7+
#[allow(unused_imports)]
8+
pub use task_metrics_rewriter::StageMetricsRewriter;
9+
pub(crate) use task_metrics_rewriter::rewrite_distributed_plan_with_metrics;

src/metrics/proto.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ pub struct MetricsSetProto {
2626
pub metrics: Vec<MetricProto>,
2727
}
2828

29+
impl MetricsSetProto {
30+
pub fn new() -> Self {
31+
Self {
32+
metrics: Vec::new(),
33+
}
34+
}
35+
36+
pub fn push(&mut self, metric: MetricProto) {
37+
self.metrics.push(metric)
38+
}
39+
}
40+
2941
/// MetricValueProto is a protobuf mirror of the [datafusion::physical_plan::metrics::MetricValue] enum.
3042
#[derive(Clone, PartialEq, Eq, ::prost::Oneof)]
3143
pub enum MetricValueProto {

0 commit comments

Comments
 (0)