Skip to content

Commit 60546c0

Browse files
authored
Tpch bench to report metrics (#2453)
1 parent 970285b commit 60546c0

File tree

9 files changed

+166
-45
lines changed

9 files changed

+166
-45
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench-vortex/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ url = { workspace = true }
7070
uuid = { workspace = true, features = ["v4"] }
7171
vortex = { workspace = true, features = ["object_store", "parquet"] }
7272
vortex-datafusion = { workspace = true }
73+
vortex-metrics = { workspace = true }
7374
xshell = { workspace = true }
7475

7576
[features]

bench-vortex/src/bin/tpch.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::{Duration, Instant};
33

44
use bench_vortex::display::{print_measurements_json, render_table, DisplayFormat, RatioMode};
55
use bench_vortex::measurements::QueryMeasurement;
6+
use bench_vortex::metrics::MetricsSetExt;
67
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
78
use bench_vortex::tpch::duckdb::{generate_tpch, DuckdbTpchOptions};
89
use bench_vortex::tpch::{
@@ -11,6 +12,7 @@ use bench_vortex::tpch::{
1112
};
1213
use bench_vortex::{default_env_filter, feature_flagged_allocator, setup_logger, Format};
1314
use clap::{Parser, ValueEnum};
15+
use datafusion_physical_plan::metrics::{Label, MetricsSet};
1416
use indicatif::ProgressBar;
1517
use itertools::Itertools;
1618
use log::{info, warn};
@@ -48,6 +50,8 @@ struct Args {
4850
emulate_object_store: bool,
4951
#[arg(long, default_value_t, value_enum)]
5052
data_generator: DataGenerator,
53+
#[arg(long)]
54+
all_metrics: bool,
5155
}
5256

5357
#[derive(ValueEnum, Default, Clone, Debug)]
@@ -132,6 +136,7 @@ fn main() -> ExitCode {
132136
args.emulate_object_store,
133137
args.scale_factor,
134138
url,
139+
args.all_metrics,
135140
))
136141
}
137142

@@ -145,6 +150,7 @@ async fn bench_main(
145150
emulate_object_store: bool,
146151
scale_factor: u8,
147152
url: Url,
153+
display_all_metrics: bool,
148154
) -> ExitCode {
149155
let expected_row_counts = if scale_factor == 1 {
150156
EXPECTED_ROW_COUNTS_SF1
@@ -170,6 +176,8 @@ async fn bench_main(
170176
let mut row_counts = Vec::new();
171177
let mut measurements = Vec::new();
172178

179+
let mut metrics = MetricsSet::new();
180+
173181
for format in formats.iter().copied() {
174182
// Load datasets
175183
let ctx = load_datasets(&url, format, emulate_object_store)
@@ -192,9 +200,18 @@ async fn bench_main(
192200
}
193201

194202
for i in 0..2 {
195-
let row_count = run_tpch_query(&ctx, &sql_queries, query_idx).await;
203+
let (row_count, new_metrics) = run_tpch_query(&ctx, &sql_queries, query_idx).await;
196204
if i == 0 {
197205
row_counts.push((query_idx, format, row_count));
206+
for (idx, m) in new_metrics.into_iter().enumerate() {
207+
metrics.merge_all_with_label(
208+
m,
209+
&[
210+
Label::new("query_idx", query_idx.to_string()),
211+
Label::new("vortex_exec_idx", idx.to_string()),
212+
],
213+
);
214+
}
198215
}
199216
}
200217

@@ -271,6 +288,12 @@ async fn bench_main(
271288

272289
match display_format {
273290
DisplayFormat::Table => {
291+
if !display_all_metrics {
292+
metrics = metrics.aggregate();
293+
}
294+
for m in metrics.timestamps_removed().sorted_for_display().iter() {
295+
println!("{}", m);
296+
}
274297
render_table(measurements, &formats, RatioMode::Time).unwrap();
275298
}
276299
DisplayFormat::GhJson => {

bench-vortex/src/lib.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use datafusion::execution::cache::cache_unit::{DefaultFileStatisticsCache, Defau
1717
use datafusion::execution::object_store::DefaultObjectStoreRegistry;
1818
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
1919
use datafusion::prelude::{SessionConfig, SessionContext};
20+
use datafusion_physical_plan::metrics::MetricsSet;
2021
use datafusion_physical_plan::{collect, ExecutionPlan};
2122
use rand::{Rng, SeedableRng as _};
2223
use tracing::level_filters::LevelFilter;
@@ -28,6 +29,7 @@ use vortex::error::VortexResult;
2829
use vortex::sampling_compressor::ALL_ENCODINGS_CONTEXT;
2930
use vortex::validity::Validity;
3031
use vortex::{ContextRef, IntoArray};
32+
use vortex_datafusion::persistent::metrics::VortexMetricsFinder;
3133

3234
pub mod bench_run;
3335
pub mod blob;
@@ -36,6 +38,7 @@ pub mod compress;
3638
pub mod datasets;
3739
pub mod display;
3840
pub mod measurements;
41+
pub mod metrics;
3942
pub mod parquet_reader;
4043
pub mod random_access;
4144
pub mod tpch;
@@ -197,12 +200,18 @@ pub fn default_env_filter(is_verbose: bool) -> EnvFilter {
197200
}
198201
}
199202

200-
pub async fn execute_query(ctx: &SessionContext, query: &str) -> VortexResult<Vec<RecordBatch>> {
203+
pub async fn execute_query(
204+
ctx: &SessionContext,
205+
query: &str,
206+
) -> VortexResult<(Vec<RecordBatch>, Vec<MetricsSet>)> {
201207
let plan = ctx.sql(query).await?;
202208
let (state, plan) = plan.into_parts();
203209
let physical_plan = state.create_physical_plan(&plan).await?;
204210
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
205-
Ok(result)
211+
Ok((
212+
result,
213+
VortexMetricsFinder::find_all(physical_plan.as_ref()),
214+
))
206215
}
207216

208217
pub async fn execute_physical_plan(

bench-vortex/src/metrics.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use std::sync::Arc;
2+
3+
use datafusion_physical_plan::metrics::{Label, MetricValue, MetricsSet};
4+
use datafusion_physical_plan::Metric;
5+
use vortex::aliases::hash_map::HashMap;
6+
7+
pub trait MetricsSetExt {
8+
fn merge_all_with_label(&mut self, other: MetricsSet, labels: &[Label]);
9+
fn aggregate(&self) -> Self;
10+
}
11+
12+
impl MetricsSetExt for MetricsSet {
13+
fn merge_all_with_label(&mut self, other: MetricsSet, labels: &[Label]) {
14+
for m in other.iter() {
15+
let mut new_metric =
16+
Metric::new_with_labels(m.value().clone(), m.partition(), m.labels().into());
17+
for label in labels.iter() {
18+
new_metric = new_metric.with_label(label.clone());
19+
}
20+
self.push(Arc::new(new_metric));
21+
}
22+
}
23+
24+
fn aggregate(&self) -> Self {
25+
let mut map = HashMap::new();
26+
let filtered = self
27+
.iter()
28+
.filter(|m| !m.value().name().ends_with("p99")) // can't aggregate percentiles
29+
.filter(|m| !m.value().name().ends_with("p95"));
30+
for metric in filtered {
31+
let key = metric.value().name();
32+
map.entry(key)
33+
.and_modify(|accum: &mut Metric| {
34+
aggregate_metric(accum.value_mut(), metric.value())
35+
})
36+
.or_insert_with(|| {
37+
let mut accum = Metric::new(metric.value().new_empty(), None);
38+
aggregate_metric(accum.value_mut(), metric.value());
39+
accum
40+
});
41+
}
42+
43+
let mut res = MetricsSet::new();
44+
map.into_iter()
45+
.map(|(_k, v)| Arc::new(v))
46+
.for_each(|m| res.push(m));
47+
res
48+
}
49+
}
50+
51+
fn aggregate_metric(metric: &mut MetricValue, to_aggregate: &MetricValue) {
52+
match (metric, to_aggregate) {
53+
(
54+
MetricValue::Gauge { name, gauge },
55+
MetricValue::Gauge {
56+
gauge: other_gauge, ..
57+
},
58+
) => match name {
59+
_ if name.ends_with("max") => gauge.set_max(other_gauge.value()),
60+
_ if name.ends_with("min") => {
61+
gauge.set(gauge.value().min(other_gauge.value()));
62+
}
63+
_ => gauge.add(other_gauge.value()),
64+
},
65+
(metric, to_aggregate) => metric.aggregate(to_aggregate),
66+
};
67+
}

bench-vortex/src/tpch/execute.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,27 @@
11
use datafusion::prelude::SessionContext;
2+
use datafusion_physical_plan::metrics::MetricsSet;
23

34
use crate::execute_query;
45

5-
pub async fn run_tpch_query(ctx: &SessionContext, queries: &[String], idx: usize) -> usize {
6+
pub async fn run_tpch_query(
7+
ctx: &SessionContext,
8+
queries: &[String],
9+
idx: usize,
10+
) -> (usize, Vec<MetricsSet>) {
611
if idx == 15 {
712
let mut result = None;
813
for (i, q) in queries.iter().enumerate() {
914
if i == 1 {
10-
result = Some(
11-
execute_query(ctx, q)
12-
.await
13-
.unwrap()
14-
.iter()
15-
.map(|r| r.num_rows())
16-
.sum(),
17-
);
15+
let (record_batches, metrics) = execute_query(ctx, q).await.unwrap();
16+
result = Some((record_batches.iter().map(|r| r.num_rows()).sum(), metrics));
1817
} else {
1918
execute_query(ctx, q).await.unwrap();
2019
}
2120
}
2221
result.expect("Must have had a result in 2nd sql statement for query 15")
2322
} else {
2423
let q = &queries[0];
25-
execute_query(ctx, q)
26-
.await
27-
.unwrap()
28-
.iter()
29-
.map(|r| r.num_rows())
30-
.sum()
24+
let (record_batches, metrics) = execute_query(ctx, q).await.unwrap();
25+
(record_batches.iter().map(|r| r.num_rows()).sum(), metrics)
3126
}
3227
}

vortex-datafusion/src/persistent/metrics.rs

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,51 @@
1+
//! Vortex table provider metrics.
12
use std::sync::Arc;
2-
use std::time::Duration;
33

44
use datafusion_physical_plan::metrics::{
55
Count, ExecutionPlanMetricsSet, Gauge, Label as DatafusionLabel,
6-
MetricValue as DatafusionMetricValue, MetricsSet, Time,
6+
MetricValue as DatafusionMetricValue, MetricsSet,
7+
};
8+
use datafusion_physical_plan::{
9+
accept, ExecutionPlan, ExecutionPlanVisitor, Metric as DatafusionMetric,
710
};
8-
use datafusion_physical_plan::Metric as DatafusionMetric;
911
use vortex_metrics::{DefaultTags, Metric, MetricId, Tags, VortexMetrics};
1012

11-
pub static PARTITION_LABEL: &str = "partition";
13+
use super::execution::VortexExec;
14+
15+
pub(crate) static PARTITION_LABEL: &str = "partition";
16+
17+
/// Extracts datafusion metrics from all VortexExec instances in
18+
/// a given physical plan.
19+
#[derive(Default)]
20+
pub struct VortexMetricsFinder(Vec<MetricsSet>);
21+
22+
impl VortexMetricsFinder {
23+
/// find all metrics for VortexExec nodes.
24+
pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
25+
let mut finder = Self::default();
26+
match accept(plan, &mut finder) {
27+
Ok(()) => finder.0,
28+
Err(_) => Vec::new(),
29+
}
30+
}
31+
}
32+
33+
impl ExecutionPlanVisitor for VortexMetricsFinder {
34+
type Error = std::convert::Infallible;
35+
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
36+
if let Some(metrics) = plan
37+
.as_any()
38+
.downcast_ref::<VortexExec>()
39+
.and_then(|exec| exec.metrics())
40+
{
41+
self.0.push(metrics);
42+
}
43+
Ok(true)
44+
}
45+
}
1246

1347
#[derive(Clone, Debug, Default)]
14-
pub struct VortexExecMetrics {
48+
pub(crate) struct VortexExecMetrics {
1549
pub vortex: VortexMetrics,
1650
pub execution_plan: ExecutionPlanMetricsSet,
1751
}
@@ -91,16 +125,17 @@ fn metric_value_to_datafusion(name: &str, metric: &Metric) -> Vec<DatafusionMetr
91125
}
92126
let snapshot = timer.snapshot();
93127
if let Ok(max) = snapshot.max().try_into() {
94-
res.push(df_time(format!("{name}_max"), max));
128+
// NOTE(os): unlike Time metrics, gauges allow custom aggregation
129+
res.push(df_gauge(format!("{name}_max"), max));
95130
}
96131
if let Ok(min) = snapshot.min().try_into() {
97-
res.push(df_time(format!("{name}_min"), min));
132+
res.push(df_gauge(format!("{name}_min"), min));
98133
}
99134
if let Some(p95) = f_to_u(snapshot.value(0.95)) {
100-
res.push(df_time(format!("{name}_p95"), p95 as u64));
135+
res.push(df_gauge(format!("{name}_p95"), p95));
101136
}
102137
if let Some(p99) = f_to_u(snapshot.value(0.95)) {
103-
res.push(df_time(format!("{name}_p99"), p99 as u64));
138+
res.push(df_gauge(format!("{name}_p99"), p99));
104139
}
105140
res
106141
}
@@ -127,15 +162,6 @@ fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
127162
}
128163
}
129164

130-
fn df_time(name: String, nanos: u64) -> DatafusionMetricValue {
131-
let time = Time::new();
132-
time.add_duration(Duration::from_nanos(nanos));
133-
DatafusionMetricValue::Time {
134-
name: name.into(),
135-
time,
136-
}
137-
}
138-
139165
fn f_to_u(f: f64) -> Option<usize> {
140166
(f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(|| f.trunc() as usize)
141167
}

vortex-datafusion/src/persistent/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ mod cache;
33
mod config;
44
mod execution;
55
mod format;
6-
mod metrics;
6+
pub mod metrics;
77
mod opener;
88
mod sink;
99

vortex-file/src/generic.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
1010
use vortex_io::VortexReadAt;
1111
use vortex_layout::scan::ScanDriver;
1212
use vortex_layout::segments::{AsyncSegmentReader, SegmentId};
13-
use vortex_metrics::{Counter, MetricId, VortexMetrics};
13+
use vortex_metrics::{Counter, VortexMetrics};
1414

1515
use crate::footer::{FileLayout, Segment};
1616
use crate::segments::channel::SegmentChannel;
@@ -364,14 +364,13 @@ struct CoalescingMetrics {
364364

365365
impl From<VortexMetrics> for CoalescingMetrics {
366366
fn from(metrics: VortexMetrics) -> Self {
367-
let byte_ranges = MetricId::new("vortex.scan.requests.bytes");
368-
let requests = MetricId::new("vortex.scan.requests.count");
367+
const BYTES: &str = "vortex.scan.requests.bytes";
368+
const COUNT: &str = "vortex.scan.requests.count";
369369
Self {
370-
bytes_uncoalesced: metrics.counter(byte_ranges.clone().with_tag("kind", "uncoalesced")),
371-
bytes_coalesced: metrics.counter(byte_ranges.with_tag("kind", "coalesced")),
372-
request_count_uncoalesced: metrics
373-
.counter(requests.clone().with_tag("kind", "uncoalesced")),
374-
request_count_coalesced: metrics.counter(requests.with_tag("kind", "coalesced")),
370+
bytes_uncoalesced: metrics.counter(format!("{BYTES}.uncoalesced")),
371+
bytes_coalesced: metrics.counter(format!("{BYTES}.coalesced")),
372+
request_count_uncoalesced: metrics.counter(format!("{COUNT}.uncoalesced")),
373+
request_count_coalesced: metrics.counter(format!("{COUNT}.coalesced")),
375374
}
376375
}
377376
}

0 commit comments

Comments
 (0)