@@ -351,28 +351,27 @@ where
351
351
// Update the aggregate state
352
352
let mut aggregate_state = self . into ( ) . data . write ( ) . await ;
353
353
354
- // Send update event to subscribers. We are purposefully ignoring the result
355
- // because there might be no subscribers.
356
- let _ = match aggregate_state. latest_completed_slot {
354
+ // Atomic check and update
355
+ let event = match aggregate_state. latest_completed_slot {
357
356
None => {
358
- aggregate_state. latest_completed_slot . replace ( slot) ;
359
- self . into ( )
360
- . api_update_tx
361
- . send ( AggregationEvent :: New { slot } )
357
+ aggregate_state. latest_completed_slot = Some ( slot) ;
358
+ AggregationEvent :: New { slot }
362
359
}
363
360
Some ( latest) if slot > latest => {
364
361
self . prune_removed_keys ( message_state_keys) . await ;
365
- aggregate_state. latest_completed_slot . replace ( slot) ;
366
- self . into ( )
367
- . api_update_tx
368
- . send ( AggregationEvent :: New { slot } )
362
+ aggregate_state. latest_completed_slot = Some ( slot) ;
363
+ AggregationEvent :: New { slot }
369
364
}
370
- _ => self
371
- . into ( )
372
- . api_update_tx
373
- . send ( AggregationEvent :: OutOfOrder { slot } ) ,
365
+ Some ( latest) if slot == latest => {
366
+ // Don't send duplicate events for the same slot
367
+ return Ok ( ( ) ) ;
368
+ }
369
+ _ => AggregationEvent :: OutOfOrder { slot } ,
374
370
} ;
375
371
372
+ // Only send the event after the state has been updated
373
+ let _ = self . into ( ) . api_update_tx . send ( event) ;
374
+
376
375
aggregate_state. latest_completed_slot = aggregate_state
377
376
. latest_completed_slot
378
377
. map ( |latest| latest. max ( slot) )
0 commit comments