Skip to content

Commit a849f1f

Browse files
committed
Merge branch 'main' into gd/epochs-blocks-endpoint
2 parents 913d3ee + 9bed98d commit a849f1f

File tree

10 files changed

+452
-115
lines changed

10 files changed

+452
-115
lines changed

common/src/queries/blocks.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ pub enum BlocksStateQuery {
7676
GetUTxOHashes {
7777
utxo_ids: Vec<UTxOIdentifier>,
7878
},
79+
GetTransactionHashesAndTimestamps {
80+
tx_ids: Vec<TxIdentifier>,
81+
},
7982
}
8083

8184
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -101,6 +104,7 @@ pub enum BlocksStateQueryResponse {
101104
BlockHashesByNumberRange(Vec<BlockHash>),
102105
TransactionHashes(TransactionHashes),
103106
UTxOHashes(UTxOHashes),
107+
TransactionHashesAndTimestamps(TransactionHashesAndTimeStamps),
104108
Error(QueryError),
105109
}
106110

@@ -245,3 +249,9 @@ pub struct UTxOHashes {
245249
pub block_hashes: Vec<BlockHash>,
246250
pub tx_hashes: Vec<TxHash>,
247251
}
252+
253+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
254+
pub struct TransactionHashesAndTimeStamps {
255+
pub tx_hashes: Vec<TxHash>,
256+
pub timestamps: Vec<u64>,
257+
}

modules/address_state/src/address_state.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) =
3333

3434
// Configuration defaults
3535
const DEFAULT_ADDRESS_DB_PATH: (&str, &str) = ("db-path", "./db");
36+
const DEFAULT_CLEAR_ON_START: (&str, bool) = ("clear-on-start", true);
3637
const DEFAULT_STORE_INFO: (&str, bool) = ("store-info", false);
3738
const DEFAULT_STORE_TOTALS: (&str, bool) = ("store-totals", false);
3839
const DEFAULT_STORE_TRANSACTIONS: (&str, bool) = ("store-transactions", false);
@@ -178,6 +179,7 @@ impl AddressState {
178179
// Get configuration flags and query topic
179180
let storage_config = AddressStorageConfig {
180181
db_path: get_string_flag(&config, DEFAULT_ADDRESS_DB_PATH),
182+
clear_on_start: get_bool_flag(&config, DEFAULT_CLEAR_ON_START),
181183
skip_until: None,
182184
store_info: get_bool_flag(&config, DEFAULT_STORE_INFO),
183185
store_totals: get_bool_flag(&config, DEFAULT_STORE_TOTALS),
@@ -209,9 +211,11 @@ impl AddressState {
209211
match state.get_address_utxos(address).await {
210212
Ok(Some(utxos)) => AddressStateQueryResponse::AddressUTxOs(utxos),
211213
Ok(None) => match address.to_string() {
212-
Ok(addr_str) => AddressStateQueryResponse::Error(
213-
QueryError::not_found(format!("Address {}", addr_str)),
214-
),
214+
Ok(addr_str) => {
215+
AddressStateQueryResponse::Error(QueryError::not_found(
216+
format!("Address {} not found", addr_str),
217+
))
218+
}
215219
Err(e) => {
216220
AddressStateQueryResponse::Error(QueryError::internal_error(
217221
format!("Could not convert address to string: {}", e),
@@ -227,9 +231,11 @@ impl AddressState {
227231
match state.get_address_transactions(address).await {
228232
Ok(Some(txs)) => AddressStateQueryResponse::AddressTransactions(txs),
229233
Ok(None) => match address.to_string() {
230-
Ok(addr_str) => AddressStateQueryResponse::Error(
231-
QueryError::not_found(format!("Address {}", addr_str)),
232-
),
234+
Ok(addr_str) => {
235+
AddressStateQueryResponse::Error(QueryError::not_found(
236+
format!("Address {} not found", addr_str),
237+
))
238+
}
233239
Err(e) => {
234240
AddressStateQueryResponse::Error(QueryError::internal_error(
235241
format!("Could not convert address to string: {}", e),

modules/address_state/src/immutable_address_store.rs

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@ pub struct ImmutableAddressStore {
3030
}
3131

3232
impl ImmutableAddressStore {
33-
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
34-
let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024).temporary(true);
33+
pub fn new(path: impl AsRef<Path>, clear_on_start: bool) -> Result<Self> {
34+
let path = path.as_ref();
35+
if path.exists() && clear_on_start {
36+
std::fs::remove_dir_all(path)?;
37+
}
38+
let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024);
3539
let keyspace = Keyspace::open(cfg)?;
3640

3741
let utxos = keyspace.open_partition("address_utxos", PartitionCreateOptions::default())?;
@@ -52,15 +56,36 @@ impl ImmutableAddressStore {
5256
/// for an entire epoch. Skips any partitions that have already stored the given epoch.
5357
/// All writes are batched and committed atomically, preventing on-disk corruption in case of failure.
5458
pub async fn persist_epoch(&self, epoch: u64, config: &AddressStorageConfig) -> Result<()> {
55-
let persist_utxos = config.store_info
56-
&& !self.epoch_exists(self.utxos.clone(), ADDRESS_UTXOS_EPOCH_COUNTER, epoch).await?;
57-
let persist_txs = config.store_transactions
58-
&& !self.epoch_exists(self.txs.clone(), ADDRESS_TXS_EPOCH_COUNTER, epoch).await?;
59-
let persist_totals = config.store_totals
60-
&& !self.epoch_exists(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER, epoch).await?;
59+
// Skip if all options disabled
60+
if !(config.store_info || config.store_transactions || config.store_totals) {
61+
debug!("no persistence needed for epoch {epoch} (all stores disabled)");
62+
return Ok(());
63+
}
64+
65+
// Determine which partitions need persistence
66+
let (persist_utxos, persist_txs, persist_totals) = if config.clear_on_start {
67+
(
68+
config.store_info,
69+
config.store_transactions,
70+
config.store_totals,
71+
)
72+
} else {
73+
let utxos = config.store_info
74+
&& !self
75+
.epoch_exists(self.utxos.clone(), ADDRESS_UTXOS_EPOCH_COUNTER, epoch)
76+
.await?;
77+
let txs = config.store_transactions
78+
&& !self.epoch_exists(self.txs.clone(), ADDRESS_TXS_EPOCH_COUNTER, epoch).await?;
79+
let totals = config.store_totals
80+
&& !self
81+
.epoch_exists(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER, epoch)
82+
.await?;
83+
(utxos, txs, totals)
84+
};
6185

86+
// Skip if all partitions have already been persisted for the epoch
6287
if !(persist_utxos || persist_txs || persist_totals) {
63-
debug!("no persistence needed for epoch {epoch} (already persisted or disabled)");
88+
debug!("no persistence needed for epoch {epoch}");
6489
return Ok(());
6590
}
6691

@@ -120,22 +145,14 @@ impl ImmutableAddressStore {
120145
}
121146

122147
// Metadata markers
123-
if persist_utxos {
124-
batch.insert(
125-
&self.utxos,
126-
ADDRESS_UTXOS_EPOCH_COUNTER,
127-
epoch.to_le_bytes(),
128-
);
129-
}
130-
if persist_txs {
131-
batch.insert(&self.txs, ADDRESS_TXS_EPOCH_COUNTER, epoch.to_le_bytes());
132-
}
133-
if persist_totals {
134-
batch.insert(
135-
&self.totals,
136-
ADDRESS_TOTALS_EPOCH_COUNTER,
137-
epoch.to_le_bytes(),
138-
);
148+
for (enabled, part, key) in [
149+
(persist_utxos, &self.utxos, ADDRESS_UTXOS_EPOCH_COUNTER),
150+
(persist_txs, &self.txs, ADDRESS_TXS_EPOCH_COUNTER),
151+
(persist_totals, &self.totals, ADDRESS_TOTALS_EPOCH_COUNTER),
152+
] {
153+
if enabled {
154+
batch.insert(part, key, epoch.to_le_bytes());
155+
}
139156
}
140157

141158
match batch.commit() {
@@ -158,13 +175,18 @@ impl ImmutableAddressStore {
158175
pub async fn get_utxos(&self, address: &Address) -> Result<Option<Vec<UTxOIdentifier>>> {
159176
let key = address.to_bytes_key()?;
160177

178+
let db_raw = self.utxos.get(&key)?;
179+
let db_had_key = db_raw.is_some();
180+
161181
let mut live: Vec<UTxOIdentifier> =
162-
self.utxos.get(&key)?.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default();
182+
db_raw.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default();
163183

164184
let pending = self.pending.lock().await;
185+
let mut pending_touched = false;
165186
for block_map in pending.iter() {
166187
if let Some(entry) = block_map.get(address) {
167188
if let Some(deltas) = &entry.utxos {
189+
pending_touched = true;
168190
for delta in deltas {
169191
match delta {
170192
UtxoDelta::Created(u) => live.push(*u),
@@ -175,8 +197,13 @@ impl ImmutableAddressStore {
175197
}
176198
}
177199

200+
// Only return None if the address never existed
178201
if live.is_empty() {
179-
Ok(None)
202+
if db_had_key || pending_touched {
203+
Ok(Some(vec![]))
204+
} else {
205+
Ok(None)
206+
}
180207
} else {
181208
Ok(Some(live))
182209
}

modules/address_state/src/state.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::{
1717
#[derive(Debug, Default, Clone)]
1818
pub struct AddressStorageConfig {
1919
pub db_path: String,
20+
pub clear_on_start: bool,
2021
pub skip_until: Option<u64>,
2122

2223
pub store_info: bool,
@@ -60,7 +61,7 @@ impl State {
6061
PathBuf::from(&config.db_path)
6162
};
6263

63-
let store = Arc::new(ImmutableAddressStore::new(&db_path)?);
64+
let store = Arc::new(ImmutableAddressStore::new(&db_path, config.clear_on_start)?);
6465

6566
let mut config = config.clone();
6667
config.skip_until = store.get_last_epoch_stored().await?;
@@ -81,14 +82,20 @@ impl State {
8182
}
8283

8384
let store = self.immutable.clone();
85+
let mut db_had_address = false;
8486
let mut combined: HashSet<UTxOIdentifier> = match store.get_utxos(address).await? {
85-
Some(db) => db.into_iter().collect(),
87+
Some(db) => {
88+
db_had_address = true;
89+
db.into_iter().collect()
90+
}
8691
None => HashSet::new(),
8792
};
8893

94+
let mut pending_touched = false;
8995
for map in self.volatile.window.iter() {
9096
if let Some(entry) = map.get(address) {
9197
if let Some(deltas) = &entry.utxos {
98+
pending_touched = true;
9299
for delta in deltas {
93100
match delta {
94101
UtxoDelta::Created(u) => {
@@ -104,7 +111,11 @@ impl State {
104111
}
105112

106113
if combined.is_empty() {
107-
Ok(None)
114+
if db_had_address || pending_touched {
115+
Ok(Some(vec![]))
116+
} else {
117+
Ok(None)
118+
}
108119
} else {
109120
Ok(Some(combined.into_iter().collect()))
110121
}
@@ -242,6 +253,7 @@ mod tests {
242253
let dir = tempdir().unwrap();
243254
AddressStorageConfig {
244255
db_path: dir.path().to_string_lossy().into_owned(),
256+
clear_on_start: true,
245257
skip_until: None,
246258
store_info: true,
247259
store_transactions: true,

0 commit comments

Comments
 (0)