Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ pub fn is_short_range_fork(a: &MinaConsensusState, b: &MinaConsensusState) -> bo
if s1.epoch_count.as_u32() == s2.epoch_count.as_u32() + 1
&& s2_epoch_slot >= slots_per_epoch * 2 / 3
{
crate::log::debug!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("s2 is 1 epoch behind and not in seed update range: {} vs {}", s1.staking_epoch_data.lock_checkpoint, s2.next_epoch_data.lock_checkpoint));
crate::log::trace!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("s2 is 1 epoch behind and not in seed update range: {} vs {}", s1.staking_epoch_data.lock_checkpoint, s2.next_epoch_data.lock_checkpoint));
// S1 is one epoch ahead of S2 and S2 is not in the seed update range
s1.staking_epoch_data.lock_checkpoint == s2.next_epoch_data.lock_checkpoint
} else {
crate::log::debug!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("chains are from different epochs"));
crate::log::trace!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("chains are from different epochs"));
false
}
};

crate::log::debug!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("epoch count: {} vs {}", a.epoch_count.as_u32(), b.epoch_count.as_u32()));
crate::log::trace!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("epoch count: {} vs {}", a.epoch_count.as_u32(), b.epoch_count.as_u32()));
if a.epoch_count == b.epoch_count {
let a_prev_lock_checkpoint = &a.staking_epoch_data.lock_checkpoint;
let b_prev_lock_checkpoint = &b.staking_epoch_data.lock_checkpoint;
// Simple case: blocks have same previous epoch, so compare previous epochs' lock_checkpoints
crate::log::debug!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("checkpoints: {} vs {}", a_prev_lock_checkpoint, b_prev_lock_checkpoint));
crate::log::trace!(crate::log::system_time(); kind = "is_short_range_fork", msg = format!("checkpoints: {} vs {}", a_prev_lock_checkpoint, b_prev_lock_checkpoint));
a_prev_lock_checkpoint == b_prev_lock_checkpoint
} else {
// Check for previous epoch case using both orientations
Expand Down
15 changes: 14 additions & 1 deletion node/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ pub struct RpcMessageProgressResponse {
pub messages_stats: BTreeMap<PeerId, MessagesStats>,
pub staking_ledger_sync: Option<LedgerSyncProgress>,
pub next_epoch_ledger_sync: Option<LedgerSyncProgress>,
pub root_ledger_sync: Option<LedgerSyncProgress>,
pub root_ledger_sync: Option<RootLedgerSyncProgress>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand All @@ -309,6 +309,19 @@ pub struct LedgerSyncProgress {
pub estimation: u64,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RootLedgerSyncProgress {
pub fetched: u64,
pub estimation: u64,
pub staged: Option<RootStagedLedgerSyncProgress>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RootStagedLedgerSyncProgress {
pub fetched: u64,
pub total: u64,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CurrentMessageProgress {
pub name: String,
Expand Down
67 changes: 55 additions & 12 deletions node/src/rpc_effectful/rpc_effectful_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use crate::{
p2p_ready,
rpc::{
AccountQuery, AccountSlim, ActionStatsQuery, ActionStatsResponse, CurrentMessageProgress,
LedgerSyncProgress, MessagesStats, RpcAction, RpcBlockProducerStats,
RpcMessageProgressResponse, RpcNodeStatus, RpcNodeStatusTransactionPool,
RpcNodeStatusTransitionFrontier, RpcNodeStatusTransitionFrontierBlockSummary,
RpcNodeStatusTransitionFrontierSync, RpcRequestExtraData, RpcScanStateSummary,
RpcScanStateSummaryBlock, RpcScanStateSummaryBlockTransaction,
RpcScanStateSummaryBlockTransactionKind, RpcScanStateSummaryScanStateJob,
RpcSnarkPoolJobFull, RpcSnarkPoolJobSnarkWork, RpcSnarkPoolJobSummary,
RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse, RpcTransactionInjectResponse,
TransactionStatus,
MessagesStats, RootLedgerSyncProgress, RootStagedLedgerSyncProgress, RpcAction,
RpcBlockProducerStats, RpcMessageProgressResponse, RpcNodeStatus,
RpcNodeStatusTransactionPool, RpcNodeStatusTransitionFrontier,
RpcNodeStatusTransitionFrontierBlockSummary, RpcNodeStatusTransitionFrontierSync,
RpcRequestExtraData, RpcScanStateSummary, RpcScanStateSummaryBlock,
RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryBlockTransactionKind,
RpcScanStateSummaryScanStateJob, RpcSnarkPoolJobFull, RpcSnarkPoolJobSnarkWork,
RpcSnarkPoolJobSummary, RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse,
RpcTransactionInjectResponse, TransactionStatus,
},
snark_pool::SnarkPoolAction,
transition_frontier::sync::{
Expand All @@ -32,6 +32,9 @@ use mina_p2p_messages::{
};
use mina_signer::CompressedPubKey;
use openmina_core::block::ArcBlockWithHash;
use p2p::channels::streaming_rpc::{
staged_ledger_parts::calc_total_pieces_to_transfer, P2pStreamingRpcReceiveProgress,
};
use redux::ActionWithMeta;
use std::{collections::BTreeMap, time::Duration};

Expand Down Expand Up @@ -219,14 +222,54 @@ pub fn rpc_effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta<RpcE
}
TransitionFrontierSyncState::RootLedgerPending(state) => match &state.ledger {
TransitionFrontierSyncLedgerState::Snarked(state) => {
response.root_ledger_sync = state.estimation()
response.root_ledger_sync =
state.estimation().map(|data| RootLedgerSyncProgress {
fetched: data.fetched,
estimation: data.estimation,
staged: None,
});
}
TransitionFrontierSyncLedgerState::Staged(_) => {
TransitionFrontierSyncLedgerState::Staged(state) => {
let unknown_staged_progress = || RootStagedLedgerSyncProgress {
fetched: 0,
total: 1,
};
let staged = match state.fetch_attempts() {
None => state.target_with_parts().map(|(_, parts)| {
let v = parts
.map(|parts| calc_total_pieces_to_transfer(parts))
.unwrap_or(0);
RootStagedLedgerSyncProgress {
fetched: v,
total: v,
}
}),
Some(attempts) => attempts
.iter()
.find(|(_, s)| s.fetch_pending_rpc_id().is_some())
.map(|(id, _)| id)
.and_then(|peer_id| store.state().p2p.get_ready_peer(peer_id))
.map(|peer| {
match peer.channels.streaming_rpc.pending_local_rpc_progress() {
None => unknown_staged_progress(),
Some(
P2pStreamingRpcReceiveProgress::StagedLedgerParts(
progress,
),
) => {
let (fetched, total) = progress.progress();
RootStagedLedgerSyncProgress { fetched, total }
}
}
}),
};

// We want to answer with a result that will serve as a 100% complete process for the
// frontend while it is still waiting for the staged ledger to complete. Could be cleaner.
response.root_ledger_sync = Some(LedgerSyncProgress {
response.root_ledger_sync = Some(RootLedgerSyncProgress {
fetched: 1,
estimation: 1,
staged,
});
}
_ => {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl TransitionFrontierSyncLedgerStagedState {
Some(match self {
Self::PartsFetchSuccess { target, parts, .. } => (target, Some(parts)),
Self::ReconstructEmpty { target, .. } => (target, None),
Self::ReconstructPending { target, parts, .. } => (target, parts.as_ref()),
_ => return None,
})
}
Expand Down
10 changes: 10 additions & 0 deletions p2p/src/channels/streaming_rpc/p2p_channels_streaming_rpc_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ impl P2pChannelsStreamingRpcState {
self.pending_local_rpc().map(|req| req.kind())
}

pub fn pending_local_rpc_progress(&self) -> Option<&P2pStreamingRpcReceiveProgress> {
match self {
Self::Ready {
local: P2pStreamingRpcLocalState::Requested { progress, .. },
..
} => Some(progress),
_ => None,
}
}

pub(super) fn local_done_response(&self) -> Option<P2pStreamingRpcResponseFull> {
match self {
Self::Ready {
Expand Down
37 changes: 37 additions & 0 deletions p2p/src/channels/streaming_rpc/rpcs/staged_ledger_parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,43 @@ impl StagedLedgerPartsReceiveProgress {
}
}
}

pub fn progress(&self) -> (u64, u64) {
const EXPECTED_FETCH_TOTAL: u64 = 27;
let total = |trees: u32| (trees as u64) + 3;
match self {
Self::BasePending { .. } => (0, EXPECTED_FETCH_TOTAL),
Self::BaseSuccess { .. } | Self::ScanStateBasePending { .. } => {
(1, EXPECTED_FETCH_TOTAL)
}
Self::ScanStateBaseSuccess {
scan_state_base, ..
}
| Self::PreviousIncompleteZkappUpdatesPending {
scan_state_base, ..
} => (2, total(scan_state_base.trees.as_u32())),
Self::PreviousIncompleteZkappUpdatesSuccess {
scan_state_base, ..
} => (3, total(scan_state_base.trees.as_u32())),
Self::ScanStateTreesPending {
scan_state_base,
trees,
..
} => (
3 + trees.len() as u64,
total(scan_state_base.trees.as_u32()),
),
Self::Success { data, .. } => {
let total = calc_total_pieces_to_transfer(data);
(total, total)
}
}
}
}

pub fn calc_total_pieces_to_transfer(parts: &StagedLedgerAuxAndPendingCoinbases) -> u64 {
let total_trees = parts.scan_state.scan_state.trees.1.len() + 1;
3 + total_trees as u64
}

impl Default for StagedLedgerPartsReceiveProgress {
Expand Down
Loading