Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions node/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub use crate::consensus::ConsensusAction;
pub use crate::event_source::EventSourceAction;
pub use crate::external_snark_worker::ExternalSnarkWorkerAction;
pub use crate::ledger::LedgerAction;
use crate::p2p::callbacks::P2pCallbacksAction;
pub use crate::p2p::P2pAction;
pub use crate::rpc::RpcAction;
pub use crate::snark::SnarkAction;
Expand All @@ -32,6 +33,8 @@ pub enum Action {
EventSource(EventSourceAction),

P2p(P2pAction),
P2pCallbacks(P2pCallbacksAction),

Ledger(LedgerAction),
Snark(SnarkAction),
Consensus(ConsensusAction),
Expand Down Expand Up @@ -85,6 +88,7 @@ impl redux::EnablingCondition<crate::State> for Action {
Action::WatchedAccounts(a) => a.is_enabled(state, time),
Action::TransactionPool(a) => a.is_enabled(state, time),
Action::TransactionPoolEffect(a) => a.is_enabled(state, time),
Action::P2pCallbacks(a) => a.is_enabled(state, time),
}
}
}
Expand Down
45 changes: 44 additions & 1 deletion node/src/action_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::external_snark_worker::ExternalSnarkWorkerAction;
use crate::ledger::read::LedgerReadAction;
use crate::ledger::write::LedgerWriteAction;
use crate::ledger::LedgerAction;
use crate::p2p::callbacks::P2pCallbacksAction;
use crate::p2p::channels::best_tip::P2pChannelsBestTipAction;
use crate::p2p::channels::best_tip_effectful::P2pChannelsBestTipEffectfulAction;
use crate::p2p::channels::rpc::P2pChannelsRpcAction;
Expand Down Expand Up @@ -147,6 +148,7 @@ pub enum ActionKind {
ConsensusBlockSnarkVerifySuccess,
ConsensusDetectForkRange,
ConsensusLongRangeForkResolve,
ConsensusP2pBestTipUpdate,
ConsensusPrune,
ConsensusShortRangeForkResolve,
EventSourceNewEvent,
Expand Down Expand Up @@ -174,6 +176,15 @@ pub enum ActionKind {
LedgerWriteInit,
LedgerWritePending,
LedgerWriteSuccess,
P2pCallbacksP2pChannelsRpcReady,
P2pCallbacksP2pChannelsRpcRequestReceived,
P2pCallbacksP2pChannelsRpcResponseReceived,
P2pCallbacksP2pChannelsRpcTimeout,
P2pCallbacksP2pChannelsStreamingRpcReady,
P2pCallbacksP2pChannelsStreamingRpcResponseReceived,
P2pCallbacksP2pChannelsStreamingRpcTimeout,
P2pCallbacksP2pDisconnection,
P2pCallbacksRpcRespondBestTip,
P2pChannelsBestTipInit,
P2pChannelsBestTipPending,
P2pChannelsBestTipReady,
Expand Down Expand Up @@ -427,6 +438,7 @@ pub enum ActionKind {
RpcLedgerAccountsGetPending,
RpcLedgerAccountsGetSuccess,
RpcMessageProgressGet,
RpcP2pConnectionIncomingAnswerReady,
RpcP2pConnectionIncomingError,
RpcP2pConnectionIncomingInit,
RpcP2pConnectionIncomingPending,
Expand Down Expand Up @@ -599,7 +611,7 @@ pub enum ActionKind {
}

impl ActionKind {
pub const COUNT: u16 = 493;
pub const COUNT: u16 = 504;
}

impl std::fmt::Display for ActionKind {
Expand All @@ -614,6 +626,7 @@ impl ActionKindGet for Action {
Self::CheckTimeouts(a) => a.kind(),
Self::EventSource(a) => a.kind(),
Self::P2p(a) => a.kind(),
Self::P2pCallbacks(a) => a.kind(),
Self::Ledger(a) => a.kind(),
Self::Snark(a) => a.kind(),
Self::Consensus(a) => a.kind(),
Expand Down Expand Up @@ -664,6 +677,32 @@ impl ActionKindGet for P2pAction {
}
}

impl ActionKindGet for P2pCallbacksAction {
fn kind(&self) -> ActionKind {
match self {
Self::P2pChannelsRpcReady { .. } => ActionKind::P2pCallbacksP2pChannelsRpcReady,
Self::P2pChannelsRpcTimeout { .. } => ActionKind::P2pCallbacksP2pChannelsRpcTimeout,
Self::P2pChannelsRpcResponseReceived { .. } => {
ActionKind::P2pCallbacksP2pChannelsRpcResponseReceived
}
Self::P2pChannelsRpcRequestReceived { .. } => {
ActionKind::P2pCallbacksP2pChannelsRpcRequestReceived
}
Self::P2pChannelsStreamingRpcReady => {
ActionKind::P2pCallbacksP2pChannelsStreamingRpcReady
}
Self::P2pChannelsStreamingRpcTimeout { .. } => {
ActionKind::P2pCallbacksP2pChannelsStreamingRpcTimeout
}
Self::P2pChannelsStreamingRpcResponseReceived { .. } => {
ActionKind::P2pCallbacksP2pChannelsStreamingRpcResponseReceived
}
Self::P2pDisconnection { .. } => ActionKind::P2pCallbacksP2pDisconnection,
Self::RpcRespondBestTip { .. } => ActionKind::P2pCallbacksRpcRespondBestTip,
}
}
}

impl ActionKindGet for LedgerAction {
fn kind(&self) -> ActionKind {
match self {
Expand Down Expand Up @@ -698,6 +737,7 @@ impl ActionKindGet for ConsensusAction {
Self::ShortRangeForkResolve { .. } => ActionKind::ConsensusShortRangeForkResolve,
Self::LongRangeForkResolve { .. } => ActionKind::ConsensusLongRangeForkResolve,
Self::BestTipUpdate { .. } => ActionKind::ConsensusBestTipUpdate,
Self::P2pBestTipUpdate { .. } => ActionKind::ConsensusP2pBestTipUpdate,
Self::Prune => ActionKind::ConsensusPrune,
}
}
Expand Down Expand Up @@ -857,6 +897,9 @@ impl ActionKindGet for RpcAction {
Self::P2pConnectionIncomingRespond { .. } => {
ActionKind::RpcP2pConnectionIncomingRespond
}
Self::P2pConnectionIncomingAnswerReady { .. } => {
ActionKind::RpcP2pConnectionIncomingAnswerReady
}
Self::P2pConnectionIncomingError { .. } => ActionKind::RpcP2pConnectionIncomingError,
Self::P2pConnectionIncomingSuccess { .. } => {
ActionKind::RpcP2pConnectionIncomingSuccess
Expand Down
6 changes: 5 additions & 1 deletion node/src/consensus/consensus_actions.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use mina_p2p_messages::v2::{MinaBlockBlockStableV2, StateHash};
use openmina_core::block::ArcBlockWithHash;
use openmina_core::block::{ArcBlockWithHash, BlockWithHash};
use openmina_core::{action_event, ActionEvent};
use serde::{Deserialize, Serialize};
use snark::block_verify::SnarkBlockVerifyError;
Expand Down Expand Up @@ -53,6 +53,9 @@ pub enum ConsensusAction {
BestTipUpdate {
hash: StateHash,
},
P2pBestTipUpdate {
best_tip: BlockWithHash<Arc<MinaBlockBlockStableV2>>,
},
Prune,
}

Expand Down Expand Up @@ -146,6 +149,7 @@ impl redux::EnablingCondition<crate::State> for ConsensusAction {
.consensus
.is_candidate_decided_to_use_as_tip(hash)
},
ConsensusAction::P2pBestTipUpdate { .. } => true,
ConsensusAction::Prune => {
state.consensus.best_tip().is_some()
},
Expand Down
21 changes: 20 additions & 1 deletion node/src/consensus/consensus_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ use openmina_core::{
use snark::block_verify::{SnarkBlockVerifyAction, SnarkBlockVerifyError};

use crate::{
transition_frontier::sync::TransitionFrontierSyncAction, Action, State, WatchedAccountsAction,
transition_frontier::sync::{
ledger::{
snarked::TransitionFrontierSyncLedgerSnarkedAction,
staged::TransitionFrontierSyncLedgerStagedAction,
},
TransitionFrontierSyncAction,
},
Action, State, WatchedAccountsAction,
};

use super::{
Expand Down Expand Up @@ -235,6 +242,18 @@ impl ConsensusState {

transition_frontier_new_best_tip_handler(global_state, dispatcher);
}
ConsensusAction::P2pBestTipUpdate { best_tip } => {
let dispatcher = state_context.into_dispatcher();
dispatcher.push(ConsensusAction::BlockReceived {
hash: best_tip.hash.clone(),
block: best_tip.block.clone(),
chain_proof: None,
});

dispatcher.push(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
dispatcher.push(TransitionFrontierSyncAction::BlocksPeersQuery);
}
ConsensusAction::Prune => {
let Some(best_tip_hash) = state.best_tip.clone() else {
return;
Expand Down
6 changes: 3 additions & 3 deletions node/src/effects.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use openmina_core::log::system_time;
use p2p::p2p_timeout_effects;

use crate::block_producer::{block_producer_effects, BlockProducerAction};
use crate::event_source::event_source_effects;
Expand Down Expand Up @@ -37,8 +36,6 @@ pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
store.dispatch(ExternalSnarkWorkerAction::Start);

if store.state().p2p.ready().is_some() {
p2p_timeout_effects(store, &meta);

p2p_request_best_tip_if_needed(store);
p2p_request_transactions_if_needed(store);
p2p_request_snarks_if_needed(store);
Expand Down Expand Up @@ -95,6 +92,9 @@ pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
Action::WatchedAccounts(_) => {
// Handled by reducer
}
Action::P2pCallbacks(_) => {
// Handled by reducer
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions node/src/p2p/callbacks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod p2p_callbacks_actions;
pub use p2p_callbacks_actions::P2pCallbacksAction;
mod p2p_callbacks_reducer;
68 changes: 68 additions & 0 deletions node/src/p2p/callbacks/p2p_callbacks_actions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use openmina_core::ActionEvent;
use p2p::{
channels::{
rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
streaming_rpc::P2pStreamingRpcResponseFull,
},
PeerId,
};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
#[action_event(level = debug)]
pub enum P2pCallbacksAction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these callbacks RPC related only?
In this case we could change the name to something like P2pRpcCallbackAction

Copy link
Contributor Author

@0xMimir 0xMimir Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callbacks are optional because of p2p tests, P2pCallbacksAction is currently only RPC related, but it doesn't have to be.

P2pChannelsRpcReady {
peer_id: PeerId,
},
P2pChannelsRpcTimeout {
peer_id: PeerId,
id: P2pRpcId,
},
P2pChannelsRpcResponseReceived {
peer_id: PeerId,
id: P2pRpcId,
response: Option<Box<P2pRpcResponse>>,
},
P2pChannelsRpcRequestReceived {
peer_id: PeerId,
id: P2pRpcId,
request: Box<P2pRpcRequest>,
},

P2pChannelsStreamingRpcReady,
P2pChannelsStreamingRpcTimeout {
peer_id: PeerId,
id: P2pRpcId,
},
P2pChannelsStreamingRpcResponseReceived {
peer_id: PeerId,
id: P2pRpcId,
response: Option<P2pStreamingRpcResponseFull>,
},

P2pDisconnection {
peer_id: PeerId,
},
RpcRespondBestTip {
peer_id: PeerId,
},
}

impl redux::EnablingCondition<crate::State> for P2pCallbacksAction {
fn is_enabled(&self, state: &crate::State, _time: redux::Timestamp) -> bool {
match self {
P2pCallbacksAction::P2pChannelsRpcReady { .. } => true,
P2pCallbacksAction::P2pChannelsRpcTimeout { .. } => true,
P2pCallbacksAction::P2pChannelsRpcResponseReceived { .. } => true,
P2pCallbacksAction::P2pChannelsRpcRequestReceived { .. } => true,
P2pCallbacksAction::P2pChannelsStreamingRpcReady => true,
P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { .. } => true,
P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived { .. } => true,
P2pCallbacksAction::P2pDisconnection { .. } => true,
// TODO: what if we don't have best tip?
P2pCallbacksAction::RpcRespondBestTip { .. } => {
state.transition_frontier.best_tip().is_some()
}
}
}
}
Loading
Loading