Skip to content

Commit 8a5a74e

Browse files
ali-behjatiReisen
andauthored
[hermes] Add WS (#773)
* [hermes] Add WS * Address Jayant comments * Update hermes/src/network/rpc/ws.rs Co-authored-by: Reisen <[email protected]> * Address David comments --------- Co-authored-by: Reisen <[email protected]>
1 parent facc2b6 commit 8a5a74e

File tree

7 files changed

+416
-82
lines changed

7 files changed

+416
-82
lines changed

hermes/src/network/rpc.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use {
2+
self::ws::dispatch_updates,
23
crate::{
34
network::p2p::OBSERVATIONS,
45
store::{
@@ -11,19 +12,25 @@ use {
1112
routing::get,
1213
Router,
1314
},
15+
std::sync::Arc,
1416
};
1517

1618
mod rest;
17-
mod rpc_price_feed;
19+
mod types;
20+
mod ws;
1821

1922
#[derive(Clone)]
2023
pub struct State {
2124
pub store: Store,
25+
pub ws: Arc<ws::WsState>,
2226
}
2327

2428
impl State {
2529
pub fn new(store: Store) -> Self {
26-
Self { store }
30+
Self {
31+
store,
32+
ws: Arc::new(ws::WsState::new()),
33+
}
2734
}
2835
}
2936

@@ -40,6 +47,7 @@ pub async fn spawn(rpc_addr: String, store: Store) -> Result<()> {
4047
let app = app
4148
.route("/", get(rest::index))
4249
.route("/live", get(rest::live))
50+
.route("/ws", get(ws::ws_route_handler))
4351
.route("/api/latest_price_feeds", get(rest::latest_price_feeds))
4452
.route("/api/latest_vaas", get(rest::latest_vaas))
4553
.route("/api/get_vaa", get(rest::get_vaa))
@@ -51,8 +59,11 @@ pub async fn spawn(rpc_addr: String, store: Store) -> Result<()> {
5159
tokio::spawn(async move {
5260
loop {
5361
if let Ok(observation) = OBSERVATIONS.1.lock().unwrap().recv() {
54-
if let Err(e) = state.store.store_update(Update::Vaa(observation)) {
55-
log::error!("Failed to process VAA: {:?}", e);
62+
match state.store.store_update(Update::Vaa(observation)) {
63+
Ok(updated_feed_ids) => {
64+
tokio::spawn(dispatch_updates(updated_feed_ids, state.clone()));
65+
}
66+
Err(e) => log::error!("Failed to process VAA: {:?}", e),
5667
}
5768
}
5869
}

hermes/src/network/rpc/rest.rs

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1+
use super::types::PriceIdInput;
12
use {
2-
super::rpc_price_feed::{
3-
RpcPriceFeed,
4-
RpcPriceFeedMetadata,
5-
},
3+
super::types::RpcPriceFeed,
64
crate::store::RequestTime,
75
crate::{
86
impl_deserialize_for_hex_string_wrapper,
@@ -30,21 +28,6 @@ use {
3028
pyth_sdk::PriceIdentifier,
3129
};
3230

33-
/// PriceIdInput is a wrapper around a 32-byte hex string.
34-
/// that supports a flexible deserialization from a hex string.
35-
/// It supports both 0x-prefixed and non-prefixed hex strings,
36-
/// and also supports both lower and upper case characters.
37-
#[derive(Debug, Clone, Deref, DerefMut)]
38-
pub struct PriceIdInput([u8; 32]);
39-
// TODO: Use const generics instead of macro.
40-
impl_deserialize_for_hex_string_wrapper!(PriceIdInput, 32);
41-
42-
impl From<PriceIdInput> for PriceIdentifier {
43-
fn from(id: PriceIdInput) -> Self {
44-
Self::new(*id)
45-
}
46-
}
47-
4831
pub enum RestError {
4932
UpdateDataNotFound,
5033
CcipUpdateDataNotFound,
@@ -127,19 +110,7 @@ pub async fn latest_price_feeds(
127110
.price_infos
128111
.into_values()
129112
.map(|price_info| {
130-
let mut rpc_price_feed: RpcPriceFeed = price_info.price_feed.into();
131-
rpc_price_feed.metadata = params.verbose.then_some(RpcPriceFeedMetadata {
132-
emitter_chain: price_info.emitter_chain,
133-
sequence_number: price_info.sequence_number,
134-
attestation_time: price_info.attestation_time,
135-
price_service_receive_time: price_info.receive_time,
136-
});
137-
138-
rpc_price_feed.vaa = params
139-
.binary
140-
.then_some(base64_standard_engine.encode(&price_info.vaa_bytes));
141-
142-
rpc_price_feed
113+
RpcPriceFeed::from_price_info(price_info, params.verbose, params.binary)
143114
})
144115
.collect(),
145116
))

hermes/src/network/rpc/rpc_price_feed.rs

Lines changed: 0 additions & 40 deletions
This file was deleted.

hermes/src/network/rpc/types.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use {
2+
crate::{
3+
impl_deserialize_for_hex_string_wrapper,
4+
store::{
5+
proof::batch_vaa::PriceInfo,
6+
UnixTimestamp,
7+
},
8+
},
9+
base64::{
10+
engine::general_purpose::STANDARD as base64_standard_engine,
11+
Engine as _,
12+
},
13+
derive_more::{
14+
Deref,
15+
DerefMut,
16+
},
17+
pyth_sdk::{
18+
Price,
19+
PriceIdentifier,
20+
},
21+
};
22+
23+
24+
/// PriceIdInput is a wrapper around a 32-byte hex string.
25+
/// that supports a flexible deserialization from a hex string.
26+
/// It supports both 0x-prefixed and non-prefixed hex strings,
27+
/// and also supports both lower and upper case characters.
28+
#[derive(Debug, Clone, Deref, DerefMut)]
29+
pub struct PriceIdInput([u8; 32]);
30+
// TODO: Use const generics instead of macro.
31+
impl_deserialize_for_hex_string_wrapper!(PriceIdInput, 32);
32+
33+
impl From<PriceIdInput> for PriceIdentifier {
34+
fn from(id: PriceIdInput) -> Self {
35+
Self::new(*id)
36+
}
37+
}
38+
39+
type Base64String = String;
40+
41+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42+
pub struct RpcPriceFeedMetadata {
43+
pub emitter_chain: u16,
44+
pub attestation_time: UnixTimestamp,
45+
pub sequence_number: u64,
46+
pub price_service_receive_time: UnixTimestamp,
47+
}
48+
49+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
50+
pub struct RpcPriceFeed {
51+
pub id: PriceIdentifier,
52+
pub price: Price,
53+
pub ema_price: Price,
54+
pub metadata: Option<RpcPriceFeedMetadata>,
55+
/// Vaa binary represented in base64.
56+
pub vaa: Option<Base64String>,
57+
}
58+
59+
impl RpcPriceFeed {
60+
// TODO: Use a Encoding type to have None, Base64, and Hex variants instead of binary flag.
61+
// TODO: Use a Verbosity type to define None, or Full instead of verbose flag.
62+
pub fn from_price_info(price_info: PriceInfo, verbose: bool, binary: bool) -> Self {
63+
Self {
64+
id: price_info.price_feed.id,
65+
price: price_info.price_feed.get_price_unchecked(),
66+
ema_price: price_info.price_feed.get_ema_price_unchecked(),
67+
metadata: verbose.then_some(RpcPriceFeedMetadata {
68+
emitter_chain: price_info.emitter_chain,
69+
attestation_time: price_info.attestation_time,
70+
sequence_number: price_info.sequence_number,
71+
price_service_receive_time: price_info.receive_time,
72+
}),
73+
vaa: binary.then_some(base64_standard_engine.encode(price_info.vaa_bytes)),
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)