Skip to content

Commit 96e8058

Browse files
committed
Ported effects
1 parent 1e420c5 commit 96e8058

34 files changed

+1571
-550
lines changed

node/src/action.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub use crate::consensus::ConsensusAction;
88
pub use crate::event_source::EventSourceAction;
99
pub use crate::external_snark_worker::ExternalSnarkWorkerAction;
1010
pub use crate::ledger::LedgerAction;
11+
use crate::p2p::callbacks::P2pCallbacksAction;
1112
pub use crate::p2p::P2pAction;
1213
pub use crate::rpc::RpcAction;
1314
pub use crate::snark::SnarkAction;
@@ -32,6 +33,8 @@ pub enum Action {
3233
EventSource(EventSourceAction),
3334

3435
P2p(P2pAction),
36+
P2pCallbacks(P2pCallbacksAction),
37+
3538
Ledger(LedgerAction),
3639
Snark(SnarkAction),
3740
Consensus(ConsensusAction),
@@ -85,6 +88,7 @@ impl redux::EnablingCondition<crate::State> for Action {
8588
Action::WatchedAccounts(a) => a.is_enabled(state, time),
8689
Action::TransactionPool(a) => a.is_enabled(state, time),
8790
Action::TransactionPoolEffect(a) => a.is_enabled(state, time),
91+
Action::P2pCallbacks(a) => a.is_enabled(state, time),
8892
}
8993
}
9094
}

node/src/action_kind.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::external_snark_worker::ExternalSnarkWorkerAction;
2323
use crate::ledger::read::LedgerReadAction;
2424
use crate::ledger::write::LedgerWriteAction;
2525
use crate::ledger::LedgerAction;
26+
use crate::p2p::callbacks::P2pCallbacksAction;
2627
use crate::p2p::channels::best_tip::P2pChannelsBestTipAction;
2728
use crate::p2p::channels::best_tip_effectful::P2pChannelsBestTipEffectfulAction;
2829
use crate::p2p::channels::rpc::P2pChannelsRpcAction;
@@ -147,6 +148,7 @@ pub enum ActionKind {
147148
ConsensusBlockSnarkVerifySuccess,
148149
ConsensusDetectForkRange,
149150
ConsensusLongRangeForkResolve,
151+
ConsensusP2pBestTipUpdate,
150152
ConsensusPrune,
151153
ConsensusShortRangeForkResolve,
152154
EventSourceNewEvent,
@@ -174,6 +176,13 @@ pub enum ActionKind {
174176
LedgerWriteInit,
175177
LedgerWritePending,
176178
LedgerWriteSuccess,
179+
P2pCallbacksP2pChannelsRpcReady,
180+
P2pCallbacksP2pChannelsRpcRequestReceived,
181+
P2pCallbacksP2pChannelsRpcResponseReceived,
182+
P2pCallbacksP2pChannelsRpcTimeout,
183+
P2pCallbacksP2pChannelsStreamingRpcReady,
184+
P2pCallbacksP2pChannelsStreamingRpcResponseReceived,
185+
P2pCallbacksP2pChannelsStreamingRpcTimeout,
177186
P2pChannelsBestTipInit,
178187
P2pChannelsBestTipPending,
179188
P2pChannelsBestTipReady,
@@ -427,6 +436,7 @@ pub enum ActionKind {
427436
RpcLedgerAccountsGetPending,
428437
RpcLedgerAccountsGetSuccess,
429438
RpcMessageProgressGet,
439+
RpcP2pConnectionIncomingAnswerReady,
430440
RpcP2pConnectionIncomingError,
431441
RpcP2pConnectionIncomingInit,
432442
RpcP2pConnectionIncomingPending,
@@ -510,6 +520,7 @@ pub enum ActionKind {
510520
TransactionPoolVerifyError,
511521
TransactionPoolEffectfulFetchAccounts,
512522
TransitionFrontierGenesisInject,
523+
TransitionFrontierRpcRespondBestTip,
513524
TransitionFrontierSyncFailed,
514525
TransitionFrontierSynced,
515526
TransitionFrontierGenesisLedgerLoadInit,
@@ -559,6 +570,7 @@ pub enum ActionKind {
559570
TransitionFrontierSyncLedgerSnarkedNumAccountsReceived,
560571
TransitionFrontierSyncLedgerSnarkedNumAccountsRejected,
561572
TransitionFrontierSyncLedgerSnarkedNumAccountsSuccess,
573+
TransitionFrontierSyncLedgerSnarkedP2pDisconnection,
562574
TransitionFrontierSyncLedgerSnarkedPeerQueryAddressError,
563575
TransitionFrontierSyncLedgerSnarkedPeerQueryAddressInit,
564576
TransitionFrontierSyncLedgerSnarkedPeerQueryAddressPending,
@@ -599,7 +611,7 @@ pub enum ActionKind {
599611
}
600612

601613
impl ActionKind {
602-
pub const COUNT: u16 = 493;
614+
pub const COUNT: u16 = 504;
603615
}
604616

605617
impl std::fmt::Display for ActionKind {
@@ -614,6 +626,7 @@ impl ActionKindGet for Action {
614626
Self::CheckTimeouts(a) => a.kind(),
615627
Self::EventSource(a) => a.kind(),
616628
Self::P2p(a) => a.kind(),
629+
Self::P2pCallbacks(a) => a.kind(),
617630
Self::Ledger(a) => a.kind(),
618631
Self::Snark(a) => a.kind(),
619632
Self::Consensus(a) => a.kind(),
@@ -664,6 +677,30 @@ impl ActionKindGet for P2pAction {
664677
}
665678
}
666679

680+
impl ActionKindGet for P2pCallbacksAction {
681+
fn kind(&self) -> ActionKind {
682+
match self {
683+
Self::P2pChannelsRpcReady { .. } => ActionKind::P2pCallbacksP2pChannelsRpcReady,
684+
Self::P2pChannelsRpcTimeout { .. } => ActionKind::P2pCallbacksP2pChannelsRpcTimeout,
685+
Self::P2pChannelsRpcResponseReceived { .. } => {
686+
ActionKind::P2pCallbacksP2pChannelsRpcResponseReceived
687+
}
688+
Self::P2pChannelsRpcRequestReceived { .. } => {
689+
ActionKind::P2pCallbacksP2pChannelsRpcRequestReceived
690+
}
691+
Self::P2pChannelsStreamingRpcReady => {
692+
ActionKind::P2pCallbacksP2pChannelsStreamingRpcReady
693+
}
694+
Self::P2pChannelsStreamingRpcTimeout { .. } => {
695+
ActionKind::P2pCallbacksP2pChannelsStreamingRpcTimeout
696+
}
697+
Self::P2pChannelsStreamingRpcResponseReceived { .. } => {
698+
ActionKind::P2pCallbacksP2pChannelsStreamingRpcResponseReceived
699+
}
700+
}
701+
}
702+
}
703+
667704
impl ActionKindGet for LedgerAction {
668705
fn kind(&self) -> ActionKind {
669706
match self {
@@ -698,6 +735,7 @@ impl ActionKindGet for ConsensusAction {
698735
Self::ShortRangeForkResolve { .. } => ActionKind::ConsensusShortRangeForkResolve,
699736
Self::LongRangeForkResolve { .. } => ActionKind::ConsensusLongRangeForkResolve,
700737
Self::BestTipUpdate { .. } => ActionKind::ConsensusBestTipUpdate,
738+
Self::P2pBestTipUpdate { .. } => ActionKind::ConsensusP2pBestTipUpdate,
701739
Self::Prune => ActionKind::ConsensusPrune,
702740
}
703741
}
@@ -712,6 +750,7 @@ impl ActionKindGet for TransitionFrontierAction {
712750
Self::GenesisInject => ActionKind::TransitionFrontierGenesisInject,
713751
Self::Synced { .. } => ActionKind::TransitionFrontierSynced,
714752
Self::SyncFailed { .. } => ActionKind::TransitionFrontierSyncFailed,
753+
Self::RpcRespondBestTip { .. } => ActionKind::TransitionFrontierRpcRespondBestTip,
715754
}
716755
}
717756
}
@@ -857,6 +896,9 @@ impl ActionKindGet for RpcAction {
857896
Self::P2pConnectionIncomingRespond { .. } => {
858897
ActionKind::RpcP2pConnectionIncomingRespond
859898
}
899+
Self::P2pConnectionIncomingAnswerReady { .. } => {
900+
ActionKind::RpcP2pConnectionIncomingAnswerReady
901+
}
860902
Self::P2pConnectionIncomingError { .. } => ActionKind::RpcP2pConnectionIncomingError,
861903
Self::P2pConnectionIncomingSuccess { .. } => {
862904
ActionKind::RpcP2pConnectionIncomingSuccess
@@ -1880,6 +1922,9 @@ impl ActionKindGet for TransitionFrontierSyncLedgerSnarkedAction {
18801922
ActionKind::TransitionFrontierSyncLedgerSnarkedMerkleTreeSyncSuccess
18811923
}
18821924
Self::Success => ActionKind::TransitionFrontierSyncLedgerSnarkedSuccess,
1925+
Self::P2pDisconnection { .. } => {
1926+
ActionKind::TransitionFrontierSyncLedgerSnarkedP2pDisconnection
1927+
}
18831928
}
18841929
}
18851930
}

node/src/consensus/consensus_actions.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22

33
use mina_p2p_messages::v2::{MinaBlockBlockStableV2, StateHash};
4-
use openmina_core::block::ArcBlockWithHash;
4+
use openmina_core::block::{ArcBlockWithHash, BlockWithHash};
55
use openmina_core::{action_event, ActionEvent};
66
use serde::{Deserialize, Serialize};
77
use snark::block_verify::SnarkBlockVerifyError;
@@ -53,6 +53,9 @@ pub enum ConsensusAction {
5353
BestTipUpdate {
5454
hash: StateHash,
5555
},
56+
P2pBestTipUpdate {
57+
best_tip: BlockWithHash<Arc<MinaBlockBlockStableV2>>,
58+
},
5659
Prune,
5760
}
5861

@@ -146,6 +149,7 @@ impl redux::EnablingCondition<crate::State> for ConsensusAction {
146149
.consensus
147150
.is_candidate_decided_to_use_as_tip(hash)
148151
},
152+
ConsensusAction::P2pBestTipUpdate { .. } => true,
149153
ConsensusAction::Prune => {
150154
state.consensus.best_tip().is_some()
151155
},

node/src/consensus/consensus_reducer.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use openmina_core::{
55
use snark::block_verify::{SnarkBlockVerifyAction, SnarkBlockVerifyError};
66

77
use crate::{
8-
transition_frontier::sync::TransitionFrontierSyncAction, Action, State, WatchedAccountsAction,
8+
transition_frontier::sync::{ledger::{snarked::TransitionFrontierSyncLedgerSnarkedAction, staged::TransitionFrontierSyncLedgerStagedAction}, TransitionFrontierSyncAction}, Action, State, WatchedAccountsAction,
99
};
1010

1111
use super::{
@@ -235,6 +235,18 @@ impl ConsensusState {
235235

236236
transition_frontier_new_best_tip_handler(global_state, dispatcher);
237237
}
238+
ConsensusAction::P2pBestTipUpdate { best_tip } => {
239+
let dispatcher = state_context.into_dispatcher();
240+
dispatcher.push(ConsensusAction::BlockReceived {
241+
hash: best_tip.hash.clone(),
242+
block: best_tip.block.clone(),
243+
chain_proof: None,
244+
});
245+
246+
dispatcher.push(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
247+
dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
248+
dispatcher.push(TransitionFrontierSyncAction::BlocksPeersQuery);
249+
}
238250
ConsensusAction::Prune => {
239251
let Some(best_tip_hash) = state.best_tip.clone() else {
240252
return;

node/src/effects.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
9292
Action::WatchedAccounts(_) => {
9393
// Handled by reducer
9494
}
95+
Action::P2pCallbacks(_) => {
96+
// Handled by reducer
97+
}
9598
}
9699
}
97100

node/src/p2p/callbacks/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
mod p2p_callbacks_actions;
2+
pub use p2p_callbacks_actions::P2pCallbacksAction;
3+
mod p2p_callbacks_reducer;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use openmina_core::ActionEvent;
2+
use p2p::{
3+
channels::{
4+
rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
5+
streaming_rpc::P2pStreamingRpcResponseFull,
6+
},
7+
PeerId,
8+
};
9+
use serde::{Deserialize, Serialize};
10+
11+
#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
12+
#[action_event(level = debug)]
13+
pub enum P2pCallbacksAction {
14+
P2pChannelsRpcReady {
15+
peer_id: PeerId,
16+
},
17+
P2pChannelsRpcTimeout {
18+
peer_id: PeerId,
19+
id: P2pRpcId,
20+
},
21+
P2pChannelsRpcResponseReceived {
22+
peer_id: PeerId,
23+
id: P2pRpcId,
24+
response: Option<Box<P2pRpcResponse>>,
25+
},
26+
P2pChannelsRpcRequestReceived {
27+
peer_id: PeerId,
28+
id: P2pRpcId,
29+
request: Box<P2pRpcRequest>,
30+
},
31+
32+
P2pChannelsStreamingRpcReady,
33+
P2pChannelsStreamingRpcTimeout {
34+
peer_id: PeerId,
35+
id: P2pRpcId,
36+
},
37+
P2pChannelsStreamingRpcResponseReceived {
38+
peer_id: PeerId,
39+
id: P2pRpcId,
40+
response: Option<P2pStreamingRpcResponseFull>,
41+
},
42+
}
43+
44+
impl redux::EnablingCondition<crate::State> for P2pCallbacksAction {
45+
fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool {
46+
true
47+
}
48+
}

0 commit comments

Comments
 (0)