Skip to content

Commit 927ea2b

Browse files
committed
fix: cleanup
Signed-off-by: William Hankins <[email protected]>
1 parent 3075dcd commit 927ea2b

File tree

5 files changed

+69
-119
lines changed

5 files changed

+69
-119
lines changed

common/src/types.rs

Lines changed: 22 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -186,15 +186,31 @@ impl PartialOrd for BlockInfo {
186186
}
187187
}
188188

189+
// Individual transaction UTxO deltas
190+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
191+
pub struct TxUTxODeltas {
192+
// Transaction in which delta occured
193+
pub tx_identifier: TxIdentifier,
194+
195+
// Created and spent UTxOs
196+
pub inputs: Vec<UTxOIdentifier>,
197+
pub outputs: Vec<TxOutput>,
198+
}
199+
189200
/// Individual address balance change
190201
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
191202
pub struct AddressDelta {
203+
// Address involved in delta
192204
pub address: Address,
205+
206+
// Transaction in which delta occured
193207
pub tx_identifier: TxIdentifier,
194208

209+
// Address impacted UTxOs
195210
pub spent_utxos: Vec<UTxOIdentifier>,
196211
pub created_utxos: Vec<UTxOIdentifier>,
197212

213+
// Sums of spent and created UTxOs
198214
pub sent: ValueDelta,
199215
pub received: Value,
200216
}
@@ -405,63 +421,6 @@ impl AddAssign for ValueMap {
405421
}
406422
}
407423

408-
/// Hashmap representation of ValueDelta (lovelace + multiasset)
409-
pub struct ValueDeltaMap {
410-
pub lovelace: i64,
411-
pub assets: NativeAssetsDeltaMap,
412-
}
413-
414-
impl From<ValueDelta> for ValueDeltaMap {
415-
fn from(value: ValueDelta) -> Self {
416-
let mut assets = HashMap::new();
417-
418-
for (policy, asset_list) in value.assets {
419-
let policy_entry = assets.entry(policy).or_insert_with(HashMap::new);
420-
for asset in asset_list {
421-
*policy_entry.entry(asset.name).or_insert(0) += asset.amount;
422-
}
423-
}
424-
425-
ValueDeltaMap {
426-
lovelace: value.lovelace,
427-
assets,
428-
}
429-
}
430-
}
431-
432-
impl AddAssign<ValueDelta> for ValueDeltaMap {
433-
fn add_assign(&mut self, delta: ValueDelta) {
434-
self.lovelace += delta.lovelace;
435-
436-
for (policy, assets) in delta.assets {
437-
let policy_entry = self.assets.entry(policy).or_default();
438-
for asset in assets {
439-
*policy_entry.entry(asset.name).or_insert(0) += asset.amount;
440-
}
441-
}
442-
}
443-
}
444-
445-
impl From<ValueDeltaMap> for ValueDelta {
446-
fn from(map: ValueDeltaMap) -> Self {
447-
let mut assets_vec = Vec::with_capacity(map.assets.len());
448-
449-
for (policy, asset_map) in map.assets {
450-
let inner_assets = asset_map
451-
.into_iter()
452-
.map(|(name, amount)| NativeAssetDelta { name, amount })
453-
.collect();
454-
455-
assets_vec.push((policy, inner_assets));
456-
}
457-
458-
ValueDelta {
459-
lovelace: map.lovelace,
460-
assets: assets_vec,
461-
}
462-
}
463-
}
464-
465424
impl ValueMap {
466425
pub fn add_value(&mut self, other: &Value) {
467426
// Handle lovelace
@@ -619,23 +578,6 @@ pub struct TxOutput {
619578
pub reference_script: Option<ReferenceScript>,
620579
}
621580

622-
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
623-
pub struct TxUTxODeltas {
624-
pub tx_identifier: TxIdentifier,
625-
pub inputs: Vec<UTxOIdentifier>,
626-
pub outputs: Vec<TxOutput>,
627-
}
628-
629-
impl TxUTxODeltas {
630-
pub fn new(tx_identifier: TxIdentifier) -> Self {
631-
Self {
632-
tx_identifier,
633-
inputs: Vec::new(),
634-
outputs: Vec::new(),
635-
}
636-
}
637-
}
638-
639581
/// Key hash
640582
pub type KeyHash = Hash<28>;
641583

@@ -2185,6 +2127,12 @@ pub struct AssetAddressEntry {
21852127
pub quantity: u64,
21862128
}
21872129

2130+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2131+
pub struct TxTotals {
2132+
pub sent: ValueDelta,
2133+
pub received: Value,
2134+
}
2135+
21882136
#[derive(
21892137
Debug, Default, Clone, serde::Serialize, serde::Deserialize, minicbor::Encode, minicbor::Decode,
21902138
)]
@@ -2197,12 +2145,6 @@ pub struct AddressTotals {
21972145
pub tx_count: u64,
21982146
}
21992147

2200-
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2201-
pub struct TxTotals {
2202-
pub sent: ValueDelta,
2203-
pub received: Value,
2204-
}
2205-
22062148
impl AddAssign for AddressTotals {
22072149
fn add_assign(&mut self, other: Self) {
22082150
self.sent += other.sent;

modules/assets_state/src/state.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! Acropolis AssetsState: State storage
22
3+
use std::collections::HashSet;
4+
35
use crate::asset_registry::{AssetId, AssetRegistry};
46
use acropolis_common::{
57
math::update_value_with_delta,
@@ -370,24 +372,27 @@ impl State {
370372
let store_cfg = self.config.store_transactions;
371373

372374
for tx in deltas {
375+
let mut tx_asset_ids = HashSet::new();
373376
for output in &tx.outputs {
374-
let tx_identifier = TxIdentifier::from(output.utxo_identifier);
375377
for (policy_id, assets) in &output.value.assets {
376378
for asset in assets {
377379
if let Some(asset_id) = registry.lookup_id(policy_id, &asset.name) {
378-
let entry = txs_map.entry(asset_id).or_default();
380+
tx_asset_ids.insert(asset_id);
381+
}
382+
}
383+
}
384+
}
379385

380-
let should_push = entry.back() != Some(&tx_identifier);
386+
for asset_id in &tx_asset_ids {
387+
let entry = txs_map.entry(*asset_id).or_default();
381388

382-
if should_push {
383-
entry.push_back(tx_identifier);
389+
let last = entry.back().copied();
390+
if last != Some(tx.tx_identifier) {
391+
entry.push_back(tx.tx_identifier);
384392

385-
if let StoreTransactions::Last(max) = store_cfg {
386-
if entry.len() as u64 > max {
387-
entry.pop_front();
388-
}
389-
}
390-
}
393+
if let StoreTransactions::Last(max) = store_cfg {
394+
if entry.len() as u64 > max {
395+
entry.pop_front();
391396
}
392397
}
393398
}

modules/stake_delta_filter/src/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,20 +432,20 @@ pub fn process_message(
432432
mod test {
433433
use crate::*;
434434
use acropolis_common::hash::Hash;
435-
use acropolis_common::Value;
436435
use acropolis_common::{
437436
messages::AddressDeltasMessage, Address, AddressDelta, BlockHash, BlockInfo, BlockStatus,
438437
ByronAddress, Era, ShelleyAddress, ShelleyAddressDelegationPart, ShelleyAddressPaymentPart,
439438
ShelleyAddressPointer, StakeAddress, StakeCredential, ValueDelta,
440439
};
440+
use acropolis_common::{TxIdentifier, Value};
441441
use bech32::{Bech32, Hrp};
442442
use pallas::ledger::addresses::{PaymentKeyHash, ScriptHash, StakeKeyHash};
443443

444444
fn parse_addr(s: &str) -> Result<AddressDelta> {
445445
let a = pallas::ledger::addresses::Address::from_bech32(s)?;
446446
Ok(AddressDelta {
447447
address: map_address(&a)?,
448-
tx_identifier: Default::default(),
448+
tx_identifier: TxIdentifier::default(),
449449
spent_utxos: Vec::new(),
450450
created_utxos: Vec::new(),
451451
sent: ValueDelta::default(),

modules/tx_unpacker/src/tx_unpacker.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ impl TxUnpacker {
172172
Ok(tx) => {
173173
let tx_hash: TxHash = tx.hash().to_vec().try_into().expect("invalid tx hash length");
174174
let tx_identifier = TxIdentifier::new(block_number, tx_index);
175-
let mut tx_utxo_deltas = TxUTxODeltas::new(tx_identifier);
176175

177176
let inputs = tx.consumes();
178177
let outputs = tx.produces();
@@ -187,15 +186,16 @@ impl TxUnpacker {
187186
}
188187

189188
if publish_utxo_deltas_topic.is_some() {
190-
// Add all the inputs
191-
for input in inputs { // MultiEraInput
192-
// Lookup and remove UTxOIdentifier from registry
189+
// Group deltas by tx
190+
let mut tx_utxo_deltas = TxUTxODeltas {tx_identifier, inputs: Vec::new(), outputs: Vec::new()};
191+
192+
// Remove inputs from UTxORegistry and push to UTxOIdentifiers to delta
193+
for input in inputs {
193194
let oref = input.output_ref();
194195
let tx_ref = TxOutRef::new(TxHash::from(**oref.hash()), oref.index() as u16);
195196

196197
match utxo_registry.consume(&tx_ref) {
197198
Ok(tx_identifier) => {
198-
// Add TxInput to utxo_deltas
199199
tx_utxo_deltas.inputs.push(
200200
UTxOIdentifier::new(
201201
tx_identifier.block_number(),
@@ -210,9 +210,8 @@ impl TxUnpacker {
210210
}
211211
}
212212

213-
// Add all the outputs
213+
// Add outputs to UTxORegistry and push TxOutputs to delta
214214
for (index, output) in outputs {
215-
// Add TxOutRef to registry
216215
match utxo_registry.add(
217216
block_number,
218217
tx_index,
@@ -225,7 +224,6 @@ impl TxUnpacker {
225224
match output.address() {
226225
Ok(pallas_address) => match map_parameters::map_address(&pallas_address) {
227226
Ok(address) => {
228-
// Add TxOutput to utxo_deltas
229227
tx_utxo_deltas.outputs.push(TxOutput {
230228
utxo_identifier: utxo_id,
231229
address,

modules/utxo_state/src/state.rs

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,7 @@ impl State {
146146
);
147147

148148
// Delete all UTXOs created in or after this block
149-
let created_after = self.volatile_created.prune_on_or_after(block.number);
150-
for key in created_after {
149+
for key in self.volatile_created.prune_on_or_after(block.number) {
151150
self.volatile_utxos.remove(&key);
152151
}
153152

@@ -334,43 +333,40 @@ impl State {
334333

335334
// Process the deltas
336335
for tx in &deltas.deltas {
337-
// Temporary map to sum UTxO deltas efficently
338-
let mut address_map: HashMap<
339-
Address,
340-
(ValueMap, ValueMap, Vec<UTxOIdentifier>, Vec<UTxOIdentifier>),
341-
> = HashMap::new();
336+
// Temporary map to sum UTxO deltas efficiently
337+
let mut address_map: HashMap<Address, AddressTxMap> = HashMap::new();
342338

343339
for input in &tx.inputs {
344340
if let Some(utxo) = self.lookup_utxo(input).await? {
345341
// Remove or mark spent
346342
self.observe_input(input, block).await?;
347343

348344
let addr = utxo.address.clone();
349-
let (sent, _, spent_utxos, _) = address_map.entry(addr.clone()).or_default();
345+
let entry = address_map.entry(addr.clone()).or_default();
350346

351-
spent_utxos.push(*input);
352-
sent.add_value(&utxo.value);
347+
entry.spent_utxos.push(*input);
348+
entry.sent.add_value(&utxo.value);
353349
}
354350
}
355351

356352
for output in &tx.outputs {
357353
self.observe_output(output, block).await?;
358354

359355
let addr = output.address.clone();
360-
let (_, received, _, created_utxos) = address_map.entry(addr.clone()).or_default();
356+
let entry = address_map.entry(addr.clone()).or_default();
361357

362-
created_utxos.push(output.utxo_identifier);
363-
received.add_value(&output.value);
358+
entry.created_utxos.push(output.utxo_identifier);
359+
entry.received.add_value(&output.value);
364360
}
365361

366-
for (addr, (sent, received, spent_utxos, created_utxos)) in address_map {
362+
for (addr, entry) in address_map {
367363
let delta = AddressDelta {
368364
address: addr,
369365
tx_identifier: tx.tx_identifier,
370-
spent_utxos,
371-
created_utxos,
372-
sent: ValueDelta::from(sent),
373-
received: Value::from(received),
366+
spent_utxos: entry.spent_utxos,
367+
created_utxos: entry.created_utxos,
368+
sent: ValueDelta::from(entry.sent),
369+
received: Value::from(entry.received),
374370
};
375371
if let Some(observer) = self.address_delta_observer.as_ref() {
376372
observer.observe_delta(&delta).await;
@@ -387,6 +383,15 @@ impl State {
387383
}
388384
}
389385

386+
/// Internal helper used during `handle` aggregation for summing UTxO deltas.
387+
#[derive(Default)]
388+
struct AddressTxMap {
389+
sent: ValueMap,
390+
received: ValueMap,
391+
spent_utxos: Vec<UTxOIdentifier>,
392+
created_utxos: Vec<UTxOIdentifier>,
393+
}
394+
390395
// -- Tests --
391396
#[cfg(test)]
392397
mod tests {

0 commit comments

Comments
 (0)