Skip to content

Commit f979bf3

Browse files
committed
fix: merge volatile totals for deltas of the same tx to avoid overcounting txs
Signed-off-by: William Hankins <[email protected]>
1 parent 1db69bc commit f979bf3

File tree

3 files changed

+90
-14
lines changed

3 files changed

+90
-14
lines changed

common/src/types.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ pub type PolicyId = [u8; 28];
254254
pub type NativeAssets = Vec<(PolicyId, Vec<NativeAsset>)>;
255255
pub type NativeAssetsDelta = Vec<(PolicyId, Vec<NativeAssetDelta>)>;
256256
pub type NativeAssetsMap = HashMap<PolicyId, HashMap<AssetName, u64>>;
257+
pub type NativeAssetsDeltaMap = HashMap<PolicyId, HashMap<AssetName, i64>>;
257258

258259
#[derive(
259260
Debug,
@@ -401,6 +402,63 @@ impl AddAssign for ValueMap {
401402
}
402403
}
403404

405+
/// Hashmap representation of ValueDelta (lovelace + multiasset)
406+
pub struct ValueDeltaMap {
407+
pub lovelace: i64,
408+
pub assets: NativeAssetsDeltaMap,
409+
}
410+
411+
impl From<ValueDelta> for ValueDeltaMap {
412+
fn from(value: ValueDelta) -> Self {
413+
let mut assets = HashMap::new();
414+
415+
for (policy, asset_list) in value.assets {
416+
let policy_entry = assets.entry(policy).or_insert_with(HashMap::new);
417+
for asset in asset_list {
418+
*policy_entry.entry(asset.name).or_insert(0) += asset.amount;
419+
}
420+
}
421+
422+
ValueDeltaMap {
423+
lovelace: value.lovelace,
424+
assets,
425+
}
426+
}
427+
}
428+
429+
impl AddAssign<ValueDelta> for ValueDeltaMap {
430+
fn add_assign(&mut self, delta: ValueDelta) {
431+
self.lovelace += delta.lovelace;
432+
433+
for (policy, assets) in delta.assets {
434+
let policy_entry = self.assets.entry(policy).or_default();
435+
for asset in assets {
436+
*policy_entry.entry(asset.name).or_insert(0) += asset.amount;
437+
}
438+
}
439+
}
440+
}
441+
442+
impl From<ValueDeltaMap> for ValueDelta {
443+
fn from(map: ValueDeltaMap) -> Self {
444+
let mut assets_vec = Vec::with_capacity(map.assets.len());
445+
446+
for (policy, asset_map) in map.assets {
447+
let inner_assets = asset_map
448+
.into_iter()
449+
.map(|(name, amount)| NativeAssetDelta { name, amount })
450+
.collect();
451+
452+
assets_vec.push((policy, inner_assets));
453+
}
454+
455+
ValueDelta {
456+
lovelace: map.lovelace,
457+
assets: assets_vec,
458+
}
459+
}
460+
}
461+
404462
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
405463
pub struct ValueDelta {
406464
pub lovelace: i64,

modules/address_state/src/address_state.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,7 @@ impl AddressState {
117117
}
118118

119119
// Add deltas to volatile
120-
if let Err(e) = state.apply_address_deltas(&address_deltas_msg.deltas) {
121-
error!("address deltas handling error: {e:#}");
122-
}
120+
state.apply_address_deltas(&address_deltas_msg.deltas);
123121

124122
store = state.immutable.clone();
125123
config = state.config.clone();

modules/address_state/src/state.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66

77
use acropolis_common::{
88
Address, AddressDelta, AddressTotals, BlockInfo, ShelleyAddress, TxIdentifier, UTxOIdentifier,
9-
ValueDelta,
9+
ValueDelta, ValueDeltaMap,
1010
};
1111
use anyhow::Result;
1212

@@ -172,12 +172,19 @@ impl State {
172172
&& block_info.number > self.volatile.epoch_start_block + self.volatile.security_param_k
173173
}
174174

175-
pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) -> Result<()> {
175+
pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) {
176176
let addresses = self.volatile.window.back_mut().expect("window should never be empty");
177177

178+
// Keeps track of previous tx to avoid duplicating tx hashes or overcounting totals tx count
179+
let mut last_block: Option<u32> = None;
180+
let mut last_tx_index: Option<u16> = None;
181+
178182
for delta in deltas {
179183
let entry = addresses.entry(delta.address.clone()).or_default();
180184

185+
let same_tx = last_block == Some(delta.utxo.block_number())
186+
&& last_tx_index == Some(delta.utxo.tx_index());
187+
181188
if self.config.store_info {
182189
let utxos = entry.utxos.get_or_insert(Vec::new());
183190
if delta.value.lovelace > 0 {
@@ -189,16 +196,29 @@ impl State {
189196

190197
if self.config.store_transactions {
191198
let txs = entry.transactions.get_or_insert(Vec::new());
192-
txs.push(TxIdentifier::from(delta.utxo))
199+
200+
if !same_tx {
201+
txs.push(TxIdentifier::from(delta.utxo));
202+
}
193203
}
194204

195205
if self.config.store_totals {
196206
let totals = entry.totals.get_or_insert(Vec::new());
197-
totals.push(delta.value.clone());
207+
208+
if same_tx {
209+
if let Some(last_total) = totals.last_mut() {
210+
// Create temporary map for summing same tx deltas efficently
211+
let mut map = ValueDeltaMap::from(last_total.clone());
212+
map += delta.value.clone();
213+
*last_total = ValueDelta::from(map);
214+
}
215+
} else {
216+
totals.push(delta.value.clone());
217+
}
198218
}
219+
last_block = Some(delta.utxo.block_number());
220+
last_tx_index = Some(delta.utxo.tx_index());
199221
}
200-
201-
Ok(())
202222
}
203223

204224
pub async fn get_addresses_totals(
@@ -277,7 +297,7 @@ mod tests {
277297
let deltas = vec![delta(&addr, &utxo, 1)];
278298

279299
// Apply deltas
280-
state.apply_address_deltas(&deltas)?;
300+
state.apply_address_deltas(&deltas);
281301

282302
// Verify UTxO is retrievable when in volatile
283303
let utxos = state.get_address_utxos(&addr).await?;
@@ -319,7 +339,7 @@ mod tests {
319339
let created = vec![delta(&addr, &utxo, 1)];
320340

321341
// Apply delta to volatile
322-
state.apply_address_deltas(&created)?;
342+
state.apply_address_deltas(&created);
323343

324344
// Drain volatile to immutable pending
325345
state.volatile.epoch_start_block = 1;
@@ -333,7 +353,7 @@ mod tests {
333353
assert_eq!(after_persist.as_ref().unwrap(), &[utxo]);
334354

335355
state.volatile.next_block();
336-
state.apply_address_deltas(&[delta(&addr, &utxo, -1)])?;
356+
state.apply_address_deltas(&[delta(&addr, &utxo, -1)]);
337357

338358
// Verify UTxO was removed while in volatile
339359
let after_spend_volatile = state.get_address_utxos(&addr).await?;
@@ -368,9 +388,9 @@ mod tests {
368388

369389
state.volatile.epoch_start_block = 1;
370390

371-
state.apply_address_deltas(&[delta(&addr, &utxo_old, 1)])?;
391+
state.apply_address_deltas(&[delta(&addr, &utxo_old, 1)]);
372392
state.volatile.next_block();
373-
state.apply_address_deltas(&[delta(&addr, &utxo_old, -1), delta(&addr, &utxo_new, 1)])?;
393+
state.apply_address_deltas(&[delta(&addr, &utxo_old, -1), delta(&addr, &utxo_new, 1)]);
374394

375395
// Verify Create and spend both in volatile is not included in address utxos
376396
let volatile = state.get_address_utxos(&addr).await?;

0 commit comments

Comments
 (0)