Skip to content

Commit e44ed3d

Browse files
wip
1 parent 95af4da commit e44ed3d

File tree

4 files changed

+1396
-1415
lines changed

4 files changed

+1396
-1415
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/.idea
22
/target
33
/benchmarks/data/
4-
testdata/tpch/data/
4+
testdata/tpch/data/
5+
*/**/*.pending-snap

src/explain.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,17 @@ use datafusion::physical_plan::ExecutionPlan;
1212
use datafusion::physical_plan::display::DisplayableExecutionPlan;
1313
use std::sync::Arc;
1414

15+
/// Rewriter that injects the display context into the [StageExec] nodes in the plan.
1516
pub struct DisplayCtxReWriter {
1617
display_ctx: DisplayCtx,
1718
}
1819

1920
impl DisplayCtxReWriter {
20-
/// Create a new TaskMetricsRewriter. The provided metrics will be used to enrich the plan.
2121
pub fn new(display_ctx: DisplayCtx) -> Self {
2222
Self { display_ctx }
2323
}
2424

25-
/// populate injects the display context into the [StageExec] nodes in the plan.
25+
/// Rewrites the plan to inject the display context into the [StageExec] nodes in the plan.
2626
pub fn rewrite(
2727
mut self,
2828
plan: Arc<dyn ExecutionPlan>,
@@ -52,9 +52,11 @@ impl TreeNodeRewriter for DisplayCtxReWriter {
5252
}
5353

5454
pub fn explain_analyze(executed: Arc<dyn ExecutionPlan>) -> Result<String, DataFusionError> {
55+
// Check if the plan is distributed by looking for a root [StageExec].
5556
let plan = match executed.as_any().downcast_ref::<StageExec>() {
5657
None => executed,
5758
Some(stage_exec) => {
59+
// If the plan was distributed, collect metrics from the coordinating stage exec.
5860
let MetricsCollectorResult {
5961
task_metrics,
6062
mut input_task_metrics,
@@ -72,6 +74,7 @@ pub fn explain_analyze(executed: Arc<dyn ExecutionPlan>) -> Result<String, DataF
7274
);
7375

7476
let display_ctx = DisplayCtx::new(input_task_metrics);
77+
// Inject the display context into the plan.
7578
DisplayCtxReWriter::new(display_ctx).rewrite(executed.clone())?
7679
}
7780
};

src/test_utils/insta.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,56 @@ pub fn settings() -> insta::Settings {
2323
"UUID",
2424
);
2525
settings.add_filter(r"\d+\.\.\d+", "<int>..<int>");
26+
27+
// Metric filters - only replace known metric names to avoid false positives
28+
settings.add_filter(r"output_rows=\d+", "output_rows=<metric>");
29+
settings.add_filter(r"elapsed_compute=[\d.]+[a-zA-Zµnms]+", "elapsed_compute=<metric>");
30+
settings.add_filter(r"spill_count=\d+", "spill_count=<metric>");
31+
settings.add_filter(r"spilled_bytes=[\d.]+\s*[KMGTPE]?B?", "spilled_bytes=<metric>");
32+
settings.add_filter(r"spilled_rows=\d+", "spilled_rows=<metric>");
33+
settings.add_filter(r"current_memory_usage=[\d.]+\s*[KMGTPE]?B?", "current_memory_usage=<metric>");
34+
settings.add_filter(r"start_timestamp=[\d.]+[a-zA-Zµnms]*", "start_timestamp=<metric>");
35+
settings.add_filter(r"end_timestamp=[\d.]+[a-zA-Zµnms]*", "end_timestamp=<metric>");
36+
37+
// Common custom metric patterns
38+
settings.add_filter(r"fetch_time=[\d.]+[a-zA-Zµnms]+", "fetch_time=<metric>");
39+
settings.add_filter(r"repartition_time=[\d.]+[a-zA-Zµnms]+", "repartition_time=<metric>");
40+
settings.add_filter(r"send_time=[\d.]+[a-zA-Zµnms]+", "send_time=<metric>");
41+
settings.add_filter(r"peak_mem_used=\d+", "peak_mem_used=<metric>");
42+
settings.add_filter(r"batches_splitted=\d+", "batches_splitted=<metric>");
43+
settings.add_filter(r"batches_split=\d+", "batches_split=<metric>");
44+
settings.add_filter(r"bytes_scanned=\d+", "bytes_scanned=<metric>");
45+
settings.add_filter(r"file_open_errors=\d+", "file_open_errors=<metric>");
46+
settings.add_filter(r"file_scan_errors=\d+", "file_scan_errors=<metric>");
47+
settings.add_filter(r"files_ranges_pruned_statistics=\d+", "files_ranges_pruned_statistics=<metric>");
48+
settings.add_filter(r"num_predicate_creation_errors=\d+", "num_predicate_creation_errors=<metric>");
49+
settings.add_filter(r"page_index_rows_matched=\d+", "page_index_rows_matched=<metric>");
50+
settings.add_filter(r"page_index_rows_pruned=\d+", "page_index_rows_pruned=<metric>");
51+
settings.add_filter(r"predicate_evaluation_errors=\d+", "predicate_evaluation_errors=<metric>");
52+
settings.add_filter(r"pushdown_rows_matched=\d+", "pushdown_rows_matched=<metric>");
53+
settings.add_filter(r"pushdown_rows_pruned=\d+", "pushdown_rows_pruned=<metric>");
54+
settings.add_filter(r"row_groups_matched_bloom_filter=\d+", "row_groups_matched_bloom_filter=<metric>");
55+
settings.add_filter(r"row_groups_matched_statistics=\d+", "row_groups_matched_statistics=<metric>");
56+
settings.add_filter(r"row_groups_pruned_bloom_filter=\d+", "row_groups_pruned_bloom_filter=<metric>");
57+
settings.add_filter(r"row_groups_pruned_statistics=\d+", "row_groups_pruned_statistics=<metric>");
58+
settings.add_filter(r"bloom_filter_eval_time=[\d.]+[a-zA-Zµnms]+", "bloom_filter_eval_time=<metric>");
59+
settings.add_filter(r"metadata_load_time=[\d.]+[a-zA-Zµnms]+", "metadata_load_time=<metric>");
60+
settings.add_filter(r"page_index_eval_time=[\d.]+[a-zA-Zµnms]+", "page_index_eval_time=<metric>");
61+
settings.add_filter(r"row_pushdown_eval_time=[\d.]+[a-zA-Zµnms]+", "row_pushdown_eval_time=<metric>");
62+
settings.add_filter(r"statistics_eval_time=[\d.]+[a-zA-Zµnms]+", "statistics_eval_time=<metric>");
63+
settings.add_filter(r"time_elapsed_opening=[\d.]+[a-zA-Zµnms]+", "time_elapsed_opening=<metric>");
64+
settings.add_filter(r"time_elapsed_processing=[\d.]+[a-zA-Zµnms]+", "time_elapsed_processing=<metric>");
65+
settings.add_filter(r"time_elapsed_scanning_total=[\d.]+[a-zA-Zµnms]+", "time_elapsed_scanning_total=<metric>");
66+
settings.add_filter(r"time_elapsed_scanning_until_data=[\d.]+[a-zA-Zµnms]+", "time_elapsed_scanning_until_data=<metric>");
67+
settings.add_filter(r"skipped_aggregation_rows=\d+", "skipped_aggregation_rows=<metric>");
68+
settings.add_filter(r"build_input_batches=\d+", "build_input_batches=<metric>");
69+
settings.add_filter(r"build_input_rows=\d+", "build_input_rows=<metric>");
70+
settings.add_filter(r"input_batches=\d+", "input_batches=<metric>");
71+
settings.add_filter(r"input_rows=\d+", "input_rows=<metric>");
72+
settings.add_filter(r"output_batches=\d+", "output_batches=<metric>");
73+
settings.add_filter(r"build_mem_used=\d+", "build_mem_used=<metric>");
74+
settings.add_filter(r"build_time=[\d.]+[a-zA-Zµnms]+", "build_time=<metric>");
75+
settings.add_filter(r"join_time=[\d.]+[a-zA-Zµnms]+", "join_time=<metric>");
2676

2777
settings
2878
}

0 commit comments

Comments
 (0)