Skip to content

Commit e35b912

Browse files
committed
Add standard storage mode and improve back-sync handling (30):
Store modes and back-sync: - Back-sync blocks to `Config::min_epochs_for_block_requests` in standard storage mode - Back-sync blob sidecars to `Config::min_epochs_for_blob_sidecars_requests` - Track & filter peers that don't serve blocks prior to `Config::min_epochs_for_block_requests` when performing full back-sync - Remove `Feature::TrustBackSyncBlocks` - Verify signatures of back-synced blocks - Move back-sync status to `Store` - Relocate `received_blob_sidecars` and `received_block_roots` caches from `p2p::Network` to `p2p::BlockSyncService` - Extend `SyncBatch` with `retry_count` and `responses_received` fields - Use smaller back-sync batches when syncing with blobs - Don't validate signature of genesis block - Track state archival progress in database to be able to resume it after restart - Don't request data from busy peers DB: - Add db-info command to inspect the Sync database - Replace read-only boolean flag with more descriptive `DatabaseMode` enum Other: - Panic to trigger app-restart if network thread is down - Handle exit signal in an archiver thread & batch archiver updates to db - Rename `RequestType` to `RPCRequestType` as it conflicts with updated `eth2_libp2p` - Log peer reporting in debug log - Log minimal back-sync info when starting back-sync - Don't log RPC received blocks to info logs (too much output during syncing)
1 parent c0678b0 commit e35b912

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1657
-663
lines changed

Cargo.lock

Lines changed: 4 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benches/benches/fork_choice_store.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ impl Criterion {
6969
anchor_block,
7070
anchor_state,
7171
false,
72+
false,
7273
);
7374

7475
for slot in (anchor_slot + 1)..=last_attestation_slot {

database/src/lib.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,56 @@ use unwrap_none::UnwrapNone as _;
2020
const GROWTH_STEP: ByteSize = ByteSize::mib(256);
2121
const MAX_NAMED_DATABASES: usize = 10;
2222

23+
#[derive(Clone, Copy)]
24+
pub enum DatabaseMode {
25+
ReadOnly,
26+
ReadWrite,
27+
}
28+
29+
impl DatabaseMode {
30+
#[must_use]
31+
pub const fn is_read_only(self) -> bool {
32+
matches!(self, Self::ReadOnly)
33+
}
34+
35+
#[must_use]
36+
pub const fn mode_permissions(self) -> u16 {
37+
match self {
38+
// <https://erthink.github.io/libmdbx/group__c__opening.html#gabb7dd3b10dd31639ba252df545e11768>
39+
// The UNIX permissions to set on created files. Zero value means to open existing, but do not create.
40+
Self::ReadOnly => 0,
41+
Self::ReadWrite => 0o600,
42+
}
43+
}
44+
45+
#[must_use]
46+
#[cfg(target_os = "linux")]
47+
pub fn permissions(self) -> u32 {
48+
self.mode_permissions().into()
49+
}
50+
51+
#[must_use]
52+
#[cfg(not(target_os = "linux"))]
53+
pub const fn permissions(self) -> u16 {
54+
self.mode_permissions()
55+
}
56+
}
57+
2358
pub struct Database(DatabaseKind);
2459

2560
impl Database {
2661
pub fn persistent(
2762
name: &str,
2863
directory: impl AsRef<Path>,
2964
max_size: ByteSize,
30-
read_only: bool,
65+
mode: DatabaseMode,
3166
) -> Result<Self> {
3267
// If a database with the legacy name exists, keep using it.
3368
// Otherwise, create a new database with the specified name.
3469
// This check will not force existing users to resync.
3570
let legacy_name = directory.as_ref().to_str().ok_or(Error)?;
3671

37-
if !read_only {
72+
if !mode.is_read_only() {
3873
fs_err::create_dir_all(&directory)?;
3974
}
4075

@@ -48,14 +83,14 @@ impl Database {
4883
shrink_threshold: None,
4984
page_size: None,
5085
})
51-
.open_with_permissions(directory.as_ref(), 0o600)?;
86+
.open_with_permissions(directory.as_ref(), mode.permissions())?;
5287

5388
let transaction = environment.begin_rw_txn()?;
5489
let existing_db = transaction.open_db(Some(legacy_name));
5590

5691
let database_name = if existing_db.is_err() {
5792
info!("database: {legacy_name} with name {name}");
58-
if !read_only {
93+
if !mode.is_read_only() {
5994
transaction.create_db(Some(name), DatabaseFlags::default())?;
6095
}
6196

@@ -753,7 +788,13 @@ mod tests {
753788
}
754789

755790
fn build_persistent_database() -> Result<Database> {
756-
let database = Database::persistent("test_db", TempDir::new()?, ByteSize::mib(1), false)?;
791+
let database = Database::persistent(
792+
"test_db",
793+
TempDir::new()?,
794+
ByteSize::mib(1),
795+
DatabaseMode::ReadWrite,
796+
)?;
797+
757798
populate_database(&database)?;
758799
Ok(database)
759800
}

features/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ pub enum Feature {
4141
SubscribeToAllAttestationSubnets,
4242
SubscribeToAllSyncCommitteeSubnets,
4343
TrackMetrics,
44-
TrustBackSyncBlocks,
4544
// By default we fully validate objects produced by the current instance of the application.
4645
// This costs some resources but may help in case of bugs.
4746
TrustOwnAttestationSignatures,

fork_choice_control/src/controller.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// The downside is that submitting the same object multiple times in quick succession will result in
99
// it being processed multiple times in parallel redundantly.
1010

11-
use core::panic::AssertUnwindSafe;
11+
use core::{panic::AssertUnwindSafe, sync::atomic::AtomicBool};
1212
use std::{
1313
sync::{mpsc::Sender, Arc},
1414
thread::{Builder, JoinHandle},
@@ -106,6 +106,7 @@ where
106106
validator_tx: impl UnboundedSink<ValidatorMessage<P, W>>,
107107
storage: Arc<Storage<P>>,
108108
unfinalized_blocks: impl DoubleEndedIterator<Item = Result<Arc<SignedBeaconBlock<P>>>>,
109+
finished_back_sync: bool,
109110
) -> Result<(Arc<Self>, MutatorHandle<P, W>)> {
110111
let finished_initial_forward_sync = anchor_block.message().slot() >= tick.slot;
111112

@@ -115,6 +116,7 @@ where
115116
anchor_block,
116117
anchor_state,
117118
finished_initial_forward_sync,
119+
finished_back_sync,
118120
);
119121

120122
store.apply_tick(tick)?;
@@ -198,6 +200,14 @@ where
198200
.send(&self.mutator_tx)
199201
}
200202

203+
pub fn on_back_sync_status(&self, is_back_synced: bool) {
204+
MutatorMessage::BackSyncStatus {
205+
wait_group: self.owned_wait_group(),
206+
is_back_synced,
207+
}
208+
.send(&self.mutator_tx)
209+
}
210+
201211
pub fn on_gossip_block(&self, block: Arc<SignedBeaconBlock<P>>, gossip_id: GossipId) {
202212
self.spawn_block_task(block, BlockOrigin::Gossip(gossip_id))
203213
}
@@ -435,6 +445,13 @@ where
435445
})
436446
}
437447

448+
pub fn store_back_sync_blob_sidecars(
449+
&self,
450+
blob_sidecars: impl IntoIterator<Item = Arc<BlobSidecar<P>>>,
451+
) -> Result<()> {
452+
self.storage.store_back_sync_blob_sidecars(blob_sidecars)
453+
}
454+
438455
pub fn store_back_sync_blocks(
439456
&self,
440457
blocks: impl IntoIterator<Item = Arc<SignedBeaconBlock<P>>>,
@@ -447,9 +464,14 @@ where
447464
start_slot: Slot,
448465
end_slot: Slot,
449466
anchor_checkpoint_provider: &AnchorCheckpointProvider<P>,
467+
is_exiting: &Arc<AtomicBool>,
450468
) -> Result<()> {
451-
self.storage
452-
.archive_back_sync_states(start_slot, end_slot, anchor_checkpoint_provider)
469+
self.storage.archive_back_sync_states(
470+
start_slot,
471+
end_slot,
472+
anchor_checkpoint_provider,
473+
is_exiting,
474+
)
453475
}
454476

455477
fn spawn_blob_sidecar_task(

fork_choice_control/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ pub use crate::{
2121
AttestationVerifierMessage, P2pMessage, PoolMessage, SubnetMessage, SyncMessage,
2222
ValidatorMessage,
2323
},
24-
misc::{MutatorRejectionReason, VerifyAggregateAndProofResult, VerifyAttestationResult},
24+
misc::{
25+
MutatorRejectionReason, StorageMode, VerifyAggregateAndProofResult, VerifyAttestationResult,
26+
},
2527
queries::{BlockWithRoot, ForkChoiceContext, ForkTip, Snapshot},
2628
specialized::{AdHocBenchController, BenchController},
2729
storage::{
28-
BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot, PrefixableKey,
29-
SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint, UnfinalizedBlockByRoot,
30+
get, save, BlobSidecarByBlobId, BlockCheckpoint, BlockRootBySlot, FinalizedBlockByRoot,
31+
PrefixableKey, SlotBlobId, SlotByStateRoot, StateByBlockRoot, StateCheckpoint,
32+
StateLoadStrategy, Storage, UnfinalizedBlockByRoot, DEFAULT_ARCHIVAL_EPOCH_INTERVAL,
3033
},
31-
storage::{StateLoadStrategy, Storage, DEFAULT_ARCHIVAL_EPOCH_INTERVAL},
3234
storage_tool::{export_state_and_blocks, replay_blocks},
3335
wait::Wait,
3436
};

fork_choice_control/src/messages.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ pub enum MutatorMessage<P: Preset, W> {
6767
wait_group: W,
6868
tick: Tick,
6969
},
70+
BackSyncStatus {
71+
wait_group: W,
72+
is_back_synced: bool,
73+
},
7074
Block {
7175
wait_group: W,
7276
result: Result<BlockAction<P>>,

fork_choice_control/src/misc.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,22 @@ pub enum MutatorRejectionReason {
120120
InvalidBlock,
121121
InvalidBlobSidecar,
122122
}
123+
124+
#[derive(Clone, Copy, Debug)]
125+
pub enum StorageMode {
126+
Prune,
127+
Standard,
128+
Archive,
129+
}
130+
131+
impl StorageMode {
132+
#[must_use]
133+
pub const fn is_prune(self) -> bool {
134+
matches!(self, Self::Prune)
135+
}
136+
137+
#[must_use]
138+
pub const fn is_archive(self) -> bool {
139+
matches!(self, Self::Archive)
140+
}
141+
}

fork_choice_control/src/mutator.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ where
190190
.expect("sender in Controller is not dropped until mutator thread exits")
191191
{
192192
MutatorMessage::Tick { wait_group, tick } => self.handle_tick(&wait_group, tick)?,
193+
MutatorMessage::BackSyncStatus {
194+
wait_group,
195+
is_back_synced,
196+
} => self.handle_back_sync_status(wait_group, is_back_synced),
193197
MutatorMessage::Block {
194198
wait_group,
195199
result,
@@ -440,7 +444,7 @@ where
440444
}
441445

442446
if self.store.is_forward_synced() && misc::slots_since_epoch_start::<P>(tick.slot) == 0 {
443-
if tick.kind == TickKind::AttestFourth {
447+
if tick.kind == TickKind::AttestFourth && self.store.is_back_synced() {
444448
self.prune_old_records()?;
445449
}
446450

@@ -467,6 +471,15 @@ where
467471
Ok(())
468472
}
469473

474+
fn handle_back_sync_status(&mut self, wait_group: W, is_back_synced: bool) {
475+
if self.store.is_back_synced() != is_back_synced {
476+
self.store_mut().set_back_synced(is_back_synced);
477+
self.update_store_snapshot();
478+
}
479+
480+
drop(wait_group);
481+
}
482+
470483
#[expect(clippy::too_many_lines)]
471484
fn handle_block(
472485
&mut self,
@@ -1636,13 +1649,15 @@ where
16361649
self.event_channels
16371650
.send_blob_sidecar_event(block_root, blob_sidecar);
16381651

1639-
self.spawn(PersistBlobSidecarsTask {
1640-
store_snapshot: self.owned_store(),
1641-
storage: self.storage.clone_arc(),
1642-
mutator_tx: self.owned_mutator_tx(),
1643-
wait_group: wait_group.clone(),
1644-
metrics: self.metrics.clone(),
1645-
});
1652+
if !self.storage.prune_storage_enabled() {
1653+
self.spawn(PersistBlobSidecarsTask {
1654+
store_snapshot: self.owned_store(),
1655+
storage: self.storage.clone_arc(),
1656+
mutator_tx: self.owned_mutator_tx(),
1657+
wait_group: wait_group.clone(),
1658+
metrics: self.metrics.clone(),
1659+
});
1660+
}
16461661

16471662
self.handle_potential_head_change(wait_group, &old_head, head_was_optimistic);
16481663
}
@@ -2337,6 +2352,10 @@ where
23372352
}
23382353

23392354
fn prune_old_records(&self) -> Result<()> {
2355+
if self.storage.archive_storage_enabled() {
2356+
return Ok(());
2357+
}
2358+
23402359
let storage = self.storage.clone_arc();
23412360
let blobs_up_to_epoch = self.store.min_checked_data_availability_epoch();
23422361
let blobs_up_to_slot = misc::compute_start_slot_at_epoch::<P>(blobs_up_to_epoch);
@@ -2376,7 +2395,9 @@ where
23762395

23772396
match storage.prune_old_state_roots(blocks_up_to_slot) {
23782397
Ok(()) => {
2379-
debug!("pruned old state roots from storage up to slot {blocks_up_to_slot}");
2398+
debug!(
2399+
"pruned old state roots from storage up to slot {blocks_up_to_slot}"
2400+
);
23802401
}
23812402
Err(error) => {
23822403
error!("pruning old state roots from storage failed: {error:?}")

0 commit comments

Comments
 (0)