Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/quorum/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion apps/quorum/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "quorum"
version = "0.2.0"
version = "0.2.1"
edition = "2021"

[dependencies]
Expand All @@ -24,3 +24,4 @@ serde_json = "1.0.140"
futures = "0.3.31"
serde_wormhole = "0.1.0"
axum-prometheus = "0.8.0"
metrics = "0.24.2"
20 changes: 18 additions & 2 deletions apps/quorum/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use secp256k1::{
use serde::Deserialize;
use serde_wormhole::RawMessage;
use sha3::{Digest, Keccak256};
use std::{net::SocketAddr, time::Duration};
use std::{
net::SocketAddr,
time::{Duration, Instant},
};
use wormhole_sdk::{
vaa::{Body, Header, Signature},
GuardianAddress, GuardianSetInfo, Vaa,
Expand Down Expand Up @@ -139,6 +142,11 @@ async fn handle_observation(
state.guardian_set.clone(),
state.observation_lifetime,
)?;
metrics::counter!(
"verified_observations_total",
&[("gaurdian_index", verifier_index.to_string())]
)
.increment(1);
let new_signature = Signature {
signature: params.signature,
index: verifier_index.try_into()?,
Expand Down Expand Up @@ -169,6 +177,7 @@ async fn handle_observation(
body,
)
.into();
metrics::counter!("new_vaa_total").increment(1);
if let Err(e) = state
.ws
.broadcast_sender
Expand All @@ -193,9 +202,14 @@ async fn post_observation(
tokio::spawn({
let state = state.clone();
async move {
let start = Instant::now();
let mut status = "success";
if let Err(e) = handle_observation(state, params).await {
status = "error";
tracing::warn!(error = ?e, "Failed to handle observation");
}
metrics::histogram!("handle_observation_duration_seconds", &[("status", status)])
.record(start.elapsed().as_secs_f64());
}
});
Json(())
Expand Down Expand Up @@ -580,7 +594,9 @@ mod test {
let update = subscriber
.try_recv()
.expect("Failed to receive update from subscriber");
let UpdateEvent::NewVaa(vaa) = update;
let UpdateEvent::NewVaa(vaa) = update else {
panic!("Expected NewVaa event, got {:?}", update);
};
let vaa: Vaa<&RawMessage> =
serde_wormhole::from_slice(&vaa).expect("Failed to deserialize VAA");
// Check if the vaa signatures are sorted
Expand Down
23 changes: 23 additions & 0 deletions apps/quorum/src/metrics_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{future::Future, time::Duration};

use axum::{routing::get, Router};
use axum_prometheus::{
metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle},
Expand All @@ -18,6 +20,27 @@ pub fn setup_metrics_recorder() -> anyhow::Result<PrometheusHandle> {
.map_err(|err| anyhow::anyhow!("Failed to set up metrics recorder: {:?}", err))
}

const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(1);
pub async fn metric_collector<F, Fut>(service_name: String, update_metrics: F)
where
F: Fn() -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let mut metric_interval = tokio::time::interval(METRIC_COLLECTION_INTERVAL);
loop {
tokio::select! {
_ = metric_interval.tick() => {
update_metrics().await;
}
_ = wait_for_exit() => {
tracing::info!("Received exit signal, stopping metric collector for {}...", service_name);
break;
}
}
}
tracing::info!("Shutting down metric collector for {}...", service_name);
}

pub async fn run(run_options: RunOptions, state: State) -> anyhow::Result<()> {
tracing::info!("Starting Metrics Server...");

Expand Down
11 changes: 10 additions & 1 deletion apps/quorum/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use wormhole_sdk::{vaa::Signature, GuardianSetInfo};

use crate::{
api::{self},
metrics_server::{self, setup_metrics_recorder},
metrics_server::{self, metric_collector, setup_metrics_recorder},
pythnet::fetch_guardian_set,
ws::WsState,
};
Expand Down Expand Up @@ -181,6 +181,15 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
run_options.clone(),
state.clone()
)),
metric_collector("state".to_string(), || {
let state = state.clone();
async move {
let verification = state.verification.read().await;
metrics::gauge!("pending_vaas").set(verification.len() as f64);
metrics::gauge!("pending_verified_observations")
.set(verification.values().flatten().count() as f64);
}
}),
);

Ok(())
Expand Down
70 changes: 58 additions & 12 deletions apps/quorum/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn websocket_handler(state: axum::extract::State<State>, stream: WebSocket
#[derive(Clone, PartialEq, Debug)]
pub enum UpdateEvent {
NewVaa(Vec<u8>),
Ping,
}

pub type SubscriberId = usize;
Expand Down Expand Up @@ -117,8 +118,7 @@ impl Subscriber {
return Err(anyhow!("Subscriber did not respond to ping. Closing connection."));
}
self.responded_to_ping = false;
self.sender.send(Message::Ping(vec![].into())).await?;
Ok(())
self.handle_update(UpdateEvent::Ping).await
},
_ = wait_for_exit() => {
self.sender.close().await?;
Expand All @@ -134,13 +134,35 @@ impl Subscriber {
}

async fn handle_update(&mut self, event: UpdateEvent) -> Result<()> {
match event.clone() {
UpdateEvent::NewVaa(vaa) => self.handle_new_vaa(vaa).await,
}
let start = std::time::Instant::now();
let update_name;
let result = match event.clone() {
UpdateEvent::NewVaa(vaa) => {
update_name = "new_vaa";
self.handle_new_vaa(vaa).await
}
UpdateEvent::Ping => {
update_name = "ping";
self.sender.send(Message::Ping(vec![].into())).await?;
Ok(())
}
};
let status = match &result {
Ok(_) => "success",
Err(_) => "error",
};
let label = [("status", status), ("name", update_name)];
metrics::counter!("ws_server_update_total", &label).increment(1);
metrics::histogram!("ws_server_update_duration_seconds", &label,)
.record(start.elapsed().as_secs_f64());
result
}

async fn handle_client_message(&mut self, message: Message) -> Result<()> {
match message {
let start = std::time::Instant::now();
let message_type;

let result: anyhow::Result<()> = match message {
Message::Close(_) => {
// Closing the connection. We don't remove it from the subscribers
// list, instead when the Subscriber struct is dropped the channel
Expand All @@ -149,15 +171,39 @@ impl Subscriber {
// Send the close message to gracefully shut down the connection
// Otherwise the client might get an abnormal Websocket closure
// error.
message_type = "close";
self.sender.close().await?;
self.closed = true;
return Ok(());
Ok(())
}
Message::Text(_) => {
message_type = "text";
Ok(())
}
Message::Binary(_) => {
message_type = "binary";
Ok(())
}
Message::Ping(_) => {
message_type = "ping";
Ok(())
}
Message::Pong(_) => {
message_type = "pong";
self.responded_to_ping = true;
Ok(())
}
Message::Text(_) => {}
Message::Binary(_) => {}
Message::Ping(_) => {}
Message::Pong(_) => self.responded_to_ping = true,
};
Ok(())

let status = match &result {
Ok(_) => "success",
Err(_) => "error",
};
let label = [("status", status), ("message_type", message_type)];
metrics::counter!("ws_client_message_total", &label).increment(1);
metrics::histogram!("ws_client_message_duration_seconds", &label,)
.record(start.elapsed().as_secs_f64());

result
}
}
Loading