Skip to content

Commit 1bb5bd4

Browse files
authored
Merge pull request #870 from openmina/feat/stats/sync/staged_ledger_parts_fetch_progress
Rpc Root Staged Ledger Sync: expose parts fetch progress
2 parents 0dac848 + 5877165 commit 1bb5bd4

File tree

6 files changed

+121
-17
lines changed

6 files changed

+121
-17
lines changed

core/src/consensus.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,21 @@ pub fn is_short_range_fork(a: &MinaConsensusState, b: &MinaConsensusState) -> bo
3939
if s1.epoch_count.as_u32() == s2.epoch_count.as_u32() + 1
4040
&& s2_epoch_slot >= slots_per_epoch * 2 / 3
4141
{
42-
crate::log::debug!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("s2 is 1 epoch behind and not in seed update range: {} vs {}", s1.staking_epoch_data.lock_checkpoint, s2.next_epoch_data.lock_checkpoint));
42+
crate::log::trace!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("s2 is 1 epoch behind and not in seed update range: {} vs {}", s1.staking_epoch_data.lock_checkpoint, s2.next_epoch_data.lock_checkpoint));
4343
// S1 is one epoch ahead of S2 and S2 is not in the seed update range
4444
s1.staking_epoch_data.lock_checkpoint == s2.next_epoch_data.lock_checkpoint
4545
} else {
46-
crate::log::debug!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("chains are from different epochs"));
46+
crate::log::trace!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("chains are from different epochs"));
4747
false
4848
}
4949
};
5050

51-
crate::log::debug!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("epoch count: {} vs {}", a.epoch_count.as_u32(), b.epoch_count.as_u32()));
51+
crate::log::trace!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("epoch count: {} vs {}", a.epoch_count.as_u32(), b.epoch_count.as_u32()));
5252
if a.epoch_count == b.epoch_count {
5353
let a_prev_lock_checkpoint = &a.staking_epoch_data.lock_checkpoint;
5454
let b_prev_lock_checkpoint = &b.staking_epoch_data.lock_checkpoint;
5555
// Simple case: blocks have same previous epoch, so compare previous epochs' lock_checkpoints
56-
crate::log::debug!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("checkpoints: {} vs {}", a_prev_lock_checkpoint, b_prev_lock_checkpoint));
56+
crate::log::trace!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("checkpoints: {} vs {}", a_prev_lock_checkpoint, b_prev_lock_checkpoint));
5757
a_prev_lock_checkpoint == b_prev_lock_checkpoint
5858
} else {
5959
// Check for previous epoch case using both orientations

node/src/rpc/mod.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ pub struct RpcMessageProgressResponse {
294294
pub messages_stats: BTreeMap<PeerId, MessagesStats>,
295295
pub staking_ledger_sync: Option<LedgerSyncProgress>,
296296
pub next_epoch_ledger_sync: Option<LedgerSyncProgress>,
297-
pub root_ledger_sync: Option<LedgerSyncProgress>,
297+
pub root_ledger_sync: Option<RootLedgerSyncProgress>,
298298
}
299299

300300
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -309,6 +309,19 @@ pub struct LedgerSyncProgress {
309309
pub estimation: u64,
310310
}
311311

312+
#[derive(Serialize, Deserialize, Debug, Clone)]
313+
pub struct RootLedgerSyncProgress {
314+
pub fetched: u64,
315+
pub estimation: u64,
316+
pub staged: Option<RootStagedLedgerSyncProgress>,
317+
}
318+
319+
#[derive(Serialize, Deserialize, Debug, Clone)]
320+
pub struct RootStagedLedgerSyncProgress {
321+
pub fetched: u64,
322+
pub total: u64,
323+
}
324+
312325
#[derive(Serialize, Deserialize, Debug, Clone)]
313326
pub struct CurrentMessageProgress {
314327
pub name: String,

node/src/rpc_effectful/rpc_effectful_effects.rs

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@ use crate::{
66
p2p_ready,
77
rpc::{
88
AccountQuery, AccountSlim, ActionStatsQuery, ActionStatsResponse, CurrentMessageProgress,
9-
LedgerSyncProgress, MessagesStats, RpcAction, RpcBlockProducerStats,
10-
RpcMessageProgressResponse, RpcNodeStatus, RpcNodeStatusTransactionPool,
11-
RpcNodeStatusTransitionFrontier, RpcNodeStatusTransitionFrontierBlockSummary,
12-
RpcNodeStatusTransitionFrontierSync, RpcRequestExtraData, RpcScanStateSummary,
13-
RpcScanStateSummaryBlock, RpcScanStateSummaryBlockTransaction,
14-
RpcScanStateSummaryBlockTransactionKind, RpcScanStateSummaryScanStateJob,
15-
RpcSnarkPoolJobFull, RpcSnarkPoolJobSnarkWork, RpcSnarkPoolJobSummary,
16-
RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse, RpcTransactionInjectResponse,
17-
TransactionStatus,
9+
MessagesStats, RootLedgerSyncProgress, RootStagedLedgerSyncProgress, RpcAction,
10+
RpcBlockProducerStats, RpcMessageProgressResponse, RpcNodeStatus,
11+
RpcNodeStatusTransactionPool, RpcNodeStatusTransitionFrontier,
12+
RpcNodeStatusTransitionFrontierBlockSummary, RpcNodeStatusTransitionFrontierSync,
13+
RpcRequestExtraData, RpcScanStateSummary, RpcScanStateSummaryBlock,
14+
RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryBlockTransactionKind,
15+
RpcScanStateSummaryScanStateJob, RpcSnarkPoolJobFull, RpcSnarkPoolJobSnarkWork,
16+
RpcSnarkPoolJobSummary, RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse,
17+
RpcTransactionInjectResponse, TransactionStatus,
1818
},
1919
snark_pool::SnarkPoolAction,
2020
transition_frontier::sync::{
@@ -32,6 +32,9 @@ use mina_p2p_messages::{
3232
};
3333
use mina_signer::CompressedPubKey;
3434
use openmina_core::block::ArcBlockWithHash;
35+
use p2p::channels::streaming_rpc::{
36+
staged_ledger_parts::calc_total_pieces_to_transfer, P2pStreamingRpcReceiveProgress,
37+
};
3538
use redux::ActionWithMeta;
3639
use std::{collections::BTreeMap, time::Duration};
3740

@@ -219,14 +222,54 @@ pub fn rpc_effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta<RpcE
219222
}
220223
TransitionFrontierSyncState::RootLedgerPending(state) => match &state.ledger {
221224
TransitionFrontierSyncLedgerState::Snarked(state) => {
222-
response.root_ledger_sync = state.estimation()
225+
response.root_ledger_sync =
226+
state.estimation().map(|data| RootLedgerSyncProgress {
227+
fetched: data.fetched,
228+
estimation: data.estimation,
229+
staged: None,
230+
});
223231
}
224-
TransitionFrontierSyncLedgerState::Staged(_) => {
232+
TransitionFrontierSyncLedgerState::Staged(state) => {
233+
let unknown_staged_progress = || RootStagedLedgerSyncProgress {
234+
fetched: 0,
235+
total: 1,
236+
};
237+
let staged = match state.fetch_attempts() {
238+
None => state.target_with_parts().map(|(_, parts)| {
239+
let v = parts
240+
.map(|parts| calc_total_pieces_to_transfer(parts))
241+
.unwrap_or(0);
242+
RootStagedLedgerSyncProgress {
243+
fetched: v,
244+
total: v,
245+
}
246+
}),
247+
Some(attempts) => attempts
248+
.iter()
249+
.find(|(_, s)| s.fetch_pending_rpc_id().is_some())
250+
.map(|(id, _)| id)
251+
.and_then(|peer_id| store.state().p2p.get_ready_peer(peer_id))
252+
.map(|peer| {
253+
match peer.channels.streaming_rpc.pending_local_rpc_progress() {
254+
None => unknown_staged_progress(),
255+
Some(
256+
P2pStreamingRpcReceiveProgress::StagedLedgerParts(
257+
progress,
258+
),
259+
) => {
260+
let (fetched, total) = progress.progress();
261+
RootStagedLedgerSyncProgress { fetched, total }
262+
}
263+
}
264+
}),
265+
};
266+
225267
// We want to answer with a result that will serve as a 100% complete process for the
226268
// frontend while it is still waiting for the staged ledger to complete. Could be cleaner.
227-
response.root_ledger_sync = Some(LedgerSyncProgress {
269+
response.root_ledger_sync = Some(RootLedgerSyncProgress {
228270
fetched: 1,
229271
estimation: 1,
272+
staged,
230273
});
231274
}
232275
_ => {}

node/src/transition_frontier/sync/ledger/staged/transition_frontier_sync_ledger_staged_state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ impl TransitionFrontierSyncLedgerStagedState {
116116
Some(match self {
117117
Self::PartsFetchSuccess { target, parts, .. } => (target, Some(parts)),
118118
Self::ReconstructEmpty { target, .. } => (target, None),
119+
Self::ReconstructPending { target, parts, .. } => (target, parts.as_ref()),
119120
_ => return None,
120121
})
121122
}

p2p/src/channels/streaming_rpc/p2p_channels_streaming_rpc_state.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,16 @@ impl P2pChannelsStreamingRpcState {
135135
self.pending_local_rpc().map(|req| req.kind())
136136
}
137137

138+
pub fn pending_local_rpc_progress(&self) -> Option<&P2pStreamingRpcReceiveProgress> {
139+
match self {
140+
Self::Ready {
141+
local: P2pStreamingRpcLocalState::Requested { progress, .. },
142+
..
143+
} => Some(progress),
144+
_ => None,
145+
}
146+
}
147+
138148
pub(super) fn local_done_response(&self) -> Option<P2pStreamingRpcResponseFull> {
139149
match self {
140150
Self::Ready {

p2p/src/channels/streaming_rpc/rpcs/staged_ledger_parts.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,43 @@ impl StagedLedgerPartsReceiveProgress {
366366
}
367367
}
368368
}
369+
370+
pub fn progress(&self) -> (u64, u64) {
371+
const EXPECTED_FETCH_TOTAL: u64 = 27;
372+
let total = |trees: u32| (trees as u64) + 3;
373+
match self {
374+
Self::BasePending { .. } => (0, EXPECTED_FETCH_TOTAL),
375+
Self::BaseSuccess { .. } | Self::ScanStateBasePending { .. } => {
376+
(1, EXPECTED_FETCH_TOTAL)
377+
}
378+
Self::ScanStateBaseSuccess {
379+
scan_state_base, ..
380+
}
381+
| Self::PreviousIncompleteZkappUpdatesPending {
382+
scan_state_base, ..
383+
} => (2, total(scan_state_base.trees.as_u32())),
384+
Self::PreviousIncompleteZkappUpdatesSuccess {
385+
scan_state_base, ..
386+
} => (3, total(scan_state_base.trees.as_u32())),
387+
Self::ScanStateTreesPending {
388+
scan_state_base,
389+
trees,
390+
..
391+
} => (
392+
3 + trees.len() as u64,
393+
total(scan_state_base.trees.as_u32()),
394+
),
395+
Self::Success { data, .. } => {
396+
let total = calc_total_pieces_to_transfer(data);
397+
(total, total)
398+
}
399+
}
400+
}
401+
}
402+
403+
pub fn calc_total_pieces_to_transfer(parts: &StagedLedgerAuxAndPendingCoinbases) -> u64 {
404+
let total_trees = parts.scan_state.scan_state.trees.1.len() + 1;
405+
3 + total_trees as u64
369406
}
370407

371408
impl Default for StagedLedgerPartsReceiveProgress {

0 commit comments

Comments
 (0)