@@ -105,6 +105,10 @@ pub struct AggregateStateData {
105105
106106 /// Aggregate Specific Metrics
107107 pub metrics : metrics:: Metrics ,
108+
109+ /// Set of slots for which events have already been sent.
110+ /// This prevents sending multiple events for the same slot.
111+ pub slots_with_sent_events : HashSet < Slot > ,
108112}
109113
110114impl AggregateStateData {
@@ -120,6 +124,7 @@ impl AggregateStateData {
120124 metrics : metrics:: Metrics :: new ( metrics_registry) ,
121125 readiness_staleness_threshold,
122126 readiness_max_allowed_slot_lag,
127+ slots_with_sent_events : HashSet :: new ( ) ,
123128 }
124129 }
125130}
@@ -348,42 +353,32 @@ where
348353 tracing:: info!( len = message_states. len( ) , "Storing Message States." ) ;
349354 self . store_message_states ( message_states) . await ?;
350355
351- // First, get the data we need without holding the lock
352- let message_state_keys_clone = message_state_keys. clone ( ) ;
353- let should_prune = {
354- let aggregate_state = self . into ( ) . data . read ( ) . await ;
355- match aggregate_state. latest_completed_slot {
356- None => false ,
357- Some ( latest) if slot > latest => true ,
358- _ => false ,
359- }
360- } ;
356+ // Update the aggregate state
357+ let mut aggregate_state = self . into ( ) . data . write ( ) . await ;
361358
362- // Do the pruning outside the critical section if needed
363- if should_prune {
364- self . prune_removed_keys ( message_state_keys_clone) . await ;
359+ // Check if we've already sent an event for this slot
360+ if aggregate_state. slots_with_sent_events . contains ( & slot) {
361+ // We've already sent an event for this slot, don't send another one
362+ return Ok ( ( ) ) ;
365363 }
366364
367- // Now acquire the write lock for the state update and event sending
368- let mut aggregate_state = self . into ( ) . data . write ( ) . await ;
369-
370365 // Atomic check and update
371366 let event = match aggregate_state. latest_completed_slot {
372367 None => {
373368 aggregate_state. latest_completed_slot = Some ( slot) ;
374369 AggregationEvent :: New { slot }
375370 }
376371 Some ( latest) if slot > latest => {
372+ self . prune_removed_keys ( message_state_keys) . await ;
377373 aggregate_state. latest_completed_slot = Some ( slot) ;
378374 AggregationEvent :: New { slot }
379375 }
380- Some ( latest) if slot == latest => {
381- // Don't send duplicate events for the same slot
382- return Ok ( ( ) ) ;
383- }
384376 _ => AggregationEvent :: OutOfOrder { slot } ,
385377 } ;
386378
379+ // Mark this slot as having sent an event
380+ aggregate_state. slots_with_sent_events . insert ( slot) ;
381+
387382 // Only send the event after the state has been updated
388383 let _ = self . into ( ) . api_update_tx . send ( event) ;
389384
@@ -1388,6 +1383,115 @@ mod test {
13881383
13891384 assert_eq ! ( result. unwrap_err( ) . to_string( ) , "Message not found" ) ;
13901385 }
1386+
1387+ /// Test that verifies only one event is sent per slot, even when updates arrive out of order
1388+ /// or when a slot is processed multiple times.
1389+ #[ tokio:: test]
1390+ pub async fn test_out_of_order_updates_send_single_event_per_slot ( ) {
1391+ let ( state, mut update_rx) = setup_state ( 10 ) . await ;
1392+
1393+ // Create price feed messages
1394+ let price_feed_100 = create_dummy_price_feed_message ( 100 , 10 , 9 ) ;
1395+ let price_feed_101 = create_dummy_price_feed_message ( 100 , 11 , 10 ) ;
1396+
1397+ // First, process slot 100
1398+ store_multiple_concurrent_valid_updates (
1399+ state. clone ( ) ,
1400+ generate_update ( vec ! [ Message :: PriceFeedMessage ( price_feed_100) ] , 100 , 20 ) ,
1401+ )
1402+ . await ;
1403+
1404+ // Check that we received the New event for slot 100
1405+ assert_eq ! (
1406+ update_rx. recv( ) . await ,
1407+ Ok ( AggregationEvent :: New { slot: 100 } )
1408+ ) ;
1409+
1410+ // Next, process slot 101
1411+ store_multiple_concurrent_valid_updates (
1412+ state. clone ( ) ,
1413+ generate_update ( vec ! [ Message :: PriceFeedMessage ( price_feed_101) ] , 101 , 21 ) ,
1414+ )
1415+ . await ;
1416+
1417+ // Check that we received the New event for slot 101
1418+ assert_eq ! (
1419+ update_rx. recv( ) . await ,
1420+ Ok ( AggregationEvent :: New { slot: 101 } )
1421+ ) ;
1422+
1423+ // Now, process slot 100 again
1424+ store_multiple_concurrent_valid_updates (
1425+ state. clone ( ) ,
1426+ generate_update ( vec ! [ Message :: PriceFeedMessage ( price_feed_100) ] , 100 , 22 ) ,
1427+ )
1428+ . await ;
1429+
1430+ // Try to receive another event with a timeout to ensure no more events were sent
1431+ // We should not receive an OutOfOrder event for slot 100 since we've already sent an event for it
1432+ let timeout_result =
1433+ tokio:: time:: timeout ( std:: time:: Duration :: from_millis ( 100 ) , update_rx. recv ( ) ) . await ;
1434+
1435+ // The timeout should occur, indicating no more events were received
1436+ assert ! (
1437+ timeout_result. is_err( ) ,
1438+ "Received unexpected additional event"
1439+ ) ;
1440+
1441+ // Verify that both price feeds were stored correctly
1442+ let price_feed_ids = ( * state) . get_price_feed_ids ( ) . await ;
1443+ assert_eq ! ( price_feed_ids. len( ) , 1 ) ;
1444+ assert ! ( price_feed_ids. contains( & PriceIdentifier :: new( [ 100 ; 32 ] ) ) ) ;
1445+ }
1446+
1447+ /// Test that verifies only one event is sent when multiple concurrent updates
1448+ /// for the same slot are processed.
1449+ #[ tokio:: test]
1450+ pub async fn test_concurrent_updates_same_slot_sends_single_event ( ) {
1451+ let ( state, mut update_rx) = setup_state ( 10 ) . await ;
1452+
1453+ // Create a single price feed message
1454+ let price_feed = create_dummy_price_feed_message ( 100 , 10 , 9 ) ;
1455+
1456+ // Generate 100 identical updates for the same slot but with different sequence numbers
1457+ let mut all_updates = Vec :: new ( ) ;
1458+ for seq in 0 ..100 {
1459+ let updates = generate_update ( vec ! [ Message :: PriceFeedMessage ( price_feed) ] , 10 , seq) ;
1460+ all_updates. extend ( updates) ;
1461+ }
1462+
1463+ // Process updates concurrently - we don't care if some fail due to the race condition
1464+ // The important thing is that only one event is sent
1465+ let state_arc = Arc :: clone ( & state) ;
1466+ let futures = all_updates. into_iter ( ) . map ( move |u| {
1467+ let state_clone = Arc :: clone ( & state_arc) ;
1468+ async move {
1469+ let _ = state_clone. store_update ( u) . await ;
1470+ }
1471+ } ) ;
1472+ futures:: future:: join_all ( futures) . await ;
1473+
1474+ // Check that only one AggregationEvent::New is received
1475+ assert_eq ! (
1476+ update_rx. recv( ) . await ,
1477+ Ok ( AggregationEvent :: New { slot: 10 } )
1478+ ) ;
1479+
1480+ // Try to receive another event with a timeout to ensure no more events were sent
1481+ let timeout_result =
1482+ tokio:: time:: timeout ( std:: time:: Duration :: from_millis ( 100 ) , update_rx. recv ( ) ) . await ;
1483+
1484+ // The timeout should occur, indicating no more events were received
1485+ assert ! (
1486+ timeout_result. is_err( ) ,
1487+ "Received unexpected additional event"
1488+ ) ;
1489+
1490+ // Verify that the price feed was stored correctly
1491+ let price_feed_ids = ( * state) . get_price_feed_ids ( ) . await ;
1492+ assert_eq ! ( price_feed_ids. len( ) , 1 ) ;
1493+ assert ! ( price_feed_ids. contains( & PriceIdentifier :: new( [ 100 ; 32 ] ) ) ) ;
1494+ }
13911495}
13921496#[ cfg( test) ]
13931497/// Unit tests for the core TWAP calculation logic in `calculate_twap`
@@ -1434,7 +1538,7 @@ mod calculate_twap_unit_tests {
14341538
14351539 #[ test]
14361540 fn test_invalid_timestamps ( ) {
1437- let start = create_basic_twap_message ( 100 , 100 , 110 , 1000 ) ;
1541+ let start = create_basic_twap_message ( 100 , 100 , 90 , 1000 ) ;
14381542 let end = create_basic_twap_message ( 300 , 200 , 180 , 1100 ) ;
14391543
14401544 let err = calculate_twap ( & start, & end) . unwrap_err ( ) ;
0 commit comments