Skip to content

Commit d556ebf

Browse files
committed
Prune additional data from storage, add min_epochs_for_block_requests to Config
1 parent eef7085 commit d556ebf

File tree

6 files changed

+242
-23
lines changed

6 files changed

+242
-23
lines changed

fork_choice_control/src/mutator.rs

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ where
441441

442442
if self.store.is_forward_synced() && misc::slots_since_epoch_start::<P>(tick.slot) == 0 {
443443
if tick.kind == TickKind::AttestFourth {
444-
self.prune_old_blob_sidecars()?;
444+
self.prune_old_records()?;
445445
}
446446

447447
if let Some(metrics) = self.metrics.as_ref() {
@@ -2336,27 +2336,50 @@ where
23362336
Ok(())
23372337
}
23382338

2339-
fn prune_old_blob_sidecars(&self) -> Result<()> {
2339+
fn prune_old_records(&self) -> Result<()> {
23402340
let storage = self.storage.clone_arc();
2341-
let current_epoch = misc::compute_epoch_at_slot::<P>(self.store.slot());
2342-
let up_to_epoch = current_epoch.saturating_sub(
2343-
self.store
2344-
.chain_config()
2345-
.min_epochs_for_blob_sidecars_requests,
2346-
);
2347-
let up_to_slot = misc::compute_start_slot_at_epoch::<P>(up_to_epoch);
2341+
let blobs_up_to_epoch = self.store.min_checked_data_availability_epoch();
2342+
let blobs_up_to_slot = misc::compute_start_slot_at_epoch::<P>(blobs_up_to_epoch);
2343+
let blocks_up_to_epoch = self.store.min_checked_block_availability_epoch();
2344+
let blocks_up_to_slot = misc::compute_start_slot_at_epoch::<P>(blocks_up_to_epoch);
23482345

23492346
Builder::new()
2350-
.name("old-blob-pruner".to_owned())
2347+
.name("old-data-pruner".to_owned())
23512348
.spawn(move || {
2352-
debug!("pruning old blob sidecards from storage up to slot {up_to_slot}…");
2349+
debug!("pruning old blob sidecars from storage up to slot {blobs_up_to_slot}…");
2350+
2351+
match storage.prune_old_blob_sidecars(blobs_up_to_slot) {
2352+
Ok(()) => {
2353+
debug!(
2354+
"pruned old blob sidecars from storage up to slot {blobs_up_to_slot}"
2355+
);
2356+
}
2357+
Err(error) => {
2358+
error!("pruning old blob sidecars from storage failed: {error:?}")
2359+
}
2360+
}
2361+
2362+
debug!("pruning old blocks and states from storage up to slot {blocks_up_to_slot}…");
2363+
2364+
match storage.prune_old_blocks_and_states(blocks_up_to_slot) {
2365+
Ok(()) => {
2366+
debug!(
2367+
"pruned old blocks and states from storage up to slot {blocks_up_to_slot}"
2368+
);
2369+
}
2370+
Err(error) => {
2371+
error!("pruning old blocks and states from storage failed: {error:?}")
2372+
}
2373+
}
2374+
2375+
debug!("pruning old state roots from storage up to slot {blocks_up_to_slot}…");
23532376

2354-
match storage.prune_old_blob_sidecars(up_to_slot) {
2377+
match storage.prune_old_state_roots(blocks_up_to_slot) {
23552378
Ok(()) => {
2356-
debug!("pruned old blob sidecards from storage up to slot {up_to_slot}");
2379+
debug!("pruned old state roots from storage up to slot {blocks_up_to_slot}");
23572380
}
23582381
Err(error) => {
2359-
error!("pruning old blob sidecards from storage failed: {error:?}")
2382+
error!("pruning old state roots from storage failed: {error:?}")
23602383
}
23612384
}
23622385
})?;

fork_choice_control/src/queries.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,14 @@ where
554554
storage: self.storage(),
555555
}
556556
}
557+
558+
pub fn min_checked_block_availability_epoch(&self) -> Epoch {
559+
self.store_snapshot().min_checked_block_availability_epoch()
560+
}
561+
562+
pub fn min_checked_data_availability_epoch(&self) -> Epoch {
563+
self.store_snapshot().min_checked_data_availability_epoch()
564+
}
557565
}
558566

559567
#[cfg(test)]

fork_choice_control/src/storage.rs

Lines changed: 177 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,71 @@ impl<P: Preset> Storage<P> {
397397
Ok(())
398398
}
399399

400+
pub(crate) fn prune_old_blocks_and_states(&self, up_to_slot: Slot) -> Result<()> {
401+
let mut block_roots_to_remove = vec![];
402+
let mut keys_to_remove = vec![];
403+
404+
let results = self
405+
.database
406+
.iterator_descending(..=BlockRootBySlot(up_to_slot.saturating_sub(1)).to_string())?;
407+
408+
for result in results {
409+
let (key_bytes, value_bytes) = result?;
410+
411+
if !BlockRootBySlot::has_prefix(&key_bytes) {
412+
break;
413+
}
414+
415+
block_roots_to_remove.push(H256::from_ssz_default(value_bytes)?);
416+
keys_to_remove.push(key_bytes.into_owned());
417+
}
418+
419+
for block_root in block_roots_to_remove {
420+
let key = FinalizedBlockByRoot(block_root).to_string();
421+
self.database.delete(key)?;
422+
423+
let key = UnfinalizedBlockByRoot(block_root).to_string();
424+
self.database.delete(key)?;
425+
426+
let key = StateByBlockRoot(block_root).to_string();
427+
self.database.delete(key)?;
428+
}
429+
430+
for key in keys_to_remove {
431+
self.database.delete(key)?;
432+
}
433+
434+
Ok(())
435+
}
436+
437+
pub(crate) fn prune_old_state_roots(&self, up_to_slot: Slot) -> Result<()> {
438+
let mut keys_to_remove = vec![];
439+
440+
let results = self
441+
.database
442+
.iterator_ascending(SlotByStateRoot(H256::zero()).to_string()..)?;
443+
444+
for result in results {
445+
let (key_bytes, value_bytes) = result?;
446+
447+
if !SlotByStateRoot::has_prefix(&key_bytes) {
448+
break;
449+
}
450+
451+
let slot = Slot::from_ssz_default(value_bytes)?;
452+
453+
if slot < up_to_slot {
454+
keys_to_remove.push(key_bytes.into_owned());
455+
}
456+
}
457+
458+
for key in keys_to_remove {
459+
self.database.delete(key)?;
460+
}
461+
462+
Ok(())
463+
}
464+
400465
pub(crate) fn checkpoint_state_slot(&self) -> Result<Option<Slot>> {
401466
if let Some(StateCheckpoint { head_slot, .. }) = self.load_state_checkpoint()? {
402467
return Ok(Some(head_slot));
@@ -704,6 +769,18 @@ impl<P: Preset> Storage<P> {
704769

705770
#[cfg(test)]
706771
impl<P: Preset> Storage<P> {
772+
pub fn block_root_by_slot_count(&self) -> Result<usize> {
773+
let results = self
774+
.database
775+
.iterator_ascending(BlockRootBySlot(0).to_string()..)?;
776+
777+
itertools::process_results(results, |pairs| {
778+
pairs
779+
.take_while(|(key_bytes, _)| BlockRootBySlot::has_prefix(key_bytes))
780+
.count()
781+
})
782+
}
783+
707784
pub fn finalized_block_count(&self) -> Result<usize> {
708785
let results = self
709786
.database
@@ -719,10 +796,34 @@ impl<P: Preset> Storage<P> {
719796
})
720797
}
721798

799+
pub fn unfinalized_block_count(&self) -> Result<usize> {
800+
let results = self
801+
.database
802+
.iterator_ascending(UnfinalizedBlockByRoot(H256::zero()).to_string()..)?;
803+
804+
itertools::process_results(results, |pairs| {
805+
pairs
806+
.take_while(|(key_bytes, _)| UnfinalizedBlockByRoot::has_prefix(key_bytes))
807+
.count()
808+
})
809+
}
810+
811+
pub fn slot_by_state_root_count(&self) -> Result<usize> {
812+
let results = self
813+
.database
814+
.iterator_ascending(SlotByStateRoot(H256::zero()).to_string()..)?;
815+
816+
itertools::process_results(results, |pairs| {
817+
pairs
818+
.take_while(|(key_bytes, _)| SlotByStateRoot::has_prefix(key_bytes))
819+
.count()
820+
})
821+
}
822+
722823
pub fn slot_by_blob_id_count(&self) -> Result<usize> {
723824
let results = self
724825
.database
725-
.iterator_ascending((H256::zero()).to_string()..)?;
826+
.iterator_ascending(SlotBlobId(0, H256::zero(), 0).to_string()..)?;
726827

727828
itertools::process_results(results, |pairs| {
728829
pairs
@@ -731,14 +832,26 @@ impl<P: Preset> Storage<P> {
731832
})
732833
}
733834

835+
pub fn state_count(&self) -> Result<usize> {
836+
let results = self
837+
.database
838+
.iterator_ascending(StateByBlockRoot(H256::zero()).to_string()..)?;
839+
840+
itertools::process_results(results, |pairs| {
841+
pairs
842+
.take_while(|(key_bytes, _)| StateByBlockRoot::has_prefix(key_bytes))
843+
.count()
844+
})
845+
}
846+
734847
pub fn blob_sidecar_by_blob_id_count(&self) -> Result<usize> {
735848
let results = self
736849
.database
737-
.iterator_ascending((H256::zero()).to_string()..)?;
850+
.iterator_ascending(BlobSidecarByBlobId(H256::zero(), 0).to_string()..)?;
738851

739852
itertools::process_results(results, |pairs| {
740853
pairs
741-
.filter(|(key_bytes, _)| BlobSidecarByBlobId::has_prefix(key_bytes))
854+
.take_while(|(key_bytes, _)| BlobSidecarByBlobId::has_prefix(key_bytes))
742855
.count()
743856
})
744857
}
@@ -931,10 +1044,70 @@ pub fn serialize(key: impl Display, value: impl SszWrite) -> Result<(String, Vec
9311044
mod tests {
9321045
use bytesize::ByteSize;
9331046
use tempfile::TempDir;
934-
use types::preset::Mainnet;
1047+
use types::{
1048+
phase0::containers::SignedBeaconBlock as Phase0SignedBeaconBlock, preset::Mainnet,
1049+
};
9351050

9361051
use super::*;
9371052

1053+
#[test]
1054+
fn test_prune_old_blocks_and_states() -> Result<()> {
1055+
let database = Database::persistent("test_db", TempDir::new()?, ByteSize::mib(10), false)?;
1056+
let block = SignedBeaconBlock::<Mainnet>::Phase0(Phase0SignedBeaconBlock::default());
1057+
1058+
database.put_batch(vec![
1059+
// Slot 1
1060+
serialize(BlockRootBySlot(1), H256::repeat_byte(1))?,
1061+
serialize(FinalizedBlockByRoot(H256::repeat_byte(1)), &block)?,
1062+
serialize(UnfinalizedBlockByRoot(H256::repeat_byte(1)), &block)?,
1063+
serialize(SlotByStateRoot(H256::repeat_byte(1)), 1_u64)?,
1064+
serialize(StateByBlockRoot(H256::repeat_byte(1)), 1_u64)?,
1065+
// Slot 3
1066+
serialize(BlockRootBySlot(3), H256::repeat_byte(3))?,
1067+
serialize(FinalizedBlockByRoot(H256::repeat_byte(3)), &block)?,
1068+
// Slot 5
1069+
serialize(BlockRootBySlot(5), H256::repeat_byte(5))?,
1070+
serialize(UnfinalizedBlockByRoot(H256::repeat_byte(5)), &block)?,
1071+
//Slot 6
1072+
serialize(BlockRootBySlot(6), H256::repeat_byte(6))?,
1073+
serialize(UnfinalizedBlockByRoot(H256::repeat_byte(6)), &block)?,
1074+
serialize(SlotByStateRoot(H256::repeat_byte(6)), 6_u64)?,
1075+
serialize(StateByBlockRoot(H256::repeat_byte(6)), 6_u64)?,
1076+
// Slot 10, test case that "10" < "3" is not true
1077+
serialize(BlockRootBySlot(10), H256::repeat_byte(10))?,
1078+
serialize(UnfinalizedBlockByRoot(H256::repeat_byte(10)), &block)?,
1079+
serialize(SlotByStateRoot(H256::repeat_byte(10)), 10_u64)?,
1080+
serialize(StateByBlockRoot(H256::repeat_byte(10)), 10_u64)?,
1081+
])?;
1082+
1083+
let storage = Storage::<Mainnet>::new(
1084+
Arc::new(Config::mainnet()),
1085+
database,
1086+
nonzero!(64_u64),
1087+
true,
1088+
);
1089+
1090+
assert_eq!(storage.finalized_block_count()?, 2);
1091+
assert_eq!(storage.unfinalized_block_count()?, 4);
1092+
assert_eq!(storage.block_root_by_slot_count()?, 5);
1093+
assert_eq!(storage.slot_by_state_root_count()?, 3);
1094+
assert_eq!(storage.state_count()?, 3);
1095+
1096+
storage.prune_old_blocks_and_states(5)?;
1097+
1098+
assert_eq!(storage.finalized_block_count()?, 0);
1099+
assert_eq!(storage.unfinalized_block_count()?, 3);
1100+
assert_eq!(storage.block_root_by_slot_count()?, 3);
1101+
assert_eq!(storage.slot_by_state_root_count()?, 3);
1102+
assert_eq!(storage.state_count()?, 2);
1103+
1104+
storage.prune_old_state_roots(5)?;
1105+
1106+
assert_eq!(storage.slot_by_state_root_count()?, 2);
1107+
1108+
Ok(())
1109+
}
1110+
9381111
#[test]
9391112
#[expect(clippy::similar_names)]
9401113
fn test_prune_old_blob_sidecars() -> Result<()> {

fork_choice_store/src/store.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3070,15 +3070,24 @@ impl<P: Preset> Store<P> {
30703070
self.blob_cache.unpersisted_blob_sidecars()
30713071
}
30723072

3073-
pub fn should_check_data_availability_at_slot(&self, slot: Slot) -> bool {
3074-
let min_checked_epoch = self.chain_config.deneb_fork_epoch.max(
3073+
pub fn min_checked_block_availability_epoch(&self) -> Epoch {
3074+
self.tick
3075+
.epoch::<P>()
3076+
.checked_sub(self.chain_config.min_epochs_for_block_requests)
3077+
.unwrap_or(GENESIS_EPOCH)
3078+
}
3079+
3080+
pub fn min_checked_data_availability_epoch(&self) -> Epoch {
3081+
self.chain_config.deneb_fork_epoch.max(
30753082
self.tick
30763083
.epoch::<P>()
30773084
.checked_sub(self.chain_config.min_epochs_for_blob_sidecars_requests)
30783085
.unwrap_or(GENESIS_EPOCH),
3079-
);
3086+
)
3087+
}
30803088

3081-
misc::compute_epoch_at_slot::<P>(slot) >= min_checked_epoch
3089+
pub fn should_check_data_availability_at_slot(&self, slot: Slot) -> bool {
3090+
misc::compute_epoch_at_slot::<P>(slot) >= self.min_checked_data_availability_epoch()
30823091
}
30833092

30843093
pub fn state_cache(&self) -> Arc<StateCacheProcessor<P>> {

types/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ pub struct Config {
143143
#[serde(with = "serde_utils::string_or_native")]
144144
pub min_epochs_for_blob_sidecars_requests: u64,
145145
#[serde(with = "serde_utils::string_or_native")]
146+
pub min_epochs_for_block_requests: u64,
147+
#[serde(with = "serde_utils::string_or_native")]
146148
pub blob_sidecar_subnet_count: NonZeroU64,
147149
#[serde(with = "serde_utils::string_or_native")]
148150
pub data_column_sidecar_subnet_count: u64,
@@ -240,6 +242,7 @@ impl Default for Config {
240242
max_request_blob_sidecars: 768,
241243
max_request_data_column_sidecars: 0x4000,
242244
min_epochs_for_blob_sidecars_requests: 4096,
245+
min_epochs_for_block_requests: 33024,
243246
blob_sidecar_subnet_count: nonzero!(6_u64),
244247
data_column_sidecar_subnet_count: 64,
245248

@@ -332,6 +335,9 @@ impl Config {
332335
deposit_contract_address: H160(hex!("1234567890123456789012345678901234567890")),
333336
deposit_network_id: 5,
334337

338+
// Networking
339+
min_epochs_for_block_requests: 272,
340+
335341
..Self::default()
336342
}
337343
}

0 commit comments

Comments
 (0)