Skip to content

Commit 05867af

Browse files
authored
Merge pull request #784 from 0xMimir/feat/reducer-port
Ported reducer and p2p effects in node
2 parents 334d00d + f43b87d commit 05867af

33 files changed

+1445
-1001
lines changed

node/src/action.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub use crate::consensus::ConsensusAction;
88
pub use crate::event_source::EventSourceAction;
99
pub use crate::external_snark_worker::ExternalSnarkWorkerAction;
1010
pub use crate::ledger::LedgerAction;
11+
use crate::p2p::callbacks::P2pCallbacksAction;
1112
pub use crate::p2p::P2pAction;
1213
pub use crate::rpc::RpcAction;
1314
pub use crate::snark::SnarkAction;
@@ -32,6 +33,8 @@ pub enum Action {
3233
EventSource(EventSourceAction),
3334

3435
P2p(P2pAction),
36+
P2pCallbacks(P2pCallbacksAction),
37+
3538
Ledger(LedgerAction),
3639
Snark(SnarkAction),
3740
Consensus(ConsensusAction),
@@ -85,6 +88,7 @@ impl redux::EnablingCondition<crate::State> for Action {
8588
Action::WatchedAccounts(a) => a.is_enabled(state, time),
8689
Action::TransactionPool(a) => a.is_enabled(state, time),
8790
Action::TransactionPoolEffect(a) => a.is_enabled(state, time),
91+
Action::P2pCallbacks(a) => a.is_enabled(state, time),
8892
}
8993
}
9094
}

node/src/action_kind.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::external_snark_worker::ExternalSnarkWorkerAction;
2323
use crate::ledger::read::LedgerReadAction;
2424
use crate::ledger::write::LedgerWriteAction;
2525
use crate::ledger::LedgerAction;
26+
use crate::p2p::callbacks::P2pCallbacksAction;
2627
use crate::p2p::channels::best_tip::P2pChannelsBestTipAction;
2728
use crate::p2p::channels::best_tip_effectful::P2pChannelsBestTipEffectfulAction;
2829
use crate::p2p::channels::rpc::P2pChannelsRpcAction;
@@ -147,6 +148,7 @@ pub enum ActionKind {
147148
ConsensusBlockSnarkVerifySuccess,
148149
ConsensusDetectForkRange,
149150
ConsensusLongRangeForkResolve,
151+
ConsensusP2pBestTipUpdate,
150152
ConsensusPrune,
151153
ConsensusShortRangeForkResolve,
152154
EventSourceNewEvent,
@@ -174,6 +176,15 @@ pub enum ActionKind {
174176
LedgerWriteInit,
175177
LedgerWritePending,
176178
LedgerWriteSuccess,
179+
P2pCallbacksP2pChannelsRpcReady,
180+
P2pCallbacksP2pChannelsRpcRequestReceived,
181+
P2pCallbacksP2pChannelsRpcResponseReceived,
182+
P2pCallbacksP2pChannelsRpcTimeout,
183+
P2pCallbacksP2pChannelsStreamingRpcReady,
184+
P2pCallbacksP2pChannelsStreamingRpcResponseReceived,
185+
P2pCallbacksP2pChannelsStreamingRpcTimeout,
186+
P2pCallbacksP2pDisconnection,
187+
P2pCallbacksRpcRespondBestTip,
177188
P2pChannelsBestTipInit,
178189
P2pChannelsBestTipPending,
179190
P2pChannelsBestTipReady,
@@ -427,6 +438,7 @@ pub enum ActionKind {
427438
RpcLedgerAccountsGetPending,
428439
RpcLedgerAccountsGetSuccess,
429440
RpcMessageProgressGet,
441+
RpcP2pConnectionIncomingAnswerReady,
430442
RpcP2pConnectionIncomingError,
431443
RpcP2pConnectionIncomingInit,
432444
RpcP2pConnectionIncomingPending,
@@ -599,7 +611,7 @@ pub enum ActionKind {
599611
}
600612

601613
impl ActionKind {
602-
pub const COUNT: u16 = 493;
614+
pub const COUNT: u16 = 504;
603615
}
604616

605617
impl std::fmt::Display for ActionKind {
@@ -614,6 +626,7 @@ impl ActionKindGet for Action {
614626
Self::CheckTimeouts(a) => a.kind(),
615627
Self::EventSource(a) => a.kind(),
616628
Self::P2p(a) => a.kind(),
629+
Self::P2pCallbacks(a) => a.kind(),
617630
Self::Ledger(a) => a.kind(),
618631
Self::Snark(a) => a.kind(),
619632
Self::Consensus(a) => a.kind(),
@@ -664,6 +677,32 @@ impl ActionKindGet for P2pAction {
664677
}
665678
}
666679

680+
impl ActionKindGet for P2pCallbacksAction {
681+
fn kind(&self) -> ActionKind {
682+
match self {
683+
Self::P2pChannelsRpcReady { .. } => ActionKind::P2pCallbacksP2pChannelsRpcReady,
684+
Self::P2pChannelsRpcTimeout { .. } => ActionKind::P2pCallbacksP2pChannelsRpcTimeout,
685+
Self::P2pChannelsRpcResponseReceived { .. } => {
686+
ActionKind::P2pCallbacksP2pChannelsRpcResponseReceived
687+
}
688+
Self::P2pChannelsRpcRequestReceived { .. } => {
689+
ActionKind::P2pCallbacksP2pChannelsRpcRequestReceived
690+
}
691+
Self::P2pChannelsStreamingRpcReady => {
692+
ActionKind::P2pCallbacksP2pChannelsStreamingRpcReady
693+
}
694+
Self::P2pChannelsStreamingRpcTimeout { .. } => {
695+
ActionKind::P2pCallbacksP2pChannelsStreamingRpcTimeout
696+
}
697+
Self::P2pChannelsStreamingRpcResponseReceived { .. } => {
698+
ActionKind::P2pCallbacksP2pChannelsStreamingRpcResponseReceived
699+
}
700+
Self::P2pDisconnection { .. } => ActionKind::P2pCallbacksP2pDisconnection,
701+
Self::RpcRespondBestTip { .. } => ActionKind::P2pCallbacksRpcRespondBestTip,
702+
}
703+
}
704+
}
705+
667706
impl ActionKindGet for LedgerAction {
668707
fn kind(&self) -> ActionKind {
669708
match self {
@@ -698,6 +737,7 @@ impl ActionKindGet for ConsensusAction {
698737
Self::ShortRangeForkResolve { .. } => ActionKind::ConsensusShortRangeForkResolve,
699738
Self::LongRangeForkResolve { .. } => ActionKind::ConsensusLongRangeForkResolve,
700739
Self::BestTipUpdate { .. } => ActionKind::ConsensusBestTipUpdate,
740+
Self::P2pBestTipUpdate { .. } => ActionKind::ConsensusP2pBestTipUpdate,
701741
Self::Prune => ActionKind::ConsensusPrune,
702742
}
703743
}
@@ -857,6 +897,9 @@ impl ActionKindGet for RpcAction {
857897
Self::P2pConnectionIncomingRespond { .. } => {
858898
ActionKind::RpcP2pConnectionIncomingRespond
859899
}
900+
Self::P2pConnectionIncomingAnswerReady { .. } => {
901+
ActionKind::RpcP2pConnectionIncomingAnswerReady
902+
}
860903
Self::P2pConnectionIncomingError { .. } => ActionKind::RpcP2pConnectionIncomingError,
861904
Self::P2pConnectionIncomingSuccess { .. } => {
862905
ActionKind::RpcP2pConnectionIncomingSuccess

node/src/consensus/consensus_actions.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22

33
use mina_p2p_messages::v2::{MinaBlockBlockStableV2, StateHash};
4-
use openmina_core::block::ArcBlockWithHash;
4+
use openmina_core::block::{ArcBlockWithHash, BlockWithHash};
55
use openmina_core::{action_event, ActionEvent};
66
use serde::{Deserialize, Serialize};
77
use snark::block_verify::SnarkBlockVerifyError;
@@ -53,6 +53,9 @@ pub enum ConsensusAction {
5353
BestTipUpdate {
5454
hash: StateHash,
5555
},
56+
P2pBestTipUpdate {
57+
best_tip: BlockWithHash<Arc<MinaBlockBlockStableV2>>,
58+
},
5659
Prune,
5760
}
5861

@@ -146,6 +149,7 @@ impl redux::EnablingCondition<crate::State> for ConsensusAction {
146149
.consensus
147150
.is_candidate_decided_to_use_as_tip(hash)
148151
},
152+
ConsensusAction::P2pBestTipUpdate { .. } => true,
149153
ConsensusAction::Prune => {
150154
state.consensus.best_tip().is_some()
151155
},

node/src/consensus/consensus_reducer.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,14 @@ use openmina_core::{
55
use snark::block_verify::{SnarkBlockVerifyAction, SnarkBlockVerifyError};
66

77
use crate::{
8-
transition_frontier::sync::TransitionFrontierSyncAction, Action, State, WatchedAccountsAction,
8+
transition_frontier::sync::{
9+
ledger::{
10+
snarked::TransitionFrontierSyncLedgerSnarkedAction,
11+
staged::TransitionFrontierSyncLedgerStagedAction,
12+
},
13+
TransitionFrontierSyncAction,
14+
},
15+
Action, State, WatchedAccountsAction,
916
};
1017

1118
use super::{
@@ -235,6 +242,18 @@ impl ConsensusState {
235242

236243
transition_frontier_new_best_tip_handler(global_state, dispatcher);
237244
}
245+
ConsensusAction::P2pBestTipUpdate { best_tip } => {
246+
let dispatcher = state_context.into_dispatcher();
247+
dispatcher.push(ConsensusAction::BlockReceived {
248+
hash: best_tip.hash.clone(),
249+
block: best_tip.block.clone(),
250+
chain_proof: None,
251+
});
252+
253+
dispatcher.push(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
254+
dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
255+
dispatcher.push(TransitionFrontierSyncAction::BlocksPeersQuery);
256+
}
238257
ConsensusAction::Prune => {
239258
let Some(best_tip_hash) = state.best_tip.clone() else {
240259
return;

node/src/effects.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use openmina_core::log::system_time;
2-
use p2p::p2p_timeout_effects;
32

43
use crate::block_producer::{block_producer_effects, BlockProducerAction};
54
use crate::event_source::event_source_effects;
@@ -37,8 +36,6 @@ pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
3736
store.dispatch(ExternalSnarkWorkerAction::Start);
3837

3938
if store.state().p2p.ready().is_some() {
40-
p2p_timeout_effects(store, &meta);
41-
4239
p2p_request_best_tip_if_needed(store);
4340
p2p_request_transactions_if_needed(store);
4441
p2p_request_snarks_if_needed(store);
@@ -95,6 +92,9 @@ pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
9592
Action::WatchedAccounts(_) => {
9693
// Handled by reducer
9794
}
95+
Action::P2pCallbacks(_) => {
96+
// Handled by reducer
97+
}
9898
}
9999
}
100100

node/src/p2p/callbacks/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
mod p2p_callbacks_actions;
2+
pub use p2p_callbacks_actions::P2pCallbacksAction;
3+
mod p2p_callbacks_reducer;
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use openmina_core::ActionEvent;
2+
use p2p::{
3+
channels::{
4+
rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
5+
streaming_rpc::P2pStreamingRpcResponseFull,
6+
},
7+
PeerId,
8+
};
9+
use serde::{Deserialize, Serialize};
10+
11+
#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
12+
#[action_event(level = debug)]
13+
pub enum P2pCallbacksAction {
14+
P2pChannelsRpcReady {
15+
peer_id: PeerId,
16+
},
17+
P2pChannelsRpcTimeout {
18+
peer_id: PeerId,
19+
id: P2pRpcId,
20+
},
21+
P2pChannelsRpcResponseReceived {
22+
peer_id: PeerId,
23+
id: P2pRpcId,
24+
response: Option<Box<P2pRpcResponse>>,
25+
},
26+
P2pChannelsRpcRequestReceived {
27+
peer_id: PeerId,
28+
id: P2pRpcId,
29+
request: Box<P2pRpcRequest>,
30+
},
31+
32+
P2pChannelsStreamingRpcReady,
33+
P2pChannelsStreamingRpcTimeout {
34+
peer_id: PeerId,
35+
id: P2pRpcId,
36+
},
37+
P2pChannelsStreamingRpcResponseReceived {
38+
peer_id: PeerId,
39+
id: P2pRpcId,
40+
response: Option<P2pStreamingRpcResponseFull>,
41+
},
42+
43+
P2pDisconnection {
44+
peer_id: PeerId,
45+
},
46+
RpcRespondBestTip {
47+
peer_id: PeerId,
48+
},
49+
}
50+
51+
impl redux::EnablingCondition<crate::State> for P2pCallbacksAction {
52+
fn is_enabled(&self, state: &crate::State, _time: redux::Timestamp) -> bool {
53+
match self {
54+
P2pCallbacksAction::P2pChannelsRpcReady { .. } => true,
55+
P2pCallbacksAction::P2pChannelsRpcTimeout { .. } => true,
56+
P2pCallbacksAction::P2pChannelsRpcResponseReceived { .. } => true,
57+
P2pCallbacksAction::P2pChannelsRpcRequestReceived { .. } => true,
58+
P2pCallbacksAction::P2pChannelsStreamingRpcReady => true,
59+
P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { .. } => true,
60+
P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived { .. } => true,
61+
P2pCallbacksAction::P2pDisconnection { .. } => true,
62+
// TODO: what if we don't have best tip?
63+
P2pCallbacksAction::RpcRespondBestTip { .. } => {
64+
state.transition_frontier.best_tip().is_some()
65+
}
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)