Skip to content

Commit 1e5ca6e

Browse files
committed
Review fixes
1 parent 03d1796 commit 1e5ca6e

File tree

5 files changed

+123
-40
lines changed

5 files changed

+123
-40
lines changed

node/src/block_producer/vrf_evaluator/block_producer_vrf_evaluator_effects.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ impl BlockProducerVrfEvaluatorAction {
149149
};
150150
if store.dispatch(LedgerReadAction::Init {
151151
request: LedgerReadRequest::DelegatorTable(staking_ledger_hash, producer),
152+
propagate: None,
152153
}) {
153154
// TODO(binier): have pending action.
154155
} else {

node/src/ledger/ledger_effects.rs

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{BlockProducerAction, RpcAction, Store};
1111

1212
use super::read::{
1313
LedgerReadAction, LedgerReadId, LedgerReadRequest, LedgerReadResponse,
14-
LedgerReadStagedLedgerAuxAndPendingCoinbases,
14+
LedgerReadStagedLedgerAuxAndPendingCoinbases, PropagateLedgerReadInit,
1515
};
1616
use super::write::{LedgerWriteAction, LedgerWriteResponse};
1717
use super::{LedgerAction, LedgerActionWithMeta, LedgerAddress, LedgerService};
@@ -36,13 +36,47 @@ pub fn ledger_effects<S: LedgerService>(store: &mut Store<S>, action: LedgerActi
3636
LedgerReadAction::FindTodos => {
3737
next_read_requests_init(store);
3838
}
39-
LedgerReadAction::Init { request } => {
39+
LedgerReadAction::Init { request, propagate } => {
4040
if store.state().ledger.read.has_same_request(&request) {
4141
return;
4242
}
4343
let id = store.state().ledger.read.next_req_id();
4444
store.service.read_init(id, request.clone());
4545
store.dispatch(LedgerReadAction::Pending { id, request });
46+
47+
if let Some(propagate) = propagate {
48+
match propagate {
49+
PropagateLedgerReadInit::RpcLedgerAccountsGetPending { rpc_id } => {
50+
store.dispatch(RpcAction::LedgerAccountsGetPending { rpc_id });
51+
}
52+
PropagateLedgerReadInit::RpcScanStateSummaryGetPending {
53+
rpc_id,
54+
block,
55+
} => {
56+
store.dispatch(RpcAction::ScanStateSummaryGetPending {
57+
rpc_id,
58+
block: Some(block),
59+
});
60+
}
61+
PropagateLedgerReadInit::P2pChannelsResponsePending {
62+
is_streaming,
63+
id,
64+
peer_id,
65+
} => {
66+
if !is_streaming {
67+
store.dispatch(P2pChannelsRpcAction::ResponsePending {
68+
peer_id,
69+
id,
70+
});
71+
} else {
72+
store.dispatch(P2pChannelsStreamingRpcAction::ResponsePending {
73+
peer_id,
74+
id,
75+
});
76+
}
77+
}
78+
}
79+
}
4680
}
4781
LedgerReadAction::Pending { .. } => {}
4882
LedgerReadAction::Success { id, response } => {
@@ -219,13 +253,16 @@ fn next_read_requests_init<S: redux::Service>(store: &mut Store<S>) {
219253
}) else {
220254
continue;
221255
};
222-
if store.dispatch(LedgerReadAction::Init { request }) {
223-
if !is_streaming {
224-
store.dispatch(P2pChannelsRpcAction::ResponsePending { peer_id, id });
225-
} else {
226-
store.dispatch(P2pChannelsStreamingRpcAction::ResponsePending { peer_id, id });
227-
}
228-
}
256+
257+
store.dispatch(LedgerReadAction::Init {
258+
request,
259+
propagate: Some(PropagateLedgerReadInit::P2pChannelsResponsePending {
260+
is_streaming,
261+
id,
262+
peer_id,
263+
}),
264+
});
265+
229266
if !store.state().ledger.read.is_total_cost_under_limit() {
230267
return;
231268
}

node/src/ledger/read/ledger_read_actions.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use serde::{Deserialize, Serialize};
22

3-
use super::{LedgerReadId, LedgerReadRequest, LedgerReadRequestState, LedgerReadResponse};
3+
use super::{
4+
LedgerReadId, LedgerReadRequest, LedgerReadRequestState, LedgerReadResponse,
5+
PropagateLedgerReadInit,
6+
};
47

58
pub type LedgerReadActionWithMeta = redux::ActionWithMeta<LedgerReadAction>;
69
pub type LedgerReadActionWithMetaRef<'a> = redux::ActionWithMeta<&'a LedgerReadAction>;
@@ -10,6 +13,7 @@ pub enum LedgerReadAction {
1013
FindTodos,
1114
Init {
1215
request: LedgerReadRequest,
16+
propagate: Option<PropagateLedgerReadInit>,
1317
},
1418
Pending {
1519
id: LedgerReadId,

node/src/ledger/read/mod.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ pub use ledger_read_actions::*;
44

55
mod ledger_read_state;
66
pub use ledger_read_state::*;
7-
use openmina_core::requests::RpcId;
7+
use openmina_core::block::AppliedBlock;
8+
use openmina_core::requests::{RequestId, RpcId, RpcIdType};
9+
use p2p::channels::rpc::P2pRpcId;
10+
use p2p::PeerId;
811

912
mod ledger_read_reducer;
1013

@@ -126,3 +129,19 @@ impl PartialEq for LedgerReadStagedLedgerAuxAndPendingCoinbases {
126129
self.ledger_hash == other.ledger_hash
127130
}
128131
}
132+
133+
#[derive(Serialize, Deserialize, Debug, Clone)]
134+
pub enum PropagateLedgerReadInit {
135+
RpcLedgerAccountsGetPending {
136+
rpc_id: RequestId<RpcIdType>,
137+
},
138+
RpcScanStateSummaryGetPending {
139+
rpc_id: RequestId<RpcIdType>,
140+
block: AppliedBlock,
141+
},
142+
P2pChannelsResponsePending {
143+
is_streaming: bool,
144+
id: P2pRpcId,
145+
peer_id: PeerId,
146+
},
147+
}

node/src/rpc/rpc_reducer.rs

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use p2p::{
66
use redux::ActionWithMeta;
77

88
use crate::{
9-
ledger::read::{LedgerReadAction, LedgerReadRequest},
9+
ledger::read::{LedgerReadAction, LedgerReadRequest, PropagateLedgerReadInit},
1010
p2p_ready,
1111
rpc_effectful::RpcEffectfulAction,
1212
TransactionPoolAction,
@@ -83,12 +83,20 @@ impl RpcState {
8383
}
8484
RpcAction::P2pConnectionOutgoingPending { rpc_id } => {
8585
let Some(rpc) = state.requests.get_mut(rpc_id) else {
86+
bug_condition!(
87+
"Rpc state not found for RpcAction::P2pConnectionOutgoingPending({})",
88+
rpc_id
89+
);
8690
return;
8791
};
8892
rpc.status = RpcRequestStatus::Pending { time: meta.time() };
8993
}
9094
RpcAction::P2pConnectionOutgoingError { rpc_id, error } => {
9195
let Some(rpc) = state.requests.get_mut(rpc_id) else {
96+
bug_condition!(
97+
"Rpc state not found for RpcAction::P2pConnectionOutgoingError({})",
98+
rpc_id
99+
);
92100
return;
93101
};
94102
rpc.status = RpcRequestStatus::Error {
@@ -104,6 +112,10 @@ impl RpcState {
104112
}
105113
RpcAction::P2pConnectionOutgoingSuccess { rpc_id } => {
106114
let Some(rpc) = state.requests.get_mut(rpc_id) else {
115+
bug_condition!(
116+
"Rpc state not found for RpcAction::P2pConnectionOutgoingSuccess({})",
117+
rpc_id
118+
);
107119
return;
108120
};
109121
rpc.status = RpcRequestStatus::Success { time: meta.time() };
@@ -142,6 +154,10 @@ impl RpcState {
142154
}
143155
RpcAction::P2pConnectionIncomingPending { rpc_id } => {
144156
let Some(rpc) = state.requests.get_mut(rpc_id) else {
157+
bug_condition!(
158+
"Rpc state not found for RpcAction::P2pConnectionIncomingPending({})",
159+
rpc_id
160+
);
145161
return;
146162
};
147163
rpc.status = RpcRequestStatus::Pending { time: meta.time() };
@@ -155,6 +171,10 @@ impl RpcState {
155171
}
156172
RpcAction::P2pConnectionIncomingError { rpc_id, error } => {
157173
let Some(rpc) = state.requests.get_mut(rpc_id) else {
174+
bug_condition!(
175+
"Rpc state not found for RpcAction::P2pConnectionIncomingError({})",
176+
rpc_id
177+
);
158178
return;
159179
};
160180
rpc.status = RpcRequestStatus::Error {
@@ -170,6 +190,10 @@ impl RpcState {
170190
}
171191
RpcAction::P2pConnectionIncomingSuccess { rpc_id } => {
172192
let Some(rpc) = state.requests.get_mut(rpc_id) else {
193+
bug_condition!(
194+
"Rpc state not found for RpcAction::P2pConnectionIncomingSuccess({})",
195+
rpc_id
196+
);
173197
return;
174198
};
175199
rpc.status = RpcRequestStatus::Success { time: meta.time() };
@@ -232,31 +256,33 @@ impl RpcState {
232256
}
233257
};
234258

235-
// TODO: add callbacks here
236-
if dispatcher.push_if_enabled(
237-
LedgerReadAction::Init {
238-
request: LedgerReadRequest::ScanStateSummary(
239-
block.staged_ledger_hashes().clone(),
240-
),
241-
},
242-
state,
243-
meta.time(),
244-
) {
245-
dispatcher.push(RpcAction::ScanStateSummaryGetPending {
259+
dispatcher.push(LedgerReadAction::Init {
260+
request: LedgerReadRequest::ScanStateSummary(
261+
block.staged_ledger_hashes().clone(),
262+
),
263+
propagate: Some(PropagateLedgerReadInit::RpcScanStateSummaryGetPending {
246264
rpc_id: *rpc_id,
247-
block: Some(block),
248-
});
249-
}
265+
block,
266+
}),
267+
});
250268
}
251269
RpcAction::ScanStateSummaryGetPending { rpc_id, block } => {
252270
let Some(rpc) = state.requests.get_mut(rpc_id) else {
271+
bug_condition!(
272+
"Rpc state not found for RpcAction::ScanStateSummaryGetPending({})",
273+
rpc_id
274+
);
253275
return;
254276
};
255277
rpc.status = RpcRequestStatus::Pending { time: meta.time() };
256278
rpc.data = RpcRequestExtraData::FullBlockOpt(block.clone());
257279
}
258280
RpcAction::ScanStateSummaryGetSuccess { rpc_id, scan_state } => {
259281
let Some(rpc) = state.requests.get_mut(rpc_id) else {
282+
bug_condition!(
283+
"Rpc state not found for RpcAction::ScanStateSummaryGetSuccess({})",
284+
rpc_id
285+
);
260286
return;
261287
};
262288
rpc.status = RpcRequestStatus::Success { time: meta.time() };
@@ -408,20 +434,16 @@ impl RpcState {
408434
return;
409435
};
410436

411-
// TODO: add callback here
412-
if dispatcher.push_if_enabled(
413-
LedgerReadAction::Init {
414-
request: LedgerReadRequest::AccountsForRpc(
415-
*rpc_id,
416-
ledger_hash.clone(),
417-
account_query.clone(),
418-
),
419-
},
420-
state,
421-
meta.time(),
422-
) {
423-
dispatcher.push(RpcAction::LedgerAccountsGetPending { rpc_id: *rpc_id });
424-
}
437+
dispatcher.push(LedgerReadAction::Init {
438+
request: LedgerReadRequest::AccountsForRpc(
439+
*rpc_id,
440+
ledger_hash.clone(),
441+
account_query.clone(),
442+
),
443+
propagate: Some(PropagateLedgerReadInit::RpcLedgerAccountsGetPending {
444+
rpc_id: *rpc_id,
445+
}),
446+
})
425447
}
426448
RpcAction::LedgerAccountsGetPending { rpc_id } => {
427449
let Some(rpc) = state.requests.get_mut(rpc_id) else {

0 commit comments

Comments
 (0)