Skip to content

Commit a1dff0f

Browse files
committed
feat: add serializers for pyth formats
feat: use pythnet serialization in hermes Fix vaa validation Clippy Update config names Wrap Store with Arc Store works perfectly without Arc as all it's elements are behind an Arc or something similar to that, however a developer might make mistake to add a field and missing it. Improve error handling Update metadata struct Add metadata Update Eth listener Pin wormhole to a version Fix ws dispatcher fix: blocking in go recv corrupts tokio runtime Make network <> store message passing non-blocking Update logs and revert debug changes
1 parent 8aeef6e commit a1dff0f

30 files changed

+2057
-365
lines changed

hermes/.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ src/network/p2p.proto
77
tools/
88

99
# Ignore Wormhole cloned repo
10-
wormhole/
10+
wormhole*/

hermes/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hermes/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", rev = "7d59
6969

7070
strum = { version = "0.24", features = ["derive"] }
7171
ethabi = { version = "18.0.0", features = ["serde"] }
72+
sha3 = "0.10.4"
73+
humantime = "2.1.0"
7274

7375
[patch.crates-io]
7476
serde_wormhole = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" }

hermes/build.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,79 @@
11
use std::{
22
env,
33
path::PathBuf,
4-
process::Command,
4+
process::{
5+
Command,
6+
Stdio,
7+
},
58
};
69

710
fn main() {
811
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
912
let out_var = env::var("OUT_DIR").unwrap();
1013

11-
// Clone the Wormhole repository, which we need to access the protobuf definitions for Wormhole
12-
// P2P message types.
14+
// Download the Wormhole repository at a certain tag, which we need to access the protobuf definitions
15+
// for Wormhole P2P message types.
1316
//
14-
// TODO: This is ugly and costly, and requires git. Instead of this we should have our own tool
17+
// TODO: This is ugly. Instead of this we should have our own tool
1518
// build process that can generate protobuf definitions for this and other user cases. For now
1619
// this is easy and works and matches upstream Wormhole's `Makefile`.
17-
let _ = Command::new("git")
20+
21+
const WORMHOLE_VERSION: &str = "2.18.1";
22+
23+
let wh_curl = Command::new("curl")
1824
.args([
19-
"clone",
20-
"https://github.com/wormhole-foundation/wormhole",
21-
"wormhole",
25+
"-s",
26+
"-L",
27+
format!("https://github.com/wormhole-foundation/wormhole/archive/refs/tags/v{WORMHOLE_VERSION}.tar.gz").as_str(),
2228
])
29+
.stdout(Stdio::piped())
30+
.spawn()
31+
.expect("failed to download wormhole archive");
32+
33+
let _ = Command::new("tar")
34+
.args(["xvz"])
35+
.stdin(Stdio::from(wh_curl.stdout.unwrap()))
2336
.output()
24-
.expect("failed to execute process");
37+
.expect("failed to extract wormhole archive");
2538

2639
// Move the tools directory to the root of the repo because that's where the build script
2740
// expects it to be, paths get hardcoded into the binaries.
2841
let _ = Command::new("mv")
29-
.args(["wormhole/tools", "tools"])
42+
.args([
43+
format!("wormhole-{WORMHOLE_VERSION}/tools").as_str(),
44+
"tools",
45+
])
3046
.output()
31-
.expect("failed to execute process");
47+
.expect("failed to move wormhole tools directory");
3248

3349
// Move the protobuf definitions to the src/network directory, we don't have to do this
3450
// but it is more intuitive when debugging.
3551
let _ = Command::new("mv")
3652
.args([
37-
"wormhole/proto/gossip/v1/gossip.proto",
53+
format!("wormhole-{WORMHOLE_VERSION}/proto/gossip/v1/gossip.proto").as_str(),
3854
"src/network/p2p.proto",
3955
])
4056
.output()
41-
.expect("failed to execute process");
57+
.expect("failed to move wormhole protobuf definitions");
4258

4359
// Build the protobuf compiler.
4460
let _ = Command::new("./build.sh")
4561
.current_dir("tools")
4662
.output()
47-
.expect("failed to execute process");
63+
.expect("failed to run protobuf compiler build script");
4864

4965
// Make the protobuf compiler executable.
5066
let _ = Command::new("chmod")
5167
.args(["+x", "tools/bin/*"])
5268
.output()
53-
.expect("failed to execute process");
69+
.expect("failed to make protofuf compiler executable");
5470

5571
// Generate the protobuf definitions. See buf.gen.yaml to see how we rename the module for our
5672
// particular use case.
5773
let _ = Command::new("./tools/bin/buf")
5874
.args(["generate", "--path", "src"])
5975
.output()
60-
.expect("failed to execute process");
76+
.expect("failed to generate protobuf definitions");
6177

6278
// Build the Go library.
6379
let mut cmd = Command::new("go");

hermes/shell.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ with pkgs; mkShell {
77
clang
88
llvmPackages.libclang
99
nettle
10-
openssl
10+
openssl_1_1
1111
pkgconfig
1212
rustup
1313
systemd

hermes/src/api.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ use {
77
Router,
88
},
99
std::sync::Arc,
10+
tokio::{
11+
signal,
12+
sync::mpsc::Receiver,
13+
},
1014
};
1115

1216
mod rest;
@@ -15,12 +19,12 @@ mod ws;
1519

1620
#[derive(Clone)]
1721
pub struct State {
18-
pub store: Store,
22+
pub store: Arc<Store>,
1923
pub ws: Arc<ws::WsState>,
2024
}
2125

2226
impl State {
23-
pub fn new(store: Store) -> Self {
27+
pub fn new(store: Arc<Store>) -> Self {
2428
Self {
2529
store,
2630
ws: Arc::new(ws::WsState::new()),
@@ -32,7 +36,7 @@ impl State {
3236
///
3337
/// Currently this is based on Axum due to the simplicity and strong ecosystem support for the
3438
/// packages they are based on (tokio & hyper).
35-
pub async fn spawn(store: Store, rpc_addr: String) -> Result<()> {
39+
pub async fn run(store: Arc<Store>, mut update_rx: Receiver<()>, rpc_addr: String) -> Result<()> {
3640
let state = State::new(store);
3741

3842
// Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
@@ -50,28 +54,31 @@ pub async fn spawn(store: Store, rpc_addr: String) -> Result<()> {
5054
.with_state(state.clone());
5155

5256

53-
// Binds the axum's server to the configured address and port. This is a blocking call and will
54-
// not return until the server is shutdown.
55-
tokio::spawn(async move {
56-
// FIXME handle errors properly
57-
axum::Server::bind(&rpc_addr.parse().unwrap())
58-
.serve(app.into_make_service())
59-
.await
60-
.unwrap();
61-
});
62-
6357
// Call dispatch updates to websocket every 1 seconds
6458
// FIXME use a channel to get updates from the store
6559
tokio::spawn(async move {
6660
loop {
67-
dispatch_updates(
68-
state.store.get_price_feed_ids().into_iter().collect(),
69-
state.clone(),
70-
)
71-
.await;
72-
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
61+
// Panics if the update channel is closed, which should never happen.
62+
// If it happens we have no way to recover, so we just panic.
63+
update_rx
64+
.recv()
65+
.await
66+
.expect("state update channel is closed");
67+
68+
dispatch_updates(state.clone()).await;
7369
}
7470
});
7571

72+
// Binds the axum's server to the configured address and port. This is a blocking call and will
73+
// not return until the server is shutdown.
74+
axum::Server::try_bind(&rpc_addr.parse()?)?
75+
.serve(app.into_make_service())
76+
.with_graceful_shutdown(async {
77+
signal::ctrl_c()
78+
.await
79+
.expect("Ctrl-c signal handler failed.");
80+
})
81+
.await?;
82+
7683
Ok(())
7784
}

hermes/src/api/rest.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use {
3636
pub enum RestError {
3737
UpdateDataNotFound,
3838
CcipUpdateDataNotFound,
39+
InvalidCCIPInput,
3940
}
4041

4142
impl IntoResponse for RestError {
@@ -54,6 +55,9 @@ impl IntoResponse for RestError {
5455

5556
(StatusCode::BAD_GATEWAY, "CCIP update data not found").into_response()
5657
}
58+
RestError::InvalidCCIPInput => {
59+
(StatusCode::BAD_REQUEST, "Invalid CCIP input").into_response()
60+
}
5761
}
5862
}
5963
}
@@ -113,7 +117,7 @@ pub async fn latest_price_feeds(
113117
.price_feeds
114118
.into_iter()
115119
.map(|price_feed| {
116-
RpcPriceFeed::from_price_feed_message(price_feed, params.verbose, params.binary)
120+
RpcPriceFeed::from_price_feed_update(price_feed, params.verbose, params.binary)
117121
})
118122
.collect(),
119123
))
@@ -156,7 +160,8 @@ pub async fn get_vaa(
156160
.price_feeds
157161
.get(0)
158162
.ok_or(RestError::UpdateDataNotFound)?
159-
.publish_time; // TODO: This should never happen.
163+
.price_feed
164+
.publish_time;
160165

161166
Ok(Json(GetVaaResponse { vaa, publish_time }))
162167
}
@@ -179,8 +184,16 @@ pub async fn get_vaa_ccip(
179184
State(state): State<super::State>,
180185
QsQuery(params): QsQuery<GetVaaCcipQueryParams>,
181186
) -> Result<Json<GetVaaCcipResponse>, RestError> {
182-
let price_id: PriceIdentifier = PriceIdentifier::new(params.data[0..32].try_into().unwrap());
183-
let publish_time = UnixTimestamp::from_be_bytes(params.data[32..40].try_into().unwrap());
187+
let price_id: PriceIdentifier = PriceIdentifier::new(
188+
params.data[0..32]
189+
.try_into()
190+
.map_err(|_| RestError::InvalidCCIPInput)?,
191+
);
192+
let publish_time = UnixTimestamp::from_be_bytes(
193+
params.data[32..40]
194+
.try_into()
195+
.map_err(|_| RestError::InvalidCCIPInput)?,
196+
);
184197

185198
let price_feeds_with_update_data = state
186199
.store

hermes/src/api/types.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
11
use {
22
crate::{
33
impl_deserialize_for_hex_string_wrapper,
4-
store::types::UnixTimestamp,
4+
store::types::{
5+
PriceFeedUpdate,
6+
Slot,
7+
UnixTimestamp,
8+
},
9+
},
10+
base64::{
11+
engine::general_purpose::STANDARD as base64_standard_engine,
12+
Engine as _,
513
},
614
derive_more::{
715
Deref,
816
DerefMut,
917
},
10-
pyth_oracle::PriceFeedMessage,
1118
pyth_sdk::{
1219
Price,
1320
PriceIdentifier,
1421
},
22+
wormhole_sdk::Chain,
1523
};
1624

1725

@@ -34,8 +42,8 @@ type Base64String = String;
3442

3543
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
3644
pub struct RpcPriceFeedMetadata {
45+
pub slot: Slot,
3746
pub emitter_chain: u16,
38-
pub sequence_number: u64,
3947
pub price_service_receive_time: UnixTimestamp,
4048
}
4149

@@ -54,11 +62,13 @@ pub struct RpcPriceFeed {
5462
impl RpcPriceFeed {
5563
// TODO: Use a Encoding type to have None, Base64, and Hex variants instead of binary flag.
5664
// TODO: Use a Verbosity type to define None, or Full instead of verbose flag.
57-
pub fn from_price_feed_message(
58-
price_feed_message: PriceFeedMessage,
59-
_verbose: bool,
60-
_binary: bool,
65+
pub fn from_price_feed_update(
66+
price_feed_update: PriceFeedUpdate,
67+
verbose: bool,
68+
binary: bool,
6169
) -> Self {
70+
let price_feed_message = price_feed_update.price_feed;
71+
6272
Self {
6373
id: PriceIdentifier::new(price_feed_message.id),
6474
price: Price {
@@ -73,16 +83,14 @@ impl RpcPriceFeed {
7383
expo: price_feed_message.exponent,
7484
publish_time: price_feed_message.publish_time,
7585
},
76-
// FIXME: Handle verbose flag properly.
77-
// metadata: verbose.then_some(RpcPriceFeedMetadata {
78-
// emitter_chain: price_feed_message.emitter_chain,
79-
// sequence_number: price_feed_message.sequence_number,
80-
// price_service_receive_time: price_feed_message.receive_time,
81-
// }),
82-
metadata: None,
83-
// FIXME: The vaa is wrong, fix it
84-
// vaa: binary.then_some(base64_standard_engine.encode(message_state.proof_set.wormhole_merkle_proof.vaa)),
85-
vaa: None,
86+
metadata: verbose.then_some(RpcPriceFeedMetadata {
87+
emitter_chain: Chain::Pythnet.into(),
88+
price_service_receive_time: price_feed_update.received_at,
89+
slot: price_feed_update.slot,
90+
}),
91+
vaa: binary.then_some(
92+
base64_standard_engine.encode(price_feed_update.wormhole_merkle_update_data),
93+
),
8694
}
8795
}
8896
}

0 commit comments

Comments
 (0)