Skip to content

Commit 11f56d7

Browse files
authored
feat: add metrics partial block metrics (#850)
2 parents 91b4f4e + e53631c commit 11f56d7

File tree

2 files changed

+46
-11
lines changed

2 files changed

+46
-11
lines changed

tycho-indexer/src/extractor/protocol_extractor.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,7 +1073,7 @@ where
10731073
})
10741074
}
10751075

1076-
#[instrument(skip_all, fields(target_hash, target_number))]
1076+
#[instrument(skip_all, fields(current_block, target_hash, target_number))]
10771077
#[allow(clippy::mutable_key_type)] // Clippy thinks that tuple with Bytes are a mutable type.
10781078
async fn handle_revert(
10791079
&self,
@@ -1090,18 +1090,31 @@ where
10901090
))
10911091
})?;
10921092

1093-
tracing::Span::current().record("target_hash", format!("{block_hash:x}"));
1094-
tracing::Span::current().record("target_number", block_ref.number);
1095-
10961093
let last_processed_block_number = self
10971094
.get_last_processed_block()
10981095
.await
1099-
.map_or(String::new(), |block| block.number.to_string());
1096+
.map_or("None".to_string(), |b| b.number.to_string());
1097+
let current_partial_block_index = self
1098+
.partial_block_buffer
1099+
.lock()
1100+
.await
1101+
.as_ref()
1102+
.and_then(|b| b.partial_block_index)
1103+
.map_or("None".to_string(), |i| i.to_string());
1104+
1105+
tracing::Span::current().record("current_block", &last_processed_block_number);
1106+
tracing::Span::current()
1107+
.record("current_partial_block_index", &current_partial_block_index);
1108+
tracing::Span::current().record("target_hash", format!("{block_hash:x}"));
1109+
tracing::Span::current().record("target_number", block_ref.number);
11001110

1111+
// Perf: consider optimizing this to avoid having a unique counter for every revert, which
1112+
// can blow up metrics memory usage in case of frequent reverts.
11011113
counter!(
11021114
"extractor_revert",
11031115
"extractor" => self.name.clone(),
11041116
"current_block" => last_processed_block_number,
1117+
"current_partial_block_index" => current_partial_block_index,
11051118
"target_block" => block_ref.number.to_string()
11061119
)
11071120
.increment(1);

tycho-indexer/src/extractor/runner.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ impl ExtractorRunner {
208208

209209
runtime.spawn(async move {
210210
let id = self.extractor.get_id();
211+
// Track the number of partials received for the current block != partial_index.
212+
let mut partials_in_block: u32 = 0;
211213
loop {
212214
// this is the main info span of an extractor
213215
let loop_span = info_span!(
@@ -248,6 +250,20 @@ impl ExtractorRunner {
248250
"extractor" => id.name.to_string()
249251
).set(block_number as f64);
250252

253+
if data.is_partial {
254+
partials_in_block += 1;
255+
}
256+
257+
if data.is_last_partial == Some(true) || data.partial_index.is_none() {
258+
gauge!(
259+
"extractor_partials_per_block",
260+
"chain" => id.chain.to_string(),
261+
"extractor" => id.name.to_string()
262+
)
263+
.set(partials_in_block as f64);
264+
partials_in_block = 0;
265+
}
266+
251267
// Start measuring block processing time
252268
let start_time = std::time::Instant::now();
253269

@@ -267,16 +283,22 @@ impl ExtractorRunner {
267283
Self::propagate_msg(&self.subscriptions, msg).await
268284
}
269285

270-
let duration = start_time.elapsed();
271-
let partial_block_index = data.partial_index.unwrap_or(0);
286+
let duration_ms = start_time.elapsed().as_millis() as f64;
287+
let block_type = match (data.is_partial, data.is_last_partial) {
288+
(false, _) => "full",
289+
(true, Some(true)) => "final_partial",
290+
(true, _) => "partial",
291+
};
292+
272293
gauge!(
273294
"block_processing_time_ms",
274295
"chain" => id.chain.to_string(),
275296
"extractor" => id.name.to_string(),
276-
"partial_block_index" => partial_block_index.to_string()
277-
).set(duration.as_millis() as f64);
297+
"block_type" => block_type
298+
).set(duration_ms);
278299
}
279300
Some(Ok(BlockResponse::Undo(undo_signal))) => {
301+
partials_in_block = 0;
280302
info!(block=?&undo_signal.last_valid_block, "Revert requested!");
281303
match self.extractor.handle_revert(undo_signal.clone()).await {
282304
Ok(Some(msg)) => {
@@ -309,8 +331,8 @@ impl ExtractorRunner {
309331
tracing::Span::current().record("otel.status_code", "ok");
310332
Ok(true) // Continue the loop
311333
}
312-
.instrument(loop_span)
313-
.await?;
334+
.instrument(loop_span)
335+
.await?;
314336

315337
if !should_continue {
316338
break Ok(());

0 commit comments

Comments
 (0)