Skip to content
8 changes: 0 additions & 8 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,10 +1058,6 @@ impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
/// Local metrics for scanners.
#[derive(Debug, Default)]
pub(crate) struct ScannerMetrics {
/// Duration to prepare the scan task.
prepare_scan_cost: Duration,
/// Duration to build the (merge) reader.
build_reader_cost: Duration,
/// Duration to scan data.
scan_cost: Duration,
/// Duration while waiting for `yield`.
Expand All @@ -1070,10 +1066,6 @@ pub(crate) struct ScannerMetrics {
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
/// Number of mem ranges scanned.
num_mem_ranges: usize,
/// Number of file ranges scanned.
num_file_ranges: usize,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,7 @@ impl StreamContext {
.entries(self.input.files.iter().map(|file| FileWrapper { file }))
.finish()?;
}
write!(f, ", \"flat_format\": {}", self.input.flat_format)?;

#[cfg(feature = "enterprise")]
self.format_extension_ranges(f)?;
Expand Down
118 changes: 101 additions & 17 deletions src/mito2/src/read/scan_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub(crate) struct ScanMetricsSet {
scan_cost: Duration,
/// Duration while waiting for `yield`.
yield_cost: Duration,
/// Duration to convert [`Batch`]es.
convert_cost: Option<Time>,
/// Duration of the scan.
total_cost: Duration,
/// Number of rows returned.
Expand Down Expand Up @@ -161,6 +163,18 @@ pub(crate) struct ScanMetricsSet {
rows_bloom_filtered: usize,
/// Number of rows filtered by precise filter.
rows_precise_filtered: usize,
/// Number of index result cache hits for fulltext index.
fulltext_index_cache_hit: usize,
/// Number of index result cache misses for fulltext index.
fulltext_index_cache_miss: usize,
/// Number of index result cache hits for inverted index.
inverted_index_cache_hit: usize,
/// Number of index result cache misses for inverted index.
inverted_index_cache_miss: usize,
/// Number of index result cache hits for bloom filter index.
bloom_filter_cache_hit: usize,
/// Number of index result cache misses for bloom filter index.
bloom_filter_cache_miss: usize,
/// Number of record batches read from SST.
num_sst_record_batches: usize,
/// Number of batches decoded from SST.
Expand All @@ -183,6 +197,8 @@ pub(crate) struct ScanMetricsSet {
distributor_scan_cost: Duration,
/// Duration of the series distributor to yield.
distributor_yield_cost: Duration,
/// Duration spent in divider operations.
distributor_divider_cost: Duration,

/// Merge metrics.
merge_metrics: MergeMetrics,
Expand Down Expand Up @@ -243,6 +259,7 @@ impl fmt::Debug for ScanMetricsSet {
build_reader_cost,
scan_cost,
yield_cost,
convert_cost,
total_cost,
num_rows,
num_batches,
Expand All @@ -260,6 +277,12 @@ impl fmt::Debug for ScanMetricsSet {
rows_inverted_filtered,
rows_bloom_filtered,
rows_precise_filtered,
fulltext_index_cache_hit,
fulltext_index_cache_miss,
inverted_index_cache_hit,
inverted_index_cache_miss,
bloom_filter_cache_hit,
bloom_filter_cache_miss,
num_sst_record_batches,
num_sst_batches,
num_sst_rows,
Expand All @@ -270,6 +293,7 @@ impl fmt::Debug for ScanMetricsSet {
num_distributor_batches,
distributor_scan_cost,
distributor_yield_cost,
distributor_divider_cost,
merge_metrics,
dedup_metrics,
stream_eof,
Expand Down Expand Up @@ -307,6 +331,12 @@ impl fmt::Debug for ScanMetricsSet {
\"first_poll\":\"{first_poll:?}\""
)?;

// Write convert_cost if present
if let Some(time) = convert_cost {
let duration = Duration::from_nanos(time.value() as u64);
write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
}

// Write non-zero filter counters
if *rg_fulltext_filtered > 0 {
write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
Expand All @@ -332,6 +362,36 @@ impl fmt::Debug for ScanMetricsSet {
if *rows_precise_filtered > 0 {
write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
}
if *fulltext_index_cache_hit > 0 {
write!(
f,
", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
)?;
}
if *fulltext_index_cache_miss > 0 {
write!(
f,
", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
)?;
}
if *inverted_index_cache_hit > 0 {
write!(
f,
", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
)?;
}
if *inverted_index_cache_miss > 0 {
write!(
f,
", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
)?;
}
if *bloom_filter_cache_hit > 0 {
write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
}
if *bloom_filter_cache_miss > 0 {
write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
}

// Write non-zero distributor metrics
if *num_series_send_timeout > 0 {
Expand All @@ -358,6 +418,12 @@ impl fmt::Debug for ScanMetricsSet {
", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
)?;
}
if !distributor_divider_cost.is_zero() {
write!(
f,
", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
)?;
}

// Write non-zero memtable metrics
if *mem_rows > 0 {
Expand Down Expand Up @@ -466,27 +532,25 @@ impl ScanMetricsSet {
self
}

/// Attaches the `convert_cost` to the metrics set.
fn with_convert_cost(mut self, time: Time) -> Self {
self.convert_cost = Some(time);
self
}

/// Merges the local scanner metrics.
fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
let ScannerMetrics {
prepare_scan_cost,
build_reader_cost,
scan_cost,
yield_cost,
num_batches,
num_rows,
num_mem_ranges,
num_file_ranges,
} = other;

self.prepare_scan_cost += *prepare_scan_cost;
self.build_reader_cost += *build_reader_cost;
self.scan_cost += *scan_cost;
self.yield_cost += *yield_cost;
self.num_rows += *num_rows;
self.num_batches += *num_batches;
self.num_mem_ranges += *num_mem_ranges;
self.num_file_ranges += *num_file_ranges;
}

/// Merges the local reader metrics.
Expand All @@ -505,6 +569,12 @@ impl ScanMetricsSet {
rows_inverted_filtered,
rows_bloom_filtered,
rows_precise_filtered,
fulltext_index_cache_hit,
fulltext_index_cache_miss,
inverted_index_cache_hit,
inverted_index_cache_miss,
bloom_filter_cache_hit,
bloom_filter_cache_miss,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
fulltext_index_apply_metrics,
Expand Down Expand Up @@ -532,6 +602,13 @@ impl ScanMetricsSet {
self.rows_bloom_filtered += *rows_bloom_filtered;
self.rows_precise_filtered += *rows_precise_filtered;

self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
self.inverted_index_cache_hit += *inverted_index_cache_hit;
self.inverted_index_cache_miss += *inverted_index_cache_miss;
self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
self.bloom_filter_cache_miss += *bloom_filter_cache_miss;

self.num_sst_record_batches += *num_record_batches;
self.num_sst_batches += *num_batches;
self.num_sst_rows += *num_rows;
Expand Down Expand Up @@ -582,6 +659,7 @@ impl ScanMetricsSet {
num_batches,
scan_cost,
yield_cost,
divider_cost,
} = distributor_metrics;

self.num_series_send_timeout += *num_series_send_timeout;
Expand All @@ -590,6 +668,7 @@ impl ScanMetricsSet {
self.num_distributor_batches += *num_batches;
self.distributor_scan_cost += *scan_cost;
self.distributor_yield_cost += *yield_cost;
self.distributor_divider_cost += *divider_cost;
}

/// Observes metrics.
Expand All @@ -606,6 +685,11 @@ impl ScanMetricsSet {
READ_STAGE_ELAPSED
.with_label_values(&["yield"])
.observe(self.yield_cost.as_secs_f64());
if let Some(time) = &self.convert_cost {
READ_STAGE_ELAPSED
.with_label_values(&["convert"])
.observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
}
READ_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(self.total_cost.as_secs_f64());
Expand Down Expand Up @@ -722,21 +806,19 @@ impl Drop for PartitionMetricsInner {

if self.explain_verbose {
common_telemetry::info!(
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
self.scanner_type,
self.region_id,
self.partition,
metrics,
self.convert_cost,
);
} else {
common_telemetry::debug!(
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
self.scanner_type,
self.region_id,
self.partition,
metrics,
self.convert_cost,
);
}
}
Expand Down Expand Up @@ -783,7 +865,10 @@ impl PartitionMetrics {
let partition_str = partition.to_string();
let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
in_progress_scan.inc();
let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
let metrics = ScanMetricsSet::default()
.with_prepare_scan_cost(query_start.elapsed())
.with_convert_cost(convert_cost.clone());
let inner = PartitionMetricsInner {
region_id,
partition,
Expand All @@ -798,7 +883,7 @@ impl PartitionMetrics {
.subset_time("build_reader_cost", partition),
scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
convert_cost,
elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
};
Self(Arc::new(inner))
Expand Down Expand Up @@ -850,9 +935,6 @@ impl PartitionMetrics {

/// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
self.0
.build_reader_cost
.add_duration(metrics.build_reader_cost);
self.0.scan_cost.add_duration(metrics.scan_cost);
self.record_elapsed_compute(metrics.scan_cost);
self.0.yield_cost.add_duration(metrics.yield_cost);
Expand Down Expand Up @@ -932,6 +1014,8 @@ pub(crate) struct SeriesDistributorMetrics {
pub(crate) scan_cost: Duration,
/// Duration of the series distributor to yield.
pub(crate) yield_cost: Duration,
/// Duration spent in divider operations.
pub(crate) divider_cost: Duration,
}

/// Scans memtable ranges at `index`.
Expand Down
Loading