diff --git a/.github/workflows/ci-hermes-client-rust.yml b/.github/workflows/ci-hermes-client-rust.yml new file mode 100644 index 0000000000..38544c1db9 --- /dev/null +++ b/.github/workflows/ci-hermes-client-rust.yml @@ -0,0 +1,35 @@ +name: "Hermes Client Rust Test Suite" +on: + push: + branches: + - main + pull_request: + paths: + - .github/workflows/ci-hermes-client-rust.yml + - apps/hermes/client/rust/** + +jobs: + lazer-rust-test-suite: + name: Hermes Client Rust Test Suite + runs-on: ubuntu-22.04 + defaults: + run: + working-directory: apps/hermes/client/rust/ + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + - uses: actions-rust-lang/setup-rust-toolchain@v1 + - name: install taplo + run: cargo install --locked taplo-cli@0.9.3 + - name: check Cargo.toml formatting + run: find . -name Cargo.toml -exec taplo fmt --check --diff {} \; + - name: Format check + run: cargo fmt --all -- --check + if: success() || failure() + - name: Clippy check + run: cargo clippy -p pyth-hermes-client --all-targets -- --deny warnings + if: success() || failure() + - name: test + run: cargo test -p pyth-hermes-client + if: success() || failure() diff --git a/.github/workflows/publish-rust-hermes-client.yml b/.github/workflows/publish-rust-hermes-client.yml new file mode 100644 index 0000000000..6a2b124b9f --- /dev/null +++ b/.github/workflows/publish-rust-hermes-client.yml @@ -0,0 +1,18 @@ +name: Publish Rust package pyth-lazer-client to crates.io + +on: + push: + tags: + - rust-pyth-hermes-client-v* +jobs: + publish-pyth-hermes-client: + name: Publish Rust package pyth-hermes-client to crates.io + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v2 + + - run: cargo publish --token ${CARGO_REGISTRY_TOKEN} + env: + CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} + working-directory: "apps/hermes/client/rust" diff --git a/Cargo.lock b/Cargo.lock index 84102dcb41..c4a2eb09b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5606,6 +5606,31 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "pyth-hermes-client" +version = "0.0.1" +dependencies = [ + "alloy-primitives 0.8.25", + "anyhow", + "backoff", + "bincode 1.3.3", + "bs58", + "derive_more 1.0.0", + "ed25519-dalek 2.1.1", + "futures-util", + "hex", + "libsecp256k1 0.7.2", + "pyth-sdk 0.8.0", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite 0.20.1", + "tracing", + "tracing-subscriber", + "ttl_cache", + "url", +] + [[package]] name = "pyth-lazer-agent" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 32bb8f766d..b330eecd89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "apps/fortuna", "apps/pyth-lazer-agent", "apps/quorum", + "apps/hermes/client/rust", "lazer/publisher_sdk/rust", "lazer/sdk/rust/client", "lazer/sdk/rust/protocol", diff --git a/apps/hermes/client/rust/Cargo.toml b/apps/hermes/client/rust/Cargo.toml new file mode 100644 index 0000000000..d2f073cad9 --- /dev/null +++ b/apps/hermes/client/rust/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "pyth-hermes-client" +version = "0.0.1" +edition = "2021" +description = "A Rust client for Pyth Hermes" +license = "Apache-2.0" + +[dependencies] +pyth-sdk = { version = "0.8.0" } +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = { version = "0.20", features = ["native-tls"] } +futures-util = "0.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +anyhow = "1.0" +tracing = "0.1" +url = "2.4" +derive_more = { version = "1.0.0", features = ["from"] } +backoff = { version = "0.4.0", features = ["futures", "tokio"] } +ttl_cache = "0.5.1" + + +[dev-dependencies] +bincode = "1.3.3" +ed25519-dalek = { version = "2.1.1", features = ["rand_core"] } +hex = "0.4.3" +libsecp256k1 = "0.7.1" +bs58 = "0.5.1" +alloy-primitives = "0.8.19" +tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] } diff --git a/apps/hermes/client/rust/examples/subscribe_price_feeds.rs b/apps/hermes/client/rust/examples/subscribe_price_feeds.rs new file mode 100644 index 0000000000..4513e87409 --- /dev/null +++ b/apps/hermes/client/rust/examples/subscribe_price_feeds.rs @@ -0,0 +1,78 @@ +use std::time::Duration; + +use pyth_hermes_client::{ + backoff::HermesExponentialBackoffBuilder, + client::HermesClientBuilder, + ws_connection::{HermesClientMessageSubscribe, HermesClientMessageUnsubscribe}, +}; +use tokio::pin; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env()?, + ) + .json() + .init(); + + // Create and start the client + let mut client = HermesClientBuilder::default() + // Optionally override the default endpoints + .with_endpoints(vec!["wss://hermes.pyth.network/ws".parse()?]) + // Optionally set the number of connections + .with_num_connections(4) + // Optionally set the backoff strategy + .with_backoff(HermesExponentialBackoffBuilder::default().build()) + // Optionally set the timeout for each connection + .with_timeout(Duration::from_secs(5)) + // Optionally set the channel capacity for responses + .with_channel_capacity(1000) + .build()?; + + let stream = client.start().await?; + pin!(stream); + + let subscribe_request = HermesClientMessageSubscribe { + ids: vec!["2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b".to_string()], + verbose: true, + binary: true, + allow_out_of_order: false, + ignore_invalid_price_ids: false, + }; + + client.subscribe(subscribe_request).await?; + + println!("Subscribed to price feeds. Waiting for updates..."); + + // Process the first few updates + let mut count = 0; + while let Some(msg) = stream.recv().await { + // The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them. + + println!("Received message: {msg:#?}"); + println!(); + + count += 1; + if count >= 50 { + break; + } + } + + // Unsubscribe example + + client + .unsubscribe(HermesClientMessageUnsubscribe { + ids: vec![ + "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b".to_string(), + ], + }) + .await?; + println!("Unsubscribed from price feeds."); + + Ok(()) +} diff --git a/apps/hermes/client/rust/src/backoff.rs b/apps/hermes/client/rust/src/backoff.rs new file mode 100644 index 0000000000..02a8529904 --- /dev/null +++ b/apps/hermes/client/rust/src/backoff.rs @@ -0,0 +1,118 @@ +//! Exponential backoff implementation for Pyth Lazer client. +//! +//! This module provides a wrapper around the [`backoff`] crate's exponential backoff functionality, +//! offering a simplified interface tailored for Pyth Lazer client operations. + +use std::time::Duration; + +use backoff::{ + default::{INITIAL_INTERVAL_MILLIS, MAX_INTERVAL_MILLIS, MULTIPLIER, RANDOMIZATION_FACTOR}, + ExponentialBackoff, ExponentialBackoffBuilder, +}; + +/// A wrapper around the backoff crate's exponential backoff configuration. +/// +/// This struct encapsulates the parameters needed to configure exponential backoff +/// behavior and can be converted into the backoff crate's [`ExponentialBackoff`] type. +#[derive(Debug)] +pub struct HermesExponentialBackoff { + /// The initial retry interval. + initial_interval: Duration, + /// The randomization factor to use for creating a range around the retry interval. + /// + /// A randomization factor of 0.5 results in a random period ranging between 50% below and 50% + /// above the retry interval. + randomization_factor: f64, + /// The value to multiply the current interval with for each retry attempt. + multiplier: f64, + /// The maximum value of the back off period. Once the retry interval reaches this + /// value it stops increasing. + max_interval: Duration, +} + +impl From for ExponentialBackoff { + fn from(val: HermesExponentialBackoff) -> Self { + ExponentialBackoffBuilder::default() + .with_initial_interval(val.initial_interval) + .with_randomization_factor(val.randomization_factor) + .with_multiplier(val.multiplier) + .with_max_interval(val.max_interval) + .with_max_elapsed_time(None) + .build() + } +} + +/// Builder for [`PythLazerExponentialBackoff`]. +/// +/// Provides a fluent interface for configuring exponential backoff parameters +/// with sensible defaults from the backoff crate. +#[derive(Debug)] +pub struct HermesExponentialBackoffBuilder { + initial_interval: Duration, + randomization_factor: f64, + multiplier: f64, + max_interval: Duration, +} + +impl Default for HermesExponentialBackoffBuilder { + fn default() -> Self { + Self { + initial_interval: Duration::from_millis(INITIAL_INTERVAL_MILLIS), + randomization_factor: RANDOMIZATION_FACTOR, + multiplier: MULTIPLIER, + max_interval: Duration::from_millis(MAX_INTERVAL_MILLIS), + } + } +} + +impl HermesExponentialBackoffBuilder { + /// Creates a new builder with default values. + pub fn new() -> Self { + Default::default() + } + + /// Sets the initial retry interval. + /// + /// This is the starting interval for the first retry attempt. + pub fn with_initial_interval(&mut self, initial_interval: Duration) -> &mut Self { + self.initial_interval = initial_interval; + self + } + + /// Sets the randomization factor to use for creating a range around the retry interval. + /// + /// A randomization factor of 0.5 results in a random period ranging between 50% below and 50% + /// above the retry interval. This helps avoid the "thundering herd" problem when multiple + /// clients retry at the same time. + pub fn with_randomization_factor(&mut self, randomization_factor: f64) -> &mut Self { + self.randomization_factor = randomization_factor; + self + } + + /// Sets the value to multiply the current interval with for each retry attempt. + /// + /// A multiplier of 2.0 means each retry interval will be double the previous one. + pub fn with_multiplier(&mut self, multiplier: f64) -> &mut Self { + self.multiplier = multiplier; + self + } + + /// Sets the maximum value of the back off period. + /// + /// Once the retry interval reaches this value it stops increasing, providing + /// an upper bound on the wait time between retries. + pub fn with_max_interval(&mut self, max_interval: Duration) -> &mut Self { + self.max_interval = max_interval; + self + } + + /// Builds the [`PythLazerExponentialBackoff`] configuration. + pub fn build(&self) -> HermesExponentialBackoff { + HermesExponentialBackoff { + initial_interval: self.initial_interval, + randomization_factor: self.randomization_factor, + multiplier: self.multiplier, + max_interval: self.max_interval, + } + } +} diff --git a/apps/hermes/client/rust/src/client.rs b/apps/hermes/client/rust/src/client.rs new file mode 100644 index 0000000000..103976e85d --- /dev/null +++ b/apps/hermes/client/rust/src/client.rs @@ -0,0 +1,391 @@ +//! # Hermes Client +//! +//! This module provides a high-level client for connecting to Hermes data streams. +//! The client maintains multiple WebSocket connections for redundancy and provides +//! automatic deduplication of messages. +//! +//! ## Features +//! +//! - Multiple redundant WebSocket connections +//! - Automatic message deduplication +//! - Exponential backoff for reconnections +//! - Configurable timeouts and channel capacities +//! - Builder pattern for easy configuration +//! +//! ## Basic Usage +//! +//! ```rust,ignore +//! use hermes_client::{HermesClientBuilder, HermesClientMessageSubscribe}; +//! +//! #[tokio::main] +//! async fn main() -> anyhow::Result<()> { +//! let mut client = HermesClientBuilder::default() +//! .with_num_connections(2) +//! .build()?; +//! +//! let mut receiver = client.start().await?; +//! +//! // Subscribe to price feeds +//! let subscribe_request = HermesClientMessageSubscribe { +//! // ... configure subscription +//! }; +//! client.subscribe(subscribe_request).await?; +//! +//! // Process incoming messages +//! while let Some(response) = receiver.recv().await { +//! println!("Received: {:?}", response); +//! } +//! +//! Ok(()) +//! } +//! ``` + +use std::time::Duration; + +use crate::{ + backoff::{HermesExponentialBackoff, HermesExponentialBackoffBuilder}, + resilient_ws_connection::HermesResilientWSConnection, + ws_connection::{ + HermesClientMessage, HermesClientMessageSubscribe, HermesClientMessageUnsubscribe, + HermesServerMessage, + }, + CHANNEL_CAPACITY, +}; +use anyhow::{bail, Result}; +use backoff::ExponentialBackoff; +use tokio::sync::mpsc::{self, error::TrySendError}; +use tracing::{error, warn}; +use ttl_cache::TtlCache; +use url::Url; + +const DEDUP_CACHE_SIZE: usize = 100_000; +const DEDUP_TTL: Duration = Duration::from_secs(10); + +const DEFAULT_ENDPOINTS: [&str; 1] = ["wss://hermes.pyth.network/ws"]; +const DEFAULT_NUM_CONNECTIONS: usize = 3; +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); + +/// A high-performance client for connecting to Hermes data streams. +/// +/// The `HermesClient` maintains multiple WebSocket connections to Hermes endpoints +/// for redundancy. It automatically handles connection management, +/// message deduplication, and provides a unified stream of price updates. +/// +/// ## Architecture +/// +/// - Maintains multiple WebSocket connections to different endpoints +/// - Uses a TTL cache for deduplicating messages across connections +/// - Provides a single channel for consuming deduplicated messages +/// - Handles connection failures with exponential backoff +pub struct HermesClient { + endpoints: Vec, + num_connections: usize, + ws_connections: Vec, + backoff: ExponentialBackoff, + timeout: Duration, + channel_capacity: usize, +} + +impl HermesClient { + /// Creates a new Hermes client instance. + /// + /// This is a low-level constructor. Consider using [`HermesClientBuilder`] for a more + /// convenient way to create clients with sensible defaults. + /// + /// # Arguments + /// + /// * `endpoints` - A vector of WebSocket endpoint URLs to connect to. Must not be empty. + /// * `num_connections` - The number of WebSocket connections to maintain for redundancy + /// * `backoff` - The exponential backoff configuration for connection retries + /// * `timeout` - The timeout duration for WebSocket operations + /// * `channel_capacity` - The capacity of the message channel + /// + /// # Returns + /// + /// Returns `Ok(HermesClient)` on success, or an error if the configuration is invalid. + /// + /// # Errors + /// + /// Returns an error if: + /// - The `endpoints` vector is empty + /// + pub fn new( + endpoints: Vec, + num_connections: usize, + backoff: HermesExponentialBackoff, + timeout: Duration, + channel_capacity: usize, + ) -> Result { + if endpoints.is_empty() { + bail!("At least one endpoint must be provided"); + } + Ok(Self { + endpoints, + num_connections, + ws_connections: Vec::with_capacity(num_connections), + backoff: backoff.into(), + timeout, + channel_capacity, + }) + } + + /// Starts the client and begins establishing WebSocket connections. + /// + /// This method initializes all WebSocket connections and starts the message processing + /// loop. It returns a receiver channel that will yield deduplicated messages from + /// all connections. + /// + /// # Returns + /// + /// Returns a `Receiver` that yields deduplicated messages from all + /// WebSocket connections. The receiver will continue to yield messages until + /// all connections are closed or the client is dropped. + /// + /// # Errors + /// + /// This method itself doesn't return errors, but individual connection failures + /// are handled internally with automatic reconnection using the configured backoff + /// strategy. + /// + /// # Message Deduplication + /// + /// Messages are deduplicated using a TTL cache with a 10-second window. This ensures + /// that identical messages received from multiple connections are only delivered once. + /// + pub async fn start(&mut self) -> Result> { + let (sender, receiver) = mpsc::channel::(self.channel_capacity); + let (ws_connection_sender, mut ws_connection_receiver) = + mpsc::channel::(CHANNEL_CAPACITY); + + for i in 0..self.num_connections { + let endpoint = self.endpoints[i % self.endpoints.len()].clone(); + let connection = HermesResilientWSConnection::new( + endpoint, + self.backoff.clone(), + self.timeout, + ws_connection_sender.clone(), + ); + self.ws_connections.push(connection); + } + + let mut seen_updates = TtlCache::new(DEDUP_CACHE_SIZE); + + tokio::spawn(async move { + while let Some(response) = ws_connection_receiver.recv().await { + let cache_key = response.cache_key(); + if seen_updates.contains_key(&cache_key) { + continue; + } + seen_updates.insert(cache_key, response.clone(), DEDUP_TTL); + + match sender.try_send(response) { + Ok(_) => (), + Err(TrySendError::Full(r)) => { + warn!("Sender channel is full, responses will be delayed"); + if sender.send(r).await.is_err() { + error!("Sender channel is closed, stopping client"); + } + } + Err(TrySendError::Closed(_)) => { + error!("Sender channel is closed, stopping client"); + } + } + } + }); + + Ok(receiver) + } + + /// Subscribes to data streams across all WebSocket connections. + /// + /// This method sends the subscription request to all active WebSocket connections, + /// ensuring redundancy. If any connection fails to subscribe, + /// an error is returned, but other connections may still be subscribed. + /// + /// # Arguments + /// + /// * `subscribe_request` - The subscription request specifying which data streams to subscribe to + /// + /// # Returns + /// + /// Returns `Ok(())` if the subscription was successfully sent to all connections, + /// or an error if any connection failed to process the subscription. + /// + pub async fn subscribe( + &mut self, + subscribe_request: HermesClientMessageSubscribe, + ) -> Result<()> { + for connection in &mut self.ws_connections { + connection + .send_request(HermesClientMessage::Subscribe(subscribe_request.clone())) + .await?; + } + Ok(()) + } + + /// Unsubscribes from a specific data stream across all WebSocket connections. + /// + /// This method sends an unsubscribe request for the specified subscription ID + /// to all active WebSocket connections. + /// + /// # Arguments + /// + /// * `unsubscribe_request` - The unsubscribe request specifying which data streams to unsubscribe from + /// + /// # Returns + /// + /// Returns `Ok(())` if the unsubscribe request was successfully sent to all connections, + /// or an error if any connection failed to process the request. + /// + pub async fn unsubscribe( + &mut self, + unsubscribe_request: HermesClientMessageUnsubscribe, + ) -> Result<()> { + for connection in &mut self.ws_connections { + connection + .send_request(HermesClientMessage::Unsubscribe( + unsubscribe_request.clone(), + )) + .await?; + } + Ok(()) + } +} + +/// A builder for creating [`HermesClient`] instances with customizable configuration. +/// +/// The builder provides a convenient way to configure a Hermes client with sensible +/// defaults while allowing customization of all parameters. It follows the builder pattern +/// for a fluent API. +/// +/// ## Default Configuration +/// +/// - **Endpoints**: Uses Hermes's default production endpoints +/// - **Connections**: 3 concurrent WebSocket connections +/// - **Timeout**: 5 seconds for WebSocket operations +/// - **Backoff**: Exponential backoff with default settings +/// - **Channel Capacity**: Uses the default 1000 +/// +pub struct HermesClientBuilder { + endpoints: Vec, + num_connections: usize, + backoff: HermesExponentialBackoff, + timeout: Duration, + channel_capacity: usize, +} + +impl Default for HermesClientBuilder { + fn default() -> Self { + Self { + endpoints: DEFAULT_ENDPOINTS + .iter() + .map(|&s| s.parse().unwrap()) + .collect(), + num_connections: DEFAULT_NUM_CONNECTIONS, + backoff: HermesExponentialBackoffBuilder::default().build(), + timeout: DEFAULT_TIMEOUT, + channel_capacity: CHANNEL_CAPACITY, + } + } +} + +impl HermesClientBuilder { + /// Sets custom WebSocket endpoints for the client. + /// + /// By default, the client uses Hermes's production endpoints. Use this method + /// to connect to different environments (staging, local development) or to use + /// custom endpoint configurations. + /// + /// # Arguments + /// + /// * `endpoints` - A vector of WebSocket endpoint URLs. Must not be empty. + /// + pub fn with_endpoints(mut self, endpoints: Vec) -> Self { + self.endpoints = endpoints; + self + } + + /// Sets the number of concurrent WebSocket connections to maintain. + /// + /// More connections provide better redundancy and can improve throughput, + /// but also consume more resources. + /// + /// # Arguments + /// + /// * `num_connections` - The number of WebSocket connections (must be > 0) + /// + pub fn with_num_connections(mut self, num_connections: usize) -> Self { + self.num_connections = num_connections; + self + } + + /// Sets the exponential backoff configuration for connection retries. + /// + /// The backoff strategy determines how the client handles connection failures + /// and retries. + /// + /// # Arguments + /// + /// * `backoff` - The exponential backoff configuration + /// + pub fn with_backoff(mut self, backoff: HermesExponentialBackoff) -> Self { + self.backoff = backoff; + self + } + + /// Sets the timeout duration for WebSocket operations. + /// + /// This timeout applies to each WebSocket connection, + /// if no response is received within this duration, + /// the connection will be considered failed and retried. + /// + /// # Arguments + /// + /// * `timeout` - The timeout duration for each WebSocket + /// + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + /// Sets the capacity of the internal message channel. + /// + /// This determines how many messages can be buffered internally before + /// the client starts applying backpressure. + /// + /// # Arguments + /// + /// * `channel_capacity` - The channel capacity (number of messages) + /// + pub fn with_channel_capacity(mut self, channel_capacity: usize) -> Self { + self.channel_capacity = channel_capacity; + self + } + + /// Builds the configured [`HermesClient`] instance. + /// + /// This consumes the builder and creates a new client with the specified + /// configuration. The client is ready to use but connections are not + /// established until [`HermesClient::start`] is called. + /// + /// # Returns + /// + /// Returns `Ok(HermesClient)` on success, or an error if the configuration + /// is invalid. + /// + /// # Errors + /// + /// Returns an error if: + /// - No endpoints are configured + /// - Any configuration parameter is invalid + /// + pub fn build(self) -> Result { + HermesClient::new( + self.endpoints, + self.num_connections, + self.backoff, + self.timeout, + self.channel_capacity, + ) + } +} diff --git a/apps/hermes/client/rust/src/lib.rs b/apps/hermes/client/rust/src/lib.rs new file mode 100644 index 0000000000..c62eab1ff4 --- /dev/null +++ b/apps/hermes/client/rust/src/lib.rs @@ -0,0 +1,6 @@ +const CHANNEL_CAPACITY: usize = 1000; + +pub mod backoff; +pub mod client; +pub mod resilient_ws_connection; +pub mod ws_connection; diff --git a/apps/hermes/client/rust/src/resilient_ws_connection.rs b/apps/hermes/client/rust/src/resilient_ws_connection.rs new file mode 100644 index 0000000000..b689d9c5e4 --- /dev/null +++ b/apps/hermes/client/rust/src/resilient_ws_connection.rs @@ -0,0 +1,181 @@ +use std::time::Duration; + +use backoff::{backoff::Backoff, ExponentialBackoff}; +use futures_util::StreamExt; + +use tokio::{pin, select, sync::mpsc, time::Instant}; +use tracing::{error, info}; +use url::Url; + +use crate::{ + ws_connection::{ + HermesClientMessage, HermesClientMessageSubscribe, HermesServerMessage, HermesWSConnection, + }, + CHANNEL_CAPACITY, +}; +use anyhow::{bail, Context, Result}; + +const BACKOFF_RESET_DURATION: Duration = Duration::from_secs(10); + +pub struct HermesResilientWSConnection { + request_sender: mpsc::Sender, +} + +impl HermesResilientWSConnection { + /// Creates a new resilient WebSocket client instance + /// + /// # Arguments + /// * `endpoint` - The WebSocket URL of the Lazer service + /// * `sender` - A sender to send responses back to the client + /// + /// # Returns + /// Returns a new client instance (not yet connected) + pub fn new( + endpoint: Url, + backoff: ExponentialBackoff, + timeout: Duration, + sender: mpsc::Sender, + ) -> Self { + let (request_sender, mut request_receiver) = mpsc::channel(CHANNEL_CAPACITY); + let mut task = HermesWSConnectionTask::new(endpoint, backoff, timeout); + + tokio::spawn(async move { + if let Err(e) = task.run(sender, &mut request_receiver).await { + error!("Resilient WebSocket connection task failed: {}", e); + } + }); + + Self { request_sender } + } + + pub async fn send_request(&mut self, request: HermesClientMessage) -> Result<()> { + self.request_sender + .send(request) + .await + .context("Failed to send request")?; + Ok(()) + } +} + +struct HermesWSConnectionTask { + endpoint: Url, + subscribe_message: Option, + backoff: ExponentialBackoff, + timeout: Duration, +} + +impl HermesWSConnectionTask { + pub fn new(endpoint: Url, backoff: ExponentialBackoff, timeout: Duration) -> Self { + Self { + endpoint, + subscribe_message: None, + backoff, + timeout, + } + } + + pub async fn run( + &mut self, + response_sender: mpsc::Sender, + request_receiver: &mut mpsc::Receiver, + ) -> Result<()> { + loop { + let start_time = Instant::now(); + if let Err(e) = self.start(response_sender.clone(), request_receiver).await { + // If a connection was working for BACKOFF_RESET_DURATION + // and timeout + 1sec, it was considered successful therefore reset the backoff + if start_time.elapsed() > BACKOFF_RESET_DURATION + && start_time.elapsed() > self.timeout + Duration::from_secs(1) + { + self.backoff.reset(); + } + + let delay = self.backoff.next_backoff(); + match delay { + Some(d) => { + info!("WebSocket connection failed: {}. Retrying in {:?}", e, d); + tokio::time::sleep(d).await; + } + None => { + bail!( + "Max retries reached for WebSocket connection to {}, this should never happen, please contact developers", + self.endpoint + ); + } + } + } + } + } + + pub async fn start( + &mut self, + sender: mpsc::Sender, + request_receiver: &mut mpsc::Receiver, + ) -> Result<()> { + let mut ws_connection = HermesWSConnection::new(self.endpoint.clone())?; + let stream = ws_connection.start().await?; + pin!(stream); + + if let Some(subscribe_message) = self.subscribe_message.clone() { + ws_connection + .send_request(HermesClientMessage::Subscribe(subscribe_message)) + .await?; + } + + loop { + let timeout_response = tokio::time::timeout(self.timeout, stream.next()); + + select! { + response = timeout_response => { + match response { + Ok(Some(response)) => match response { + Ok(response) => { + sender + .send(response) + .await + .context("Failed to send response")?; + } + Err(e) => { + bail!("WebSocket stream error: {}", e); + } + }, + Ok(None) => { + bail!("WebSocket stream ended unexpectedly"); + } + Err(_elapsed) => { + bail!("WebSocket stream timed out"); + } + } + } + Some(request) = request_receiver.recv() => { + self.handle_request(&mut ws_connection,request).await?; + } + } + } + } + + pub async fn handle_request( + &mut self, + ws_connection: &mut HermesWSConnection, + request: HermesClientMessage, + ) -> Result<()> { + match request.clone() { + HermesClientMessage::Subscribe(subscribe_message) => { + self.subscribe_message = Some(subscribe_message); + } + HermesClientMessage::Unsubscribe(unsubscribe_message) => { + if let Some(mut subscribe_message) = self.subscribe_message.clone() { + subscribe_message + .ids + .retain(|id| !unsubscribe_message.ids.contains(id)); + if subscribe_message.ids.is_empty() { + self.subscribe_message = None; + } else { + self.subscribe_message = Some(subscribe_message); + } + } + } + } + ws_connection.send_request(request).await + } +} diff --git a/apps/hermes/client/rust/src/ws_connection.rs b/apps/hermes/client/rust/src/ws_connection.rs new file mode 100644 index 0000000000..ec84363600 --- /dev/null +++ b/apps/hermes/client/rust/src/ws_connection.rs @@ -0,0 +1,186 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + +use anyhow::anyhow; +use anyhow::Result; +use derive_more::From; +use futures_util::{SinkExt, StreamExt, TryStreamExt}; +use serde::{Deserialize, Serialize}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tracing::warn; +use url::Url; + +#[derive(Serialize, Debug, Clone)] +#[serde(tag = "type")] +pub enum HermesClientMessage { + #[serde(rename = "subscribe")] + Subscribe(HermesClientMessageSubscribe), + #[serde(rename = "unsubscribe")] + Unsubscribe(HermesClientMessageUnsubscribe), +} + +#[derive(Serialize, Debug, Clone)] +pub struct HermesClientMessageSubscribe { + pub ids: Vec, + #[serde(default)] + pub verbose: bool, + #[serde(default)] + pub binary: bool, + #[serde(default)] + pub allow_out_of_order: bool, + #[serde(default)] + pub ignore_invalid_price_ids: bool, +} + +#[derive(Serialize, Debug, Clone)] +pub struct HermesClientMessageUnsubscribe { + pub ids: Vec, +} + +#[derive(Deserialize, Debug, Clone, Hash)] +#[serde(tag = "type")] +pub enum HermesServerMessage { + #[serde(rename = "response")] + Response(HermesServerResponseMessage), + #[serde(rename = "price_update")] + PriceUpdate { price_feed: HermesPriceFeed }, +} + +impl HermesServerMessage { + pub fn cache_key(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + self.hash(&mut hasher); + hasher.finish() + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash)] +#[serde(tag = "status")] +pub enum HermesServerResponseMessage { + #[serde(rename = "success")] + Success, + #[serde(rename = "error")] + Err { error: String }, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Hash)] +pub struct HermesPriceFeed { + pub id: String, + pub price: HermesPrice, + pub ema_price: HermesPrice, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub vaa: Option, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Hash)] +pub struct HermesPrice { + #[serde(with = "pyth_sdk::utils::as_string")] + pub price: i64, + #[serde(with = "pyth_sdk::utils::as_string")] + pub conf: u64, + /// Exponent. + pub expo: i32, + /// Publish time. + pub publish_time: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Hash)] +pub struct HermesPriceFeedMetadata { + pub slot: Option, + pub emitter_chain: u16, + pub price_service_receive_time: Option, + pub prev_publish_time: Option, +} + +/// A WebSocket client for consuming Pyth Hermes price feed updates +/// +/// This client provides a simple interface to: +/// - Connect to a Hermes WebSocket endpoint +/// - Subscribe to price feed updates +/// - Receive updates as a stream of messages +/// +pub struct HermesWSConnection { + endpoint: Url, + ws_sender: Option< + futures_util::stream::SplitSink< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + Message, + >, + >, +} + +impl HermesWSConnection { + /// Creates a new Hermes client instance + /// + /// # Arguments + /// * `endpoint` - The WebSocket URL of the Hermes service + /// + /// # Returns + /// Returns a new client instance (not yet connected) + pub fn new(endpoint: Url) -> Result { + Ok(Self { + endpoint, + ws_sender: None, + }) + } + + /// Starts the WebSocket connection + /// + /// # Returns + /// Returns a stream of responses from the server + pub async fn start( + &mut self, + ) -> Result>> { + let url = self.endpoint.clone(); + + let (ws_stream, _) = connect_async(url).await?; + let (ws_sender, ws_receiver) = ws_stream.split(); + + self.ws_sender = Some(ws_sender); + let response_stream = + ws_receiver + .map_err(anyhow::Error::from) + .try_filter_map(|msg| async { + let r: Result> = match msg { + Message::Text(text) => { + Ok(Some(serde_json::from_str::(&text)?)) + } + Message::Binary(_) => { + warn!("Received unexpected binary message"); + Ok(None) + } + Message::Close(_) => { + Err(anyhow!("WebSocket connection closed unexpectedly")) + } + _ => Ok(None), + }; + r + }); + + Ok(response_stream) + } + + pub async fn send_request(&mut self, request: HermesClientMessage) -> Result<()> { + if let Some(sender) = &mut self.ws_sender { + let msg = serde_json::to_string(&request)?; + sender.send(Message::Text(msg)).await?; + Ok(()) + } else { + anyhow::bail!("WebSocket connection not started") + } + } + + /// Closes the WebSocket connection + pub async fn close(&mut self) -> Result<()> { + if let Some(sender) = &mut self.ws_sender { + sender.send(Message::Close(None)).await?; + self.ws_sender = None; + Ok(()) + } else { + anyhow::bail!("WebSocket connection not started") + } + } +}