diff --git a/Cargo.lock b/Cargo.lock index 54dfa25..f2da978 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -844,6 +844,80 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" +dependencies = [ + "axum-core", + "bytes", + "form_urlencoded", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" +dependencies = [ + "bytes", + "futures-core", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-prometheus" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb15221c30bbb32e99873d348d89d7bc2138d6199520aa473a1bdb0d8e5721e8" +dependencies = [ + "axum", + "bytes", + "futures-core", + "http 1.3.1", + "http-body 1.0.1", + "matchit", + "metrics", + "metrics-exporter-prometheus", + "pin-project-lite", + "tokio", + "tower", + "tower-http", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -2049,6 +2123,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -2348,6 +2428,9 @@ name = "hashbrown" version = "0.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" +dependencies = [ + "foldhash", +] [[package]] name = "heck" @@ -2524,7 +2607,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.10", "tokio", "tower-service", "tracing", @@ -2544,6 +2627,7 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -2619,7 +2703,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -3121,6 +3205,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "memchr" version = "2.7.5" @@ -3163,6 +3253,46 @@ dependencies = [ "zeroize", ] +[[package]] +name = "metrics" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5" +dependencies = [ + "ahash 0.8.12", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b166dea96003ee2531cf14833efedced545751d800f03535801d833313f8c15" +dependencies = [ + "base64 0.22.1", + "indexmap", + "metrics", + "metrics-util", + "quanta", + "thiserror 2.0.12", +] + +[[package]] +name = "metrics-util" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe8db7a05415d0f919ffb905afa37784f71901c9a773188876984b4f769ab986" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.15.4", + "metrics", + "quanta", + "rand 0.9.1", + "rand_xoshiro", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -3801,19 +3931,23 @@ dependencies = [ [[package]] name = "pythnet-watcher" -version = "1.0.0" +version = "1.1.0" dependencies = [ "anyhow", "async-trait", "aws-arn", "aws-config", "aws-sdk-kms", + "axum", + "axum-prometheus", "base64 0.22.1", "borsh 0.9.3", "clap", "der", "futures", "hex", + "lazy_static", + "metrics", "mockall", "prost", "reqwest", @@ -3870,7 +4004,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.1.1", "rustls 0.23.28", - "socket2", + "socket2 0.5.10", "thiserror 2.0.12", "tokio", "tracing", @@ -3909,7 +4043,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.5.10", "tracing", "windows-sys 0.59.0", ] @@ -4029,6 +4163,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.3", +] + [[package]] name = "raw-cpuid" version = "11.5.0" @@ -4616,6 +4759,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59fab13f937fa393d08645bf3a84bdfe86e296747b506ada67bb15f10f218b2a" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -4771,6 +4924,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.10" @@ -4793,6 +4952,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "solana-account" version = "2.2.1" @@ -5591,7 +5760,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_derive", - "socket2", + "socket2 0.5.10", "solana-serde", "tokio", "url", @@ -6472,7 +6641,7 @@ dependencies = [ "rand 0.8.5", "rustls 0.23.28", "smallvec", - "socket2", + "socket2 0.5.10", "solana-keypair", "solana-measure", "solana-metrics", @@ -7470,9 +7639,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.46.1" +version = "1.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", @@ -7483,9 +7652,9 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "slab", - "socket2", + "socket2 0.6.0", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -7607,6 +7776,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b7ba8a9..383e144 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pythnet-watcher" -version = "1.0.0" +version = "1.1.0" edition = "2021" [dependencies] @@ -29,6 +29,10 @@ tokio-stream = "0.1.17" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] } wormhole-vaas-serde = "0.1.0" +axum = "0.8.4" +metrics = "0.24.2" +lazy_static = "1.5.0" +axum-prometheus = "0.9.0" [dev-dependencies] serde_json = "1.0.140" diff --git a/README.md b/README.md index a2a47b0..07b73ec 100644 --- a/README.md +++ b/README.md @@ -84,3 +84,10 @@ This will save the key in raw byte format to the file named `.secret`. ### ๐Ÿงช Testing Locally To test in a non-production environment (e.g. with devnet or a local Pythnet fork), just provide a different `--pythnet-url`, and `--server-url`, and optionally use custom `--wormhole-pid`. + +--- + +### ๐Ÿ“Š Metrics + +By default, running the watcher exposes metrics at `http://127.0.0.1:9001`. +You can change the metrics endpoint by passing the `--metrics-addr` flag or setting the `METRICS_ADDR` environment variable. diff --git a/src/api_client.rs b/src/api_client.rs index 1b5d698..73646fa 100644 --- a/src/api_client.rs +++ b/src/api_client.rs @@ -3,6 +3,7 @@ use { reqwest::{Client, Url}, serde::Serialize, std::{sync::Arc, time::Duration}, + tokio::time::Instant, wormhole_sdk::vaa::Body, }; @@ -83,11 +84,13 @@ impl ApiClient { &self, observation: Observation

, ) -> Result<(), anyhow::Error> { + let started = Instant::now(); let url = self .inner .base_url .join("observation") .map_err(|e| anyhow::anyhow!("Failed to construct URL: {}", e))?; + let response = self .inner .client @@ -95,7 +98,28 @@ impl ApiClient { .json(&observation) .send() .await - .map_err(|e| anyhow::anyhow!("Failed to post observation: {}", e))?; + .map_err(|e| anyhow::anyhow!("Failed to post observation: {}", e)); + + let base_url = self.inner.base_url.clone(); + let duration = started.elapsed(); + metrics::histogram!( + "post_observation_duration", + &[("url", base_url.to_string())] + ) + .record(duration.as_secs_f64()); + let status = match &response { + Ok(resp) => resp.status().as_u16(), + Err(_) => 0, + }; + metrics::counter!( + "post_observation", + &[ + ("status", status.to_string()), + ("url", base_url.to_string()), + ] + ) + .increment(1); + let response = response?; if response.status().is_success() { Ok(()) diff --git a/src/config.rs b/src/config.rs index 05d00e5..910eba9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,6 +26,12 @@ pub struct RunOptions { /// https://github.com/wormhole-foundation/wormhole/blob/main/docs/guardian_signer.md #[arg(long = "signer-uri", env = "SIGNER_URI")] pub signer_uri: String, + #[arg( + long = "metrics-addr", + env = "METRICS_ADDR", + default_value = "127.0.0.1:9001" + )] + pub metrics_addr: String, } #[derive(Parser, Clone, Debug)] diff --git a/src/main.rs b/src/main.rs index a390131..b920415 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,15 @@ use { crate::{ config::Command, + metrics_server::setup_metrics_recorder, signer::{GuardianKey, Signer, GUARDIAN_KEY_ARMORED_BLOCK, STANDARD_ARMOR_LINE_HEADER}, }, - anyhow::Context, + anyhow::{bail, Context}, api_client::{ApiClient, Observation}, borsh::BorshDeserialize, clap::Parser, futures::future::join_all, + lazy_static::lazy_static, posted_message::PostedMessageUnreliableData, prost::Message, reqwest::Url, @@ -17,7 +19,6 @@ use { solana_account_decoder::UiAccountEncoding, solana_client::{ nonblocking::pubsub_client::PubsubClient, - pubsub_client::PubsubClientError, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_filter::{Memcmp, RpcFilterType}, rpc_response::{Response, RpcKeyedAccount}, @@ -25,18 +26,23 @@ use { solana_sdk::pubkey::Pubkey, std::{ fs, + future::Future, io::{IsTerminal, Write}, str::FromStr, sync::Arc, time::Duration, }, - tokio::time::sleep, + tokio::{ + sync::watch, + time::{sleep, Instant}, + }, tokio_stream::StreamExt, wormhole_sdk::{vaa::Body, Address, Chain}, }; mod api_client; mod config; +mod metrics_server; mod posted_message; mod signer; @@ -128,7 +134,7 @@ fn message_data_to_body(unreliable_data: &PostedMessageUnreliableData) -> Body<& } } -async fn run_listener(input: RunListenerInput) -> Result<(), PubsubClientError> { +async fn run_listener(input: RunListenerInput) -> anyhow::Result<()> { let client = PubsubClient::new(input.ws_url.as_str()).await?; let (mut stream, unsubscribe) = client .program_subscribe( @@ -150,43 +156,69 @@ async fn run_listener(input: RunListenerInput) -> Result<(), PubsubClientError> ) .await?; - while let Some(update) = stream.next().await { - let unreliable_data = - match decode_and_verify_update(&input.wormhole_pid, &input.accumulator_address, update) - { - Ok(data) => data, - Err(_) => continue, + tokio::select! { + update = stream.next() => { + let Some(update) = update else { + tracing::error!("Failed to receive update"); + tokio::spawn(async move { unsubscribe().await }); + bail!("Stream ended"); }; - - tokio::spawn({ - let (api_clients, signer) = (input.api_clients.clone(), input.signer.clone()); - async move { - let body = message_data_to_body(&unreliable_data); - match Observation::try_new(body.clone(), signer.clone()).await { - Ok(observation) => { - join_all(api_clients.iter().map(|api_client| { - let observation = observation.clone(); - let api_client = api_client.clone(); - async move { - if let Err(e) = api_client.post_observation(observation).await { - tracing::warn!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation"); - } else { - tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully"); - } + let started = Instant::now(); + let unreliable_data = decode_and_verify_update(&input.wormhole_pid, &input.accumulator_address, update); + let status = if unreliable_data.is_err() { + "error" + } else { + "success" + }; + let duration = started.elapsed(); + metrics::histogram!("decode_and_verify_observed_messages_duration").record( + duration.as_secs_f64(), + ); + metrics::counter!("decode_and_verify_observed_messages", &[("status", status)]).increment(1); + if let Ok(unreliable_data) = unreliable_data { + tokio::spawn({ + let (api_clients, signer) = (input.api_clients.clone(), input.signer.clone()); + async move { + let started = Instant::now(); + let body = message_data_to_body(&unreliable_data); + let status = match Observation::try_new(body.clone(), signer.clone()).await { + Ok(observation) => { + join_all(api_clients.iter().map(|api_client| { + let observation = observation.clone(); + let api_client = api_client.clone(); + async move { + if let Err(e) = api_client.post_observation(observation).await { + tracing::warn!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation"); + } else { + tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully"); + } + } + })).await; + "success" } - })).await; + Err(e) => { + tracing::error!(error = ?e, "Failed to create observation"); + "error" + } + }; + let duration = started.elapsed(); + metrics::histogram!("create_and_post_observation_duration").record( + duration.as_secs_f64(), + ); + metrics::counter!("create_and_post_observation", &[("status", status)]).increment(1); } - Err(e) => tracing::error!(error = ?e, "Failed to create observation"), - } + }); } - }); + } + _ = wait_for_exit() => { + tracing::info!("Received exit signal, stopping pythnet watcher"); + return Ok(()) + } } tokio::spawn(async move { unsubscribe().await }); - Err(PubsubClientError::ConnectionClosed( - "Stream ended".to_string(), - )) + bail!("Stream ended") } async fn get_signer(run_options: config::RunOptions) -> anyhow::Result> { @@ -216,6 +248,54 @@ async fn get_signer(run_options: config::RunOptions) -> anyhow::Result = watch::channel(false).0; +} + +pub async fn wait_for_exit() { + let mut rx = EXIT.subscribe(); + // Check if the exit flag is already set, if so, we don't need to wait. + if !(*rx.borrow()) { + // Wait until the exit flag is set. + let _ = rx.changed().await; + } +} + +async fn fault_tolerant_handler(name: String, f: F) +where + F: Fn() -> Fut, + Fut: Future> + Send + 'static, + Fut::Output: Send + 'static, +{ + loop { + let res = tokio::spawn(f()).await; + match res { + Ok(result) => match result { + Ok(_) => break, // This will happen on graceful shutdown + Err(err) => { + tracing::error!("{} returned error: {:?}", name, err); + sleep(Duration::from_millis(500)).await; + } + }, + Err(err) => { + tracing::error!("{} is panicked or canceled: {:?}", name, err); + EXIT.send_modify(|exit| *exit = true); + break; + } + } + } +} + async fn run(run_options: config::RunOptions) -> anyhow::Result<()> { let signer = get_signer(run_options.clone()) .await @@ -230,6 +310,7 @@ async fn run(run_options: config::RunOptions) -> anyhow::Result<()> { Pubkey::from_str(&run_options.wormhole_pid).context("Invalid Wormhole program ID")?; let api_clients: Vec = run_options .server_urls + .clone() .into_iter() .map(|server_url| ApiClient::try_new(server_url, None)) .collect::>>()?; @@ -244,20 +325,33 @@ async fn run(run_options: config::RunOptions) -> anyhow::Result<()> { "Running listener...", ); - loop { - if let Err(e) = run_listener(RunListenerInput { - ws_url: run_options.pythnet_url.clone(), - signer: signer.clone(), - wormhole_pid, - accumulator_address, - api_clients: api_clients.clone(), - }) - .await - { - tracing::error!(error = ?e, "Error listening to messages"); - sleep(Duration::from_millis(200)).await; // Wait before retrying + // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. + tokio::spawn(async move { + tracing::info!("Registered shutdown signal handler..."); + if tokio::signal::ctrl_c().await.is_ok() { + tracing::info!("Shut down signal received, waiting for tasks..."); + EXIT.send_modify(|exit| *exit = true); } - } + }); + + let metrics_recorder = setup_metrics_recorder()?; + tokio::join!( + fault_tolerant_handler("Metrics Server".to_string(), { + let run_options = run_options.clone(); + move || metrics_server::run(run_options.clone(), metrics_recorder.clone()) + }), + fault_tolerant_handler("Listener".to_string(), move || run_listener( + RunListenerInput { + ws_url: run_options.pythnet_url.clone(), + signer: signer.clone(), + wormhole_pid, + accumulator_address, + api_clients: api_clients.clone(), + } + )), + ); + + Ok(()) } #[tokio::main] diff --git a/src/metrics_server.rs b/src/metrics_server.rs new file mode 100644 index 0000000..edc1c5e --- /dev/null +++ b/src/metrics_server.rs @@ -0,0 +1,41 @@ +use axum::{routing::get, Router}; +use axum_prometheus::{ + metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}, + PrometheusMetricLayerBuilder, +}; + +use crate::{config::RunOptions, wait_for_exit}; + +pub const DEFAULT_METRICS_BUCKET: &[f64; 20] = &[ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.25, 1.5, 2.0, + 3.0, 5.0, 10.0, +]; + +pub fn setup_metrics_recorder() -> anyhow::Result { + PrometheusBuilder::new() + .set_buckets(DEFAULT_METRICS_BUCKET)? + .install_recorder() + .map_err(|err| anyhow::anyhow!("Failed to set up metrics recorder: {:?}", err)) +} + +pub async fn run( + run_options: RunOptions, + metrics_recorder: PrometheusHandle, +) -> anyhow::Result<()> { + tracing::info!("Starting Metrics Server..."); + + let (_, metric_handle) = PrometheusMetricLayerBuilder::new() + .with_metrics_from_fn(|| metrics_recorder) + .build_pair(); + let app = Router::new(); + let app = app.route("/metrics", get(|| async move { metric_handle.render() })); + + let listener = tokio::net::TcpListener::bind(&run_options.metrics_addr).await?; + axum::serve(listener, app) + .with_graceful_shutdown(async { + let _ = wait_for_exit().await; + tracing::info!("Shutting down metrics server..."); + }) + .await?; + Ok(()) +}