Skip to content

Commit 1743bf6

Browse files
committed
Added callbacks
1 parent 1e5ca6e commit 1743bf6

File tree

6 files changed

+70
-71
lines changed

6 files changed

+70
-71
lines changed

node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_effects.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use redux::ActionMeta;
22

33
use crate::block_producer::to_epoch_and_slot;
44
use crate::ledger::read::LedgerReadAction;
5+
use crate::ledger::read::LedgerReadInitCallback;
56
use crate::ledger::read::LedgerReadRequest;
67
use crate::Service;
78
use crate::Store;
@@ -149,7 +150,7 @@ impl BlockProducerVrfEvaluatorAction {
149150
};
150151
if store.dispatch(LedgerReadAction::Init {
151152
request: LedgerReadRequest::DelegatorTable(staking_ledger_hash, producer),
152-
propagate: None,
153+
callback: LedgerReadInitCallback::None,
153154
}) {
154155
// TODO(binier): have pending action.
155156
} else {

node/src/ledger/ledger_effects.rs

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use mina_p2p_messages::v2;
22
use p2p::channels::rpc::P2pRpcRequest;
33
use p2p::channels::streaming_rpc::{P2pChannelsStreamingRpcAction, P2pStreamingRpcRequest};
4+
use p2p::P2pAction;
45

56
use crate::block_producer::vrf_evaluator::BlockProducerVrfEvaluatorAction;
67
use crate::p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcResponse};
@@ -10,8 +11,8 @@ use crate::transition_frontier::sync::TransitionFrontierSyncAction;
1011
use crate::{BlockProducerAction, RpcAction, Store};
1112

1213
use super::read::{
13-
LedgerReadAction, LedgerReadId, LedgerReadRequest, LedgerReadResponse,
14-
LedgerReadStagedLedgerAuxAndPendingCoinbases, PropagateLedgerReadInit,
14+
LedgerReadAction, LedgerReadId, LedgerReadInitCallback, LedgerReadRequest, LedgerReadResponse,
15+
LedgerReadStagedLedgerAuxAndPendingCoinbases,
1516
};
1617
use super::write::{LedgerWriteAction, LedgerWriteResponse};
1718
use super::{LedgerAction, LedgerActionWithMeta, LedgerAddress, LedgerService};
@@ -36,47 +37,26 @@ pub fn ledger_effects<S: LedgerService>(store: &mut Store<S>, action: LedgerActi
3637
LedgerReadAction::FindTodos => {
3738
next_read_requests_init(store);
3839
}
39-
LedgerReadAction::Init { request, propagate } => {
40+
LedgerReadAction::Init { request, callback } => {
4041
if store.state().ledger.read.has_same_request(&request) {
4142
return;
4243
}
4344
let id = store.state().ledger.read.next_req_id();
4445
store.service.read_init(id, request.clone());
4546
store.dispatch(LedgerReadAction::Pending { id, request });
4647

47-
if let Some(propagate) = propagate {
48-
match propagate {
49-
PropagateLedgerReadInit::RpcLedgerAccountsGetPending { rpc_id } => {
50-
store.dispatch(RpcAction::LedgerAccountsGetPending { rpc_id });
51-
}
52-
PropagateLedgerReadInit::RpcScanStateSummaryGetPending {
53-
rpc_id,
54-
block,
55-
} => {
56-
store.dispatch(RpcAction::ScanStateSummaryGetPending {
57-
rpc_id,
58-
block: Some(block),
59-
});
60-
}
61-
PropagateLedgerReadInit::P2pChannelsResponsePending {
62-
is_streaming,
63-
id,
64-
peer_id,
65-
} => {
66-
if !is_streaming {
67-
store.dispatch(P2pChannelsRpcAction::ResponsePending {
68-
peer_id,
69-
id,
70-
});
71-
} else {
72-
store.dispatch(P2pChannelsStreamingRpcAction::ResponsePending {
73-
peer_id,
74-
id,
75-
});
76-
}
77-
}
48+
match callback {
49+
LedgerReadInitCallback::RpcLedgerAccountsGetPending { callback, args } => {
50+
store.dispatch_callback(callback, args);
7851
}
79-
}
52+
LedgerReadInitCallback::RpcScanStateSummaryGetPending { callback, args } => {
53+
store.dispatch_callback(callback, args);
54+
}
55+
LedgerReadInitCallback::P2pChannelsResponsePending { callback, args } => {
56+
store.dispatch_callback(callback, args);
57+
}
58+
LedgerReadInitCallback::None => {}
59+
};
8060
}
8161
LedgerReadAction::Pending { .. } => {}
8262
LedgerReadAction::Success { id, response } => {
@@ -256,11 +236,22 @@ fn next_read_requests_init<S: redux::Service>(store: &mut Store<S>) {
256236

257237
store.dispatch(LedgerReadAction::Init {
258238
request,
259-
propagate: Some(PropagateLedgerReadInit::P2pChannelsResponsePending {
260-
is_streaming,
261-
id,
262-
peer_id,
263-
}),
239+
callback: LedgerReadInitCallback::P2pChannelsResponsePending
240+
{ callback: redux::callback!(on_ledger_read_init_p2p_channels_response_pending((is_streaming: bool, id: P2pRpcId, peer_id: PeerId)) -> crate::Action{
241+
if is_streaming {
242+
P2pAction::from(P2pChannelsStreamingRpcAction::ResponsePending {
243+
peer_id,
244+
id,
245+
})
246+
} else {
247+
P2pAction::from(P2pChannelsRpcAction::ResponsePending {
248+
peer_id,
249+
id,
250+
})
251+
}
252+
}),
253+
args:(is_streaming, id, peer_id)
254+
}
264255
});
265256

266257
if !store.state().ledger.read.is_total_cost_under_limit() {

node/src/ledger/read/ledger_read_actions.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use serde::{Deserialize, Serialize};
22

33
use super::{
4-
LedgerReadId, LedgerReadRequest, LedgerReadRequestState, LedgerReadResponse,
5-
PropagateLedgerReadInit,
4+
LedgerReadId, LedgerReadInitCallback, LedgerReadRequest, LedgerReadRequestState,
5+
LedgerReadResponse,
66
};
77

88
pub type LedgerReadActionWithMeta = redux::ActionWithMeta<LedgerReadAction>;
@@ -13,7 +13,7 @@ pub enum LedgerReadAction {
1313
FindTodos,
1414
Init {
1515
request: LedgerReadRequest,
16-
propagate: Option<PropagateLedgerReadInit>,
16+
callback: LedgerReadInitCallback,
1717
},
1818
Pending {
1919
id: LedgerReadId,

node/src/ledger/read/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use openmina_core::block::AppliedBlock;
88
use openmina_core::requests::{RequestId, RpcId, RpcIdType};
99
use p2p::channels::rpc::P2pRpcId;
1010
use p2p::PeerId;
11+
use redux::Callback;
1112

1213
mod ledger_read_reducer;
1314

@@ -131,17 +132,18 @@ impl PartialEq for LedgerReadStagedLedgerAuxAndPendingCoinbases {
131132
}
132133

133134
#[derive(Serialize, Deserialize, Debug, Clone)]
134-
pub enum PropagateLedgerReadInit {
135+
pub enum LedgerReadInitCallback {
135136
RpcLedgerAccountsGetPending {
136-
rpc_id: RequestId<RpcIdType>,
137+
callback: Callback<RequestId<RpcIdType>>,
138+
args: RequestId<RpcIdType>,
137139
},
138140
RpcScanStateSummaryGetPending {
139-
rpc_id: RequestId<RpcIdType>,
140-
block: AppliedBlock,
141+
callback: Callback<(RequestId<RpcIdType>, AppliedBlock)>,
142+
args: (RequestId<RpcIdType>, AppliedBlock),
141143
},
142144
P2pChannelsResponsePending {
143-
is_streaming: bool,
144-
id: P2pRpcId,
145-
peer_id: PeerId,
145+
callback: Callback<(bool, P2pRpcId, PeerId)>,
146+
args: (bool, P2pRpcId, PeerId),
146147
},
148+
None,
147149
}

node/src/reducer.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use openmina_core::{bug_condition, error, Substate};
22
use p2p::{P2pAction, P2pEffectfulAction, P2pInitializeAction, P2pState};
33

4-
use crate::{rpc::RpcState, Action, ActionWithMeta, ConsensusAction, EventSourceAction, P2p, State};
4+
use crate::{
5+
rpc::RpcState, Action, ActionWithMeta, ConsensusAction, EventSourceAction, P2p, State,
6+
};
57

68
pub fn reducer(
79
state: &mut State,

node/src/rpc/rpc_reducer.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
use openmina_core::bug_condition;
1+
use openmina_core::{
2+
block::AppliedBlock,
3+
bug_condition,
4+
requests::{RequestId, RpcIdType},
5+
};
26
use p2p::{
37
connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
48
webrtc::P2pConnectionResponse,
59
};
610
use redux::ActionWithMeta;
711

812
use crate::{
9-
ledger::read::{LedgerReadAction, LedgerReadRequest, PropagateLedgerReadInit},
13+
ledger::read::{LedgerReadAction, LedgerReadInitCallback, LedgerReadRequest},
1014
p2p_ready,
1115
rpc_effectful::RpcEffectfulAction,
1216
TransactionPoolAction,
@@ -260,10 +264,14 @@ impl RpcState {
260264
request: LedgerReadRequest::ScanStateSummary(
261265
block.staged_ledger_hashes().clone(),
262266
),
263-
propagate: Some(PropagateLedgerReadInit::RpcScanStateSummaryGetPending {
264-
rpc_id: *rpc_id,
265-
block,
266-
}),
267+
callback: LedgerReadInitCallback::RpcScanStateSummaryGetPending {
268+
callback: redux::callback!(
269+
on_ledger_read_init_rpc_scan_state_summary_get_pending((rpc_id: RequestId<RpcIdType>, block: AppliedBlock)) -> crate::Action{
270+
RpcAction::ScanStateSummaryGetPending { rpc_id, block: Some(block) }
271+
}
272+
),
273+
args: (*rpc_id, block),
274+
},
267275
});
268276
}
269277
RpcAction::ScanStateSummaryGetPending { rpc_id, block } => {
@@ -440,9 +448,14 @@ impl RpcState {
440448
ledger_hash.clone(),
441449
account_query.clone(),
442450
),
443-
propagate: Some(PropagateLedgerReadInit::RpcLedgerAccountsGetPending {
444-
rpc_id: *rpc_id,
445-
}),
451+
callback: LedgerReadInitCallback::RpcLedgerAccountsGetPending {
452+
callback: redux::callback!(
453+
on_ledger_read_init_rpc_actions_get_init(rpc_id: RequestId<RpcIdType>) -> crate::Action{
454+
RpcAction::LedgerAccountsGetPending { rpc_id }
455+
}
456+
),
457+
args: *rpc_id,
458+
},
446459
})
447460
}
448461
RpcAction::LedgerAccountsGetPending { rpc_id } => {
@@ -478,16 +491,6 @@ impl RpcState {
478491

479492
let dispatcher = state_context.into_dispatcher();
480493
dispatcher.push(RpcAction::TransactionInjectPending { rpc_id: *rpc_id });
481-
482-
// sort the commands by nonce
483-
484-
// let Ok(commands) = commands
485-
// .into_iter()
486-
// .map(|c| c.try_into())
487-
// .collect::<Result<_, _>>()
488-
// else {
489-
// return;
490-
// };
491494
dispatcher.push(TransactionPoolAction::StartVerify {
492495
commands: commands.clone().into_iter().collect(),
493496
from_rpc: Some(*rpc_id),

0 commit comments

Comments
 (0)