Skip to content

Commit e6522b6

Browse files
kariyclaude
andcommitted
feat(pipeline): add comprehensive sync metrics
Added comprehensive metrics collection for the sync pipeline enabling visualization of both individual stage progression and overall pipeline progress. Metrics include blocks processed, execution time, checkpoints, and iteration tracking. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 92196ed commit e6522b6

File tree

4 files changed

+213
-0
lines changed

4 files changed

+213
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sync/pipeline/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ repository.workspace = true
66
version.workspace = true
77

88
[dependencies]
9+
katana-metrics.workspace = true
910
katana-primitives.workspace = true
1011
katana-provider-api.workspace = true
1112
katana-stage.workspace = true

crates/sync/pipeline/src/lib.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ use tokio::sync::watch;
8585
use tokio::task::yield_now;
8686
use tracing::{debug, error, info, info_span, Instrument};
8787

88+
pub mod metrics;
89+
pub use metrics::PipelineMetrics;
90+
8891
/// The result of a pipeline execution.
8992
pub type PipelineResult<T> = Result<T, Error>;
9093

@@ -221,6 +224,7 @@ pub struct Pipeline<P> {
221224
command_tx: watch::Sender<Option<PipelineCommand>>,
222225
block_tx: watch::Sender<Option<BlockNumber>>,
223226
tip: Option<BlockNumber>,
227+
metrics: PipelineMetrics,
224228
}
225229

226230
impl<P> Pipeline<P> {
@@ -246,6 +250,7 @@ impl<P> Pipeline<P> {
246250
provider,
247251
chunk_size,
248252
tip: None,
253+
metrics: PipelineMetrics::new(),
249254
};
250255
(pipeline, handle)
251256
}
@@ -271,6 +276,11 @@ impl<P> Pipeline<P> {
271276
pub fn handle(&self) -> PipelineHandle {
272277
PipelineHandle { tx: self.command_tx.clone(), block_tx: self.block_tx.clone() }
273278
}
279+
280+
/// Returns a reference to the pipeline metrics.
281+
pub fn metrics(&self) -> &PipelineMetrics {
282+
&self.metrics
283+
}
274284
}
275285

276286
impl<P: StageCheckpointProvider> Pipeline<P> {
@@ -305,6 +315,7 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
305315
Some(PipelineCommand::SetTip(new_tip)) => {
306316
info!(target: "pipeline", tip = %new_tip, "A new tip has been set.");
307317
self.tip = Some(new_tip);
318+
self.metrics.set_tip(new_tip);
308319
}
309320
None => {}
310321
}
@@ -359,6 +370,7 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
359370

360371
for stage in self.stages.iter_mut() {
361372
let id = stage.id();
373+
let stage_metrics = self.metrics.stage(id);
362374

363375
// Get the checkpoint for the stage, otherwise default to block number 0
364376
let checkpoint = self.provider.checkpoint(id)?;
@@ -368,11 +380,13 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
368380

369381
let from = if let Some(checkpoint) = checkpoint {
370382
debug!(target: "pipeline", %checkpoint, "Found checkpoint.");
383+
stage_metrics.set_checkpoint(checkpoint);
371384

372385
// Skip the stage if the checkpoint is greater than or equal to the target block
373386
// number
374387
if checkpoint >= to {
375388
info!(target: "pipeline", %checkpoint, "Skipping stage - target already reached.");
389+
stage_metrics.record_skipped();
376390
last_block_processed_list.push(checkpoint);
377391
continue;
378392
}
@@ -381,13 +395,15 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
381395
// from the next block
382396
checkpoint + 1
383397
} else {
398+
stage_metrics.set_checkpoint(0);
384399
0
385400
};
386401

387402
let input = StageExecutionInput::new(from, to);
388403
info!(target: "pipeline", %from, %to, "Executing stage.");
389404

390405
let span = enter.exit();
406+
let _guard = stage_metrics.execution_started();
391407
let StageExecutionOutput { last_block_processed } = stage
392408
.execute(&input)
393409
.instrument(span.clone())
@@ -397,11 +413,24 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
397413
let _enter = span.enter();
398414
info!(target: "pipeline", %from, %to, "Stage execution completed.");
399415

416+
// Record blocks processed by this stage in this execution
417+
let blocks_processed = last_block_processed.saturating_sub(from.saturating_sub(1));
418+
stage_metrics.record_blocks_processed(blocks_processed);
419+
400420
self.provider.set_checkpoint(id, last_block_processed)?;
421+
stage_metrics.set_checkpoint(last_block_processed);
401422
last_block_processed_list.push(last_block_processed);
402423
info!(target: "pipeline", checkpoint = %last_block_processed, "New checkpoint set.");
403424
}
404425

426+
// Update overall pipeline checkpoint metrics
427+
if let Some(&min_checkpoint) = last_block_processed_list.iter().min() {
428+
self.metrics.set_lowest_checkpoint(min_checkpoint);
429+
}
430+
if let Some(&max_checkpoint) = last_block_processed_list.iter().max() {
431+
self.metrics.set_highest_checkpoint(max_checkpoint);
432+
}
433+
405434
Ok(last_block_processed_list.into_iter().min().unwrap_or(to))
406435
}
407436

@@ -413,13 +442,22 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
413442
// Process blocks if we have a tip
414443
if let Some(tip) = self.tip {
415444
let to = current_chunk_tip.min(tip);
445+
let iteration_start = std::time::Instant::now();
416446
let last_block_processed = self.run_once(to).await?;
447+
let iteration_duration = iteration_start.elapsed().as_secs_f64();
448+
449+
// Record pipeline metrics for this iteration
450+
self.metrics.record_iteration_time(iteration_duration);
451+
let blocks_in_iteration =
452+
to.saturating_sub(current_chunk_tip.saturating_sub(self.chunk_size));
453+
self.metrics.record_chunk(blocks_in_iteration);
417454

418455
// Notify subscribers about the newly processed block
419456
let _ = self.block_tx.send(Some(last_block_processed));
420457

421458
if last_block_processed >= tip {
422459
info!(target: "pipeline", %tip, "Finished syncing until tip.");
460+
self.metrics.record_run_complete();
423461
self.tip = None;
424462
current_chunk_tip = last_block_processed;
425463
} else {
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
//! Metrics for the sync pipeline.
2+
//!
3+
//! This module provides comprehensive metrics collection for the synchronization pipeline,
4+
//! enabling monitoring and visualization of both individual stages and overall pipeline progress.
5+
//!
6+
//! ## Pipeline Metrics
7+
//!
8+
//! Pipeline-level metrics track the overall synchronization process:
9+
//!
10+
//! - Total chunks processed across all stages
11+
//! - Total blocks processed across all pipeline runs
12+
//! - Total time spent syncing
13+
//! - Current tip block being synced to
14+
//! - Pipeline runs completed
15+
//!
16+
//! ## Stage Metrics
17+
//!
18+
//! Stage-level metrics are collected per stage and include:
19+
//!
20+
//! - Number of executions for each stage
21+
//! - Total blocks processed by each stage
22+
//! - Execution time for each stage execution
23+
//! - Checkpoint updates for each stage
24+
25+
use std::collections::HashMap;
26+
use std::sync::Arc;
27+
use std::time::Instant;
28+
29+
use katana_metrics::metrics::{self, Counter, Gauge, Histogram};
30+
use katana_metrics::Metrics;
31+
32+
/// Metrics for the sync pipeline.
33+
#[derive(Clone)]
34+
pub struct PipelineMetrics {
35+
inner: Arc<PipelineMetricsInner>,
36+
}
37+
38+
impl PipelineMetrics {
39+
/// Creates a new instance of `PipelineMetrics`.
40+
pub fn new() -> Self {
41+
Self {
42+
inner: Arc::new(PipelineMetricsInner {
43+
pipeline: PipelineOverallMetrics::default(),
44+
stages: Default::default(),
45+
}),
46+
}
47+
}
48+
49+
/// Get or create metrics for a specific stage.
50+
pub fn stage(&self, stage_id: &'static str) -> StageMetrics {
51+
let mut stages = self.inner.stages.lock().unwrap();
52+
stages
53+
.entry(stage_id)
54+
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id)]))
55+
.clone()
56+
}
57+
58+
/// Record a chunk being processed by the pipeline.
59+
pub fn record_chunk(&self, blocks_in_chunk: u64) {
60+
self.inner.pipeline.chunks_processed_total.increment(1);
61+
self.inner.pipeline.blocks_processed_total.increment(blocks_in_chunk);
62+
}
63+
64+
/// Update the current tip being synced to.
65+
pub fn set_tip(&self, tip: u64) {
66+
self.inner.pipeline.current_tip.set(tip as f64);
67+
}
68+
69+
/// Record a pipeline run completing.
70+
pub fn record_run_complete(&self) {
71+
self.inner.pipeline.runs_completed_total.increment(1);
72+
}
73+
74+
/// Record the time taken for a pipeline iteration.
75+
pub fn record_iteration_time(&self, duration_seconds: f64) {
76+
self.inner.pipeline.iteration_time_seconds.record(duration_seconds);
77+
}
78+
79+
/// Update the lowest checkpoint across all stages.
80+
pub fn set_lowest_checkpoint(&self, checkpoint: u64) {
81+
self.inner.pipeline.lowest_checkpoint.set(checkpoint as f64);
82+
}
83+
84+
/// Update the highest checkpoint across all stages.
85+
pub fn set_highest_checkpoint(&self, checkpoint: u64) {
86+
self.inner.pipeline.highest_checkpoint.set(checkpoint as f64);
87+
}
88+
}
89+
90+
impl Default for PipelineMetrics {
91+
fn default() -> Self {
92+
Self::new()
93+
}
94+
}
95+
96+
struct PipelineMetricsInner {
97+
/// Overall pipeline metrics
98+
pipeline: PipelineOverallMetrics,
99+
/// Per-stage metrics
100+
stages: std::sync::Mutex<HashMap<&'static str, StageMetrics>>,
101+
}
102+
103+
/// Metrics for the overall pipeline execution.
104+
#[derive(Metrics, Clone)]
105+
#[metrics(scope = "sync.pipeline")]
106+
struct PipelineOverallMetrics {
107+
/// Total number of chunks processed by the pipeline
108+
chunks_processed_total: Counter,
109+
/// Total number of blocks processed by the pipeline
110+
blocks_processed_total: Counter,
111+
/// Total number of pipeline runs completed
112+
runs_completed_total: Counter,
113+
/// Current tip block being synced to
114+
current_tip: Gauge,
115+
/// Lowest checkpoint across all stages
116+
lowest_checkpoint: Gauge,
117+
/// Highest checkpoint across all stages
118+
highest_checkpoint: Gauge,
119+
/// Time taken for each pipeline iteration
120+
iteration_time_seconds: Histogram,
121+
}
122+
123+
/// Metrics for individual stage execution.
124+
#[derive(Metrics, Clone)]
125+
#[metrics(scope = "sync.stage")]
126+
pub struct StageMetrics {
127+
/// Number of times the stage has been executed
128+
executions_total: Counter,
129+
/// Total number of blocks processed by this stage
130+
blocks_processed_total: Counter,
131+
/// Number of times the stage was skipped (checkpoint >= target)
132+
skipped_total: Counter,
133+
/// Time taken for each stage execution
134+
execution_time_seconds: Histogram,
135+
/// Current checkpoint for this stage
136+
checkpoint: Gauge,
137+
}
138+
139+
impl StageMetrics {
140+
/// Record a stage execution starting.
141+
pub fn execution_started(&self) -> StageExecutionGuard {
142+
self.executions_total.increment(1);
143+
StageExecutionGuard { metrics: self.clone(), started_at: Instant::now() }
144+
}
145+
146+
/// Record blocks processed by this stage.
147+
pub fn record_blocks_processed(&self, count: u64) {
148+
self.blocks_processed_total.increment(count);
149+
}
150+
151+
/// Record a stage being skipped.
152+
pub fn record_skipped(&self) {
153+
self.skipped_total.increment(1);
154+
}
155+
156+
/// Update the checkpoint for this stage.
157+
pub fn set_checkpoint(&self, checkpoint: u64) {
158+
self.checkpoint.set(checkpoint as f64);
159+
}
160+
}
161+
162+
/// Guard that records the execution time when dropped.
163+
pub struct StageExecutionGuard {
164+
metrics: StageMetrics,
165+
started_at: Instant,
166+
}
167+
168+
impl Drop for StageExecutionGuard {
169+
fn drop(&mut self) {
170+
let duration = self.started_at.elapsed().as_secs_f64();
171+
self.metrics.execution_time_seconds.record(duration);
172+
}
173+
}

0 commit comments

Comments
 (0)