@@ -105,10 +105,6 @@ pub struct AggregateStateData {
105
105
106
106
/// Aggregate Specific Metrics
107
107
pub metrics : metrics:: Metrics ,
108
-
109
- /// Set of slots for which events have already been sent.
110
- /// This prevents sending multiple events for the same slot.
111
- pub slots_with_sent_events : HashSet < Slot > ,
112
108
}
113
109
114
110
impl AggregateStateData {
@@ -124,7 +120,6 @@ impl AggregateStateData {
124
120
metrics : metrics:: Metrics :: new ( metrics_registry) ,
125
121
readiness_staleness_threshold,
126
122
readiness_max_allowed_slot_lag,
127
- slots_with_sent_events : HashSet :: new ( ) ,
128
123
}
129
124
}
130
125
}
@@ -287,13 +282,23 @@ where
287
282
WormholePayload :: Merkle ( proof) => {
288
283
tracing:: info!( slot = proof. slot, "Storing VAA Merkle Proof." ) ;
289
284
290
- store_wormhole_merkle_verified_message (
285
+ // Store the wormhole merkle verified message and check if it was already stored
286
+ let is_new = store_wormhole_merkle_verified_message (
291
287
self ,
292
288
proof. clone ( ) ,
293
289
update_vaa. to_owned ( ) ,
294
290
)
295
291
. await ?;
296
292
293
+ // If the message was already stored, return early
294
+ if !is_new {
295
+ tracing:: info!(
296
+ slot = proof. slot,
297
+ "VAA Merkle Proof already stored, skipping."
298
+ ) ;
299
+ return Ok ( ( ) ) ;
300
+ }
301
+
297
302
self . into ( )
298
303
. data
299
304
. write ( )
@@ -309,6 +314,15 @@ where
309
314
let slot = accumulator_messages. slot ;
310
315
tracing:: info!( slot = slot, "Storing Accumulator Messages." ) ;
311
316
317
+ // Check if we already have accumulator messages for this slot
318
+ if ( self . fetch_accumulator_messages ( slot) . await ?) . is_some ( ) {
319
+ tracing:: info!(
320
+ slot = slot,
321
+ "Accumulator Messages already stored, skipping."
322
+ ) ;
323
+ return Ok ( ( ) ) ;
324
+ }
325
+
312
326
self . store_accumulator_messages ( accumulator_messages)
313
327
. await ?;
314
328
@@ -356,12 +370,6 @@ where
356
370
// Update the aggregate state
357
371
let mut aggregate_state = self . into ( ) . data . write ( ) . await ;
358
372
359
- // Check if we've already sent an event for this slot
360
- if aggregate_state. slots_with_sent_events . contains ( & slot) {
361
- // We've already sent an event for this slot, don't send another one
362
- return Ok ( ( ) ) ;
363
- }
364
-
365
373
// Atomic check and update
366
374
let event = match aggregate_state. latest_completed_slot {
367
375
None => {
@@ -376,9 +384,6 @@ where
376
384
_ => AggregationEvent :: OutOfOrder { slot } ,
377
385
} ;
378
386
379
- // Mark this slot as having sent an event
380
- aggregate_state. slots_with_sent_events . insert ( slot) ;
381
-
382
387
// Only send the event after the state has been updated
383
388
let _ = self . into ( ) . api_update_tx . send ( event) ;
384
389
@@ -1538,7 +1543,7 @@ mod calculate_twap_unit_tests {
1538
1543
1539
1544
#[ test]
1540
1545
fn test_invalid_timestamps ( ) {
1541
- let start = create_basic_twap_message ( 100 , 100 , 110 , 1000 ) ;
1546
+ let start = create_basic_twap_message ( 100 , 100 , 90 , 1000 ) ;
1542
1547
let end = create_basic_twap_message ( 300 , 200 , 180 , 1100 ) ;
1543
1548
1544
1549
let err = calculate_twap ( & start, & end) . unwrap_err ( ) ;
0 commit comments