Skip to content

Commit d6ba2b6

Browse files
feat(hermes): create latest TWAP endpoint (#2131)
* feat(hermes): init latest twap endpoint * feat(hermes): update twaps rpc types * feat(hermes): update data model * refactor(hermes): seperate out domain & rpc models * feat(hermes): update twap data model * chore(hermes): remove commented code * feat(hermes): use Decimal type for down_slots_ratio * feat(hermes): add validation for first message in a timestamp, remove unnecesary struct * feat(hermes): address pr comments * test(hermes): add tests for twap
1 parent 83f4174 commit d6ba2b6

File tree

9 files changed

+977
-106
lines changed

9 files changed

+977
-106
lines changed

apps/hermes/server/Cargo.lock

Lines changed: 284 additions & 98 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/hermes/server/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.7.2"
3+
version = "0.8.0"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

@@ -34,6 +34,7 @@ pyth-sdk-solana = { version = "0.10.2" }
3434
pythnet-sdk = { path = "../../../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] }
3535
rand = { version = "0.8.5" }
3636
reqwest = { version = "0.11.14", features = ["blocking", "json"] }
37+
rust_decimal = { version = "1.36.0" }
3738
secp256k1 = { version = "0.27.0", features = ["rand", "recovery", "serde"] }
3839
serde = { version = "1.0.152", features = ["derive"] }
3940
serde_json = { version = "1.0.93" }
@@ -47,7 +48,7 @@ tonic = { version = "0.10.1", features = ["tls"] }
4748
tower-http = { version = "0.4.0", features = ["cors"] }
4849
tracing = { version = "0.1.37", features = ["log"] }
4950
tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json"] }
50-
utoipa = { version = "3.4.0", features = ["axum_extras"] }
51+
utoipa = { version = "3.4.0", features = ["axum_extras", "decimal"] }
5152
utoipa-swagger-ui = { version = "3.1.4", features = ["axum"] }
5253
wormhole-sdk = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" }
5354

apps/hermes/server/src/api.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ where
101101
rest::latest_vaas,
102102
rest::price_feed_ids,
103103
rest::latest_price_updates,
104+
rest::latest_twaps,
104105
rest::latest_publisher_stake_caps,
105106
rest::timestamp_price_updates,
106107
rest::price_feeds_metadata,
@@ -126,6 +127,8 @@ where
126127
types::ParsedPublisherStakeCapsUpdate,
127128
types::ParsedPublisherStakeCap,
128129
types::AssetType,
130+
types::TwapsResponse,
131+
types::ParsedPriceFeedTwap,
129132
)
130133
),
131134
tags(
@@ -152,6 +155,15 @@ where
152155
get(rest::price_stream_sse_handler),
153156
)
154157
.route("/v2/updates/price/latest", get(rest::latest_price_updates))
158+
.route(
159+
"/v2/updates/twap/:window_seconds/latest",
160+
get(rest::latest_twaps),
161+
)
162+
// TODO(Tejas)
163+
// .route(
164+
// "/v2/updates/twap/:window_seconds/:publish_time",
165+
// get(rest::latest_twaps),
166+
// )
155167
.route(
156168
"/v2/updates/publisher_stake_caps/latest",
157169
get(rest::latest_publisher_stake_caps),

apps/hermes/server/src/api/rest.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ pub use {
3030
price_feed_ids::*,
3131
ready::*,
3232
v2::{
33-
latest_price_updates::*, latest_publisher_stake_caps::*, price_feeds_metadata::*, sse::*,
34-
timestamp_price_updates::*,
33+
latest_price_updates::*, latest_publisher_stake_caps::*, latest_twaps::*,
34+
price_feeds_metadata::*, sse::*, timestamp_price_updates::*,
3535
},
3636
};
3737

@@ -125,7 +125,7 @@ mod tests {
125125
crate::state::{
126126
aggregate::{
127127
AggregationEvent, PriceFeedsWithUpdateData, PublisherStakeCapsWithUpdateData,
128-
ReadinessMetadata, RequestTime, Update,
128+
ReadinessMetadata, RequestTime, TwapsWithUpdateData, Update,
129129
},
130130
benchmarks::BenchmarksState,
131131
cache::CacheState,
@@ -198,6 +198,14 @@ mod tests {
198198
) -> Result<PublisherStakeCapsWithUpdateData> {
199199
unimplemented!("Not needed for this test")
200200
}
201+
async fn get_twaps_with_update_data(
202+
&self,
203+
_price_ids: &[PriceIdentifier],
204+
_start_time: RequestTime,
205+
_end_time: RequestTime,
206+
) -> Result<TwapsWithUpdateData> {
207+
unimplemented!("Not needed for this test")
208+
}
201209
}
202210

203211
#[tokio::test]

apps/hermes/server/src/api/rest/index.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@ pub async fn index() -> impl IntoResponse {
1313
"/api/get_price_feed?id=<price_feed_id>&publish_time=<publish_time_in_unix_timestamp>(&verbose=true)(&binary=true)",
1414
"/api/get_vaa?id=<price_feed_id>&publish_time=<publish_time_in_unix_timestamp>",
1515
"/api/get_vaa_ccip?data=<0x<price_feed_id_32_bytes>+<publish_time_unix_timestamp_be_8_bytes>>",
16+
1617
"/v2/updates/price/latest?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
1718
"/v2/updates/price/stream?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)(&allow_unordered=false)(&benchmarks_only=false)",
1819
"/v2/updates/price/<timestamp>?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
1920
"/v2/price_feeds?(query=btc)(&asset_type=crypto|equity|fx|metal|rates)",
21+
"/v2/updates/twap/<window_seconds>/latest?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
22+
"/v2/updates/twap/<window_seconds>/<timestamp>?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
2023
])
2124
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
use {
2+
crate::{
3+
api::{
4+
rest::{validate_price_ids, RestError},
5+
types::{BinaryUpdate, EncodingType, ParsedPriceFeedTwap, PriceIdInput, TwapsResponse},
6+
ApiState,
7+
},
8+
state::aggregate::{Aggregates, RequestTime},
9+
},
10+
anyhow::Result,
11+
axum::{
12+
extract::{Path, State},
13+
Json,
14+
},
15+
base64::{engine::general_purpose::STANDARD as base64_standard_engine, Engine as _},
16+
pyth_sdk::{DurationInSeconds, PriceIdentifier, UnixTimestamp},
17+
serde::Deserialize,
18+
serde_qs::axum::QsQuery,
19+
utoipa::IntoParams,
20+
};
21+
22+
#[derive(Debug, Deserialize, IntoParams)]
23+
#[into_params(parameter_in=Path)]
24+
pub struct LatestTwapsPathParams {
25+
/// The time window in seconds over which to calculate the TWAP, ending at the current time.
26+
/// For example, a value of 300 would return the most recent 5 minute TWAP.
27+
/// Must be greater than 0 and less than or equal to 600 seconds (10 minutes).
28+
#[param(example = "300")]
29+
#[serde(deserialize_with = "validate_twap_window")]
30+
window_seconds: u64,
31+
}
32+
33+
#[derive(Debug, Deserialize, IntoParams)]
34+
#[into_params(parameter_in=Query)]
35+
pub struct LatestTwapsQueryParams {
36+
/// Get the most recent TWAP (time weighted average price) for this set of price feed ids.
37+
/// The `binary` data contains the signed start & end cumulative price updates needed to calculate
38+
/// the TWAPs on-chain. The `parsed` data contains the calculated TWAPs.
39+
///
40+
/// This parameter can be provided multiple times to retrieve multiple price updates,
41+
/// for example see the following query string:
42+
///
43+
/// ```
44+
/// ?ids[]=a12...&ids[]=b4c...
45+
/// ```
46+
#[param(rename = "ids[]")]
47+
#[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")]
48+
ids: Vec<PriceIdInput>,
49+
50+
/// Optional encoding type. If true, return the cumulative price updates in the encoding specified by the encoding parameter. Default is `hex`.
51+
#[serde(default)]
52+
encoding: EncodingType,
53+
54+
/// If true, include the calculated TWAP in the `parsed` field of each returned feed. Default is `true`.
55+
#[serde(default = "default_true")]
56+
parsed: bool,
57+
58+
/// If true, invalid price IDs in the `ids` parameter are ignored. Only applicable to the v2 APIs. Default is `false`.
59+
#[serde(default)]
60+
ignore_invalid_price_ids: bool,
61+
}
62+
63+
fn validate_twap_window<'de, D>(deserializer: D) -> Result<DurationInSeconds, D::Error>
64+
where
65+
D: serde::Deserializer<'de>,
66+
{
67+
use serde::de::Error;
68+
let seconds = DurationInSeconds::deserialize(deserializer)?;
69+
if seconds == 0 || seconds > 600 {
70+
return Err(D::Error::custom(
71+
"twap_window_seconds must be in range (0, 600]",
72+
));
73+
}
74+
Ok(seconds)
75+
}
76+
fn default_true() -> bool {
77+
true
78+
}
79+
80+
/// Get the latest TWAP by price feed id with a custom time window.
81+
///
82+
/// Given a collection of price feed ids, retrieve the latest Pyth TWAP price for each price feed.
83+
#[utoipa::path(
84+
get,
85+
path = "/v2/updates/twap/{window_seconds}/latest",
86+
responses(
87+
(status = 200, description = "TWAPs retrieved successfully", body = TwapsResponse),
88+
(status = 404, description = "Price ids not found", body = String)
89+
),
90+
params(
91+
LatestTwapsPathParams,
92+
LatestTwapsQueryParams
93+
)
94+
)]
95+
pub async fn latest_twaps<S>(
96+
State(state): State<ApiState<S>>,
97+
Path(path_params): Path<LatestTwapsPathParams>,
98+
QsQuery(params): QsQuery<LatestTwapsQueryParams>,
99+
) -> Result<Json<TwapsResponse>, RestError>
100+
where
101+
S: Aggregates,
102+
{
103+
let price_id_inputs: Vec<PriceIdentifier> =
104+
params.ids.into_iter().map(|id| id.into()).collect();
105+
let price_ids: Vec<PriceIdentifier> =
106+
validate_price_ids(&state, &price_id_inputs, params.ignore_invalid_price_ids).await?;
107+
108+
// Collect start and end bounds for the TWAP window
109+
let window_seconds = path_params.window_seconds as i64;
110+
let current_time = std::time::SystemTime::now()
111+
.duration_since(std::time::UNIX_EPOCH)
112+
.unwrap()
113+
.as_secs() as UnixTimestamp;
114+
let start_time = current_time - window_seconds;
115+
116+
// Calculate the average
117+
let twaps_with_update_data = Aggregates::get_twaps_with_update_data(
118+
&*state.state,
119+
&price_ids,
120+
RequestTime::FirstAfter(start_time),
121+
RequestTime::Latest,
122+
)
123+
.await
124+
.map_err(|e| {
125+
tracing::warn!(
126+
"Error getting TWAPs for price IDs {:?} with update data: {:?}",
127+
price_ids,
128+
e
129+
);
130+
RestError::UpdateDataNotFound
131+
})?;
132+
133+
let twap_update_data = twaps_with_update_data.update_data;
134+
let binary: Vec<BinaryUpdate> = twap_update_data
135+
.into_iter()
136+
.map(|data_vec| {
137+
let encoded_data = data_vec
138+
.into_iter()
139+
.map(|data| match params.encoding {
140+
EncodingType::Base64 => base64_standard_engine.encode(data),
141+
EncodingType::Hex => hex::encode(data),
142+
})
143+
.collect();
144+
BinaryUpdate {
145+
encoding: params.encoding,
146+
data: encoded_data,
147+
}
148+
})
149+
.collect();
150+
151+
let parsed: Option<Vec<ParsedPriceFeedTwap>> = if params.parsed {
152+
Some(
153+
twaps_with_update_data
154+
.twaps
155+
.into_iter()
156+
.map(Into::into)
157+
.collect(),
158+
)
159+
} else {
160+
None
161+
};
162+
163+
let twap_resp = TwapsResponse { binary, parsed };
164+
Ok(Json(twap_resp))
165+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod latest_price_updates;
22
pub mod latest_publisher_stake_caps;
3+
pub mod latest_twaps;
34
pub mod price_feeds_metadata;
45
pub mod sse;
56
pub mod timestamp_price_updates;

apps/hermes/server/src/api/types.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
use {
22
super::doc_examples,
3-
crate::state::aggregate::{PriceFeedUpdate, PriceFeedsWithUpdateData, Slot, UnixTimestamp},
3+
crate::state::aggregate::{
4+
PriceFeedTwap, PriceFeedUpdate, PriceFeedsWithUpdateData, Slot, UnixTimestamp,
5+
},
46
anyhow::Result,
57
base64::{engine::general_purpose::STANDARD as base64_standard_engine, Engine as _},
68
borsh::{BorshDeserialize, BorshSerialize},
79
derive_more::{Deref, DerefMut},
810
pyth_sdk::{Price, PriceFeed, PriceIdentifier},
11+
rust_decimal::Decimal,
912
serde::{Deserialize, Serialize},
1013
std::{
1114
collections::BTreeMap,
@@ -140,7 +143,7 @@ pub struct RpcPrice {
140143
pub conf: u64,
141144
/// The exponent associated with both the price and confidence interval. Multiply those values
142145
/// by `10^expo` to get the real value.
143-
#[schema(example=-8)]
146+
#[schema(example = -8)]
144147
pub expo: i32,
145148
/// When the price was published. The `publish_time` is a unix timestamp, i.e., the number of
146149
/// seconds since the Unix epoch (00:00:00 UTC on 1 Jan 1970).
@@ -244,6 +247,48 @@ impl From<PriceFeedUpdate> for ParsedPriceUpdate {
244247
}
245248
}
246249
}
250+
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
251+
pub struct ParsedPriceFeedTwap {
252+
pub id: RpcPriceIdentifier,
253+
/// The start unix timestamp of the window
254+
pub start_timestamp: i64,
255+
/// The end unix timestamp of the window
256+
pub end_timestamp: i64,
257+
/// The calculated time weighted average price over the window
258+
pub twap: RpcPrice,
259+
/// The % of slots where the network was down over the TWAP window.
260+
/// A value of zero indicates no slots were missed over the window, and
261+
/// a value of one indicates that every slot was missed over the window.
262+
/// This is a float value stored as a string to avoid precision loss.
263+
pub down_slots_ratio: Decimal,
264+
}
265+
impl From<PriceFeedTwap> for ParsedPriceFeedTwap {
266+
fn from(pft: PriceFeedTwap) -> Self {
267+
Self {
268+
id: RpcPriceIdentifier::from(pft.id),
269+
start_timestamp: pft.start_timestamp,
270+
end_timestamp: pft.end_timestamp,
271+
twap: RpcPrice {
272+
price: pft.twap.price,
273+
conf: pft.twap.conf,
274+
expo: pft.twap.expo,
275+
publish_time: pft.twap.publish_time,
276+
},
277+
down_slots_ratio: pft.down_slots_ratio,
278+
}
279+
}
280+
}
281+
282+
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
283+
pub struct TwapsResponse {
284+
/// Each BinaryUpdate contains the start & end cumulative price updates used to
285+
/// calculate a given price feed's TWAP.
286+
pub binary: Vec<BinaryUpdate>,
287+
288+
/// The calculated TWAPs for each price ID
289+
#[serde(skip_serializing_if = "Option::is_none")]
290+
pub parsed: Option<Vec<ParsedPriceFeedTwap>>,
291+
}
247292

248293
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize, Clone, ToSchema)]
249294
pub struct ParsedPublisherStakeCapsUpdate {

0 commit comments

Comments
 (0)