diff --git a/apps/hermes/server/Cargo.lock b/apps/hermes/server/Cargo.lock index be9fa40418..999cc026dc 100644 --- a/apps/hermes/server/Cargo.lock +++ b/apps/hermes/server/Cargo.lock @@ -1880,7 +1880,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.10.0-alpha" +version = "0.10.1-alpha" dependencies = [ "anyhow", "async-trait", diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index 1dfc6ca65f..5fa2b31813 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.10.0-alpha" +version = "0.10.1-alpha" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/server/src/api/rest.rs b/apps/hermes/server/src/api/rest.rs index 3fb1208e95..60c2bf31ac 100644 --- a/apps/hermes/server/src/api/rest.rs +++ b/apps/hermes/server/src/api/rest.rs @@ -182,7 +182,7 @@ mod tests { unimplemented!("Not needed for this test") } - async fn store_update(&self, _update: Update) -> Result<()> { + async fn store_update(&self, _update: Update) -> Result { unimplemented!("Not needed for this test") } diff --git a/apps/hermes/server/src/network/pythnet.rs b/apps/hermes/server/src/network/pythnet.rs index 099452edab..906e129039 100644 --- a/apps/hermes/server/src/network/pythnet.rs +++ b/apps/hermes/server/src/network/pythnet.rs @@ -511,8 +511,10 @@ where tokio::spawn({ let state = state.clone(); async move { - if let Err(e) = state.process_message(vaa_bytes).await { - tracing::debug!(error = ?e, "Skipped VAA."); + // We always want to verify the VAA, even if it has been seen before. + // This ensures that VAAs from the quorum are valid, and allows us to alert and log an error if they are not. + if let Err(e) = state.process_message(vaa_bytes, true).await { + tracing::error!(error = ?e, "Received an invalid VAA from PythNet quorum."); } } }); diff --git a/apps/hermes/server/src/network/wormhole.rs b/apps/hermes/server/src/network/wormhole.rs index 623a23d276..85747000fe 100644 --- a/apps/hermes/server/src/network/wormhole.rs +++ b/apps/hermes/server/src/network/wormhole.rs @@ -160,7 +160,10 @@ where while let Some(Ok(message)) = stream.next().await { let state = state.clone(); tokio::spawn(async move { - if let Err(e) = state.process_message(message.vaa_bytes).await { + // We do not want to verify the VAA if it has already been seen. + // This improves performance, since the beacon may send the same body + // multiple times with different signatures. + if let Err(e) = state.process_message(message.vaa_bytes, false).await { tracing::debug!(error = ?e, "Skipped VAA."); } }); diff --git a/apps/hermes/server/src/state/aggregate.rs b/apps/hermes/server/src/state/aggregate.rs index 237b0c0fa5..9622fe621b 100644 --- a/apps/hermes/server/src/state/aggregate.rs +++ b/apps/hermes/server/src/state/aggregate.rs @@ -233,7 +233,7 @@ where { fn subscribe(&self) -> Receiver; async fn is_ready(&self) -> (bool, ReadinessMetadata); - async fn store_update(&self, update: Update) -> Result<()>; + async fn store_update(&self, update: Update) -> Result; async fn get_price_feed_ids(&self) -> HashSet; async fn get_price_feeds_with_update_data( &self, @@ -274,7 +274,7 @@ where /// Stores the update data in the store #[tracing::instrument(skip(self, update))] - async fn store_update(&self, update: Update) -> Result<()> { + async fn store_update(&self, update: Update) -> Result { // The slot that the update is originating from. It should be available // in all the updates. let slot = match update { @@ -300,7 +300,7 @@ where slot = proof.slot, "VAA Merkle Proof already stored, skipping." ); - return Ok(()); + return Ok(false); } self.into() @@ -331,7 +331,7 @@ where slot = slot, "Accumulator Messages already stored, skipping." ); - return Ok(()); + return Ok(false); } self.into() @@ -358,7 +358,7 @@ where (Some(accumulator_messages), Some(wormhole_merkle_state)) => { (accumulator_messages, wormhole_merkle_state) } - _ => return Ok(()), + _ => return Ok(true), }; tracing::info!(slot = wormhole_merkle_state.root.slot, "Completed Update."); @@ -408,7 +408,7 @@ where .metrics .observe(slot, metrics::Event::CompletedUpdate); - Ok(()) + Ok(true) } async fn get_twaps_with_update_data( diff --git a/apps/hermes/server/src/state/wormhole.rs b/apps/hermes/server/src/state/wormhole.rs index 845e695d28..a593d71899 100644 --- a/apps/hermes/server/src/state/wormhole.rs +++ b/apps/hermes/server/src/state/wormhole.rs @@ -60,8 +60,13 @@ impl<'a> From<&'a State> for &'a WormholeState { #[async_trait::async_trait] pub trait Wormhole: Aggregates { - async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec); - async fn process_message(&self, vaa_bytes: Vec) -> Result<()>; + async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec) -> bool; + /// Process a Wormhole message, extracting the VAA and storing it in the state. + /// If `always_verify` is false, it will check if the VAA has been seen before verifying it. + /// If true, it will verify the VAA even if it has been seen before. + /// Returns true if the message was processed successfully, false if it was already seen. + /// Throws an error if the VAA is invalid or cannot be processed. + async fn process_message(&self, vaa_bytes: Vec, always_verify: bool) -> Result; async fn update_guardian_set(&self, id: u32, guardian_set: GuardianSet); } @@ -80,13 +85,13 @@ where } #[tracing::instrument(skip(self, vaa_bytes))] - async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec) { + async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec) -> bool { // Check VAA hasn't already been seen, this may have been checked previously // but due to async nature it's possible other threads have mutated the state // since this VAA started processing. let mut observed_vaa_seqs = self.into().observed_vaa_seqs.write().await; if observed_vaa_seqs.contains(&sequence) { - return; + return false; } // Clear old cached VAA sequences. @@ -94,13 +99,21 @@ where observed_vaa_seqs.pop_first(); } + observed_vaa_seqs.insert(sequence); + // Drop the lock to allow other threads to access the state. + drop(observed_vaa_seqs); + // Hand the VAA to the aggregate store. - if let Err(e) = Aggregates::store_update(self, Update::Vaa(vaa_bytes)).await { - tracing::error!(error = ?e, "Failed to store VAA in aggregate store."); + match Aggregates::store_update(self, Update::Vaa(vaa_bytes)).await { + Ok(is_stored) => is_stored, + Err(e) => { + tracing::error!(error = ?e, "Failed to store VAA in aggregate store."); + false + } } } - async fn process_message(&self, vaa_bytes: Vec) -> Result<()> { + async fn process_message(&self, vaa_bytes: Vec, always_verify: bool) -> Result { let vaa = serde_wormhole::from_slice::>(&vaa_bytes)?; // Log VAA Processing. @@ -114,17 +127,19 @@ where }; tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA"); - // Check VAA hasn't already been seen. - ensure!( - !self - .into() - .observed_vaa_seqs - .read() - .await - .contains(&vaa.sequence), - "Previously observed VAA: {}", - vaa.sequence - ); + if !always_verify { + // Check VAA hasn't already been seen. + ensure!( + !self + .into() + .observed_vaa_seqs + .read() + .await + .contains(&vaa.sequence), + "Previously observed VAA: {}", + vaa.sequence + ); + } // Check VAA source is valid, we don't want to process other protocols VAAs. validate_vaa_source(&vaa)?; @@ -140,9 +155,7 @@ where vaa, )?; - // Finally, store the resulting VAA in Hermes. - self.store_vaa(vaa.sequence, vaa_bytes).await; - Ok(()) + Ok(self.store_vaa(vaa.sequence, vaa_bytes).await) } } // Rejects VAAs from invalid sources.