Skip to content
Merged
52 changes: 27 additions & 25 deletions apps/hermes/server/src/api/rest/v2/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,34 +93,36 @@ where
// Convert the broadcast receiver into a Stream
let stream = BroadcastStream::new(update_rx);

let sse_stream = stream.then(move |message| {
let state_clone = state.clone(); // Clone again to use inside the async block
let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block
async move {
match message {
Ok(event) => {
match handle_aggregation_event(
event,
state_clone,
price_ids_clone,
params.encoding,
params.parsed,
params.benchmarks_only,
params.allow_unordered,
)
.await
{
Ok(Some(update)) => Ok(Event::default()
.json_data(update)
.unwrap_or_else(error_event)),
Ok(None) => Ok(Event::default().comment("No update available")),
Err(e) => Ok(error_event(e)),
let sse_stream = stream
.then(move |message| {
let state_clone = state.clone(); // Clone again to use inside the async block
let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block
async move {
match message {
Ok(event) => {
match handle_aggregation_event(
event,
state_clone,
price_ids_clone,
params.encoding,
params.parsed,
params.benchmarks_only,
params.allow_unordered,
)
.await
{
Ok(Some(update)) => Some(Ok(Event::default()
.json_data(update)
.unwrap_or_else(error_event))),
Ok(None) => None,
Err(e) => Some(Ok(error_event(e))),
}
}
Err(e) => Some(Ok(error_event(e))),
}
Err(e) => Ok(error_event(e)),
}
}
});
})
.filter_map(|x| x);

Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}
Expand Down
161 changes: 144 additions & 17 deletions apps/hermes/server/src/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,23 @@ where
WormholePayload::Merkle(proof) => {
tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof.");

store_wormhole_merkle_verified_message(
// Store the wormhole merkle verified message and check if it was already stored
let is_new = store_wormhole_merkle_verified_message(
self,
proof.clone(),
update_vaa.to_owned(),
)
.await?;

// If the message was already stored, return early
if !is_new {
tracing::info!(
slot = proof.slot,
"VAA Merkle Proof already stored, skipping."
);
return Ok(());
}

self.into()
.data
.write()
Expand All @@ -304,9 +314,22 @@ where
let slot = accumulator_messages.slot;
tracing::info!(slot = slot, "Storing Accumulator Messages.");

self.store_accumulator_messages(accumulator_messages)
// Store the accumulator messages and check if they were already stored in a single operation
// This avoids the race condition where multiple threads could check and find nothing
// but then both store the same messages
let is_new = self
.store_accumulator_messages(accumulator_messages)
.await?;

// If the messages were already stored, return early
if !is_new {
tracing::info!(
slot = slot,
"Accumulator Messages already stored, skipping."
);
return Ok(());
}

self.into()
.data
.write()
Expand Down Expand Up @@ -351,28 +374,23 @@ where
// Update the aggregate state
let mut aggregate_state = self.into().data.write().await;

// Send update event to subscribers. We are purposefully ignoring the result
// because there might be no subscribers.
let _ = match aggregate_state.latest_completed_slot {
// Atomic check and update
let event = match aggregate_state.latest_completed_slot {
None => {
aggregate_state.latest_completed_slot.replace(slot);
self.into()
.api_update_tx
.send(AggregationEvent::New { slot })
aggregate_state.latest_completed_slot = Some(slot);
AggregationEvent::New { slot }
}
Some(latest) if slot > latest => {
self.prune_removed_keys(message_state_keys).await;
aggregate_state.latest_completed_slot.replace(slot);
self.into()
.api_update_tx
.send(AggregationEvent::New { slot })
aggregate_state.latest_completed_slot = Some(slot);
AggregationEvent::New { slot }
}
_ => self
.into()
.api_update_tx
.send(AggregationEvent::OutOfOrder { slot }),
_ => AggregationEvent::OutOfOrder { slot },
};

// Only send the event after the state has been updated
let _ = self.into().api_update_tx.send(event);

aggregate_state.latest_completed_slot = aggregate_state
.latest_completed_slot
.map(|latest| latest.max(slot))
Expand Down Expand Up @@ -1374,6 +1392,115 @@ mod test {

assert_eq!(result.unwrap_err().to_string(), "Message not found");
}

/// Test that verifies only one event is sent per slot, even when updates arrive out of order
/// or when a slot is processed multiple times.
#[tokio::test]
pub async fn test_out_of_order_updates_send_single_event_per_slot() {
let (state, mut update_rx) = setup_state(10).await;

// Create price feed messages
let price_feed_100 = create_dummy_price_feed_message(100, 10, 9);
let price_feed_101 = create_dummy_price_feed_message(100, 11, 10);

// First, process slot 100
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_100)], 100, 20),
)
.await;

// Check that we received the New event for slot 100
assert_eq!(
update_rx.recv().await,
Ok(AggregationEvent::New { slot: 100 })
);

// Next, process slot 101
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_101)], 101, 21),
)
.await;

// Check that we received the New event for slot 101
assert_eq!(
update_rx.recv().await,
Ok(AggregationEvent::New { slot: 101 })
);

// Now, process slot 100 again
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_100)], 100, 22),
)
.await;

// Try to receive another event with a timeout to ensure no more events were sent
// We should not receive an OutOfOrder event for slot 100 since we've already sent an event for it
let timeout_result =
tokio::time::timeout(std::time::Duration::from_millis(100), update_rx.recv()).await;

// The timeout should occur, indicating no more events were received
assert!(
timeout_result.is_err(),
"Received unexpected additional event"
);

// Verify that both price feeds were stored correctly
let price_feed_ids = (*state).get_price_feed_ids().await;
assert_eq!(price_feed_ids.len(), 1);
assert!(price_feed_ids.contains(&PriceIdentifier::new([100; 32])));
}

/// Test that verifies only one event is sent when multiple concurrent updates
/// for the same slot are processed.
#[tokio::test]
pub async fn test_concurrent_updates_same_slot_sends_single_event() {
let (state, mut update_rx) = setup_state(10).await;

// Create a single price feed message
let price_feed = create_dummy_price_feed_message(100, 10, 9);

// Generate 100 identical updates for the same slot but with different sequence numbers
let mut all_updates = Vec::new();
for seq in 0..100 {
let updates = generate_update(vec![Message::PriceFeedMessage(price_feed)], 10, seq);
all_updates.extend(updates);
}

// Process updates concurrently - we don't care if some fail due to the race condition
// The important thing is that only one event is sent
let state_arc = Arc::clone(&state);
let futures = all_updates.into_iter().map(move |u| {
let state_clone = Arc::clone(&state_arc);
async move {
let _ = state_clone.store_update(u).await;
}
});
futures::future::join_all(futures).await;

// Check that only one AggregationEvent::New is received
assert_eq!(
update_rx.recv().await,
Ok(AggregationEvent::New { slot: 10 })
);

// Try to receive another event with a timeout to ensure no more events were sent
let timeout_result =
tokio::time::timeout(std::time::Duration::from_millis(100), update_rx.recv()).await;

// The timeout should occur, indicating no more events were received
assert!(
timeout_result.is_err(),
"Received unexpected additional event"
);

// Verify that the price feed was stored correctly
let price_feed_ids = (*state).get_price_feed_ids().await;
assert_eq!(price_feed_ids.len(), 1);
assert!(price_feed_ids.contains(&PriceIdentifier::new([100; 32])));
}
}
#[cfg(test)]
/// Unit tests for the core TWAP calculation logic in `calculate_twap`
Expand Down
8 changes: 5 additions & 3 deletions apps/hermes/server/src/state/aggregate/wormhole_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ pub async fn store_wormhole_merkle_verified_message<S>(
state: &S,
root: WormholeMerkleRoot,
vaa: VaaBytes,
) -> Result<()>
) -> Result<bool>
where
S: Cache,
{
// Store the state and check if it was already stored in a single operation
// This avoids the race condition where multiple threads could check and find nothing
// but then both store the same state
state
.store_wormhole_merkle_state(WormholeMerkleState { root, vaa })
.await?;
Ok(())
.await
}

pub fn construct_message_states_proofs(
Expand Down
64 changes: 28 additions & 36 deletions apps/hermes/server/src/state/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ pub trait Cache {
async fn store_accumulator_messages(
&self,
accumulator_messages: AccumulatorMessages,
) -> Result<()>;
) -> Result<bool>;
async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>>;
async fn store_wormhole_merkle_state(
&self,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()>;
) -> Result<bool>;
async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>>;
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
async fn fetch_message_states(
Expand Down Expand Up @@ -226,13 +226,22 @@ where
async fn store_accumulator_messages(
&self,
accumulator_messages: AccumulatorMessages,
) -> Result<()> {
) -> Result<bool> {
let mut cache = self.into().accumulator_messages_cache.write().await;
cache.insert(accumulator_messages.slot, accumulator_messages);
let slot = accumulator_messages.slot;

// Check if we already have messages for this slot while holding the lock
if cache.contains_key(&slot) {
// Messages already exist, return false to indicate no insertion happened
return Ok(false);
}

// Messages don't exist, store them
cache.insert(slot, accumulator_messages);
while cache.len() > self.into().cache_size as usize {
cache.pop_first();
}
Ok(())
Ok(true)
}

async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>> {
Expand All @@ -243,13 +252,22 @@ where
async fn store_wormhole_merkle_state(
&self,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()> {
) -> Result<bool> {
let mut cache = self.into().wormhole_merkle_state_cache.write().await;
cache.insert(wormhole_merkle_state.root.slot, wormhole_merkle_state);
let slot = wormhole_merkle_state.root.slot;

// Check if we already have a state for this slot while holding the lock
if cache.contains_key(&slot) {
// State already exists, return false to indicate no insertion happened
return Ok(false);
}

// State doesn't exist, store it
cache.insert(slot, wormhole_merkle_state);
while cache.len() > self.into().cache_size as usize {
cache.pop_first();
}
Ok(())
Ok(true)
}

async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>> {
Expand Down Expand Up @@ -702,18 +720,7 @@ mod test {
let (state, _) = setup_state(2).await;

// Make sure the retrieved accumulator messages is what we store.
let mut accumulator_messages_at_10 = create_empty_accumulator_messages_at_slot(10);
state
.store_accumulator_messages(accumulator_messages_at_10.clone())
.await
.unwrap();
assert_eq!(
state.fetch_accumulator_messages(10).await.unwrap().unwrap(),
accumulator_messages_at_10
);

// Make sure overwriting the accumulator messages works.
accumulator_messages_at_10.ring_size = 5; // Change the ring size from 3 to 5.
let accumulator_messages_at_10 = create_empty_accumulator_messages_at_slot(10);
state
.store_accumulator_messages(accumulator_messages_at_10.clone())
.await
Expand Down Expand Up @@ -764,22 +771,7 @@ mod test {
let (state, _) = setup_state(2).await;

// Make sure the retrieved wormhole merkle state is what we store
let mut wormhole_merkle_state_at_10 = create_empty_wormhole_merkle_state_at_slot(10);
state
.store_wormhole_merkle_state(wormhole_merkle_state_at_10.clone())
.await
.unwrap();
assert_eq!(
state
.fetch_wormhole_merkle_state(10)
.await
.unwrap()
.unwrap(),
wormhole_merkle_state_at_10
);

// Make sure overwriting the wormhole merkle state works.
wormhole_merkle_state_at_10.root.ring_size = 5; // Change the ring size from 3 to 5.
let wormhole_merkle_state_at_10 = create_empty_wormhole_merkle_state_at_slot(10);
state
.store_wormhole_merkle_state(wormhole_merkle_state_at_10.clone())
.await
Expand Down
Loading