Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
### Breaking

- [#5559](https://github.com/ChainSafe/forest/pull/5559) Change `Filecoin.ChainGetMinBaseFee` to `Forest.ChainGetMinBaseFee` with read access.
- [#5589](https://github.com/ChainSafe/forest/pull/5589) Replace existing `Filecoin.SyncState` API with new `Forest.SyncStatus` to track node syncing progress specific to Forest.

### Added

Expand Down
113 changes: 46 additions & 67 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
//!
//! The state machine does not do any network requests or validation. Those are
//! handled by an external actor.
use std::time::SystemTime;
use std::{ops::Deref as _, sync::Arc};

use crate::libp2p::hello::HelloRequest;
use crate::message_pool::MessagePool;
use crate::message_pool::MpoolRpcProvider;
Expand All @@ -31,11 +28,15 @@ use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use libp2p::PeerId;
use parking_lot::Mutex;
use std::time::SystemTime;
use std::{ops::Deref as _, sync::Arc};
use tokio::{sync::Notify, task::JoinSet};
use tracing::{debug, error, info, trace, warn};

use crate::chain_sync::SyncState;
use super::network_context::SyncNetworkContext;
use crate::chain_sync::sync_status::SyncStatusReport;
use crate::chain_sync::tipset_syncer::validate_tipset;
use crate::chain_sync::{ForkSyncInfo, ForkSyncStage};
use crate::{
blocks::{Block, FullTipset, Tipset, TipsetKey},
chain::ChainStore,
Expand All @@ -44,12 +45,9 @@ use crate::{
};
use parking_lot::RwLock;

use super::SyncStage;
use super::network_context::SyncNetworkContext;

pub struct ChainFollower<DB> {
/// Syncing state of chain sync workers.
pub sync_states: Arc<RwLock<nunny::Vec<SyncState>>>,
/// Syncing status of the chain
pub sync_status: Arc<RwLock<SyncStatusReport>>,

/// manages retrieving and updates state objects
state_manager: Arc<StateManager<DB>>,
Expand Down Expand Up @@ -93,14 +91,9 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
stateless_mode: bool,
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
) -> Self {
let heaviest = state_manager.chain_store().heaviest_tipset();
let mut main_sync_state = SyncState::default();
main_sync_state.init(heaviest.clone(), heaviest.clone());
main_sync_state.set_epoch(heaviest.epoch());
main_sync_state.set_stage(SyncStage::Idle);
let (tipset_sender, tipset_receiver) = flume::bounded(20);
Self {
sync_states: Arc::new(RwLock::new(nunny::vec![main_sync_state])),
sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
state_manager,
network,
genesis,
Expand All @@ -121,7 +114,7 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
self.tipset_receiver,
self.network,
self.mem_pool,
self.sync_states,
self.sync_status,
self.genesis,
self.stateless_mode,
)
Expand All @@ -138,7 +131,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
tipset_receiver: flume::Receiver<Arc<FullTipset>>,
network: SyncNetworkContext<DB>,
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
sync_states: Arc<RwLock<nunny::Vec<SyncState>>>,
sync_status: Arc<RwLock<SyncStatusReport>>,
genesis: Arc<Tipset>,
stateless_mode: bool,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -228,7 +221,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
}
});

// When the state machine is updated, we need to update the sync states and spawn tasks
// When the state machine is updated, we need to update the sync status and spawn tasks
set.spawn({
let state_manager = state_manager.clone();
let state_machine = state_machine.clone();
Expand All @@ -239,31 +232,16 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
state_changed.notified().await;

let mut tasks_set = tasks.lock();
let (task_vec, states) = state_machine.lock().tasks();
let (task_vec, current_active_forks) = state_machine.lock().tasks();

// Update the sync states
{
let heaviest = state_manager.chain_store().heaviest_tipset();
let mut sync_states_guard = sync_states.write();

sync_states_guard.truncate(std::num::NonZeroUsize::new(1).unwrap());
let first = sync_states_guard.first_mut();
first.set_epoch(heaviest.epoch());
first.set_target(Some(
state_machine
.lock()
.heaviest_tipset()
.unwrap_or(heaviest.clone()),
));
let seconds_per_epoch = state_manager.chain_config().block_delay_secs;
let time_diff =
(Utc::now().timestamp() as u64).saturating_sub(heaviest.min_timestamp());
if time_diff < seconds_per_epoch as u64 * 2 {
first.set_stage(SyncStage::Complete);
} else {
first.set_stage(SyncStage::Messages);
}
sync_states_guard.extend(states);
let mut status_report_guard = sync_status.write();
status_report_guard.update(
&state_manager,
current_active_forks,
stateless_mode,
);
}

for task in task_vec {
Expand Down Expand Up @@ -322,7 +300,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(

// Only print 'Catching up to HEAD' if we're more than 10 epochs
// behind. Otherwise it can be too spammy.
match (expected_head as i64 - heaviest_epoch > 10, to_download > 0) {
match (expected_head - heaviest_epoch > 10, to_download > 0) {
(true, true) => info!(
"Catching up to HEAD: {} -> {}, downloading {} tipsets",
heaviest_epoch, expected_head, to_download
Expand Down Expand Up @@ -582,13 +560,6 @@ impl<DB: Blockstore> SyncStateMachine<DB> {
chains
}

fn heaviest_tipset(&self) -> Option<Arc<Tipset>> {
self.tipsets
.values()
.max_by_key(|ts| ts.weight())
.map(|ts| Arc::new(ts.deref().clone().into_tipset()))
}

fn is_validated(&self, tipset: &FullTipset) -> bool {
let db = self.cs.blockstore();
self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
Expand Down Expand Up @@ -732,36 +703,44 @@ impl<DB: Blockstore> SyncStateMachine<DB> {
}
}

pub fn tasks(&self) -> (Vec<SyncTask>, Vec<SyncState>) {
let mut states = Vec::new();
pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
// Get the node's current validated head epoch once, as it's the same for all forks.
let current_validated_epoch = self.cs.heaviest_tipset().epoch();
let now = Utc::now();

let mut active_sync_info = Vec::new();
let mut tasks = Vec::new();
for chain in self.chains() {
if let Some(first_ts) = chain.first() {
let last = chain.last().expect("Infallible");
let mut state = SyncState::default();
state.init(
Arc::new(first_ts.deref().clone().into_tipset()),
Arc::new(last.deref().clone().into_tipset()),
);
state.set_epoch(first_ts.epoch());
let last_ts = chain.last().expect("Infallible");
let stage: ForkSyncStage;
let start_time = Some(now);

if !self.is_ready_for_validation(first_ts) {
state.set_stage(SyncStage::Headers);
stage = ForkSyncStage::FetchingHeaders;
tasks.push(SyncTask::FetchTipset(
first_ts.parents().clone(),
first_ts.epoch(),
));
} else {
if last.epoch() - first_ts.epoch() > 5 {
state.set_stage(SyncStage::Messages);
} else {
state.set_stage(SyncStage::Complete);
}
stage = ForkSyncStage::ValidatingTipsets;
tasks.push(SyncTask::ValidateTipset(first_ts.clone()));
}
states.push(state);

let fork_info = ForkSyncInfo {
target_tipset_key: last_ts.key().clone(),
target_epoch: last_ts.epoch(),
target_sync_epoch_start: first_ts.epoch(),
stage,
validated_chain_head_epoch: current_validated_epoch,
start_time,
last_updated: Some(now),
};

active_sync_info.push(fork_info);
}
}
(tasks, states)
(tasks, active_sync_info)
}
}

Expand Down Expand Up @@ -898,7 +877,7 @@ mod tests {
}

#[test]
fn test_sync_state_machine_validation_order() {
fn test_state_machine_validation_order() {
let (cs, c4u) = setup();
let db = cs.db.clone();

Expand Down Expand Up @@ -927,7 +906,7 @@ mod tests {
// Record validation order by processing all validation tasks in each iteration
let mut validation_tasks = Vec::new();
loop {
let (tasks, _states) = state_machine.tasks();
let (tasks, _) = state_machine.tasks();

// Find all validation tasks
let validation_tipsets: Vec<_> = tasks
Expand Down
4 changes: 2 additions & 2 deletions src/chain_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod chain_muxer;
pub mod consensus;
pub mod metrics;
pub mod network_context;
mod sync_state;
mod sync_status;
mod tipset_syncer;
mod validation;

Expand All @@ -16,6 +16,6 @@ pub use self::{
chain_follower::ChainFollower,
chain_muxer::SyncConfig,
consensus::collect_errs,
sync_state::{SyncStage, SyncState},
sync_status::{ForkSyncInfo, ForkSyncStage, NodeSyncStatus, SyncStatusReport},
validation::{TipsetValidationError, TipsetValidator},
};
Loading
Loading