Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.10.0-alpha"
version = "0.10.1-alpha"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
2 changes: 1 addition & 1 deletion apps/hermes/server/src/api/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ mod tests {
unimplemented!("Not needed for this test")
}

async fn store_update(&self, _update: Update) -> Result<()> {
async fn store_update(&self, _update: Update) -> Result<bool> {
unimplemented!("Not needed for this test")
}

Expand Down
6 changes: 4 additions & 2 deletions apps/hermes/server/src/network/pythnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,10 @@ where
tokio::spawn({
let state = state.clone();
async move {
if let Err(e) = state.process_message(vaa_bytes).await {
tracing::debug!(error = ?e, "Skipped VAA.");
// We always want to verify the VAA, even if it has been seen before.
// This ensures that VAAs from the quorum are valid, and allows us to alert and log an error if they are not.
if let Err(e) = state.process_message(vaa_bytes, true).await {
tracing::error!(error = ?e, "Received an invalid VAA from PythNet quorum.");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice quorum is in Pythnet module. Would it be possible to move it out? (not necessary as this code is not gonna change but it's nicer).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me create a new PR for this

Where should I put it?

}
}
});
Expand Down
5 changes: 4 additions & 1 deletion apps/hermes/server/src/network/wormhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ where
while let Some(Ok(message)) = stream.next().await {
let state = state.clone();
tokio::spawn(async move {
if let Err(e) = state.process_message(message.vaa_bytes).await {
// We do not want to verify the VAA if it has already been seen.
// This improves performance, since the beacon may send the same body
// multiple times with different signatures.
if let Err(e) = state.process_message(message.vaa_bytes, false).await {
tracing::debug!(error = ?e, "Skipped VAA.");
}
});
Expand Down
12 changes: 6 additions & 6 deletions apps/hermes/server/src/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ where
{
fn subscribe(&self) -> Receiver<AggregationEvent>;
async fn is_ready(&self) -> (bool, ReadinessMetadata);
async fn store_update(&self, update: Update) -> Result<()>;
async fn store_update(&self, update: Update) -> Result<bool>;
async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier>;
async fn get_price_feeds_with_update_data(
&self,
Expand Down Expand Up @@ -274,7 +274,7 @@ where

/// Stores the update data in the store
#[tracing::instrument(skip(self, update))]
async fn store_update(&self, update: Update) -> Result<()> {
async fn store_update(&self, update: Update) -> Result<bool> {
// The slot that the update is originating from. It should be available
// in all the updates.
let slot = match update {
Expand All @@ -300,7 +300,7 @@ where
slot = proof.slot,
"VAA Merkle Proof already stored, skipping."
);
return Ok(());
return Ok(false);
}

self.into()
Expand Down Expand Up @@ -331,7 +331,7 @@ where
slot = slot,
"Accumulator Messages already stored, skipping."
);
return Ok(());
return Ok(false);
}

self.into()
Expand All @@ -358,7 +358,7 @@ where
(Some(accumulator_messages), Some(wormhole_merkle_state)) => {
(accumulator_messages, wormhole_merkle_state)
}
_ => return Ok(()),
_ => return Ok(true),
};

tracing::info!(slot = wormhole_merkle_state.root.slot, "Completed Update.");
Expand Down Expand Up @@ -408,7 +408,7 @@ where
.metrics
.observe(slot, metrics::Event::CompletedUpdate);

Ok(())
Ok(true)
}

async fn get_twaps_with_update_data(
Expand Down
55 changes: 34 additions & 21 deletions apps/hermes/server/src/state/wormhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,13 @@ impl<'a> From<&'a State> for &'a WormholeState {

#[async_trait::async_trait]
pub trait Wormhole: Aggregates {
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>);
async fn process_message(&self, vaa_bytes: Vec<u8>) -> Result<()>;
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>) -> bool;
/// Process a Wormhole message, extracting the VAA and storing it in the state.
/// If `always_verify` is false, it will check if the VAA has been seen before verifying it.
/// If true, it will verify the VAA even if it has been seen before.
/// Returns true if the message was processed successfully, false if it was already seen.
/// Throws an error if the VAA is invalid or cannot be processed.
async fn process_message(&self, vaa_bytes: Vec<u8>, always_verify: bool) -> Result<bool>;
async fn update_guardian_set(&self, id: u32, guardian_set: GuardianSet);
}

Expand All @@ -80,27 +85,35 @@ where
}

#[tracing::instrument(skip(self, vaa_bytes))]
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>) {
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>) -> bool {
// Check VAA hasn't already been seen, this may have been checked previously
// but due to async nature it's possible other threads have mutated the state
// since this VAA started processing.
let mut observed_vaa_seqs = self.into().observed_vaa_seqs.write().await;
if observed_vaa_seqs.contains(&sequence) {
return;
return false;
}

// Clear old cached VAA sequences.
while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
observed_vaa_seqs.pop_first();
}

observed_vaa_seqs.insert(sequence);
// Drop the lock to allow other threads to access the state.
drop(observed_vaa_seqs);

// Hand the VAA to the aggregate store.
if let Err(e) = Aggregates::store_update(self, Update::Vaa(vaa_bytes)).await {
tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
match Aggregates::store_update(self, Update::Vaa(vaa_bytes)).await {
Ok(is_stored) => is_stored,
Err(e) => {
tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
false
}
}
}

async fn process_message(&self, vaa_bytes: Vec<u8>) -> Result<()> {
async fn process_message(&self, vaa_bytes: Vec<u8>, always_verify: bool) -> Result<bool> {
let vaa = serde_wormhole::from_slice::<Vaa<&RawMessage>>(&vaa_bytes)?;

// Log VAA Processing.
Expand All @@ -114,17 +127,19 @@ where
};
tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA");

// Check VAA hasn't already been seen.
ensure!(
!self
.into()
.observed_vaa_seqs
.read()
.await
.contains(&vaa.sequence),
"Previously observed VAA: {}",
vaa.sequence
);
if !always_verify {
// Check VAA hasn't already been seen.
ensure!(
!self
.into()
.observed_vaa_seqs
.read()
.await
.contains(&vaa.sequence),
"Previously observed VAA: {}",
vaa.sequence
);
}

// Check VAA source is valid, we don't want to process other protocols VAAs.
validate_vaa_source(&vaa)?;
Expand All @@ -140,9 +155,7 @@ where
vaa,
)?;

// Finally, store the resulting VAA in Hermes.
self.store_vaa(vaa.sequence, vaa_bytes).await;
Ok(())
Ok(self.store_vaa(vaa.sequence, vaa_bytes).await)
}
}
// Rejects VAAs from invalid sources.
Expand Down
Loading