11use std:: {
2- collections:: HashSet ,
2+ collections:: { HashMap , HashSet } ,
33 path:: { Path , PathBuf } ,
44 sync:: Arc ,
55} ;
66
77use acropolis_common:: {
88 Address , AddressDelta , AddressTotals , BlockInfo , ShelleyAddress , TxIdentifier , UTxOIdentifier ,
9- ValueDelta ,
9+ ValueDelta , ValueDeltaMap ,
1010} ;
1111use anyhow:: Result ;
1212
@@ -172,10 +172,14 @@ impl State {
172172 && block_info. number > self . volatile . epoch_start_block + self . volatile . security_param_k
173173 }
174174
175- pub fn apply_address_deltas ( & mut self , deltas : & [ AddressDelta ] ) -> Result < ( ) > {
175+ pub fn apply_address_deltas ( & mut self , deltas : & [ AddressDelta ] ) {
176176 let addresses = self . volatile . window . back_mut ( ) . expect ( "window should never be empty" ) ;
177177
178+ // Keeps track seen txs to avoid overcounting totals tx count and duplicating tx identifiers
179+ let mut seen: HashMap < Address , HashSet < TxIdentifier > > = HashMap :: new ( ) ;
180+
178181 for delta in deltas {
182+ let tx_id = TxIdentifier :: from ( delta. utxo ) ;
179183 let entry = addresses. entry ( delta. address . clone ( ) ) . or_default ( ) ;
180184
181185 if self . config . store_info {
@@ -187,18 +191,33 @@ impl State {
187191 }
188192 }
189193
190- if self . config . store_transactions {
191- let txs = entry. transactions . get_or_insert ( Vec :: new ( ) ) ;
192- txs. push ( TxIdentifier :: from ( delta. utxo ) )
193- }
194+ if self . config . store_transactions || self . config . store_totals {
195+ let seen_for_addr = seen. entry ( delta. address . clone ( ) ) . or_default ( ) ;
194196
195- if self . config . store_totals {
196- let totals = entry. totals . get_or_insert ( Vec :: new ( ) ) ;
197- totals. push ( delta. value . clone ( ) ) ;
197+ if self . config . store_transactions {
198+ let txs = entry. transactions . get_or_insert ( Vec :: new ( ) ) ;
199+ if !seen_for_addr. contains ( & tx_id) {
200+ txs. push ( tx_id) ;
201+ }
202+ }
203+ if self . config . store_totals {
204+ let totals = entry. totals . get_or_insert ( Vec :: new ( ) ) ;
205+
206+ if seen_for_addr. contains ( & tx_id) {
207+ if let Some ( last_total) = totals. last_mut ( ) {
208+ // Create temporary map for summing same tx deltas efficiently
209+ // TODO: Potentially move upstream to address deltas publisher
210+ let mut map = ValueDeltaMap :: from ( last_total. clone ( ) ) ;
211+ map += delta. value . clone ( ) ;
212+ * last_total = ValueDelta :: from ( map) ;
213+ }
214+ } else {
215+ totals. push ( delta. value . clone ( ) ) ;
216+ }
217+ }
218+ seen_for_addr. insert ( tx_id) ;
198219 }
199220 }
200-
201- Ok ( ( ) )
202221 }
203222
204223 pub async fn get_addresses_totals (
@@ -277,7 +296,7 @@ mod tests {
277296 let deltas = vec ! [ delta( & addr, & utxo, 1 ) ] ;
278297
279298 // Apply deltas
280- state. apply_address_deltas ( & deltas) ? ;
299+ state. apply_address_deltas ( & deltas) ;
281300
282301 // Verify UTxO is retrievable when in volatile
283302 let utxos = state. get_address_utxos ( & addr) . await ?;
@@ -319,7 +338,7 @@ mod tests {
319338 let created = vec ! [ delta( & addr, & utxo, 1 ) ] ;
320339
321340 // Apply delta to volatile
322- state. apply_address_deltas ( & created) ? ;
341+ state. apply_address_deltas ( & created) ;
323342
324343 // Drain volatile to immutable pending
325344 state. volatile . epoch_start_block = 1 ;
@@ -333,7 +352,7 @@ mod tests {
333352 assert_eq ! ( after_persist. as_ref( ) . unwrap( ) , & [ utxo] ) ;
334353
335354 state. volatile . next_block ( ) ;
336- state. apply_address_deltas ( & [ delta ( & addr, & utxo, -1 ) ] ) ? ;
355+ state. apply_address_deltas ( & [ delta ( & addr, & utxo, -1 ) ] ) ;
337356
338357 // Verify UTxO was removed while in volatile
339358 let after_spend_volatile = state. get_address_utxos ( & addr) . await ?;
@@ -368,9 +387,9 @@ mod tests {
368387
369388 state. volatile . epoch_start_block = 1 ;
370389
371- state. apply_address_deltas ( & [ delta ( & addr, & utxo_old, 1 ) ] ) ? ;
390+ state. apply_address_deltas ( & [ delta ( & addr, & utxo_old, 1 ) ] ) ;
372391 state. volatile . next_block ( ) ;
373- state. apply_address_deltas ( & [ delta ( & addr, & utxo_old, -1 ) , delta ( & addr, & utxo_new, 1 ) ] ) ? ;
392+ state. apply_address_deltas ( & [ delta ( & addr, & utxo_old, -1 ) , delta ( & addr, & utxo_new, 1 ) ] ) ;
374393
375394 // Verify Create and spend both in volatile is not included in address utxos
376395 let volatile = state. get_address_utxos ( & addr) . await ?;
@@ -400,4 +419,67 @@ mod tests {
400419
401420 Ok ( ( ) )
402421 }
422+
423+ #[ tokio:: test]
424+ async fn test_same_tx_deltas_sums_totals_in_volatile ( ) -> Result < ( ) > {
425+ let _ = tracing_subscriber:: fmt:: try_init ( ) ;
426+
427+ let mut state = setup_state_and_store ( ) . await ?;
428+
429+ let addr = dummy_address ( ) ;
430+ let delta_1 = UTxOIdentifier :: new ( 0 , 1 , 0 ) ;
431+ let delta_2 = UTxOIdentifier :: new ( 0 , 1 , 1 ) ;
432+
433+ state. volatile . epoch_start_block = 1 ;
434+
435+ state. apply_address_deltas ( & [ delta ( & addr, & delta_1, 1 ) , delta ( & addr, & delta_2, 1 ) ] ) ;
436+
437+ // Verify only 1 totals entry with delta of 2
438+ let volatile = state
439+ . volatile
440+ . window
441+ . back ( )
442+ . expect ( "Window should have a delta" )
443+ . get ( & addr)
444+ . expect ( "Entry should be populated" )
445+ . totals
446+ . as_ref ( )
447+ . expect ( "Totals should be populated" ) ;
448+
449+ assert_eq ! ( volatile. len( ) , 1 ) ;
450+ assert_eq ! ( volatile. first( ) . expect( "Should be populated" ) . lovelace, 2 ) ;
451+
452+ Ok ( ( ) )
453+ }
454+
455+ #[ tokio:: test]
456+ async fn test_same_tx_deltas_prevents_duplicate_identifier_in_volatile ( ) -> Result < ( ) > {
457+ let _ = tracing_subscriber:: fmt:: try_init ( ) ;
458+
459+ let mut state = setup_state_and_store ( ) . await ?;
460+
461+ let addr = dummy_address ( ) ;
462+ let delta_1 = UTxOIdentifier :: new ( 0 , 1 , 0 ) ;
463+ let delta_2 = UTxOIdentifier :: new ( 0 , 1 , 1 ) ;
464+
465+ state. volatile . epoch_start_block = 1 ;
466+
467+ state. apply_address_deltas ( & [ delta ( & addr, & delta_1, 1 ) , delta ( & addr, & delta_2, 1 ) ] ) ;
468+
469+ // Verify only 1 transactions entry
470+ let volatile = state
471+ . volatile
472+ . window
473+ . back ( )
474+ . expect ( "Window should have a delta" )
475+ . get ( & addr)
476+ . expect ( "Entry should be populated" )
477+ . transactions
478+ . as_ref ( )
479+ . expect ( "Transactions should be populated" ) ;
480+
481+ assert_eq ! ( volatile. len( ) , 1 ) ;
482+
483+ Ok ( ( ) )
484+ }
403485}
0 commit comments