Skip to content

Commit 866eace

Browse files
ali-behjatiReisen
authored andcommitted
Add initial version
1 parent 925f606 commit 866eace

File tree

21 files changed

+4670
-2230
lines changed

21 files changed

+4670
-2230
lines changed

hermes/Cargo.lock

Lines changed: 3612 additions & 1670 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hermes/Cargo.toml

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ edition = "2021"
55

66
[dependencies]
77
axum = { version = "0.6.9", features = ["json", "ws", "macros"] }
8-
axum-extra = { version = "0.7.2", features = ["query"] }
98
axum-macros = { version = "0.3.4" }
109
anyhow = { version = "1.0.69" }
1110
base64 = { version = "0.21.0" }
@@ -15,7 +14,7 @@ dashmap = { version = "5.4.0" }
1514
der = { version = "0.7.0" }
1615
derive_more = { version = "0.99.17" }
1716
env_logger = { version = "0.10.0" }
18-
futures = { version = "0.3.26" }
17+
futures = { version = "0.3.28" }
1918
hex = { version = "0.4.3" }
2019
rand = { version = "0.8.5" }
2120
reqwest = { version = "0.11.14", features = ["blocking", "json"] }
@@ -39,24 +38,30 @@ log = { version = "0.4.17" }
3938
wormhole-core = { git = "https://github.com/guibescos/wormhole", branch = "reisen/sdk-solana"}
4039

4140
# Parse Wormhole attester price attestations.
42-
pyth-wormhole-attester-sdk = { path = "../wormhole_attester/sdk/rust/", version = "0.1.2" }
41+
pythnet-sdk = { path = "../pythnet/pythnet_sdk/", version = "=1.13.6" }
4342

4443
# Setup LibP2P. Unfortunately the dependencies required by libp2p are shared
4544
# with the dependencies required by solana's geyser plugin. This means that we
4645
# would have to use the same version of libp2p as solana. Luckily we don't need
4746
# to do this yet but it's something to keep in mind.
48-
libp2p = { version = "0.51.1", features = [
49-
"dns",
47+
libp2p = { version = "0.42.2", features = [
5048
"gossipsub",
5149
"identify",
52-
"macros",
5350
"mplex",
5451
"noise",
55-
"quic",
5652
"secp256k1",
57-
"tcp",
58-
"tls",
59-
"tokio",
6053
"websocket",
6154
"yamux",
6255
]}
56+
57+
async-trait = "0.1.68"
58+
solana-client = "=1.15.2"
59+
solana-sdk = "=1.15.2"
60+
solana-account-decoder = "=1.15.2"
61+
moka = { version = "0.11.0", features = ["future"] }
62+
derive_builder = "0.12.0"
63+
byteorder = "1.4.3"
64+
serde_qs = { version = "0.12.0", features = ["axum"] }
65+
66+
[patch.crates-io]
67+
serde_wormhole = { git = "https://github.com/wormhole-foundation/wormhole" }

hermes/build.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ fn main() {
7171
// Tell Rust to link our Go library at compile time.
7272
println!("cargo:rustc-link-search=native={out_var}");
7373
println!("cargo:rustc-link-lib=static=pythnet");
74-
75-
#[cfg(target_arch = "aarch64")]
7674
println!("cargo:rustc-link-lib=resolv");
7775

7876
let status = cmd.status().unwrap();

hermes/src/network/rpc.rs renamed to hermes/src/api.rs

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
11
use {
22
self::ws::dispatch_updates,
3-
crate::{
4-
network::p2p::OBSERVATIONS,
5-
store::{
6-
Store,
7-
Update,
8-
},
9-
},
3+
crate::store::Store,
104
anyhow::Result,
115
axum::{
126
routing::get,
@@ -36,7 +30,7 @@ impl State {
3630

3731
/// This method provides a background service that responds to REST requests
3832
///
39-
/// Currently this is based on Axum due to the simplicity and strong ecosyjtem support for the
33+
/// Currently this is based on Axum due to the simplicity and strong ecosystem support for the
4034
/// packages they are based on (tokio & hyper).
4135
pub async fn spawn(rpc_addr: String, store: Store) -> Result<()> {
4236
let state = State::new(store);
@@ -55,25 +49,25 @@ pub async fn spawn(rpc_addr: String, store: Store) -> Result<()> {
5549
.route("/api/price_feed_ids", get(rest::price_feed_ids))
5650
.with_state(state.clone());
5751

58-
// Listen in the background for new VAA's from the Wormhole RPC.
52+
53+
// Binds the axum's server to the configured address and port. This is a blocking call and will
54+
// not return until the server is shutdown.
55+
tokio::spawn(async move {
56+
// FIXME handle errors properly
57+
axum::Server::bind(&rpc_addr.parse().unwrap())
58+
.serve(app.into_make_service())
59+
.await
60+
.unwrap();
61+
});
62+
63+
// Call dispatch updates to websocket every 1 seconds
64+
// FIXME use a channel to get updates from the store
5965
tokio::spawn(async move {
6066
loop {
61-
if let Ok(observation) = OBSERVATIONS.1.lock().unwrap().recv() {
62-
match state.store.store_update(Update::Vaa(observation)) {
63-
Ok(updated_feed_ids) => {
64-
tokio::spawn(dispatch_updates(updated_feed_ids, state.clone()));
65-
}
66-
Err(e) => log::error!("Failed to process VAA: {:?}", e),
67-
}
68-
}
67+
dispatch_updates(state.store.get_price_feed_ids(), state.clone()).await;
68+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
6969
}
7070
});
7171

72-
// Binds the axum's server to the configured address and port. This is a blocking call and will
73-
// not return until the server is shutdown.
74-
axum::Server::bind(&rpc_addr.parse()?)
75-
.serve(app.into_make_service())
76-
.await?;
77-
7872
Ok(())
7973
}

hermes/src/network/rpc/rest.rs renamed to hermes/src/api/rest.rs

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
use super::types::PriceIdInput;
21
use {
3-
super::types::RpcPriceFeed,
4-
crate::store::RequestTime,
2+
super::types::{
3+
PriceIdInput,
4+
RpcPriceFeed,
5+
},
56
crate::{
67
impl_deserialize_for_hex_string_wrapper,
7-
store::UnixTimestamp,
8+
store::types::{
9+
RequestTime,
10+
UnixTimestamp,
11+
},
812
},
913
anyhow::Result,
1014
axum::{
@@ -16,7 +20,6 @@ use {
1620
},
1721
Json,
1822
},
19-
axum_extra::extract::Query, // Axum extra Query allows us to parse multi-value query parameters.
2023
base64::{
2124
engine::general_purpose::STANDARD as base64_standard_engine,
2225
Engine as _,
@@ -26,6 +29,7 @@ use {
2629
DerefMut,
2730
},
2831
pyth_sdk::PriceIdentifier,
32+
serde_qs::axum::QsQuery,
2933
};
3034

3135
pub enum RestError {
@@ -68,7 +72,7 @@ pub struct LatestVaasQueryParams {
6872

6973
pub async fn latest_vaas(
7074
State(state): State<super::State>,
71-
Query(params): Query<LatestVaasQueryParams>,
75+
QsQuery(params): QsQuery<LatestVaasQueryParams>,
7276
) -> Result<Json<Vec<String>>, RestError> {
7377
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
7478
let price_feeds_with_update_data = state
@@ -77,10 +81,9 @@ pub async fn latest_vaas(
7781
.map_err(|_| RestError::UpdateDataNotFound)?;
7882
Ok(Json(
7983
price_feeds_with_update_data
80-
.batch_vaa
81-
.update_data
84+
.wormhole_merkle_update_data
8285
.iter()
83-
.map(|vaa_bytes| base64_standard_engine.encode(vaa_bytes)) // TODO: Support multiple
86+
.map(|bytes| base64_standard_engine.encode(bytes)) // TODO: Support multiple
8487
// encoding formats
8588
.collect(),
8689
))
@@ -97,7 +100,7 @@ pub struct LatestPriceFeedsQueryParams {
97100

98101
pub async fn latest_price_feeds(
99102
State(state): State<super::State>,
100-
Query(params): Query<LatestPriceFeedsQueryParams>,
103+
QsQuery(params): QsQuery<LatestPriceFeedsQueryParams>,
101104
) -> Result<Json<Vec<RpcPriceFeed>>, RestError> {
102105
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
103106
let price_feeds_with_update_data = state
@@ -106,11 +109,10 @@ pub async fn latest_price_feeds(
106109
.map_err(|_| RestError::UpdateDataNotFound)?;
107110
Ok(Json(
108111
price_feeds_with_update_data
109-
.batch_vaa
110-
.price_infos
111-
.into_values()
112-
.map(|price_info| {
113-
RpcPriceFeed::from_price_info(price_info, params.verbose, params.binary)
112+
.price_feeds
113+
.into_iter()
114+
.map(|price_feed| {
115+
RpcPriceFeed::from_price_feed_message(price_feed, params.verbose, params.binary)
114116
})
115117
.collect(),
116118
))
@@ -131,7 +133,7 @@ pub struct GetVaaResponse {
131133

132134
pub async fn get_vaa(
133135
State(state): State<super::State>,
134-
Query(params): Query<GetVaaQueryParams>,
136+
QsQuery(params): QsQuery<GetVaaQueryParams>,
135137
) -> Result<Json<GetVaaResponse>, RestError> {
136138
let price_id: PriceIdentifier = params.id.into();
137139

@@ -144,18 +146,16 @@ pub async fn get_vaa(
144146
.map_err(|_| RestError::UpdateDataNotFound)?;
145147

146148
let vaa = price_feeds_with_update_data
147-
.batch_vaa
148-
.update_data
149+
.wormhole_merkle_update_data
149150
.get(0)
150-
.map(|vaa_bytes| base64_standard_engine.encode(vaa_bytes))
151+
.map(|bytes| base64_standard_engine.encode(bytes))
151152
.ok_or(RestError::UpdateDataNotFound)?;
152153

153154
let publish_time = price_feeds_with_update_data
154-
.batch_vaa
155-
.price_infos
156-
.get(&price_id)
157-
.map(|price_info| price_info.publish_time)
158-
.ok_or(RestError::UpdateDataNotFound)?;
155+
.price_feeds
156+
.get(0)
157+
.ok_or(RestError::UpdateDataNotFound)?
158+
.publish_time; // TODO: This should never happen.
159159

160160
Ok(Json(GetVaaResponse { vaa, publish_time }))
161161
}
@@ -176,7 +176,7 @@ pub struct GetVaaCcipResponse {
176176

177177
pub async fn get_vaa_ccip(
178178
State(state): State<super::State>,
179-
Query(params): Query<GetVaaCcipQueryParams>,
179+
QsQuery(params): QsQuery<GetVaaCcipQueryParams>,
180180
) -> Result<Json<GetVaaCcipResponse>, RestError> {
181181
let price_id: PriceIdentifier = PriceIdentifier::new(params.data[0..32].try_into().unwrap());
182182
let publish_time = UnixTimestamp::from_be_bytes(params.data[32..40].try_into().unwrap());
@@ -186,14 +186,13 @@ pub async fn get_vaa_ccip(
186186
.get_price_feeds_with_update_data(vec![price_id], RequestTime::FirstAfter(publish_time))
187187
.map_err(|_| RestError::CcipUpdateDataNotFound)?;
188188

189-
let vaa = price_feeds_with_update_data
190-
.batch_vaa
191-
.update_data
189+
let bytes = price_feeds_with_update_data
190+
.wormhole_merkle_update_data
192191
.get(0) // One price feed has only a single VAA as proof.
193192
.ok_or(RestError::UpdateDataNotFound)?;
194193

195194
Ok(Json(GetVaaCcipResponse {
196-
data: format!("0x{}", hex::encode(vaa)),
195+
data: format!("0x{}", hex::encode(bytes)),
197196
}))
198197
}
199198

hermes/src/network/rpc/types.rs renamed to hermes/src/api/types.rs

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
use {
22
crate::{
33
impl_deserialize_for_hex_string_wrapper,
4-
store::{
5-
proof::batch_vaa::PriceInfo,
4+
store::types::{
5+
PriceFeedMessage,
66
UnixTimestamp,
77
},
88
},
9-
base64::{
10-
engine::general_purpose::STANDARD as base64_standard_engine,
11-
Engine as _,
12-
},
139
derive_more::{
1410
Deref,
1511
DerefMut,
@@ -41,7 +37,6 @@ type Base64String = String;
4137
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4238
pub struct RpcPriceFeedMetadata {
4339
pub emitter_chain: u16,
44-
pub attestation_time: UnixTimestamp,
4540
pub sequence_number: u64,
4641
pub price_service_receive_time: UnixTimestamp,
4742
}
@@ -51,26 +46,45 @@ pub struct RpcPriceFeed {
5146
pub id: PriceIdentifier,
5247
pub price: Price,
5348
pub ema_price: Price,
49+
#[serde(skip_serializing_if = "Option::is_none")]
5450
pub metadata: Option<RpcPriceFeedMetadata>,
5551
/// Vaa binary represented in base64.
52+
#[serde(skip_serializing_if = "Option::is_none")]
5653
pub vaa: Option<Base64String>,
5754
}
5855

5956
impl RpcPriceFeed {
6057
// TODO: Use a Encoding type to have None, Base64, and Hex variants instead of binary flag.
6158
// TODO: Use a Verbosity type to define None, or Full instead of verbose flag.
62-
pub fn from_price_info(price_info: PriceInfo, verbose: bool, binary: bool) -> Self {
59+
pub fn from_price_feed_message(
60+
price_feed_message: PriceFeedMessage,
61+
_verbose: bool,
62+
_binary: bool,
63+
) -> Self {
6364
Self {
64-
id: price_info.price_feed.id,
65-
price: price_info.price_feed.get_price_unchecked(),
66-
ema_price: price_info.price_feed.get_ema_price_unchecked(),
67-
metadata: verbose.then_some(RpcPriceFeedMetadata {
68-
emitter_chain: price_info.emitter_chain,
69-
attestation_time: price_info.attestation_time,
70-
sequence_number: price_info.sequence_number,
71-
price_service_receive_time: price_info.receive_time,
72-
}),
73-
vaa: binary.then_some(base64_standard_engine.encode(price_info.vaa_bytes)),
65+
id: PriceIdentifier::new(price_feed_message.id),
66+
price: Price {
67+
price: price_feed_message.price,
68+
conf: price_feed_message.conf,
69+
expo: price_feed_message.exponent,
70+
publish_time: price_feed_message.publish_time,
71+
},
72+
ema_price: Price {
73+
price: price_feed_message.ema_price,
74+
conf: price_feed_message.ema_conf,
75+
expo: price_feed_message.exponent,
76+
publish_time: price_feed_message.publish_time,
77+
},
78+
// FIXME: Handle verbose flag properly.
79+
// metadata: verbose.then_some(RpcPriceFeedMetadata {
80+
// emitter_chain: price_feed_message.emitter_chain,
81+
// sequence_number: price_feed_message.sequence_number,
82+
// price_service_receive_time: price_feed_message.receive_time,
83+
// }),
84+
metadata: None,
85+
// FIXME: The vaa is wrong, fix it
86+
// vaa: binary.then_some(base64_standard_engine.encode(message_state.proof_set.wormhole_merkle_proof.vaa)),
87+
vaa: None,
7488
}
7589
}
7690
}

0 commit comments

Comments
 (0)