diff --git a/apps/hermes/server/Cargo.lock b/apps/hermes/server/Cargo.lock index ef9b6e46aa..26bfc4fe6a 100644 --- a/apps/hermes/server/Cargo.lock +++ b/apps/hermes/server/Cargo.lock @@ -1796,7 +1796,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.7.2" +version = "0.8.0" dependencies = [ "anyhow", "async-trait", diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index 6df64185ec..59e369dc9c 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.7.2" +version = "0.8.0" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/server/src/api.rs b/apps/hermes/server/src/api.rs index 0484576d49..f166685d29 100644 --- a/apps/hermes/server/src/api.rs +++ b/apps/hermes/server/src/api.rs @@ -101,6 +101,7 @@ where rest::latest_vaas, rest::price_feed_ids, rest::latest_price_updates, + rest::latest_twaps, rest::latest_publisher_stake_caps, rest::timestamp_price_updates, rest::price_feeds_metadata, @@ -126,6 +127,8 @@ where types::ParsedPublisherStakeCapsUpdate, types::ParsedPublisherStakeCap, types::AssetType, + types::TwapsResponse, + types::RpcPriceFeedTwap, ) ), tags( @@ -152,6 +155,15 @@ where get(rest::price_stream_sse_handler), ) .route("/v2/updates/price/latest", get(rest::latest_price_updates)) + .route( + "/v2/updates/twap/:window_seconds/latest", + get(rest::latest_twaps), + ) + // TODO(Tejas) + // .route( + // "/v2/updates/twap/:window_seconds/:publish_time", + // get(rest::latest_twaps), + // ) .route( "/v2/updates/publisher_stake_caps/latest", get(rest::latest_publisher_stake_caps), diff --git a/apps/hermes/server/src/api/rest.rs b/apps/hermes/server/src/api/rest.rs index d4a381a25c..44c8db77f7 100644 --- a/apps/hermes/server/src/api/rest.rs +++ b/apps/hermes/server/src/api/rest.rs @@ -30,8 +30,8 @@ pub use { price_feed_ids::*, ready::*, v2::{ - latest_price_updates::*, latest_publisher_stake_caps::*, price_feeds_metadata::*, sse::*, - timestamp_price_updates::*, + latest_price_updates::*, latest_publisher_stake_caps::*, latest_twaps::*, + price_feeds_metadata::*, sse::*, timestamp_price_updates::*, }, }; @@ -125,7 +125,7 @@ mod tests { crate::state::{ aggregate::{ AggregationEvent, PriceFeedsWithUpdateData, PublisherStakeCapsWithUpdateData, - ReadinessMetadata, RequestTime, Update, + ReadinessMetadata, RequestTime, TwapsWithUpdateData, Update, }, benchmarks::BenchmarksState, cache::CacheState, @@ -198,6 +198,14 @@ mod tests { ) -> Result { unimplemented!("Not needed for this test") } + async fn get_twaps_with_update_data( + &self, + _price_ids: &[PriceIdentifier], + _start_time: RequestTime, + _end_time: RequestTime, + ) -> Result { + unimplemented!("Not needed for this test") + } } #[tokio::test] diff --git a/apps/hermes/server/src/api/rest/index.rs b/apps/hermes/server/src/api/rest/index.rs index fee938984b..fc6d398f05 100644 --- a/apps/hermes/server/src/api/rest/index.rs +++ b/apps/hermes/server/src/api/rest/index.rs @@ -13,9 +13,12 @@ pub async fn index() -> impl IntoResponse { "/api/get_price_feed?id=&publish_time=(&verbose=true)(&binary=true)", "/api/get_vaa?id=&publish_time=", "/api/get_vaa_ccip?data=<0x+>", + "/v2/updates/price/latest?ids[]=&ids[]=&..(&encoding=hex|base64)(&parsed=false)", "/v2/updates/price/stream?ids[]=&ids[]=&..(&encoding=hex|base64)(&parsed=false)(&allow_unordered=false)(&benchmarks_only=false)", "/v2/updates/price/?ids[]=&ids[]=&..(&encoding=hex|base64)(&parsed=false)", "/v2/price_feeds?(query=btc)(&asset_type=crypto|equity|fx|metal|rates)", + "/v2/updates/twap//latest?ids[]=&ids[]=&..(&encoding=hex|base64)(&parsed=false)", + "/v2/updates/twap//?ids[]=&ids[]=&..(&encoding=hex|base64)(&parsed=false)", ]) } diff --git a/apps/hermes/server/src/api/rest/v2/latest_twaps.rs b/apps/hermes/server/src/api/rest/v2/latest_twaps.rs new file mode 100644 index 0000000000..f86fc4741a --- /dev/null +++ b/apps/hermes/server/src/api/rest/v2/latest_twaps.rs @@ -0,0 +1,162 @@ +use { + crate::{ + api::{ + rest::{validate_price_ids, RestError}, + types::{BinaryUpdate, EncodingType, PriceIdInput, RpcPriceFeedTwap, TwapsResponse}, + ApiState, + }, + state::aggregate::{Aggregates, RequestTime}, + }, + anyhow::Result, + axum::{ + extract::{Path, State}, + Json, + }, + base64::{engine::general_purpose::STANDARD as base64_standard_engine, Engine as _}, + pyth_sdk::{DurationInSeconds, PriceIdentifier, UnixTimestamp}, + serde::Deserialize, + serde_qs::axum::QsQuery, + utoipa::IntoParams, +}; + +#[derive(Debug, Deserialize, IntoParams)] +#[into_params(parameter_in=Path)] +pub struct LatestTwapsPathParams { + /// The time window in seconds over which to calculate the TWAP, ending at the current time. + /// For example, a value of 300 would return the most recent 5 minute TWAP. + /// Must be greater than 0 and less than or equal to 600 seconds (10 minutes). + #[param(example = "300")] + #[serde(deserialize_with = "validate_twap_window")] + window_seconds: u64, +} + +#[derive(Debug, Deserialize, IntoParams)] +#[into_params(parameter_in=Query)] +pub struct LatestTwapsQueryParams { + /// Get the most recent TWAP (time weighted average price) for this set of price feed ids. + /// + /// This parameter can be provided multiple times to retrieve multiple price updates, + /// for example see the following query string: + /// + /// ``` + /// ?ids[]=a12...&ids[]=b4c... + /// ``` + #[param(rename = "ids[]")] + #[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")] + ids: Vec, + + /// Optional encoding type. If true, return the price update in the encoding specified by the encoding parameter. Default is `hex`. + #[serde(default)] + encoding: EncodingType, + + /// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `true`. + #[serde(default = "default_true")] + parsed: bool, + + /// If true, invalid price IDs in the `ids` parameter are ignored. Only applicable to the v2 APIs. Default is `false`. + #[serde(default)] + ignore_invalid_price_ids: bool, +} + +fn validate_twap_window<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + use serde::de::Error; + let seconds = DurationInSeconds::deserialize(deserializer)?; + if seconds == 0 || seconds > 600 { + return Err(D::Error::custom( + "twap_window_seconds must be in range (0, 600]", + )); + } + Ok(seconds) +} +fn default_true() -> bool { + true +} + +/// Get the latest TWAP by price feed id. +/// +/// Given a collection of price feed ids, retrieve the latest Pyth price for each price feed. +#[utoipa::path( + get, + path = "/v2/updates/twap/{window_seconds}/latest", + responses( + (status = 200, description = "TWAPs retrieved successfully", body = TwapsResponse), + (status = 404, description = "Price ids not found", body = String) + ), + params( + LatestTwapsPathParams, + LatestTwapsQueryParams + ) +)] +pub async fn latest_twaps( + State(state): State>, + Path(path_params): Path, + QsQuery(params): QsQuery, +) -> Result, RestError> +where + S: Aggregates, +{ + let price_id_inputs: Vec = + params.ids.into_iter().map(|id| id.into()).collect(); + let price_ids: Vec = + validate_price_ids(&state, &price_id_inputs, params.ignore_invalid_price_ids).await?; + + // Collect start and end bounds for the TWAP window + let window_seconds = path_params.window_seconds as i64; + let current_time = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as UnixTimestamp; + let start_time = current_time - window_seconds; + + // Calculate the average + let twaps_with_update_data = Aggregates::get_twaps_with_update_data( + &*state.state, + &price_ids, + RequestTime::FirstAfter(start_time), + RequestTime::Latest, + ) + .await + .map_err(|e| { + tracing::warn!( + "Error getting TWAPs for price IDs {:?} with update data: {:?}", + price_ids, + e + ); + RestError::UpdateDataNotFound + })?; + + // Include binary update data for the messages used to calculate the TWAP + let twap_update_data = twaps_with_update_data.update_data; + let encoded_data: Vec = twap_update_data + .into_iter() + .map(|data| match params.encoding { + EncodingType::Base64 => base64_standard_engine.encode(data), + EncodingType::Hex => hex::encode(data), + }) + .collect(); + let binary_price_update = BinaryUpdate { + encoding: params.encoding, + data: encoded_data, + }; + let parsed_twaps: Option> = if params.parsed { + Some( + twaps_with_update_data + .twaps + .into_iter() + .map(Into::into) + .collect(), + ) + } else { + None + }; + + let twap_resp = TwapsResponse { + binary: binary_price_update, + parsed: parsed_twaps, + }; + + Ok(Json(twap_resp)) +} diff --git a/apps/hermes/server/src/api/rest/v2/mod.rs b/apps/hermes/server/src/api/rest/v2/mod.rs index 4777e0ebbc..3ec0ee1f20 100644 --- a/apps/hermes/server/src/api/rest/v2/mod.rs +++ b/apps/hermes/server/src/api/rest/v2/mod.rs @@ -1,5 +1,6 @@ pub mod latest_price_updates; pub mod latest_publisher_stake_caps; +pub mod latest_twaps; pub mod price_feeds_metadata; pub mod sse; pub mod timestamp_price_updates; diff --git a/apps/hermes/server/src/api/types.rs b/apps/hermes/server/src/api/types.rs index 9b1c134e7b..f5dcb34836 100644 --- a/apps/hermes/server/src/api/types.rs +++ b/apps/hermes/server/src/api/types.rs @@ -244,6 +244,31 @@ impl From for ParsedPriceUpdate { } } } +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct PriceFeedTwap { + pub id: PriceIdentifier, + pub twap_price: Price, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct RpcPriceFeedTwap { + pub id: RpcPriceIdentifier, + pub twap_price: RpcPrice, +} + +impl From for RpcPriceFeedTwap { + fn from(twap: PriceFeedTwap) -> Self { + Self { + id: RpcPriceIdentifier::from(twap.id), + twap_price: RpcPrice { + price: twap.twap_price.price, + conf: twap.twap_price.conf, + expo: twap.twap_price.expo, + publish_time: twap.twap_price.publish_time, + }, + } + } +} #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize, Clone, ToSchema)] pub struct ParsedPublisherStakeCapsUpdate { @@ -263,6 +288,13 @@ pub struct LatestPublisherStakeCapsUpdateDataResponse { pub parsed: Option>, } +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct TwapsResponse { + pub binary: BinaryUpdate, + #[serde(skip_serializing_if = "Option::is_none")] + pub parsed: Option>, +} + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct PriceUpdate { pub binary: BinaryUpdate, diff --git a/apps/hermes/server/src/state/aggregate.rs b/apps/hermes/server/src/state/aggregate.rs index 0232628a73..de5783617c 100644 --- a/apps/hermes/server/src/state/aggregate.rs +++ b/apps/hermes/server/src/state/aggregate.rs @@ -1,7 +1,10 @@ +use crate::api::types::PriceFeedTwap; #[cfg(test)] use mock_instant::{SystemTime, UNIX_EPOCH}; +use pythnet_sdk::messages::TwapMessage; #[cfg(not(test))] use std::time::{SystemTime, UNIX_EPOCH}; + use { self::wormhole_merkle::{ construct_message_states_proofs, construct_update_data, @@ -190,6 +193,12 @@ pub struct PublisherStakeCapsWithUpdateData { pub update_data: Vec>, } +#[derive(Debug)] +pub struct TwapsWithUpdateData { + pub twaps: Vec, + pub update_data: Vec>, +} + #[derive(Debug, Serialize)] pub struct ReadinessMetadata { pub has_completed_recently: bool, @@ -220,6 +229,12 @@ where async fn get_latest_publisher_stake_caps_with_update_data( &self, ) -> Result; + async fn get_twaps_with_update_data( + &self, + price_ids: &[PriceIdentifier], + start_time: RequestTime, + end_time: RequestTime, + ) -> Result; } /// Allow downcasting State into CacheState for functions that depend on the `Cache` service. @@ -364,6 +379,29 @@ where Ok(()) } + async fn get_twaps_with_update_data( + &self, + price_ids: &[PriceIdentifier], + start_time: RequestTime, + end_time: RequestTime, + ) -> Result { + match get_verified_twaps_with_update_data( + self, + price_ids, + start_time.clone(), + end_time.clone(), + ) + .await + { + Ok(twaps_with_update_data) => Ok(twaps_with_update_data), + Err(e) => { + // TODO: Hit benchmarks if data not found in the cache + tracing::debug!("Update data not found in cache, falling back to Benchmarks"); + Err(e) + } + } + } + async fn get_price_feeds_with_update_data( &self, price_ids: &[PriceIdentifier], @@ -568,6 +606,108 @@ where }) } +async fn get_verified_twaps_with_update_data( + state: &S, + price_ids: &[PriceIdentifier], + start_time: RequestTime, + end_time: RequestTime, +) -> Result +where + S: Cache, +{ + // TODO: what if some messages aren't in the cache? + let start_messages = state + .fetch_message_states( + price_ids + .iter() + .map(|price_id| price_id.to_bytes()) + .collect(), + start_time, + MessageStateFilter::Only(MessageType::TwapMessage), + ) + .await?; + + let end_messages = state + .fetch_message_states( + price_ids + .iter() + .map(|price_id| price_id.to_bytes()) + .collect(), + end_time, + MessageStateFilter::Only(MessageType::TwapMessage), + ) + .await?; + + if start_messages.len() != end_messages.len() { + return Err(anyhow!( + "Failed to fetch price updates for provided price IDs" + )); + } + + let twaps: Vec = start_messages + .iter() + .zip(end_messages.iter()) + .map( + |(start_message, end_message)| match (&start_message.message, &end_message.message) { + (Message::TwapMessage(start_twap), Message::TwapMessage(end_twap)) => { + Ok(PriceFeedTwap { + twap_price: calculate_twap(start_twap, end_twap)?, + id: PriceIdentifier::new(start_message.message.feed_id()), + }) + } + _ => Err(anyhow!("Invalid message type for TWAP calculation")), + }, + ) + .collect::>>()?; + + // TODO: does it make sense to include binary updates here? + // Currently this returns the binary update data for all the messages + // involved in creating the TWAPs. + let all_messages: Vec<_> = end_messages + .into_iter() + .chain(start_messages) + .map(|m| m.into()) + .collect(); + let update_data = construct_update_data(all_messages)?; + + Ok(TwapsWithUpdateData { twaps, update_data }) +} + +fn calculate_twap(start_message: &TwapMessage, end_message: &TwapMessage) -> Result { + if end_message.publish_slot <= start_message.publish_slot { + return Err(anyhow!( + "Cannot calculate TWAP - end slot must be greater than start slot" + )); + } + + let slot_diff = end_message + .publish_slot + .checked_sub(start_message.publish_slot) + .ok_or_else(|| anyhow!("Slot difference overflow"))?; + + let price_diff = end_message + .cumulative_price + .checked_sub(start_message.cumulative_price) + .ok_or_else(|| anyhow!("Price difference overflow"))?; + + let conf_diff = end_message + .cumulative_conf + .checked_sub(start_message.cumulative_conf) + .ok_or_else(|| anyhow!("Confidence difference overflow"))?; + + // Perform division before casting to maintain precision + // Cast slot_diff to the same type as price / conf diff before division + let price = (price_diff / slot_diff as i128) as i64; + let conf = (conf_diff / slot_diff as u128) as u64; + + Ok(Price { + price, + conf, + expo: end_message.exponent, + publish_time: end_message.publish_time, + }) +} + #[cfg(test)] mod test { use {