Skip to content

Commit 1bb3711

Browse files
wip
1 parent 1f29ca6 commit 1bb3711

File tree

12 files changed

+1031
-2191
lines changed

12 files changed

+1031
-2191
lines changed

src/execution_plans/distributed.rs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
33
use crate::execution_plans::common::require_one_child;
4-
use crate::stage::{ExecutionTask, Stage};
5-
use crate::protobuf::{DistributedCodec, StageKey};
4+
use crate::protobuf::DistributedCodec;
65
use crate::stage::DisplayCtx;
7-
use bytes::Bytes;
6+
use crate::stage::{ExecutionTask, Stage};
87
use datafusion::common::exec_err;
98
use datafusion::common::tree_node::{Transformed, TreeNode};
109
use datafusion::error::DataFusionError;
@@ -41,25 +40,15 @@ impl DistributedExec {
4140
}
4241
}
4342

44-
/// Returns a special stage key used to identify the root "stage" of the distributed plan.
45-
/// TODO: reconcile this with display_plan_graphviz
46-
pub(crate) fn to_stage_key(&self) -> StageKey {
47-
StageKey {
48-
query_id: Bytes::new(),
49-
stage_id: 0_u64,
50-
task_number: 0,
51-
}
52-
}
53-
5443
pub(crate) fn with_display_ctx(&self, display_ctx: DisplayCtx) -> Self {
5544
Self {
5645
display_ctx: Some(display_ctx),
5746
..self.clone()
5847
}
5948
}
6049

61-
/// Returns the prepared plan which is lazily prepared on execute(). It is updated on every
62-
/// call. Returns an error if .execute() has not been called.
50+
/// Returns the plan which is lazily prepared on execute() and actually gets executed.
51+
/// It is updated on every call to execute(). Returns an error if .execute() has not been called.
6352
pub(crate) fn pepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
6453
self.prepared_plan
6554
.lock()

src/execution_plans/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl ExecutionPlan for MetricsWrapperExec {
5353
}
5454

5555
fn properties(&self) -> &PlanProperties {
56-
unimplemented!("MetricsWrapperExec does not implement properties")
56+
self.inner.properties()
5757
}
5858

5959
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {

src/explain.rs

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,20 @@
11
use crate::display_plan_ascii;
22
use crate::execution_plans::DistributedExec;
3-
use crate::metrics::MetricsCollectorResult;
4-
use crate::metrics::TaskMetricsCollector;
5-
use crate::metrics::proto::MetricsSetProto;
6-
use crate::metrics::proto::df_metrics_set_to_proto;
7-
use crate::stage::DisplayCtx;
3+
use crate::metrics::rewrite_distributed_plan_with_metrics;
84
use datafusion::error::DataFusionError;
95
use datafusion::physical_plan::ExecutionPlan;
106
use datafusion::physical_plan::display::DisplayableExecutionPlan;
117
use std::sync::Arc;
128

139
/// explain_analyze renders an [ExecutionPlan] with metrics.
1410
pub fn explain_analyze(executed: Arc<dyn ExecutionPlan>) -> Result<String, DataFusionError> {
15-
// Check if the plan is distributed by looking for a root DistributedExec.
1611
match executed.as_any().downcast_ref::<DistributedExec>() {
1712
None => Ok(DisplayableExecutionPlan::with_metrics(executed.as_ref())
1813
.indent(true)
1914
.to_string()),
20-
Some(dist_exec) => {
21-
// If the plan was distributed, collect metrics from the coordinating stage exec.
22-
// TODO: Should we move this into the DistributedExec itself or a new ExplainAnalyzeExec?
23-
let MetricsCollectorResult {
24-
task_metrics,
25-
mut input_task_metrics,
26-
} = TaskMetricsCollector::new().collect(dist_exec.pepared_plan()?)?;
27-
28-
input_task_metrics.insert(
29-
dist_exec.to_stage_key(),
30-
task_metrics
31-
.into_iter()
32-
.map(|metrics| df_metrics_set_to_proto(&metrics))
33-
.collect::<Result<Vec<MetricsSetProto>, DataFusionError>>()?,
34-
);
35-
36-
let display_ctx = DisplayCtx::new(input_task_metrics);
37-
Ok(display_plan_ascii(&dist_exec.with_display_ctx(display_ctx)))
15+
Some(_) => {
16+
let executed = rewrite_distributed_plan_with_metrics(executed.clone())?;
17+
Ok(display_plan_ascii(executed.as_ref()))
3818
}
3919
}
4020
}

src/flight_service/do_get.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ fn collect_and_create_metrics_flight_data(
165165
plan: Arc<dyn ExecutionPlan>,
166166
incoming: FlightData,
167167
) -> Result<FlightData, FlightError> {
168-
// Get the metrics for the task executed on this worker. Separately, collect metrics for child tasks.
168+
// Get the metrics for the task executed on this worker + child tasks.
169169
let mut result = TaskMetricsCollector::new()
170170
.collect(plan)
171171
.map_err(|err| FlightError::ProtocolError(err.to_string()))?;

src/metrics/metrics_collecting_stream.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,10 @@ mod tests {
155155
tasks: vec![TaskMetrics {
156156
stage_key: Some(stage_key.clone()),
157157
// use the task number to seed the test metrics set for convenience
158-
metrics: vec![make_test_metrics_set_proto_from_seed(stage_key.task_number)],
158+
metrics: vec![make_test_metrics_set_proto_from_seed(
159+
stage_key.task_number,
160+
4,
161+
)],
159162
}],
160163
})),
161164
})
@@ -200,7 +203,7 @@ mod tests {
200203
assert_eq!(collected_metrics.len(), 1);
201204
assert_eq!(
202205
collected_metrics[0],
203-
make_test_metrics_set_proto_from_seed(stage_key.task_number)
206+
make_test_metrics_set_proto_from_seed(stage_key.task_number, 4)
204207
);
205208
}
206209
}
@@ -209,7 +212,7 @@ mod tests {
209212
async fn test_metrics_collecting_stream_error_missing_stage_key() {
210213
let task_metrics_with_no_stage_key = TaskMetrics {
211214
stage_key: None,
212-
metrics: vec![make_test_metrics_set_proto_from_seed(1)],
215+
metrics: vec![make_test_metrics_set_proto_from_seed(1, 4)],
213216
};
214217

215218
let invalid_app_metadata = FlightAppMetadata {

src/metrics/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ mod task_metrics_collector;
44
mod task_metrics_rewriter;
55
pub(crate) use metrics_collecting_stream::MetricsCollectingStream;
66
pub(crate) use task_metrics_collector::{MetricsCollectorResult, TaskMetricsCollector};
7-
pub(crate) use task_metrics_rewriter::TaskMetricsRewriter;
7+
#[allow(unused_imports)]
8+
pub use task_metrics_rewriter::StageMetricsRewriter;
9+
pub(crate) use task_metrics_rewriter::rewrite_distributed_plan_with_metrics;

src/metrics/proto.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ pub struct MetricsSetProto {
2626
pub metrics: Vec<MetricProto>,
2727
}
2828

29+
impl MetricsSetProto {
30+
pub fn new() -> Self {
31+
Self {
32+
metrics: Vec::new(),
33+
}
34+
}
35+
36+
pub fn push(&mut self, metric: MetricProto) {
37+
self.metrics.push(metric)
38+
}
39+
}
40+
2941
/// MetricValueProto is a protobuf mirror of the [datafusion::physical_plan::metrics::MetricValue] enum.
3042
#[derive(Clone, PartialEq, Eq, ::prost::Oneof)]
3143
pub enum MetricValueProto {

0 commit comments

Comments
 (0)