Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/quorum/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion apps/quorum/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "quorum"
version = "0.2.0"
version = "0.3.0"
edition = "2021"

[dependencies]
Expand All @@ -24,3 +24,4 @@ serde_json = "1.0.140"
futures = "0.3.31"
serde_wormhole = "0.1.0"
axum-prometheus = "0.8.0"
metrics = "0.24.2"
20 changes: 18 additions & 2 deletions apps/quorum/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use secp256k1::{
use serde::Deserialize;
use serde_wormhole::RawMessage;
use sha3::{Digest, Keccak256};
use std::{net::SocketAddr, time::Duration};
use std::{
net::SocketAddr,
time::{Duration, Instant},
};
use wormhole_sdk::{
vaa::{Body, Header, Signature},
GuardianAddress, GuardianSetInfo, Vaa,
Expand Down Expand Up @@ -139,6 +142,11 @@ async fn handle_observation(
state.guardian_set.clone(),
state.observation_lifetime,
)?;
metrics::counter!(
"verified_observations_total",
&[("gaurdian_index", verifier_index.to_string())]
)
.increment(1);
let new_signature = Signature {
signature: params.signature,
index: verifier_index.try_into()?,
Expand Down Expand Up @@ -169,6 +177,7 @@ async fn handle_observation(
body,
)
.into();
metrics::counter!("new_vaa_total").increment(1);
if let Err(e) = state
.ws
.broadcast_sender
Expand All @@ -193,9 +202,14 @@ async fn post_observation(
tokio::spawn({
let state = state.clone();
async move {
let start = Instant::now();
let mut status = "success";
if let Err(e) = handle_observation(state, params).await {
status = "error";
tracing::warn!(error = ?e, "Failed to handle observation");
}
metrics::histogram!("handle_observation_duration_seconds", &[("status", status)])
.record(start.elapsed().as_secs_f64());
}
});
Json(())
Expand Down Expand Up @@ -580,7 +594,9 @@ mod test {
let update = subscriber
.try_recv()
.expect("Failed to receive update from subscriber");
let UpdateEvent::NewVaa(vaa) = update;
let UpdateEvent::NewVaa(vaa) = update else {
panic!("Expected NewVaa event, got {:?}", update);
};
let vaa: Vaa<&RawMessage> =
serde_wormhole::from_slice(&vaa).expect("Failed to deserialize VAA");
// Check if the vaa signatures are sorted
Expand Down
23 changes: 23 additions & 0 deletions apps/quorum/src/metrics_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{future::Future, time::Duration};

use axum::{routing::get, Router};
use axum_prometheus::{
metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle},
Expand All @@ -18,6 +20,27 @@ pub fn setup_metrics_recorder() -> anyhow::Result<PrometheusHandle> {
.map_err(|err| anyhow::anyhow!("Failed to set up metrics recorder: {:?}", err))
}

const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(1);
pub async fn metric_collector<F, Fut>(service_name: String, update_metrics: F)
where
F: Fn() -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let mut metric_interval = tokio::time::interval(METRIC_COLLECTION_INTERVAL);
loop {
tokio::select! {
_ = metric_interval.tick() => {
update_metrics().await;
}
_ = wait_for_exit() => {
tracing::info!("Received exit signal, stopping metric collector for {}...", service_name);
break;
}
}
}
tracing::info!("Shutting down metric collector for {}...", service_name);
}

pub async fn run(run_options: RunOptions, state: State) -> anyhow::Result<()> {
tracing::info!("Starting Metrics Server...");

Expand Down
11 changes: 10 additions & 1 deletion apps/quorum/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use wormhole_sdk::{vaa::Signature, GuardianSetInfo};

use crate::{
api::{self},
metrics_server::{self, setup_metrics_recorder},
metrics_server::{self, metric_collector, setup_metrics_recorder},
pythnet::fetch_guardian_set,
ws::WsState,
};
Expand Down Expand Up @@ -181,6 +181,15 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
run_options.clone(),
state.clone()
)),
metric_collector("state".to_string(), || {
let state = state.clone();
async move {
let verification = state.verification.read().await;
metrics::gauge!("pending_vaas").set(verification.len() as f64);
metrics::gauge!("pending_verified_observations")
.set(verification.values().flatten().count() as f64);
}
}),
);

Ok(())
Expand Down
70 changes: 58 additions & 12 deletions apps/quorum/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn websocket_handler(state: axum::extract::State<State>, stream: WebSocket
#[derive(Clone, PartialEq, Debug)]
pub enum UpdateEvent {
NewVaa(Vec<u8>),
Ping,
}

pub type SubscriberId = usize;
Expand Down Expand Up @@ -117,8 +118,7 @@ impl Subscriber {
return Err(anyhow!("Subscriber did not respond to ping. Closing connection."));
}
self.responded_to_ping = false;
self.sender.send(Message::Ping(vec![].into())).await?;
Ok(())
self.handle_update(UpdateEvent::Ping).await
},
_ = wait_for_exit() => {
self.sender.close().await?;
Expand All @@ -134,13 +134,35 @@ impl Subscriber {
}

async fn handle_update(&mut self, event: UpdateEvent) -> Result<()> {
match event.clone() {
UpdateEvent::NewVaa(vaa) => self.handle_new_vaa(vaa).await,
}
let start = std::time::Instant::now();
let update_name;
let result = match event.clone() {
UpdateEvent::NewVaa(vaa) => {
update_name = "new_vaa";
self.handle_new_vaa(vaa).await
}
UpdateEvent::Ping => {
update_name = "ping";
self.sender.send(Message::Ping(vec![].into())).await?;
Ok(())
}
};
let status = match &result {
Ok(_) => "success",
Err(_) => "error",
};
let label = [("status", status), ("name", update_name)];
metrics::counter!("ws_server_update_total", &label).increment(1);
metrics::histogram!("ws_server_update_duration_seconds", &label,)
.record(start.elapsed().as_secs_f64());
result
}

async fn handle_client_message(&mut self, message: Message) -> Result<()> {
match message {
let start = std::time::Instant::now();
let message_type;

let result: anyhow::Result<()> = match message {
Message::Close(_) => {
// Closing the connection. We don't remove it from the subscribers
// list, instead when the Subscriber struct is dropped the channel
Expand All @@ -149,15 +171,39 @@ impl Subscriber {
// Send the close message to gracefully shut down the connection
// Otherwise the client might get an abnormal Websocket closure
// error.
message_type = "close";
self.sender.close().await?;
self.closed = true;
return Ok(());
Ok(())
}
Message::Text(_) => {
message_type = "text";
Ok(())
}
Message::Binary(_) => {
message_type = "binary";
Ok(())
}
Message::Ping(_) => {
message_type = "ping";
Ok(())
}
Message::Pong(_) => {
message_type = "pong";
self.responded_to_ping = true;
Ok(())
}
Message::Text(_) => {}
Message::Binary(_) => {}
Message::Ping(_) => {}
Message::Pong(_) => self.responded_to_ping = true,
};
Ok(())

let status = match &result {
Ok(_) => "success",
Err(_) => "error",
};
let label = [("status", status), ("message_type", message_type)];
metrics::counter!("ws_client_message_total", &label).increment(1);
metrics::histogram!("ws_client_message_duration_seconds", &label,)
.record(start.elapsed().as_secs_f64());

result
}
}
Loading