Skip to content
Merged
52 changes: 27 additions & 25 deletions apps/hermes/server/src/api/rest/v2/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,34 +93,36 @@ where
// Convert the broadcast receiver into a Stream
let stream = BroadcastStream::new(update_rx);

let sse_stream = stream.then(move |message| {
let state_clone = state.clone(); // Clone again to use inside the async block
let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block
async move {
match message {
Ok(event) => {
match handle_aggregation_event(
event,
state_clone,
price_ids_clone,
params.encoding,
params.parsed,
params.benchmarks_only,
params.allow_unordered,
)
.await
{
Ok(Some(update)) => Ok(Event::default()
.json_data(update)
.unwrap_or_else(error_event)),
Ok(None) => Ok(Event::default().comment("No update available")),
Err(e) => Ok(error_event(e)),
let sse_stream = stream
.then(move |message| {
let state_clone = state.clone(); // Clone again to use inside the async block
let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block
async move {
match message {
Ok(event) => {
match handle_aggregation_event(
event,
state_clone,
price_ids_clone,
params.encoding,
params.parsed,
params.benchmarks_only,
params.allow_unordered,
)
.await
{
Ok(Some(update)) => Some(Ok(Event::default()
.json_data(update)
.unwrap_or_else(error_event))),
Ok(None) => None,
Err(e) => Some(Ok(error_event(e))),
}
}
Err(e) => Some(Ok(error_event(e))),
}
Err(e) => Ok(error_event(e)),
}
}
});
})
.filter_map(|x| x);

Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}
Expand Down
148 changes: 133 additions & 15 deletions apps/hermes/server/src/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ pub struct AggregateStateData {

/// Aggregate Specific Metrics
pub metrics: metrics::Metrics,

/// Set of slots for which events have already been sent.
/// This prevents sending multiple events for the same slot.
pub slots_with_sent_events: HashSet<Slot>,
}

impl AggregateStateData {
Expand All @@ -120,6 +124,7 @@ impl AggregateStateData {
metrics: metrics::Metrics::new(metrics_registry),
readiness_staleness_threshold,
readiness_max_allowed_slot_lag,
slots_with_sent_events: HashSet::new(),
}
}
}
Expand Down Expand Up @@ -351,28 +356,32 @@ where
// Update the aggregate state
let mut aggregate_state = self.into().data.write().await;

// Send update event to subscribers. We are purposefully ignoring the result
// because there might be no subscribers.
let _ = match aggregate_state.latest_completed_slot {
// Check if we've already sent an event for this slot
if aggregate_state.slots_with_sent_events.contains(&slot) {
// We've already sent an event for this slot, don't send another one
return Ok(());
}

// Atomic check and update
let event = match aggregate_state.latest_completed_slot {
None => {
aggregate_state.latest_completed_slot.replace(slot);
self.into()
.api_update_tx
.send(AggregationEvent::New { slot })
aggregate_state.latest_completed_slot = Some(slot);
AggregationEvent::New { slot }
}
Some(latest) if slot > latest => {
self.prune_removed_keys(message_state_keys).await;
aggregate_state.latest_completed_slot.replace(slot);
self.into()
.api_update_tx
.send(AggregationEvent::New { slot })
aggregate_state.latest_completed_slot = Some(slot);
AggregationEvent::New { slot }
}
_ => self
.into()
.api_update_tx
.send(AggregationEvent::OutOfOrder { slot }),
_ => AggregationEvent::OutOfOrder { slot },
};

// Mark this slot as having sent an event
aggregate_state.slots_with_sent_events.insert(slot);

// Only send the event after the state has been updated
let _ = self.into().api_update_tx.send(event);

aggregate_state.latest_completed_slot = aggregate_state
.latest_completed_slot
.map(|latest| latest.max(slot))
Expand Down Expand Up @@ -1374,6 +1383,115 @@ mod test {

assert_eq!(result.unwrap_err().to_string(), "Message not found");
}

/// Test that verifies only one event is sent per slot, even when updates arrive out of order
/// or when a slot is processed multiple times.
#[tokio::test]
pub async fn test_out_of_order_updates_send_single_event_per_slot() {
let (state, mut update_rx) = setup_state(10).await;

// Create price feed messages
let price_feed_100 = create_dummy_price_feed_message(100, 10, 9);
let price_feed_101 = create_dummy_price_feed_message(100, 11, 10);

// First, process slot 100
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_100)], 100, 20),
)
.await;

// Check that we received the New event for slot 100
assert_eq!(
update_rx.recv().await,
Ok(AggregationEvent::New { slot: 100 })
);

// Next, process slot 101
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_101)], 101, 21),
)
.await;

// Check that we received the New event for slot 101
assert_eq!(
update_rx.recv().await,
Ok(AggregationEvent::New { slot: 101 })
);

// Now, process slot 100 again
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_100)], 100, 22),
)
.await;

// Try to receive another event with a timeout to ensure no more events were sent
// We should not receive an OutOfOrder event for slot 100 since we've already sent an event for it
let timeout_result =
tokio::time::timeout(std::time::Duration::from_millis(100), update_rx.recv()).await;

// The timeout should occur, indicating no more events were received
assert!(
timeout_result.is_err(),
"Received unexpected additional event"
);

// Verify that both price feeds were stored correctly
let price_feed_ids = (*state).get_price_feed_ids().await;
assert_eq!(price_feed_ids.len(), 1);
assert!(price_feed_ids.contains(&PriceIdentifier::new([100; 32])));
}

/// Test that verifies only one event is sent when multiple concurrent updates
/// for the same slot are processed.
#[tokio::test]
pub async fn test_concurrent_updates_same_slot_sends_single_event() {
let (state, mut update_rx) = setup_state(10).await;

// Create a single price feed message
let price_feed = create_dummy_price_feed_message(100, 10, 9);

// Generate 100 identical updates for the same slot but with different sequence numbers
let mut all_updates = Vec::new();
for seq in 0..100 {
let updates = generate_update(vec![Message::PriceFeedMessage(price_feed)], 10, seq);
all_updates.extend(updates);
}

// Process updates concurrently - we don't care if some fail due to the race condition
// The important thing is that only one event is sent
let state_arc = Arc::clone(&state);
let futures = all_updates.into_iter().map(move |u| {
let state_clone = Arc::clone(&state_arc);
async move {
let _ = state_clone.store_update(u).await;
}
});
futures::future::join_all(futures).await;

// Check that only one AggregationEvent::New is received
assert_eq!(
update_rx.recv().await,
Ok(AggregationEvent::New { slot: 10 })
);

// Try to receive another event with a timeout to ensure no more events were sent
let timeout_result =
tokio::time::timeout(std::time::Duration::from_millis(100), update_rx.recv()).await;

// The timeout should occur, indicating no more events were received
assert!(
timeout_result.is_err(),
"Received unexpected additional event"
);

// Verify that the price feed was stored correctly
let price_feed_ids = (*state).get_price_feed_ids().await;
assert_eq!(price_feed_ids.len(), 1);
assert!(price_feed_ids.contains(&PriceIdentifier::new([100; 32])));
}
}
#[cfg(test)]
/// Unit tests for the core TWAP calculation logic in `calculate_twap`
Expand Down
Loading