Skip to content

Commit d5967af

Browse files
committed
Hermes: Add error log when receiving invalid quorum VAA
1 parent 6ba707b commit d5967af

File tree

6 files changed

+25
-32
lines changed

6 files changed

+25
-32
lines changed

apps/hermes/server/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/hermes/server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.10.0-alpha"
3+
version = "0.10.1-alpha"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

apps/hermes/server/src/api/rest.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ mod tests {
182182
unimplemented!("Not needed for this test")
183183
}
184184

185-
async fn store_update(&self, _update: Update) -> Result<()> {
185+
async fn store_update(&self, _update: Update) -> Result<bool> {
186186
unimplemented!("Not needed for this test")
187187
}
188188

apps/hermes/server/src/network/pythnet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ where
512512
let state = state.clone();
513513
async move {
514514
if let Err(e) = state.process_message(vaa_bytes).await {
515-
tracing::debug!(error = ?e, "Skipped VAA.");
515+
tracing::error!(error = ?e, "Received an invalid VAA from PythNet quorum.");
516516
}
517517
}
518518
});

apps/hermes/server/src/state/aggregate.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ where
233233
{
234234
fn subscribe(&self) -> Receiver<AggregationEvent>;
235235
async fn is_ready(&self) -> (bool, ReadinessMetadata);
236-
async fn store_update(&self, update: Update) -> Result<()>;
236+
async fn store_update(&self, update: Update) -> Result<bool>;
237237
async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier>;
238238
async fn get_price_feeds_with_update_data(
239239
&self,
@@ -274,7 +274,7 @@ where
274274

275275
/// Stores the update data in the store
276276
#[tracing::instrument(skip(self, update))]
277-
async fn store_update(&self, update: Update) -> Result<()> {
277+
async fn store_update(&self, update: Update) -> Result<bool> {
278278
// The slot that the update is originating from. It should be available
279279
// in all the updates.
280280
let slot = match update {
@@ -300,7 +300,7 @@ where
300300
slot = proof.slot,
301301
"VAA Merkle Proof already stored, skipping."
302302
);
303-
return Ok(());
303+
return Ok(false);
304304
}
305305

306306
self.into()
@@ -331,7 +331,7 @@ where
331331
slot = slot,
332332
"Accumulator Messages already stored, skipping."
333333
);
334-
return Ok(());
334+
return Ok(false);
335335
}
336336

337337
self.into()
@@ -358,7 +358,7 @@ where
358358
(Some(accumulator_messages), Some(wormhole_merkle_state)) => {
359359
(accumulator_messages, wormhole_merkle_state)
360360
}
361-
_ => return Ok(()),
361+
_ => return Ok(true),
362362
};
363363

364364
tracing::info!(slot = wormhole_merkle_state.root.slot, "Completed Update.");
@@ -408,7 +408,7 @@ where
408408
.metrics
409409
.observe(slot, metrics::Event::CompletedUpdate);
410410

411-
Ok(())
411+
Ok(true)
412412
}
413413

414414
async fn get_twaps_with_update_data(

apps/hermes/server/src/state/wormhole.rs

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,11 @@ impl<'a> From<&'a State> for &'a WormholeState {
6060

6161
#[async_trait::async_trait]
6262
pub trait Wormhole: Aggregates {
63-
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>);
64-
async fn process_message(&self, vaa_bytes: Vec<u8>) -> Result<()>;
63+
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>) -> bool;
64+
/// Process a Wormhole message, extracting the VAA and storing it in the state.
65+
/// Returns true if the message was processed successfully, false if it was already seen.
66+
/// Throws an error if the VAA is invalid or cannot be processed.
67+
async fn process_message(&self, vaa_bytes: Vec<u8>) -> Result<bool>;
6568
async fn update_guardian_set(&self, id: u32, guardian_set: GuardianSet);
6669
}
6770

@@ -80,13 +83,13 @@ where
8083
}
8184

8285
#[tracing::instrument(skip(self, vaa_bytes))]
83-
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>) {
86+
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>) -> bool {
8487
// Check VAA hasn't already been seen, this may have been checked previously
8588
// but due to async nature it's possible other threads have mutated the state
8689
// since this VAA started processing.
8790
let mut observed_vaa_seqs = self.into().observed_vaa_seqs.write().await;
8891
if observed_vaa_seqs.contains(&sequence) {
89-
return;
92+
return false;
9093
}
9194

9295
// Clear old cached VAA sequences.
@@ -95,12 +98,16 @@ where
9598
}
9699

97100
// Hand the VAA to the aggregate store.
98-
if let Err(e) = Aggregates::store_update(self, Update::Vaa(vaa_bytes)).await {
99-
tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
101+
match Aggregates::store_update(self, Update::Vaa(vaa_bytes)).await {
102+
Ok(is_stored) => is_stored,
103+
Err(e) => {
104+
tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
105+
false
106+
}
100107
}
101108
}
102109

103-
async fn process_message(&self, vaa_bytes: Vec<u8>) -> Result<()> {
110+
async fn process_message(&self, vaa_bytes: Vec<u8>) -> Result<bool> {
104111
let vaa = serde_wormhole::from_slice::<Vaa<&RawMessage>>(&vaa_bytes)?;
105112

106113
// Log VAA Processing.
@@ -114,18 +121,6 @@ where
114121
};
115122
tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA");
116123

117-
// Check VAA hasn't already been seen.
118-
ensure!(
119-
!self
120-
.into()
121-
.observed_vaa_seqs
122-
.read()
123-
.await
124-
.contains(&vaa.sequence),
125-
"Previously observed VAA: {}",
126-
vaa.sequence
127-
);
128-
129124
// Check VAA source is valid, we don't want to process other protocols VAAs.
130125
validate_vaa_source(&vaa)?;
131126

@@ -140,9 +135,7 @@ where
140135
vaa,
141136
)?;
142137

143-
// Finally, store the resulting VAA in Hermes.
144-
self.store_vaa(vaa.sequence, vaa_bytes).await;
145-
Ok(())
138+
Ok(self.store_vaa(vaa.sequence, vaa_bytes).await)
146139
}
147140
}
148141
// Rejects VAAs from invalid sources.

0 commit comments

Comments
 (0)