Skip to content

Commit 6873a2d

Browse files
authored
Merge pull request #833 from 0xMimir/feat/port-rpc
Ported RPC layer in node to new style reducers
2 parents ab0c027 + 1743bf6 commit 6873a2d

File tree

19 files changed

+993
-400
lines changed

19 files changed

+993
-400
lines changed

node/common/src/service/rpc/mod.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@ use serde::{Deserialize, Serialize};
2020
use node::core::channels::{mpsc, oneshot};
2121
use node::core::requests::PendingRequests;
2222
use node::p2p::connection::P2pConnectionResponse;
23-
pub use node::rpc::{
24-
ActionStatsResponse, RespondError, RpcActionStatsGetResponse, RpcId, RpcIdType,
25-
RpcP2pConnectionOutgoingResponse, RpcScanStateSummaryGetResponse, RpcSnarkPoolGetResponse,
26-
RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse, RpcStateGetResponse,
27-
RpcSyncStatsGetResponse, RpcTransactionInjectSuccess,
28-
};
2923
use node::State;
3024
use node::{event_source::Event, rpc::RpcSnarkPoolJobGetResponse};
25+
pub use node::{
26+
rpc::{
27+
ActionStatsResponse, RpcActionStatsGetResponse, RpcId, RpcIdType,
28+
RpcP2pConnectionOutgoingResponse, RpcScanStateSummaryGetResponse, RpcSnarkPoolGetResponse,
29+
RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse, RpcStateGetResponse,
30+
RpcSyncStatsGetResponse, RpcTransactionInjectSuccess,
31+
},
32+
rpc_effectful::RespondError,
33+
};
3134

3235
use crate::NodeService;
3336

@@ -176,7 +179,7 @@ fn optimize_filtered_state(
176179
Ok((value, filter))
177180
}
178181

179-
impl node::rpc::RpcService for NodeService {
182+
impl node::rpc_effectful::RpcService for NodeService {
180183
fn respond_state_get(
181184
&mut self,
182185
rpc_id: RpcId,

node/src/action.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub use crate::ledger::LedgerAction;
1212
use crate::p2p::callbacks::P2pCallbacksAction;
1313
pub use crate::p2p::P2pAction;
1414
pub use crate::rpc::RpcAction;
15+
use crate::rpc_effectful::RpcEffectfulAction;
1516
pub use crate::snark::SnarkAction;
1617
pub use crate::snark_pool::SnarkPoolAction;
1718
pub use crate::snark_pool::SnarkPoolEffectfulAction;
@@ -48,6 +49,7 @@ pub enum Action {
4849
ExternalSnarkWorker(ExternalSnarkWorkerAction),
4950
BlockProducer(BlockProducerAction),
5051
Rpc(RpcAction),
52+
RpcEffectful(RpcEffectfulAction),
5153

5254
WatchedAccounts(WatchedAccountsAction),
5355
}
@@ -95,6 +97,7 @@ impl redux::EnablingCondition<crate::State> for Action {
9597
Action::TransactionPool(a) => a.is_enabled(state, time),
9698
Action::TransactionPoolEffect(a) => a.is_enabled(state, time),
9799
Action::P2pCallbacks(a) => a.is_enabled(state, time),
100+
Action::RpcEffectful(a) => a.is_enabled(state, time),
98101
}
99102
}
100103
}

node/src/action_kind.rs

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ use crate::p2p::network::{P2pNetworkAction, P2pNetworkEffectfulAction};
7373
use crate::p2p::peer::P2pPeerAction;
7474
use crate::p2p::{P2pAction, P2pEffectfulAction, P2pInitializeAction};
7575
use crate::rpc::RpcAction;
76+
use crate::rpc_effectful::RpcEffectfulAction;
7677
use crate::snark::block_verify::SnarkBlockVerifyAction;
7778
use crate::snark::block_verify_effectful::SnarkBlockVerifyEffectfulAction;
7879
use crate::snark::user_command_verify::SnarkUserCommandVerifyAction;
@@ -520,6 +521,38 @@ pub enum ActionKind {
520521
RpcTransactionPool,
521522
RpcTransactionStatusGet,
522523
RpcTransitionFrontierUserCommandsGet,
524+
RpcEffectfulActionStatsGet,
525+
RpcEffectfulBestChain,
526+
RpcEffectfulBlockProducerStatsGet,
527+
RpcEffectfulConsensusConstantsGet,
528+
RpcEffectfulDiscoveryBoostrapStats,
529+
RpcEffectfulDiscoveryRoutingTable,
530+
RpcEffectfulGlobalStateGet,
531+
RpcEffectfulHealthCheck,
532+
RpcEffectfulLedgerAccountsGetSuccess,
533+
RpcEffectfulMessageProgressGet,
534+
RpcEffectfulP2pConnectionIncomingError,
535+
RpcEffectfulP2pConnectionIncomingRespond,
536+
RpcEffectfulP2pConnectionIncomingSuccess,
537+
RpcEffectfulP2pConnectionOutgoingError,
538+
RpcEffectfulP2pConnectionOutgoingSuccess,
539+
RpcEffectfulPeersGet,
540+
RpcEffectfulReadinessCheck,
541+
RpcEffectfulScanStateSummaryGetSuccess,
542+
RpcEffectfulSnarkPoolAvailableJobsGet,
543+
RpcEffectfulSnarkPoolJobGet,
544+
RpcEffectfulSnarkerConfigGet,
545+
RpcEffectfulSnarkerJobCommit,
546+
RpcEffectfulSnarkerJobSpec,
547+
RpcEffectfulSnarkerWorkersGet,
548+
RpcEffectfulStatusGet,
549+
RpcEffectfulSyncStatsGet,
550+
RpcEffectfulTransactionInjectFailure,
551+
RpcEffectfulTransactionInjectRejected,
552+
RpcEffectfulTransactionInjectSuccess,
553+
RpcEffectfulTransactionPool,
554+
RpcEffectfulTransactionStatusGet,
555+
RpcEffectfulTransitionFrontierUserCommandsGet,
523556
SnarkBlockVerifyError,
524557
SnarkBlockVerifyFinish,
525558
SnarkBlockVerifyInit,
@@ -661,7 +694,7 @@ pub enum ActionKind {
661694
}
662695

663696
impl ActionKind {
664-
pub const COUNT: u16 = 549;
697+
pub const COUNT: u16 = 581;
665698
}
666699

667700
impl std::fmt::Display for ActionKind {
@@ -689,6 +722,7 @@ impl ActionKindGet for Action {
689722
Self::ExternalSnarkWorker(a) => a.kind(),
690723
Self::BlockProducer(a) => a.kind(),
691724
Self::Rpc(a) => a.kind(),
725+
Self::RpcEffectful(a) => a.kind(),
692726
Self::WatchedAccounts(a) => a.kind(),
693727
}
694728
}
@@ -1003,6 +1037,69 @@ impl ActionKindGet for RpcAction {
10031037
}
10041038
}
10051039

1040+
impl ActionKindGet for RpcEffectfulAction {
1041+
fn kind(&self) -> ActionKind {
1042+
match self {
1043+
Self::GlobalStateGet { .. } => ActionKind::RpcEffectfulGlobalStateGet,
1044+
Self::StatusGet { .. } => ActionKind::RpcEffectfulStatusGet,
1045+
Self::ActionStatsGet { .. } => ActionKind::RpcEffectfulActionStatsGet,
1046+
Self::SyncStatsGet { .. } => ActionKind::RpcEffectfulSyncStatsGet,
1047+
Self::BlockProducerStatsGet { .. } => ActionKind::RpcEffectfulBlockProducerStatsGet,
1048+
Self::MessageProgressGet { .. } => ActionKind::RpcEffectfulMessageProgressGet,
1049+
Self::PeersGet { .. } => ActionKind::RpcEffectfulPeersGet,
1050+
Self::P2pConnectionOutgoingError { .. } => {
1051+
ActionKind::RpcEffectfulP2pConnectionOutgoingError
1052+
}
1053+
Self::P2pConnectionOutgoingSuccess { .. } => {
1054+
ActionKind::RpcEffectfulP2pConnectionOutgoingSuccess
1055+
}
1056+
Self::P2pConnectionIncomingRespond { .. } => {
1057+
ActionKind::RpcEffectfulP2pConnectionIncomingRespond
1058+
}
1059+
Self::P2pConnectionIncomingError { .. } => {
1060+
ActionKind::RpcEffectfulP2pConnectionIncomingError
1061+
}
1062+
Self::P2pConnectionIncomingSuccess { .. } => {
1063+
ActionKind::RpcEffectfulP2pConnectionIncomingSuccess
1064+
}
1065+
Self::ScanStateSummaryGetSuccess { .. } => {
1066+
ActionKind::RpcEffectfulScanStateSummaryGetSuccess
1067+
}
1068+
Self::SnarkPoolAvailableJobsGet { .. } => {
1069+
ActionKind::RpcEffectfulSnarkPoolAvailableJobsGet
1070+
}
1071+
Self::SnarkPoolJobGet { .. } => ActionKind::RpcEffectfulSnarkPoolJobGet,
1072+
Self::SnarkerConfigGet { .. } => ActionKind::RpcEffectfulSnarkerConfigGet,
1073+
Self::SnarkerJobCommit { .. } => ActionKind::RpcEffectfulSnarkerJobCommit,
1074+
Self::SnarkerJobSpec { .. } => ActionKind::RpcEffectfulSnarkerJobSpec,
1075+
Self::SnarkerWorkersGet { .. } => ActionKind::RpcEffectfulSnarkerWorkersGet,
1076+
Self::HealthCheck { .. } => ActionKind::RpcEffectfulHealthCheck,
1077+
Self::ReadinessCheck { .. } => ActionKind::RpcEffectfulReadinessCheck,
1078+
Self::DiscoveryRoutingTable { .. } => ActionKind::RpcEffectfulDiscoveryRoutingTable,
1079+
Self::DiscoveryBoostrapStats { .. } => ActionKind::RpcEffectfulDiscoveryBoostrapStats,
1080+
Self::TransactionPool { .. } => ActionKind::RpcEffectfulTransactionPool,
1081+
Self::LedgerAccountsGetSuccess { .. } => {
1082+
ActionKind::RpcEffectfulLedgerAccountsGetSuccess
1083+
}
1084+
Self::TransactionInjectSuccess { .. } => {
1085+
ActionKind::RpcEffectfulTransactionInjectSuccess
1086+
}
1087+
Self::TransactionInjectRejected { .. } => {
1088+
ActionKind::RpcEffectfulTransactionInjectRejected
1089+
}
1090+
Self::TransactionInjectFailure { .. } => {
1091+
ActionKind::RpcEffectfulTransactionInjectFailure
1092+
}
1093+
Self::TransitionFrontierUserCommandsGet { .. } => {
1094+
ActionKind::RpcEffectfulTransitionFrontierUserCommandsGet
1095+
}
1096+
Self::BestChain { .. } => ActionKind::RpcEffectfulBestChain,
1097+
Self::ConsensusConstantsGet { .. } => ActionKind::RpcEffectfulConsensusConstantsGet,
1098+
Self::TransactionStatusGet { .. } => ActionKind::RpcEffectfulTransactionStatusGet,
1099+
}
1100+
}
1101+
}
1102+
10061103
impl ActionKindGet for WatchedAccountsAction {
10071104
fn kind(&self) -> ActionKind {
10081105
match self {

node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_effects.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use redux::ActionMeta;
22

33
use crate::block_producer::to_epoch_and_slot;
44
use crate::ledger::read::LedgerReadAction;
5+
use crate::ledger::read::LedgerReadInitCallback;
56
use crate::ledger::read::LedgerReadRequest;
67
use crate::Service;
78
use crate::Store;
@@ -149,6 +150,7 @@ impl BlockProducerVrfEvaluatorAction {
149150
};
150151
if store.dispatch(LedgerReadAction::Init {
151152
request: LedgerReadRequest::DelegatorTable(staking_ledger_hash, producer),
153+
callback: LedgerReadInitCallback::None,
152154
}) {
153155
// TODO(binier): have pending action.
154156
} else {

node/src/effects.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::ledger::ledger_effects;
88
use crate::ledger::read::LedgerReadAction;
99
use crate::logger::logger_effects;
1010
use crate::p2p::node_p2p_effects;
11-
use crate::rpc::rpc_effects;
11+
use crate::rpc_effectful::rpc_effects;
1212
use crate::snark::snark_effects;
1313
use crate::snark_pool::candidate::SnarkPoolCandidateAction;
1414
use crate::snark_pool::{snark_pool_effects, SnarkPoolAction};
@@ -87,7 +87,10 @@ pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
8787
Action::ExternalSnarkWorker(action) => {
8888
external_snark_worker_effects(store, meta.with_action(action));
8989
}
90-
Action::Rpc(action) => {
90+
Action::Rpc(_) => {
91+
// Handled by reducer
92+
}
93+
Action::RpcEffectful(action) => {
9194
rpc_effects(store, meta.with_action(action));
9295
}
9396
Action::WatchedAccounts(_) => {

node/src/ledger/ledger_effects.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use mina_p2p_messages::v2;
22
use p2p::channels::rpc::P2pRpcRequest;
33
use p2p::channels::streaming_rpc::{P2pChannelsStreamingRpcAction, P2pStreamingRpcRequest};
4+
use p2p::P2pAction;
45

56
use crate::block_producer::vrf_evaluator::BlockProducerVrfEvaluatorAction;
67
use crate::p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcResponse};
@@ -10,7 +11,7 @@ use crate::transition_frontier::sync::TransitionFrontierSyncAction;
1011
use crate::{BlockProducerAction, RpcAction, Store};
1112

1213
use super::read::{
13-
LedgerReadAction, LedgerReadId, LedgerReadRequest, LedgerReadResponse,
14+
LedgerReadAction, LedgerReadId, LedgerReadInitCallback, LedgerReadRequest, LedgerReadResponse,
1415
LedgerReadStagedLedgerAuxAndPendingCoinbases,
1516
};
1617
use super::write::{LedgerWriteAction, LedgerWriteResponse};
@@ -36,13 +37,26 @@ pub fn ledger_effects<S: LedgerService>(store: &mut Store<S>, action: LedgerActi
3637
LedgerReadAction::FindTodos => {
3738
next_read_requests_init(store);
3839
}
39-
LedgerReadAction::Init { request } => {
40+
LedgerReadAction::Init { request, callback } => {
4041
if store.state().ledger.read.has_same_request(&request) {
4142
return;
4243
}
4344
let id = store.state().ledger.read.next_req_id();
4445
store.service.read_init(id, request.clone());
4546
store.dispatch(LedgerReadAction::Pending { id, request });
47+
48+
match callback {
49+
LedgerReadInitCallback::RpcLedgerAccountsGetPending { callback, args } => {
50+
store.dispatch_callback(callback, args);
51+
}
52+
LedgerReadInitCallback::RpcScanStateSummaryGetPending { callback, args } => {
53+
store.dispatch_callback(callback, args);
54+
}
55+
LedgerReadInitCallback::P2pChannelsResponsePending { callback, args } => {
56+
store.dispatch_callback(callback, args);
57+
}
58+
LedgerReadInitCallback::None => {}
59+
};
4660
}
4761
LedgerReadAction::Pending { .. } => {}
4862
LedgerReadAction::Success { id, response } => {
@@ -219,13 +233,27 @@ fn next_read_requests_init<S: redux::Service>(store: &mut Store<S>) {
219233
}) else {
220234
continue;
221235
};
222-
if store.dispatch(LedgerReadAction::Init { request }) {
223-
if !is_streaming {
224-
store.dispatch(P2pChannelsRpcAction::ResponsePending { peer_id, id });
225-
} else {
226-
store.dispatch(P2pChannelsStreamingRpcAction::ResponsePending { peer_id, id });
227-
}
236+
237+
store.dispatch(LedgerReadAction::Init {
238+
request,
239+
callback: LedgerReadInitCallback::P2pChannelsResponsePending
240+
{ callback: redux::callback!(on_ledger_read_init_p2p_channels_response_pending((is_streaming: bool, id: P2pRpcId, peer_id: PeerId)) -> crate::Action{
241+
if is_streaming {
242+
P2pAction::from(P2pChannelsStreamingRpcAction::ResponsePending {
243+
peer_id,
244+
id,
245+
})
246+
} else {
247+
P2pAction::from(P2pChannelsRpcAction::ResponsePending {
248+
peer_id,
249+
id,
250+
})
251+
}
252+
}),
253+
args:(is_streaming, id, peer_id)
228254
}
255+
});
256+
229257
if !store.state().ledger.read.is_total_cost_under_limit() {
230258
return;
231259
}

node/src/ledger/read/ledger_read_actions.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use serde::{Deserialize, Serialize};
22

3-
use super::{LedgerReadId, LedgerReadRequest, LedgerReadRequestState, LedgerReadResponse};
3+
use super::{
4+
LedgerReadId, LedgerReadInitCallback, LedgerReadRequest, LedgerReadRequestState,
5+
LedgerReadResponse,
6+
};
47

58
pub type LedgerReadActionWithMeta = redux::ActionWithMeta<LedgerReadAction>;
69
pub type LedgerReadActionWithMetaRef<'a> = redux::ActionWithMeta<&'a LedgerReadAction>;
@@ -10,6 +13,7 @@ pub enum LedgerReadAction {
1013
FindTodos,
1114
Init {
1215
request: LedgerReadRequest,
16+
callback: LedgerReadInitCallback,
1317
},
1418
Pending {
1519
id: LedgerReadId,

node/src/ledger/read/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ pub use ledger_read_actions::*;
44

55
mod ledger_read_state;
66
pub use ledger_read_state::*;
7-
use openmina_core::requests::RpcId;
7+
use openmina_core::block::AppliedBlock;
8+
use openmina_core::requests::{RequestId, RpcId, RpcIdType};
9+
use p2p::channels::rpc::P2pRpcId;
10+
use p2p::PeerId;
11+
use redux::Callback;
812

913
mod ledger_read_reducer;
1014

@@ -126,3 +130,20 @@ impl PartialEq for LedgerReadStagedLedgerAuxAndPendingCoinbases {
126130
self.ledger_hash == other.ledger_hash
127131
}
128132
}
133+
134+
#[derive(Serialize, Deserialize, Debug, Clone)]
135+
pub enum LedgerReadInitCallback {
136+
RpcLedgerAccountsGetPending {
137+
callback: Callback<RequestId<RpcIdType>>,
138+
args: RequestId<RpcIdType>,
139+
},
140+
RpcScanStateSummaryGetPending {
141+
callback: Callback<(RequestId<RpcIdType>, AppliedBlock)>,
142+
args: (RequestId<RpcIdType>, AppliedBlock),
143+
},
144+
P2pChannelsResponsePending {
145+
callback: Callback<(bool, P2pRpcId, PeerId)>,
146+
args: (bool, P2pRpcId, PeerId),
147+
},
148+
None,
149+
}

node/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub mod ledger;
3838
pub mod logger;
3939
pub mod p2p;
4040
pub mod rpc;
41+
pub mod rpc_effectful;
4142
pub mod snark;
4243
pub mod snark_pool;
4344
pub mod transaction_pool;

node/src/reducer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use openmina_core::{bug_condition, error, Substate};
22
use p2p::{P2pAction, P2pEffectfulAction, P2pInitializeAction, P2pState};
33

4-
use crate::{Action, ActionWithMeta, ConsensusAction, EventSourceAction, P2p, State};
4+
use crate::{
5+
rpc::RpcState, Action, ActionWithMeta, ConsensusAction, EventSourceAction, P2p, State,
6+
};
57

68
pub fn reducer(
79
state: &mut State,
@@ -87,9 +89,10 @@ pub fn reducer(
8789
Action::ExternalSnarkWorker(a) => {
8890
state.external_snark_worker.reducer(meta.with_action(a));
8991
}
90-
Action::Rpc(a) => {
91-
state.rpc.reducer(meta.with_action(a));
92+
Action::Rpc(action) => {
93+
RpcState::reducer(Substate::new(state, dispatcher), meta.with_action(action));
9294
}
95+
Action::RpcEffectful(_) => {}
9396
Action::WatchedAccounts(a) => {
9497
crate::watched_accounts::WatchedAccountsState::reducer(
9598
Substate::new(state, dispatcher),

0 commit comments

Comments
 (0)