diff --git a/apps/quorum/Cargo.lock b/apps/quorum/Cargo.lock index f21bb3f7fb..95d9b4d525 100644 --- a/apps/quorum/Cargo.lock +++ b/apps/quorum/Cargo.lock @@ -2760,7 +2760,7 @@ dependencies = [ [[package]] name = "quorum" -version = "0.2.0" +version = "0.2.1" dependencies = [ "anyhow", "axum", @@ -2770,6 +2770,7 @@ dependencies = [ "futures", "hex", "lazy_static", + "metrics", "secp256k1", "serde", "serde_json", diff --git a/apps/quorum/Cargo.toml b/apps/quorum/Cargo.toml index 8287d728b7..1308e362ed 100644 --- a/apps/quorum/Cargo.toml +++ b/apps/quorum/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quorum" -version = "0.2.0" +version = "0.2.1" edition = "2021" [dependencies] @@ -24,3 +24,4 @@ serde_json = "1.0.140" futures = "0.3.31" serde_wormhole = "0.1.0" axum-prometheus = "0.8.0" +metrics = "0.24.2" diff --git a/apps/quorum/src/api.rs b/apps/quorum/src/api.rs index f068252bfa..576a19afbd 100644 --- a/apps/quorum/src/api.rs +++ b/apps/quorum/src/api.rs @@ -11,7 +11,10 @@ use secp256k1::{ use serde::Deserialize; use serde_wormhole::RawMessage; use sha3::{Digest, Keccak256}; -use std::{net::SocketAddr, time::Duration}; +use std::{ + net::SocketAddr, + time::{Duration, Instant}, +}; use wormhole_sdk::{ vaa::{Body, Header, Signature}, GuardianAddress, GuardianSetInfo, Vaa, @@ -139,6 +142,11 @@ async fn handle_observation( state.guardian_set.clone(), state.observation_lifetime, )?; + metrics::counter!( + "verified_observations_total", + &[("gaurdian_index", verifier_index.to_string())] + ) + .increment(1); let new_signature = Signature { signature: params.signature, index: verifier_index.try_into()?, @@ -169,6 +177,7 @@ async fn handle_observation( body, ) .into(); + metrics::counter!("new_vaa_total").increment(1); if let Err(e) = state .ws .broadcast_sender @@ -193,9 +202,14 @@ async fn post_observation( tokio::spawn({ let state = state.clone(); async move { + let start = Instant::now(); + let mut status = "success"; if let Err(e) = handle_observation(state, params).await { + status = "error"; tracing::warn!(error = ?e, "Failed to handle observation"); } + metrics::histogram!("handle_observation_duration_seconds", &[("status", status)]) + .record(start.elapsed().as_secs_f64()); } }); Json(()) @@ -580,7 +594,9 @@ mod test { let update = subscriber .try_recv() .expect("Failed to receive update from subscriber"); - let UpdateEvent::NewVaa(vaa) = update; + let UpdateEvent::NewVaa(vaa) = update else { + panic!("Expected NewVaa event, got {:?}", update); + }; let vaa: Vaa<&RawMessage> = serde_wormhole::from_slice(&vaa).expect("Failed to deserialize VAA"); // Check if the vaa signatures are sorted diff --git a/apps/quorum/src/metrics_server.rs b/apps/quorum/src/metrics_server.rs index 2063996885..c10f7e4400 100644 --- a/apps/quorum/src/metrics_server.rs +++ b/apps/quorum/src/metrics_server.rs @@ -1,3 +1,5 @@ +use std::{future::Future, time::Duration}; + use axum::{routing::get, Router}; use axum_prometheus::{ metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}, @@ -18,6 +20,27 @@ pub fn setup_metrics_recorder() -> anyhow::Result { .map_err(|err| anyhow::anyhow!("Failed to set up metrics recorder: {:?}", err)) } +const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(1); +pub async fn metric_collector(service_name: String, update_metrics: F) +where + F: Fn() -> Fut, + Fut: Future + Send + 'static, +{ + let mut metric_interval = tokio::time::interval(METRIC_COLLECTION_INTERVAL); + loop { + tokio::select! { + _ = metric_interval.tick() => { + update_metrics().await; + } + _ = wait_for_exit() => { + tracing::info!("Received exit signal, stopping metric collector for {}...", service_name); + break; + } + } + } + tracing::info!("Shutting down metric collector for {}...", service_name); +} + pub async fn run(run_options: RunOptions, state: State) -> anyhow::Result<()> { tracing::info!("Starting Metrics Server..."); diff --git a/apps/quorum/src/server.rs b/apps/quorum/src/server.rs index 73fc82a159..1c2ba3708c 100644 --- a/apps/quorum/src/server.rs +++ b/apps/quorum/src/server.rs @@ -14,7 +14,7 @@ use wormhole_sdk::{vaa::Signature, GuardianSetInfo}; use crate::{ api::{self}, - metrics_server::{self, setup_metrics_recorder}, + metrics_server::{self, metric_collector, setup_metrics_recorder}, pythnet::fetch_guardian_set, ws::WsState, }; @@ -181,6 +181,15 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { run_options.clone(), state.clone() )), + metric_collector("state".to_string(), || { + let state = state.clone(); + async move { + let verification = state.verification.read().await; + metrics::gauge!("pending_vaas").set(verification.len() as f64); + metrics::gauge!("pending_verified_observations") + .set(verification.values().flatten().count() as f64); + } + }), ); Ok(()) diff --git a/apps/quorum/src/ws.rs b/apps/quorum/src/ws.rs index 90f492da6e..63130db019 100644 --- a/apps/quorum/src/ws.rs +++ b/apps/quorum/src/ws.rs @@ -54,6 +54,7 @@ async fn websocket_handler(state: axum::extract::State, stream: WebSocket #[derive(Clone, PartialEq, Debug)] pub enum UpdateEvent { NewVaa(Vec), + Ping, } pub type SubscriberId = usize; @@ -117,8 +118,7 @@ impl Subscriber { return Err(anyhow!("Subscriber did not respond to ping. Closing connection.")); } self.responded_to_ping = false; - self.sender.send(Message::Ping(vec![].into())).await?; - Ok(()) + self.handle_update(UpdateEvent::Ping).await }, _ = wait_for_exit() => { self.sender.close().await?; @@ -134,13 +134,35 @@ impl Subscriber { } async fn handle_update(&mut self, event: UpdateEvent) -> Result<()> { - match event.clone() { - UpdateEvent::NewVaa(vaa) => self.handle_new_vaa(vaa).await, - } + let start = std::time::Instant::now(); + let update_name; + let result = match event.clone() { + UpdateEvent::NewVaa(vaa) => { + update_name = "new_vaa"; + self.handle_new_vaa(vaa).await + } + UpdateEvent::Ping => { + update_name = "ping"; + self.sender.send(Message::Ping(vec![].into())).await?; + Ok(()) + } + }; + let status = match &result { + Ok(_) => "success", + Err(_) => "error", + }; + let label = [("status", status), ("name", update_name)]; + metrics::counter!("ws_server_update_total", &label).increment(1); + metrics::histogram!("ws_server_update_duration_seconds", &label,) + .record(start.elapsed().as_secs_f64()); + result } async fn handle_client_message(&mut self, message: Message) -> Result<()> { - match message { + let start = std::time::Instant::now(); + let message_type; + + let result: anyhow::Result<()> = match message { Message::Close(_) => { // Closing the connection. We don't remove it from the subscribers // list, instead when the Subscriber struct is dropped the channel @@ -149,15 +171,39 @@ impl Subscriber { // Send the close message to gracefully shut down the connection // Otherwise the client might get an abnormal Websocket closure // error. + message_type = "close"; self.sender.close().await?; self.closed = true; - return Ok(()); + Ok(()) + } + Message::Text(_) => { + message_type = "text"; + Ok(()) + } + Message::Binary(_) => { + message_type = "binary"; + Ok(()) + } + Message::Ping(_) => { + message_type = "ping"; + Ok(()) + } + Message::Pong(_) => { + message_type = "pong"; + self.responded_to_ping = true; + Ok(()) } - Message::Text(_) => {} - Message::Binary(_) => {} - Message::Ping(_) => {} - Message::Pong(_) => self.responded_to_ping = true, }; - Ok(()) + + let status = match &result { + Ok(_) => "success", + Err(_) => "error", + }; + let label = [("status", status), ("message_type", message_type)]; + metrics::counter!("ws_client_message_total", &label).increment(1); + metrics::histogram!("ws_client_message_duration_seconds", &label,) + .record(start.elapsed().as_secs_f64()); + + result } }