Skip to content

Commit 60c5074

Browse files
committed
refactor: merge pending address state block deltas before persisting
Signed-off-by: William Hankins <[email protected]>
1 parent af546cd commit 60c5074

File tree

1 file changed

+95
-73
lines changed

1 file changed

+95
-73
lines changed

modules/address_state/src/immutable_address_store.rs

Lines changed: 95 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use std::{
2-
collections::{HashMap, HashSet},
3-
path::Path,
4-
};
1+
use std::{collections::HashMap, path::Path};
52

63
use crate::state::{AddressEntry, AddressStorageConfig, UtxoDelta};
74
use acropolis_common::{Address, AddressTotals, TxIdentifier, UTxOIdentifier};
@@ -16,6 +13,14 @@ const ADDRESS_UTXOS_EPOCH_COUNTER: &[u8] = b"utxos_epoch_last";
1613
const ADDRESS_TXS_EPOCH_COUNTER: &[u8] = b"txs_epoch_last";
1714
const ADDRESS_TOTALS_EPOCH_COUNTER: &[u8] = b"totals_epoch_last";
1815

16+
#[derive(Default)]
17+
struct MergedDeltas {
18+
created_utxos: Vec<UTxOIdentifier>,
19+
spent_utxos: Vec<UTxOIdentifier>,
20+
txs: Vec<TxIdentifier>,
21+
totals: AddressTotals,
22+
}
23+
1924
pub struct ImmutableAddressStore {
2025
utxos: Partition,
2126
txs: Partition,
@@ -26,7 +31,7 @@ pub struct ImmutableAddressStore {
2631

2732
impl ImmutableAddressStore {
2833
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
29-
let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024);
34+
let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024).temporary(true);
3035
let keyspace = Keyspace::open(cfg)?;
3136

3237
let utxos = keyspace.open_partition("address_utxos", PartitionCreateOptions::default())?;
@@ -43,8 +48,8 @@ impl ImmutableAddressStore {
4348
})
4449
}
4550

46-
/// Persists volatile UTxOs, transactions, and totals into their respective Fjall partitions for an entire epoch.
47-
/// Skips any partitions that have already stored the given epoch.
51+
/// Persists volatile UTxOs, transactions, and totals into their respective Fjall partitions
52+
/// for an entire epoch. Skips any partitions that have already stored the given epoch.
4853
/// All writes are batched and committed atomically, preventing on-disk corruption in case of failure.
4954
pub async fn persist_epoch(&self, epoch: u64, config: &AddressStorageConfig) -> Result<()> {
5055
let persist_utxos = config.store_info
@@ -55,7 +60,7 @@ impl ImmutableAddressStore {
5560
&& !self.epoch_exists(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER, epoch).await?;
5661

5762
if !(persist_utxos || persist_txs || persist_totals) {
58-
debug!("no persistence needed for epoch {epoch} (already persisted or disabled)",);
63+
debug!("no persistence needed for epoch {epoch} (already persisted or disabled)");
5964
return Ok(());
6065
}
6166

@@ -67,70 +72,50 @@ impl ImmutableAddressStore {
6772
let mut batch = self.keyspace.batch();
6873
let mut change_count = 0;
6974

70-
for block_map in drained_blocks.into_iter() {
71-
if block_map.is_empty() {
72-
continue;
73-
}
75+
for (address, deltas) in Self::merge_block_deltas(drained_blocks) {
76+
change_count += 1;
77+
let addr_key = address.to_bytes_key()?;
7478

75-
for (addr, entry) in block_map {
76-
change_count += 1;
77-
let addr_key = addr.to_bytes_key()?;
78-
79-
if persist_utxos {
80-
let mut live: HashSet<UTxOIdentifier> = self
81-
.utxos
82-
.get(&addr_key)?
83-
.map(|bytes| decode(&bytes))
84-
.transpose()?
85-
.unwrap_or_default();
86-
87-
if let Some(deltas) = &entry.utxos {
88-
for delta in deltas {
89-
match delta {
90-
UtxoDelta::Created(u) => {
91-
live.insert(*u);
92-
}
93-
UtxoDelta::Spent(u) => {
94-
live.remove(u);
95-
}
96-
}
97-
}
98-
}
79+
if persist_utxos && (!deltas.created_utxos.is_empty() || !deltas.spent_utxos.is_empty())
80+
{
81+
let mut live: Vec<UTxOIdentifier> = self
82+
.utxos
83+
.get(&addr_key)?
84+
.map(|bytes| decode(&bytes))
85+
.transpose()?
86+
.unwrap_or_default();
9987

100-
batch.insert(&self.utxos, &addr_key, to_vec(&live)?);
88+
live.extend(&deltas.created_utxos);
89+
90+
for u in &deltas.spent_utxos {
91+
live.retain(|x| x != u);
10192
}
10293

103-
if persist_txs {
104-
let mut live: Vec<TxIdentifier> = self
105-
.txs
106-
.get(&addr_key)?
107-
.map(|bytes| decode(&bytes))
108-
.transpose()?
109-
.unwrap_or_default();
94+
batch.insert(&self.utxos, &addr_key, to_vec(&live)?);
95+
}
11096

111-
if let Some(txs_deltas) = &entry.transactions {
112-
live.extend(txs_deltas.iter().cloned());
113-
}
97+
if persist_txs && !deltas.txs.is_empty() {
98+
let mut live: Vec<TxIdentifier> = self
99+
.txs
100+
.get(&addr_key)?
101+
.map(|bytes| decode(&bytes))
102+
.transpose()?
103+
.unwrap_or_default();
114104

115-
batch.insert(&self.txs, &addr_key, to_vec(&live)?);
116-
}
105+
live.extend(deltas.txs.iter().cloned());
106+
batch.insert(&self.txs, &addr_key, to_vec(&live)?);
107+
}
117108

118-
if persist_totals {
119-
let mut live: AddressTotals = self
120-
.totals
121-
.get(&addr_key)?
122-
.map(|bytes| decode(&bytes))
123-
.transpose()?
124-
.unwrap_or_default();
125-
126-
if let Some(deltas) = &entry.totals {
127-
for delta in deltas {
128-
live.apply_delta(delta);
129-
}
130-
}
109+
if persist_totals && deltas.totals.tx_count != 0 {
110+
let mut live: AddressTotals = self
111+
.totals
112+
.get(&addr_key)?
113+
.map(|bytes| decode(&bytes))
114+
.transpose()?
115+
.unwrap_or_default();
131116

132-
batch.insert(&self.totals, &addr_key, to_vec(&live)?);
133-
}
117+
live += deltas.totals;
118+
batch.insert(&self.totals, &addr_key, to_vec(&live)?);
134119
}
135120
}
136121

@@ -173,7 +158,7 @@ impl ImmutableAddressStore {
173158
pub async fn get_utxos(&self, address: &Address) -> Result<Option<Vec<UTxOIdentifier>>> {
174159
let key = address.to_bytes_key()?;
175160

176-
let mut live: HashSet<UTxOIdentifier> =
161+
let mut live: Vec<UTxOIdentifier> =
177162
self.utxos.get(&key)?.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default();
178163

179164
let pending = self.pending.lock().await;
@@ -182,12 +167,8 @@ impl ImmutableAddressStore {
182167
if let Some(deltas) = &entry.utxos {
183168
for delta in deltas {
184169
match delta {
185-
UtxoDelta::Created(u) => {
186-
live.insert(*u);
187-
}
188-
UtxoDelta::Spent(u) => {
189-
live.remove(u);
190-
}
170+
UtxoDelta::Created(u) => live.push(*u),
171+
UtxoDelta::Spent(u) => live.retain(|x| x != u),
191172
}
192173
}
193174
}
@@ -197,8 +178,7 @@ impl ImmutableAddressStore {
197178
if live.is_empty() {
198179
Ok(None)
199180
} else {
200-
let vec: Vec<_> = live.into_iter().collect();
201-
Ok(Some(vec))
181+
Ok(Some(live))
202182
}
203183
}
204184

@@ -311,4 +291,46 @@ impl ImmutableAddressStore {
311291

312292
Ok(exists)
313293
}
294+
295+
fn merge_block_deltas(
296+
drained_blocks: Vec<HashMap<Address, AddressEntry>>,
297+
) -> HashMap<Address, MergedDeltas> {
298+
let mut merged = HashMap::new();
299+
300+
for block_map in drained_blocks {
301+
for (addr, entry) in block_map {
302+
let target = merged.entry(addr.clone()).or_insert_with(MergedDeltas::default);
303+
304+
// Remove UTxOs that are spent in the same epoch
305+
if let Some(deltas) = &entry.utxos {
306+
for delta in deltas {
307+
match delta {
308+
UtxoDelta::Created(u) => target.created_utxos.push(*u),
309+
UtxoDelta::Spent(u) => {
310+
if target.created_utxos.contains(u) {
311+
target.created_utxos.retain(|x| x != u);
312+
} else {
313+
target.spent_utxos.push(*u);
314+
}
315+
}
316+
}
317+
}
318+
}
319+
320+
// Merge Tx vectors
321+
if let Some(txs) = &entry.transactions {
322+
target.txs.extend(txs.iter().cloned());
323+
}
324+
325+
// Sum totals
326+
if let Some(totals) = &entry.totals {
327+
for delta in totals {
328+
target.totals.apply_delta(delta);
329+
}
330+
}
331+
}
332+
}
333+
334+
merged
335+
}
314336
}

0 commit comments

Comments
 (0)