Skip to content

Commit b11e05a

Browse files
authored
Indexer processor metrics improvements. (#469)
1 parent 312280b commit b11e05a

File tree

2 files changed

+14
-15
lines changed

2 files changed

+14
-15
lines changed

rust/processor/src/utils/counters.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
use once_cell::sync::Lazy;
55
use prometheus::{
6-
register_gauge_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec,
7-
GaugeVec, IntCounter, IntCounterVec, IntGaugeVec,
6+
register_gauge_vec, register_histogram_vec, register_int_counter, register_int_counter_vec,
7+
register_int_gauge_vec, GaugeVec, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec,
88
};
99

1010
pub enum ProcessorStep {
@@ -188,8 +188,8 @@ pub static FETCHER_THREAD_CHANNEL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
188188
});
189189

190190
/// Overall processing time for a single batch of transactions (per task)
191-
pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
192-
register_gauge_vec!(
191+
pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
192+
register_histogram_vec!(
193193
"indexer_processor_single_batch_processing_time_in_secs",
194194
"Time taken to process a single batch of transactions",
195195
&["processor_name", "task_index"]
@@ -198,8 +198,8 @@ pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
198198
});
199199

200200
/// Parsing time for a single batch of transactions
201-
pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
202-
register_gauge_vec!(
201+
pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
202+
register_histogram_vec!(
203203
"indexer_processor_single_batch_parsing_time_in_secs",
204204
"Time taken to parse a single batch of transactions",
205205
&["processor_name", "task_index"]
@@ -208,8 +208,8 @@ pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
208208
});
209209

210210
/// DB insertion time for a single batch of transactions
211-
pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
212-
register_gauge_vec!(
211+
pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
212+
register_histogram_vec!(
213213
"indexer_processor_single_batch_db_insertion_time_in_secs",
214214
"Time taken to insert to DB for a single batch of transactions",
215215
&["processor_name", "task_index"]
@@ -246,8 +246,8 @@ pub static PARQUET_PROCESSOR_DATA_GAP_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
246246
});
247247

248248
/// GRPC latency.
249-
pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
250-
register_gauge_vec!(
249+
pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy<HistogramVec> = Lazy::new(|| {
250+
register_histogram_vec!(
251251
"indexer_processor_grpc_latency_in_secs",
252252
"GRPC latency observed by processor",
253253
&["processor_name", "task_index"]

rust/processor/src/worker.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ use std::{
6060
use tokio::task::JoinHandle;
6161
use tracing::{debug, error, info};
6262
use url::Url;
63-
6463
// this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch
6564
// of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision
6665
// machines accordingly.
@@ -588,7 +587,7 @@ impl Worker {
588587
// TODO: For these three, do an atomic thing, or ideally move to an async metrics collector!
589588
GRPC_LATENCY_BY_PROCESSOR_IN_SECS
590589
.with_label_values(&[processor_name, &task_index_str])
591-
.set(time_diff_since_pb_timestamp_in_secs(
590+
.observe(time_diff_since_pb_timestamp_in_secs(
592591
end_txn_timestamp.as_ref().unwrap(),
593592
));
594593
LATEST_PROCESSED_VERSION
@@ -628,13 +627,13 @@ impl Worker {
628627

629628
SINGLE_BATCH_PROCESSING_TIME_IN_SECS
630629
.with_label_values(&[processor_name, &task_index_str])
631-
.set(processing_time);
630+
.observe(processing_time);
632631
SINGLE_BATCH_PARSING_TIME_IN_SECS
633632
.with_label_values(&[processor_name, &task_index_str])
634-
.set(processing_result.processing_duration_in_secs);
633+
.observe(processing_result.processing_duration_in_secs);
635634
SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS
636635
.with_label_values(&[processor_name, &task_index_str])
637-
.set(processing_result.db_insertion_duration_in_secs);
636+
.observe(processing_result.db_insertion_duration_in_secs);
638637

639638
gap_detector_sender
640639
.send(ProcessingResult::DefaultProcessingResult(

0 commit comments

Comments
 (0)