Skip to content

Commit 6ad0e54

Browse files
authored
first seq should be none pruned (#403)
* first seq should be none pruned * remove pruned seq from db directly
1 parent bd6a559 commit 6ad0e54

File tree

3 files changed

+83
-60
lines changed

3 files changed

+83
-60
lines changed

node/storage/src/log_store/log_manager.rs

Lines changed: 56 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -312,34 +312,13 @@ impl LogStoreWrite for LogManager {
312312
.get_tx_by_seq_number(tx_seq)?
313313
.ok_or_else(|| anyhow!("finalize_tx with tx missing: tx_seq={}", tx_seq))?;
314314

315-
self.padding_rear_data(&tx)?;
316-
317-
let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size);
318-
// TODO: Check completeness without loading all data in memory.
319-
// TODO: Should we double check the tx merkle root?
320-
if self.check_data_completed(tx.start_entry_index, tx_end_index)? {
321-
let same_root_seq_list = self
322-
.tx_store
323-
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
324-
// Check if there are other same-root transaction not finalized.
325-
if same_root_seq_list.first() == Some(&tx_seq) {
326-
// If this is the first tx with this data root, copy and finalize all same-root txs.
327-
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
328-
} else {
329-
// If this is not the first tx with this data root, and the first one is not finalized.
330-
let maybe_first_seq = same_root_seq_list.first().cloned();
331-
if let Some(first_seq) = maybe_first_seq {
332-
if !self.check_tx_completed(first_seq)? {
333-
self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?;
334-
}
335-
}
336-
}
337-
338-
self.tx_store.finalize_tx(tx_seq)?;
339-
Ok(())
340-
} else {
341-
bail!("finalize tx with data missing: tx_seq={}", tx_seq)
342-
}
315+
self.finalize_tx_common(&tx).map_err(|err| {
316+
anyhow!(
317+
"finalize tx with data missing: tx_seq={} err={}",
318+
tx.seq,
319+
err
320+
)
321+
})
343322
}
344323

345324
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
@@ -358,36 +337,15 @@ impl LogStoreWrite for LogManager {
358337
return Ok(false);
359338
}
360339

361-
self.padding_rear_data(&tx)?;
362-
363-
// TODO: Check completeness without loading all data in memory.
364-
// TODO: Should we double check the tx merkle root?
365-
let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size);
366-
if self.check_data_completed(tx.start_entry_index, tx_end_index)? {
367-
let same_root_seq_list = self
368-
.tx_store
369-
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
370-
// Check if there are other same-root transaction not finalized.
371-
372-
if same_root_seq_list.first() == Some(&tx_seq) {
373-
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
374-
} else {
375-
// If this is not the first tx with this data root, copy and finalize the first one.
376-
let maybe_first_seq = same_root_seq_list.first().cloned();
377-
if let Some(first_seq) = maybe_first_seq {
378-
if !self.check_tx_completed(first_seq)? {
379-
self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?;
380-
}
381-
}
382-
}
383-
384-
self.tx_store.finalize_tx(tx_seq)?;
385-
386-
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
387-
Ok(true)
388-
} else {
389-
bail!("finalize tx hash with data missing: tx_seq={}", tx_seq)
390-
}
340+
self.finalize_tx_common(&tx).map_err(|err| {
341+
anyhow!(
342+
"finalize tx hash with data missing: tx_seq={} err={}",
343+
tx.seq,
344+
err
345+
)
346+
})?;
347+
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
348+
Ok(true)
391349
}
392350

393351
fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> {
@@ -597,6 +555,16 @@ impl LogStoreRead for LogManager {
597555
Ok(seq_list.first().cloned())
598556
}
599557

558+
fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result<Vec<u64>> {
559+
let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?;
560+
for (index, tx_seq) in seq_list.iter().enumerate() {
561+
if !self.tx_store.check_tx_pruned(*tx_seq)? {
562+
return Ok(seq_list[index..].to_vec());
563+
}
564+
}
565+
Ok(Vec::new())
566+
}
567+
600568
fn get_chunk_with_proof_by_tx_and_index(
601569
&self,
602570
tx_seq: u64,
@@ -1234,6 +1202,36 @@ impl LogManager {
12341202
Ok(())
12351203
}
12361204

1205+
fn finalize_tx_common(&self, tx: &Transaction) -> Result<()> {
1206+
let tx_seq = tx.seq;
1207+
self.padding_rear_data(tx)?;
1208+
1209+
let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size);
1210+
// TODO: Check completeness without loading all data in memory.
1211+
// TODO: Should we double check the tx merkle root?
1212+
if self.check_data_completed(tx.start_entry_index, tx_end_index)? {
1213+
let same_root_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
1214+
// Check if there are other same-root transaction not finalized.
1215+
if same_root_seq_list.first() == Some(&tx_seq) {
1216+
// If this is the first tx with this data root, copy and finalize all same-root txs.
1217+
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
1218+
} else {
1219+
// If this is not the first tx with this data root, and the first one is not finalized.
1220+
let maybe_first_seq = same_root_seq_list.first().cloned();
1221+
if let Some(first_seq) = maybe_first_seq {
1222+
if !self.check_tx_completed(first_seq)? {
1223+
self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?;
1224+
}
1225+
}
1226+
}
1227+
1228+
self.tx_store.finalize_tx(tx_seq)?;
1229+
Ok(())
1230+
} else {
1231+
bail!("data missing")
1232+
}
1233+
}
1234+
12371235
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
12381236
let start_time = Instant::now();
12391237

node/storage/src/log_store/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ pub trait LogStoreRead: LogStoreChunkRead {
4242
need_available: bool,
4343
) -> Result<Option<u64>>;
4444

45+
/// Return seq list filtered to unpruned entries.
46+
fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> Result<Vec<u64>>;
47+
4548
/// If all txs are not finalized, return the first one if need available is false.
4649
/// Otherwise, return the first finalized tx.
4750
fn get_tx_by_data_root(

node/storage/src/log_store/tx_store.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,33 @@ impl TransactionStore {
199199

200200
#[instrument(skip(self))]
201201
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
202-
Ok(self.data_kvdb.put(
202+
let tx = self
203+
.get_tx_by_seq_number(tx_seq)?
204+
.ok_or_else(|| anyhow!("prune_tx with tx missing: tx_seq={}", tx_seq))?;
205+
206+
let mut flow_db_tx = self.flow_kvdb.transaction();
207+
let mut data_db_tx = self.data_kvdb.transaction();
208+
data_db_tx.put(
203209
COL_TX_COMPLETED,
204210
&tx_seq.to_be_bytes(),
205211
&[TxStatus::Pruned.into()],
206-
)?)
212+
);
213+
214+
let mut tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
215+
tx_seq_list.retain(|seq| *seq != tx_seq);
216+
if tx_seq_list.is_empty() {
217+
flow_db_tx.delete(COL_TX_DATA_ROOT_INDEX, tx.data_merkle_root.as_bytes());
218+
} else {
219+
flow_db_tx.put(
220+
COL_TX_DATA_ROOT_INDEX,
221+
tx.data_merkle_root.as_bytes(),
222+
&tx_seq_list.as_ssz_bytes(),
223+
);
224+
}
225+
226+
self.data_kvdb.write(data_db_tx)?;
227+
self.flow_kvdb.write(flow_db_tx)?;
228+
Ok(())
207229
}
208230

209231
pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {

0 commit comments

Comments
 (0)