Skip to content

Commit 4b3bd73

Browse files
authored
feat: add per-partition convert, result cache metrics (#7539)
* fix: show convert cost in explain analyze verbose Signed-off-by: evenyag <realevenyag@gmail.com> * fix: increase puffin metadata cache metric Signed-off-by: evenyag <realevenyag@gmail.com> * feat: add result cache hit/miss to filter metrics Signed-off-by: evenyag <realevenyag@gmail.com> * feat: print flat format in debug Signed-off-by: evenyag <realevenyag@gmail.com> * test: update sqlness test Signed-off-by: evenyag <realevenyag@gmail.com> * feat: make scan cost contains part/reader build cost Signed-off-by: evenyag <realevenyag@gmail.com> * feat: collect divider cost Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: remove unused field in ScannerMetrics Signed-off-by: evenyag <realevenyag@gmail.com> * feat: collect metadata read bytes Signed-off-by: evenyag <realevenyag@gmail.com> * chore: collect read metrics in get_parquet_meta_data Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
1 parent 2799089 commit 4b3bd73

File tree

17 files changed

+234
-42
lines changed

17 files changed

+234
-42
lines changed

src/mito2/src/cache.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,10 @@ impl CacheManager {
350350
// Try to get metadata from write cache
351351
let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
352352
if let Some(write_cache) = &self.write_cache
353-
&& let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
353+
&& let Some(metadata) = write_cache
354+
.file_cache()
355+
.get_parquet_meta_data(key, metrics)
356+
.await
354357
{
355358
metrics.file_cache_hit += 1;
356359
let metadata = Arc::new(metadata);

src/mito2/src/cache/file_cache.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::metrics::{
4343
use crate::region::opener::RegionLoadCacheTask;
4444
use crate::sst::parquet::helper::fetch_byte_ranges;
4545
use crate::sst::parquet::metadata::MetadataLoader;
46+
use crate::sst::parquet::reader::MetadataCacheMetrics;
4647

4748
/// Subdirectory of cached files for write.
4849
///
@@ -566,7 +567,11 @@ impl FileCache {
566567

567568
/// Get the parquet metadata in file cache.
568569
/// If the file is not in the cache or fail to load metadata, return None.
569-
pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
570+
pub(crate) async fn get_parquet_meta_data(
571+
&self,
572+
key: IndexKey,
573+
cache_metrics: &mut MetadataCacheMetrics,
574+
) -> Option<ParquetMetaData> {
570575
// Check if file cache contains the key
571576
if let Some(index_value) = self.inner.parquet_index.get(&key).await {
572577
// Load metadata from file cache
@@ -575,7 +580,7 @@ impl FileCache {
575580
let file_size = index_value.file_size as u64;
576581
let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
577582

578-
match metadata_loader.load().await {
583+
match metadata_loader.load(cache_metrics).await {
579584
Ok(metadata) => {
580585
CACHE_HIT
581586
.with_label_values(&[key.file_type.metric_label()])

src/mito2/src/read.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,10 +1058,6 @@ impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
10581058
/// Local metrics for scanners.
10591059
#[derive(Debug, Default)]
10601060
pub(crate) struct ScannerMetrics {
1061-
/// Duration to prepare the scan task.
1062-
prepare_scan_cost: Duration,
1063-
/// Duration to build the (merge) reader.
1064-
build_reader_cost: Duration,
10651061
/// Duration to scan data.
10661062
scan_cost: Duration,
10671063
/// Duration while waiting for `yield`.
@@ -1070,10 +1066,6 @@ pub(crate) struct ScannerMetrics {
10701066
num_batches: usize,
10711067
/// Number of rows returned.
10721068
num_rows: usize,
1073-
/// Number of mem ranges scanned.
1074-
num_mem_ranges: usize,
1075-
/// Number of file ranges scanned.
1076-
num_file_ranges: usize,
10771069
}
10781070

10791071
#[cfg(test)]

src/mito2/src/read/scan_region.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,6 +1498,7 @@ impl StreamContext {
14981498
.entries(self.input.files.iter().map(|file| FileWrapper { file }))
14991499
.finish()?;
15001500
}
1501+
write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
15011502

15021503
#[cfg(feature = "enterprise")]
15031504
self.format_extension_ranges(f)?;

src/mito2/src/read/scan_util.rs

Lines changed: 101 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ pub(crate) struct ScanMetricsSet {
115115
scan_cost: Duration,
116116
/// Duration while waiting for `yield`.
117117
yield_cost: Duration,
118+
/// Duration to convert [`Batch`]es.
119+
convert_cost: Option<Time>,
118120
/// Duration of the scan.
119121
total_cost: Duration,
120122
/// Number of rows returned.
@@ -165,6 +167,18 @@ pub(crate) struct ScanMetricsSet {
165167
rows_vector_filtered: usize,
166168
/// Number of rows filtered by precise filter.
167169
rows_precise_filtered: usize,
170+
/// Number of index result cache hits for fulltext index.
171+
fulltext_index_cache_hit: usize,
172+
/// Number of index result cache misses for fulltext index.
173+
fulltext_index_cache_miss: usize,
174+
/// Number of index result cache hits for inverted index.
175+
inverted_index_cache_hit: usize,
176+
/// Number of index result cache misses for inverted index.
177+
inverted_index_cache_miss: usize,
178+
/// Number of index result cache hits for bloom filter index.
179+
bloom_filter_cache_hit: usize,
180+
/// Number of index result cache misses for bloom filter index.
181+
bloom_filter_cache_miss: usize,
168182
/// Number of record batches read from SST.
169183
num_sst_record_batches: usize,
170184
/// Number of batches decoded from SST.
@@ -187,6 +201,8 @@ pub(crate) struct ScanMetricsSet {
187201
distributor_scan_cost: Duration,
188202
/// Duration of the series distributor to yield.
189203
distributor_yield_cost: Duration,
204+
/// Duration spent in divider operations.
205+
distributor_divider_cost: Duration,
190206

191207
/// Merge metrics.
192208
merge_metrics: MergeMetrics,
@@ -247,6 +263,7 @@ impl fmt::Debug for ScanMetricsSet {
247263
build_reader_cost,
248264
scan_cost,
249265
yield_cost,
266+
convert_cost,
250267
total_cost,
251268
num_rows,
252269
num_batches,
@@ -266,6 +283,12 @@ impl fmt::Debug for ScanMetricsSet {
266283
rows_bloom_filtered,
267284
rows_vector_filtered,
268285
rows_precise_filtered,
286+
fulltext_index_cache_hit,
287+
fulltext_index_cache_miss,
288+
inverted_index_cache_hit,
289+
inverted_index_cache_miss,
290+
bloom_filter_cache_hit,
291+
bloom_filter_cache_miss,
269292
num_sst_record_batches,
270293
num_sst_batches,
271294
num_sst_rows,
@@ -276,6 +299,7 @@ impl fmt::Debug for ScanMetricsSet {
276299
num_distributor_batches,
277300
distributor_scan_cost,
278301
distributor_yield_cost,
302+
distributor_divider_cost,
279303
merge_metrics,
280304
dedup_metrics,
281305
stream_eof,
@@ -313,6 +337,12 @@ impl fmt::Debug for ScanMetricsSet {
313337
\"first_poll\":\"{first_poll:?}\""
314338
)?;
315339

340+
// Write convert_cost if present
341+
if let Some(time) = convert_cost {
342+
let duration = Duration::from_nanos(time.value() as u64);
343+
write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
344+
}
345+
316346
// Write non-zero filter counters
317347
if *rg_fulltext_filtered > 0 {
318348
write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
@@ -344,6 +374,36 @@ impl fmt::Debug for ScanMetricsSet {
344374
if *rows_precise_filtered > 0 {
345375
write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
346376
}
377+
if *fulltext_index_cache_hit > 0 {
378+
write!(
379+
f,
380+
", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
381+
)?;
382+
}
383+
if *fulltext_index_cache_miss > 0 {
384+
write!(
385+
f,
386+
", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
387+
)?;
388+
}
389+
if *inverted_index_cache_hit > 0 {
390+
write!(
391+
f,
392+
", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
393+
)?;
394+
}
395+
if *inverted_index_cache_miss > 0 {
396+
write!(
397+
f,
398+
", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
399+
)?;
400+
}
401+
if *bloom_filter_cache_hit > 0 {
402+
write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
403+
}
404+
if *bloom_filter_cache_miss > 0 {
405+
write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
406+
}
347407

348408
// Write non-zero distributor metrics
349409
if *num_series_send_timeout > 0 {
@@ -370,6 +430,12 @@ impl fmt::Debug for ScanMetricsSet {
370430
", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
371431
)?;
372432
}
433+
if !distributor_divider_cost.is_zero() {
434+
write!(
435+
f,
436+
", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
437+
)?;
438+
}
373439

374440
// Write non-zero memtable metrics
375441
if *mem_rows > 0 {
@@ -478,27 +544,25 @@ impl ScanMetricsSet {
478544
self
479545
}
480546

547+
/// Attaches the `convert_cost` to the metrics set.
548+
fn with_convert_cost(mut self, time: Time) -> Self {
549+
self.convert_cost = Some(time);
550+
self
551+
}
552+
481553
/// Merges the local scanner metrics.
482554
fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
483555
let ScannerMetrics {
484-
prepare_scan_cost,
485-
build_reader_cost,
486556
scan_cost,
487557
yield_cost,
488558
num_batches,
489559
num_rows,
490-
num_mem_ranges,
491-
num_file_ranges,
492560
} = other;
493561

494-
self.prepare_scan_cost += *prepare_scan_cost;
495-
self.build_reader_cost += *build_reader_cost;
496562
self.scan_cost += *scan_cost;
497563
self.yield_cost += *yield_cost;
498564
self.num_rows += *num_rows;
499565
self.num_batches += *num_batches;
500-
self.num_mem_ranges += *num_mem_ranges;
501-
self.num_file_ranges += *num_file_ranges;
502566
}
503567

504568
/// Merges the local reader metrics.
@@ -519,6 +583,12 @@ impl ScanMetricsSet {
519583
rows_bloom_filtered,
520584
rows_vector_filtered,
521585
rows_precise_filtered,
586+
fulltext_index_cache_hit,
587+
fulltext_index_cache_miss,
588+
inverted_index_cache_hit,
589+
inverted_index_cache_miss,
590+
bloom_filter_cache_hit,
591+
bloom_filter_cache_miss,
522592
inverted_index_apply_metrics,
523593
bloom_filter_apply_metrics,
524594
fulltext_index_apply_metrics,
@@ -548,6 +618,13 @@ impl ScanMetricsSet {
548618
self.rows_vector_filtered += *rows_vector_filtered;
549619
self.rows_precise_filtered += *rows_precise_filtered;
550620

621+
self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
622+
self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
623+
self.inverted_index_cache_hit += *inverted_index_cache_hit;
624+
self.inverted_index_cache_miss += *inverted_index_cache_miss;
625+
self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
626+
self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
627+
551628
self.num_sst_record_batches += *num_record_batches;
552629
self.num_sst_batches += *num_batches;
553630
self.num_sst_rows += *num_rows;
@@ -598,6 +675,7 @@ impl ScanMetricsSet {
598675
num_batches,
599676
scan_cost,
600677
yield_cost,
678+
divider_cost,
601679
} = distributor_metrics;
602680

603681
self.num_series_send_timeout += *num_series_send_timeout;
@@ -606,6 +684,7 @@ impl ScanMetricsSet {
606684
self.num_distributor_batches += *num_batches;
607685
self.distributor_scan_cost += *scan_cost;
608686
self.distributor_yield_cost += *yield_cost;
687+
self.distributor_divider_cost += *divider_cost;
609688
}
610689

611690
/// Observes metrics.
@@ -622,6 +701,11 @@ impl ScanMetricsSet {
622701
READ_STAGE_ELAPSED
623702
.with_label_values(&["yield"])
624703
.observe(self.yield_cost.as_secs_f64());
704+
if let Some(time) = &self.convert_cost {
705+
READ_STAGE_ELAPSED
706+
.with_label_values(&["convert"])
707+
.observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
708+
}
625709
READ_STAGE_ELAPSED
626710
.with_label_values(&["total"])
627711
.observe(self.total_cost.as_secs_f64());
@@ -746,21 +830,19 @@ impl Drop for PartitionMetricsInner {
746830

747831
if self.explain_verbose {
748832
common_telemetry::info!(
749-
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
833+
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
750834
self.scanner_type,
751835
self.region_id,
752836
self.partition,
753837
metrics,
754-
self.convert_cost,
755838
);
756839
} else {
757840
common_telemetry::debug!(
758-
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
841+
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
759842
self.scanner_type,
760843
self.region_id,
761844
self.partition,
762845
metrics,
763-
self.convert_cost,
764846
);
765847
}
766848
}
@@ -807,7 +889,10 @@ impl PartitionMetrics {
807889
let partition_str = partition.to_string();
808890
let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
809891
in_progress_scan.inc();
810-
let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
892+
let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
893+
let metrics = ScanMetricsSet::default()
894+
.with_prepare_scan_cost(query_start.elapsed())
895+
.with_convert_cost(convert_cost.clone());
811896
let inner = PartitionMetricsInner {
812897
region_id,
813898
partition,
@@ -822,7 +907,7 @@ impl PartitionMetrics {
822907
.subset_time("build_reader_cost", partition),
823908
scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
824909
yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
825-
convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
910+
convert_cost,
826911
elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
827912
};
828913
Self(Arc::new(inner))
@@ -874,9 +959,6 @@ impl PartitionMetrics {
874959

875960
/// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
876961
pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
877-
self.0
878-
.build_reader_cost
879-
.add_duration(metrics.build_reader_cost);
880962
self.0.scan_cost.add_duration(metrics.scan_cost);
881963
self.record_elapsed_compute(metrics.scan_cost);
882964
self.0.yield_cost.add_duration(metrics.yield_cost);
@@ -956,6 +1038,8 @@ pub(crate) struct SeriesDistributorMetrics {
9561038
pub(crate) scan_cost: Duration,
9571039
/// Duration of the series distributor to yield.
9581040
pub(crate) yield_cost: Duration,
1041+
/// Duration spent in divider operations.
1042+
pub(crate) divider_cost: Duration,
9591043
}
9601044

9611045
/// Scans memtable ranges at `index`.

0 commit comments

Comments
 (0)