Skip to content

Commit ebf05a1

Browse files
committed
feat: add forest sync status
1 parent 60a8e5f commit ebf05a1

File tree

10 files changed

+372
-107
lines changed

10 files changed

+372
-107
lines changed

src/chain_sync/chain_follower.rs

Lines changed: 48 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
//!
1616
//! The state machine does not do any network requests or validation. Those are
1717
//! handled by an external actor.
18-
use std::time::SystemTime;
19-
use std::{ops::Deref as _, sync::Arc};
20-
2118
use crate::libp2p::hello::HelloRequest;
2219
use crate::message_pool::MessagePool;
2320
use crate::message_pool::MpoolRpcProvider;
@@ -31,11 +28,16 @@ use fvm_ipld_blockstore::Blockstore;
3128
use itertools::Itertools;
3229
use libp2p::PeerId;
3330
use parking_lot::Mutex;
31+
use std::time::SystemTime;
32+
use std::{ops::Deref as _, sync::Arc};
3433
use tokio::{sync::Notify, task::JoinSet};
3534
use tracing::{debug, error, info, trace, warn};
3635

37-
use crate::chain_sync::SyncState;
36+
use super::SyncStage;
37+
use super::network_context::SyncNetworkContext;
38+
use crate::chain_sync::sync_status::ForestSyncStatusReport;
3839
use crate::chain_sync::tipset_syncer::validate_tipset;
40+
use crate::chain_sync::{ForkSyncInfo, ForkSyncStage, SyncState};
3941
use crate::{
4042
blocks::{Block, FullTipset, Tipset, TipsetKey},
4143
chain::ChainStore,
@@ -44,13 +46,13 @@ use crate::{
4446
};
4547
use parking_lot::RwLock;
4648

47-
use super::SyncStage;
48-
use super::network_context::SyncNetworkContext;
49-
5049
pub struct ChainFollower<DB> {
5150
/// Syncing state of chain sync workers.
5251
pub sync_states: Arc<RwLock<nunny::Vec<SyncState>>>,
5352

53+
/// Syncing status of the chain
54+
pub sync_status: Arc<RwLock<ForestSyncStatusReport>>,
55+
5456
/// manages retrieving and updates state objects
5557
state_manager: Arc<StateManager<DB>>,
5658

@@ -101,6 +103,7 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
101103
let (tipset_sender, tipset_receiver) = flume::bounded(20);
102104
Self {
103105
sync_states: Arc::new(RwLock::new(nunny::vec![main_sync_state])),
106+
sync_status: Arc::new(RwLock::new(ForestSyncStatusReport::new())),
104107
state_manager,
105108
network,
106109
genesis,
@@ -122,6 +125,7 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
122125
self.network,
123126
self.mem_pool,
124127
self.sync_states,
128+
self.sync_status,
125129
self.genesis,
126130
self.stateless_mode,
127131
)
@@ -138,7 +142,8 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
138142
tipset_receiver: flume::Receiver<Arc<FullTipset>>,
139143
network: SyncNetworkContext<DB>,
140144
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
141-
sync_states: Arc<RwLock<nunny::Vec<SyncState>>>,
145+
_sync_states: Arc<RwLock<nunny::Vec<SyncState>>>,
146+
sync_status: Arc<RwLock<ForestSyncStatusReport>>,
142147
genesis: Arc<Tipset>,
143148
stateless_mode: bool,
144149
) -> anyhow::Result<()> {
@@ -228,7 +233,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
228233
}
229234
});
230235

231-
// When the state machine is updated, we need to update the sync states and spawn tasks
236+
// When the state machine is updated, we need to update the sync status and spawn tasks
232237
set.spawn({
233238
let state_manager = state_manager.clone();
234239
let state_machine = state_machine.clone();
@@ -239,31 +244,16 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
239244
state_changed.notified().await;
240245

241246
let mut tasks_set = tasks.lock();
242-
let (task_vec, states) = state_machine.lock().tasks();
247+
let (task_vec, current_active_forks) = state_machine.lock().tasks();
243248

244249
// Update the sync states
245250
{
246-
let heaviest = state_manager.chain_store().heaviest_tipset();
247-
let mut sync_states_guard = sync_states.write();
248-
249-
sync_states_guard.truncate(std::num::NonZeroUsize::new(1).unwrap());
250-
let first = sync_states_guard.first_mut();
251-
first.set_epoch(heaviest.epoch());
252-
first.set_target(Some(
253-
state_machine
254-
.lock()
255-
.heaviest_tipset()
256-
.unwrap_or(heaviest.clone()),
257-
));
258-
let seconds_per_epoch = state_manager.chain_config().block_delay_secs;
259-
let time_diff =
260-
(Utc::now().timestamp() as u64).saturating_sub(heaviest.min_timestamp());
261-
if time_diff < seconds_per_epoch as u64 * 2 {
262-
first.set_stage(SyncStage::Complete);
263-
} else {
264-
first.set_stage(SyncStage::Messages);
265-
}
266-
sync_states_guard.extend(states);
251+
let mut status_report_guard = sync_status.write();
252+
status_report_guard.update(
253+
&state_manager,
254+
current_active_forks,
255+
stateless_mode,
256+
);
267257
}
268258

269259
for task in task_vec {
@@ -732,36 +722,45 @@ impl<DB: Blockstore> SyncStateMachine<DB> {
732722
}
733723
}
734724

735-
pub fn tasks(&self) -> (Vec<SyncTask>, Vec<SyncState>) {
736-
let mut states = Vec::new();
725+
pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
726+
// Get the node's current validated head epoch once, as it's the same for all forks.
727+
let current_validated_epoch = self.cs.heaviest_tipset().epoch();
728+
let now = Utc::now();
729+
730+
let mut active_sync_info = Vec::new();
737731
let mut tasks = Vec::new();
738732
for chain in self.chains() {
739733
if let Some(first_ts) = chain.first() {
740-
let last = chain.last().expect("Infallible");
741-
let mut state = SyncState::default();
742-
state.init(
743-
Arc::new(first_ts.deref().clone().into_tipset()),
744-
Arc::new(last.deref().clone().into_tipset()),
745-
);
746-
state.set_epoch(first_ts.epoch());
734+
let last_ts = chain.last().expect("Infallible");
735+
let stage: ForkSyncStage;
736+
let start_time = Some(now);
737+
747738
if !self.is_ready_for_validation(first_ts) {
748-
state.set_stage(SyncStage::Headers);
739+
stage = ForkSyncStage::FetchingHeaders;
749740
tasks.push(SyncTask::FetchTipset(
750741
first_ts.parents().clone(),
751742
first_ts.epoch(),
752743
));
753744
} else {
754-
if last.epoch() - first_ts.epoch() > 5 {
755-
state.set_stage(SyncStage::Messages);
756-
} else {
757-
state.set_stage(SyncStage::Complete);
758-
}
745+
stage = ForkSyncStage::ValidatingTipsets;
759746
tasks.push(SyncTask::ValidateTipset(first_ts.clone()));
760747
}
761-
states.push(state);
748+
749+
let fork_info = ForkSyncInfo {
750+
target_tipset_key: last_ts.key().clone(),
751+
target_epoch: last_ts.epoch(),
752+
// The epoch from which sync activities (fetch/validate) need to start for this fork.
753+
target_sync_epoch_start: first_ts.epoch(),
754+
stage,
755+
validated_chain_head_epoch: current_validated_epoch,
756+
start_time, // Track when this fork's sync task was initiated
757+
last_updated: Some(now), // Mark the last update time
758+
};
759+
760+
active_sync_info.push(fork_info);
762761
}
763762
}
764-
(tasks, states)
763+
(tasks, active_sync_info)
765764
}
766765
}
767766

@@ -927,7 +926,7 @@ mod tests {
927926
// Record validation order by processing all validation tasks in each iteration
928927
let mut validation_tasks = Vec::new();
929928
loop {
930-
let (tasks, _states) = state_machine.tasks();
929+
let (tasks, _) = state_machine.tasks();
931930

932931
// Find all validation tasks
933932
let validation_tipsets: Vec<_> = tasks

src/chain_sync/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub mod consensus;
88
pub mod metrics;
99
pub mod network_context;
1010
mod sync_state;
11+
mod sync_status;
1112
mod tipset_syncer;
1213
mod validation;
1314

@@ -17,5 +18,12 @@ pub use self::{
1718
chain_muxer::SyncConfig,
1819
consensus::collect_errs,
1920
sync_state::{SyncStage, SyncState},
21+
sync_status::{
22+
// Export new types
23+
ForestSyncStatusReport,
24+
ForkSyncInfo,
25+
ForkSyncStage,
26+
NodeSyncStatus,
27+
},
2028
validation::{TipsetValidationError, TipsetValidator},
2129
};

src/chain_sync/sync_status.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright 2019-2025 ChainSafe Systems
2+
// SPDX-License-Identifier: Apache-2.0, MIT
3+
use crate::blocks::TipsetKey;
4+
use crate::lotus_json::lotus_json_with_self;
5+
use crate::networks::calculate_expected_epoch;
6+
use crate::shim::clock::ChainEpoch;
7+
use crate::state_manager::StateManager;
8+
use chrono::{DateTime, Utc};
9+
use fvm_ipld_blockstore::Blockstore;
10+
use schemars::JsonSchema;
11+
use serde::{Deserialize, Serialize};
12+
use std::fmt::Formatter;
13+
use std::sync::Arc;
14+
15+
/// Represents the overall synchronization status of the Forest node.
16+
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, JsonSchema)]
17+
pub enum NodeSyncStatus {
18+
/// Node is initializing, status not yet determined.
19+
#[default]
20+
Initializing,
21+
/// Node is significantly behind the network head and actively downloading/validating.
22+
Syncing,
23+
/// Node is close to the network head (e.g., within a configurable threshold like ~5 epochs).
24+
Synced,
25+
/// An error occurred during the sync process.
26+
Error(String),
27+
/// Node is configured to not sync (offline mode).
28+
Offline,
29+
}
30+
31+
/// Represents the stage of processing for a specific chain fork being tracked.
32+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)]
33+
pub enum ForkSyncStage {
34+
/// Fetching necessary block headers for this fork.
35+
FetchingHeaders,
36+
/// Validating tipsets and messages for this fork.
37+
ValidatingTipsets,
38+
/// This fork sync process is complete (e.g., reached target, merged, or deemed invalid).
39+
Complete,
40+
/// Progress is stalled, potentially waiting for dependencies.
41+
Stalled,
42+
/// An error occurred processing this specific fork.
43+
Error(String),
44+
}
45+
46+
impl std::fmt::Display for ForkSyncStage {
47+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48+
match self {
49+
ForkSyncStage::FetchingHeaders => write!(f, "Fetching Headers"),
50+
ForkSyncStage::ValidatingTipsets => write!(f, "Validating Tipsets"),
51+
ForkSyncStage::Complete => write!(f, "Complete"),
52+
ForkSyncStage::Stalled => write!(f, "Stalled"),
53+
ForkSyncStage::Error(e) => write!(f, "{}", format!("Error: {}", e)),
54+
}
55+
}
56+
}
57+
58+
/// Contains information about a specific chain/fork the node is actively tracking or syncing.
59+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)]
60+
pub struct ForkSyncInfo {
61+
/// The target tipset key for this synchronization task.
62+
#[schemars(with = "crate::lotus_json::LotusJson<TipsetKey>")] // Keep LotusJson for TipsetKey if needed
63+
#[serde(with = "crate::lotus_json")]
64+
pub(crate) target_tipset_key: TipsetKey,
65+
/// The target epoch for this synchronization task.
66+
pub(crate) target_epoch: ChainEpoch,
67+
/// The lowest epoch that still needs processing (fetching or validating) for this target.
68+
/// This helps indicate the start of the current sync range.
69+
pub(crate) target_sync_epoch_start: ChainEpoch,
70+
/// The current stage of processing for this fork.
71+
pub(crate) stage: ForkSyncStage,
72+
/// The epoch of the heaviest fully validated tipset on the node's main chain.
73+
/// This shows overall node progress, distinct from fork-specific progress.
74+
pub(crate) validated_chain_head_epoch: ChainEpoch,
75+
/// When processing for this fork started.
76+
pub(crate) start_time: Option<DateTime<Utc>>,
77+
/// Last time status for this fork was updated.
78+
pub(crate) last_updated: Option<DateTime<Utc>>,
79+
}
80+
81+
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, JsonSchema)]
82+
pub struct ForestSyncStatusReport {
83+
/// Overall status of the node's synchronization.
84+
pub status: NodeSyncStatus,
85+
/// The epoch of the heaviest validated tipset on the node's main chain.
86+
pub current_head_epoch: ChainEpoch,
87+
/// The tipset key of the current heaviest validated tipset.
88+
#[schemars(with = "crate::lotus_json::LotusJson<TipsetKey>")]
89+
#[serde(with = "crate::lotus_json")]
90+
pub current_head_key: Option<TipsetKey>,
91+
/// An estimation of the current highest epoch on the network.
92+
pub network_head_epoch: ChainEpoch,
93+
/// Estimated number of epochs the node is behind the network head.
94+
/// Can be negative if the node is slightly ahead, due to estimation variance.
95+
pub epochs_behind: i64,
96+
/// List of active fork synchronization tasks the node is currently handling.
97+
pub active_forks: Vec<ForkSyncInfo>,
98+
/// When the node process started.
99+
pub node_start_time: DateTime<Utc>,
100+
/// Last time this status report was generated.
101+
pub last_updated: DateTime<Utc>,
102+
}
103+
104+
lotus_json_with_self!(ForestSyncStatusReport);
105+
106+
impl ForestSyncStatusReport {
107+
pub(crate) fn new() -> Self {
108+
Self {
109+
node_start_time: Utc::now(),
110+
..Default::default()
111+
}
112+
}
113+
114+
pub(crate) fn set_current_chain_head(&mut self, tipset_key: TipsetKey, epoch: ChainEpoch) {
115+
self.current_head_key = Some(tipset_key);
116+
self.current_head_epoch = epoch;
117+
}
118+
119+
pub(crate) fn set_network_head(&mut self, epoch: ChainEpoch) {
120+
self.network_head_epoch = epoch;
121+
}
122+
123+
pub(crate) fn set_epochs_behind(&mut self, epochs_behind: i64) {
124+
self.epochs_behind = epochs_behind;
125+
}
126+
127+
pub(crate) fn set_status(&mut self, status: NodeSyncStatus) {
128+
self.status = status;
129+
}
130+
131+
pub(crate) fn set_active_forks(&mut self, active_forks: Vec<ForkSyncInfo>) {
132+
self.active_forks = active_forks;
133+
}
134+
135+
pub(crate) fn update<DB: Blockstore + Sync + Send + 'static>(
136+
&mut self,
137+
state_manager: &Arc<StateManager<DB>>,
138+
current_active_forks: Vec<ForkSyncInfo>,
139+
stateless_mode: bool,
140+
) {
141+
let heaviest = state_manager.chain_store().heaviest_tipset();
142+
let current_chain_head_epoch = heaviest.epoch();
143+
self.set_current_chain_head(heaviest.key().clone(), current_chain_head_epoch);
144+
let network_head_epoch = calculate_expected_epoch(
145+
Utc::now().timestamp() as u64,
146+
state_manager.chain_store().genesis_block_header().timestamp,
147+
state_manager.chain_config().block_delay_secs,
148+
);
149+
150+
self.set_network_head(network_head_epoch.clone() as ChainEpoch);
151+
self.set_epochs_behind(network_head_epoch as i64 - current_chain_head_epoch as i64);
152+
let seconds_per_epoch = state_manager.chain_config().block_delay_secs;
153+
let time_diff = (Utc::now().timestamp() as u64).saturating_sub(heaviest.min_timestamp());
154+
155+
match stateless_mode {
156+
true => self.set_status(NodeSyncStatus::Offline),
157+
false => {
158+
if time_diff < seconds_per_epoch as u64 * 5 {
159+
self.set_status(NodeSyncStatus::Synced)
160+
} else {
161+
self.set_status(NodeSyncStatus::Syncing)
162+
}
163+
}
164+
}
165+
self.set_active_forks(current_active_forks);
166+
self.last_updated = Utc::now();
167+
}
168+
}

0 commit comments

Comments
 (0)