Skip to content

Commit 453b283

Browse files
authored
Merge pull request #964 from openmina/fix/dpool/cleanup_when_peer_sends_empty_response
Fix Pools: clean up candidates if we get an empty response for that data
2 parents adce26d + f4cc19f commit 453b283

9 files changed

+113
-28
lines changed

node/src/action_kind.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -567,9 +567,10 @@ pub enum ActionKind {
567567
SnarkPoolCandidateInfoReceived,
568568
SnarkPoolCandidatePeerPrune,
569569
SnarkPoolCandidateWorkFetchAll,
570+
SnarkPoolCandidateWorkFetchError,
570571
SnarkPoolCandidateWorkFetchInit,
571572
SnarkPoolCandidateWorkFetchPending,
572-
SnarkPoolCandidateWorkReceived,
573+
SnarkPoolCandidateWorkFetchSuccess,
573574
SnarkPoolCandidateWorkVerifyError,
574575
SnarkPoolCandidateWorkVerifyNext,
575576
SnarkPoolCandidateWorkVerifyPending,
@@ -601,11 +602,12 @@ pub enum ActionKind {
601602
TransactionPoolStartVerifyWithAccounts,
602603
TransactionPoolVerifyError,
603604
TransactionPoolCandidateFetchAll,
605+
TransactionPoolCandidateFetchError,
604606
TransactionPoolCandidateFetchInit,
605607
TransactionPoolCandidateFetchPending,
608+
TransactionPoolCandidateFetchSuccess,
606609
TransactionPoolCandidateInfoReceived,
607610
TransactionPoolCandidatePeerPrune,
608-
TransactionPoolCandidateReceived,
609611
TransactionPoolCandidateVerifyError,
610612
TransactionPoolCandidateVerifyNext,
611613
TransactionPoolCandidateVerifyPending,
@@ -702,7 +704,7 @@ pub enum ActionKind {
702704
}
703705

704706
impl ActionKind {
705-
pub const COUNT: u16 = 592;
707+
pub const COUNT: u16 = 594;
706708
}
707709

708710
impl std::fmt::Display for ActionKind {
@@ -1486,7 +1488,8 @@ impl ActionKindGet for SnarkPoolCandidateAction {
14861488
Self::WorkFetchAll => ActionKind::SnarkPoolCandidateWorkFetchAll,
14871489
Self::WorkFetchInit { .. } => ActionKind::SnarkPoolCandidateWorkFetchInit,
14881490
Self::WorkFetchPending { .. } => ActionKind::SnarkPoolCandidateWorkFetchPending,
1489-
Self::WorkReceived { .. } => ActionKind::SnarkPoolCandidateWorkReceived,
1491+
Self::WorkFetchError { .. } => ActionKind::SnarkPoolCandidateWorkFetchError,
1492+
Self::WorkFetchSuccess { .. } => ActionKind::SnarkPoolCandidateWorkFetchSuccess,
14901493
Self::WorkVerifyNext => ActionKind::SnarkPoolCandidateWorkVerifyNext,
14911494
Self::WorkVerifyPending { .. } => ActionKind::SnarkPoolCandidateWorkVerifyPending,
14921495
Self::WorkVerifyError { .. } => ActionKind::SnarkPoolCandidateWorkVerifyError,
@@ -1503,7 +1506,8 @@ impl ActionKindGet for TransactionPoolCandidateAction {
15031506
Self::FetchAll => ActionKind::TransactionPoolCandidateFetchAll,
15041507
Self::FetchInit { .. } => ActionKind::TransactionPoolCandidateFetchInit,
15051508
Self::FetchPending { .. } => ActionKind::TransactionPoolCandidateFetchPending,
1506-
Self::Received { .. } => ActionKind::TransactionPoolCandidateReceived,
1509+
Self::FetchError { .. } => ActionKind::TransactionPoolCandidateFetchError,
1510+
Self::FetchSuccess { .. } => ActionKind::TransactionPoolCandidateFetchSuccess,
15071511
Self::VerifyNext => ActionKind::TransactionPoolCandidateVerifyNext,
15081512
Self::VerifyPending { .. } => ActionKind::TransactionPoolCandidateVerifyPending,
15091513
Self::VerifyError { .. } => ActionKind::TransactionPoolCandidateVerifyError,

node/src/p2p/callbacks/p2p_callbacks_reducer.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ use crate::{
3434

3535
use super::P2pCallbacksAction;
3636

37+
fn get_rpc_request<'a>(state: &'a State, peer_id: &PeerId) -> Option<&'a P2pRpcRequest> {
38+
state
39+
.p2p
40+
.get_ready_peer(peer_id)
41+
.and_then(|s| s.channels.rpc.local_responded_request())
42+
.map(|(_, req)| req)
43+
}
44+
3745
impl crate::State {
3846
pub fn p2p_callback_reducer(
3947
state_context: crate::Substate<Self>,
@@ -110,7 +118,10 @@ impl crate::State {
110118
id,
111119
response,
112120
} => {
113-
State::handle_rpc_channels_response(dispatcher, meta, *id, *peer_id, response);
121+
let request = || get_rpc_request(state, peer_id);
122+
State::handle_rpc_channels_response(
123+
dispatcher, meta, *id, *peer_id, request, response,
124+
);
114125
dispatcher.push(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
115126
dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
116127
dispatcher.push(TransitionFrontierSyncAction::BlocksPeersQuery);
@@ -265,6 +276,7 @@ impl crate::State {
265276
})
266277
.for_each(|action| dispatcher.push(action));
267278

279+
dispatcher.push(TransactionPoolCandidateAction::PeerPrune { peer_id });
268280
dispatcher.push(SnarkPoolCandidateAction::PeerPrune { peer_id });
269281
}
270282
P2pCallbacksAction::RpcRespondBestTip { peer_id } => {
@@ -386,15 +398,32 @@ impl crate::State {
386398
}
387399
}
388400

389-
fn handle_rpc_channels_response(
401+
fn handle_rpc_channels_response<'a>(
390402
dispatcher: &mut Dispatcher<Action, State>,
391403
meta: ActionMeta,
392404
id: u64,
393405
peer_id: PeerId,
406+
request: impl FnOnce() -> Option<&'a P2pRpcRequest>,
394407
response: &Option<Box<P2pRpcResponse>>,
395408
) {
396409
match response.as_deref() {
397410
None => {
411+
match request() {
412+
Some(P2pRpcRequest::Transaction(hash)) => {
413+
let hash = hash.clone();
414+
dispatcher
415+
.push(TransactionPoolCandidateAction::FetchError { peer_id, hash });
416+
return;
417+
}
418+
Some(P2pRpcRequest::Snark(job_id)) => {
419+
let job_id = job_id.clone();
420+
dispatcher
421+
.push(SnarkPoolCandidateAction::WorkFetchError { peer_id, job_id });
422+
return;
423+
}
424+
_ => {}
425+
}
426+
398427
dispatcher.push(
399428
TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError {
400429
peer_id,
@@ -527,14 +556,16 @@ impl crate::State {
527556
Some(P2pRpcResponse::Transaction(transaction)) => {
528557
match TransactionWithHash::try_new(transaction.clone()) {
529558
Err(err) => bug_condition!("tx hashing failed: {err}"),
530-
Ok(transaction) => dispatcher.push(TransactionPoolCandidateAction::Received {
531-
peer_id,
532-
transaction,
533-
}),
559+
Ok(transaction) => {
560+
dispatcher.push(TransactionPoolCandidateAction::FetchSuccess {
561+
peer_id,
562+
transaction,
563+
})
564+
}
534565
}
535566
}
536567
Some(P2pRpcResponse::Snark(snark)) => {
537-
dispatcher.push(SnarkPoolCandidateAction::WorkReceived {
568+
dispatcher.push(SnarkPoolCandidateAction::WorkFetchSuccess {
538569
peer_id,
539570
work: snark.clone(),
540571
});

node/src/snark_pool/candidate/snark_pool_candidate_actions.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@ pub enum SnarkPoolCandidateAction {
3131
job_id: SnarkJobId,
3232
rpc_id: P2pRpcId,
3333
},
34-
WorkReceived {
34+
WorkFetchError {
35+
peer_id: PeerId,
36+
job_id: SnarkJobId,
37+
},
38+
WorkFetchSuccess {
3539
peer_id: PeerId,
3640
work: Snark,
3741
},
@@ -91,7 +95,14 @@ impl redux::EnablingCondition<crate::State> for SnarkPoolCandidateAction {
9195
.map_or(false, |s| {
9296
matches!(s, SnarkPoolCandidateState::InfoReceived { .. })
9397
}),
94-
SnarkPoolCandidateAction::WorkReceived { peer_id, work } => {
98+
SnarkPoolCandidateAction::WorkFetchError { peer_id, job_id } => state
99+
.snark_pool
100+
.candidates
101+
.get(*peer_id, job_id)
102+
.map_or(false, |s| {
103+
matches!(s, SnarkPoolCandidateState::WorkFetchPending { .. })
104+
}),
105+
SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work } => {
95106
let job_id = work.job_id();
96107
state.snark_pool.contains(&job_id)
97108
&& state

node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ impl SnarkPoolCandidatesState {
8585
} => {
8686
state.work_fetch_pending(meta.time(), peer_id, job_id, *rpc_id);
8787
}
88-
SnarkPoolCandidateAction::WorkReceived { peer_id, work } => {
88+
SnarkPoolCandidateAction::WorkFetchError { peer_id, job_id } => {
89+
state.peer_work_remove(*peer_id, job_id);
90+
}
91+
SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work } => {
8992
state.work_received(meta.time(), *peer_id, work.clone());
9093
}
9194
SnarkPoolCandidateAction::WorkVerifyNext => {

node/src/snark_pool/candidate/snark_pool_candidate_state.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,18 @@ impl SnarkPoolCandidatesState {
271271
}
272272
}
273273

274+
pub fn peer_work_remove(&mut self, peer_id: PeerId, job_id: &SnarkJobId) {
275+
if let Some(works) = self.by_peer.get_mut(&peer_id) {
276+
works.remove(job_id);
277+
if let Some(peers) = self.by_job_id.get_mut(job_id) {
278+
peers.remove(&peer_id);
279+
if peers.is_empty() {
280+
self.by_job_id.remove(job_id);
281+
}
282+
}
283+
}
284+
}
285+
274286
pub fn remove_inferior_snarks(&mut self, snark: &Snark) {
275287
let job_id = snark.job_id();
276288
let by_peer = &mut self.by_peer;

node/src/state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ impl P2p {
551551
)),
552552
on_p2p_channels_snark_libp2p_received: Some(redux::callback!(
553553
on_p2p_channels_snark_libp2p_received((peer_id: PeerId, snark: Box<Snark>)) -> crate::Action{
554-
SnarkPoolCandidateAction::WorkReceived { peer_id, work: *snark }
554+
SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work: *snark }
555555
}
556556
)),
557557
on_p2p_channels_streaming_rpc_ready: Some(redux::callback!(

node/src/transaction_pool/candidate/transaction_pool_candidate_actions.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ pub enum TransactionPoolCandidateAction {
2929
hash: TransactionHash,
3030
rpc_id: P2pRpcId,
3131
},
32-
Received {
32+
FetchError {
33+
peer_id: PeerId,
34+
hash: TransactionHash,
35+
},
36+
FetchSuccess {
3337
peer_id: PeerId,
3438
transaction: TransactionWithHash,
3539
},
@@ -82,7 +86,12 @@ impl redux::EnablingCondition<crate::State> for TransactionPoolCandidateAction {
8286
.map_or(false, |s| {
8387
matches!(s, TransactionPoolCandidateState::InfoReceived { .. })
8488
}),
85-
TransactionPoolCandidateAction::Received {
89+
TransactionPoolCandidateAction::FetchError { peer_id, hash } => state
90+
.transaction_pool
91+
.candidates
92+
.get(*peer_id, hash)
93+
.is_some(),
94+
TransactionPoolCandidateAction::FetchSuccess {
8695
peer_id,
8796
transaction,
8897
} => state

node/src/transaction_pool/candidate/transaction_pool_candidate_reducer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,16 @@ impl TransactionPoolCandidatesState {
7878
hash,
7979
rpc_id,
8080
} => {
81-
state.work_fetch_pending(meta.time(), peer_id, hash, *rpc_id);
81+
state.fetch_pending(meta.time(), peer_id, hash, *rpc_id);
8282
}
83-
TransactionPoolCandidateAction::Received {
83+
TransactionPoolCandidateAction::FetchError { peer_id, hash } => {
84+
state.peer_transaction_remove(*peer_id, hash);
85+
}
86+
TransactionPoolCandidateAction::FetchSuccess {
8487
peer_id,
8588
transaction,
8689
} => {
87-
state.work_received(meta.time(), *peer_id, transaction.clone());
90+
state.transaction_received(meta.time(), *peer_id, transaction.clone());
8891
}
8992
TransactionPoolCandidateAction::VerifyNext => {
9093
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();

node/src/transaction_pool/candidate/transaction_pool_candidate_state.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl TransactionPoolCandidatesState {
159159
.collect()
160160
}
161161

162-
pub fn work_fetch_pending(
162+
pub fn fetch_pending(
163163
&mut self,
164164
time: Timestamp,
165165
peer_id: &PeerId,
@@ -181,7 +181,7 @@ impl TransactionPoolCandidatesState {
181181
}
182182
}
183183

184-
pub fn work_received(
184+
pub fn transaction_received(
185185
&mut self,
186186
time: Timestamp,
187187
peer_id: PeerId,
@@ -269,8 +269,8 @@ impl TransactionPoolCandidatesState {
269269
}
270270

271271
pub fn peer_remove(&mut self, peer_id: PeerId) {
272-
if let Some(works) = self.by_peer.remove(&peer_id) {
273-
for hash in works.into_keys() {
272+
if let Some(txs) = self.by_peer.remove(&peer_id) {
273+
for hash in txs.into_keys() {
274274
if let Some(peers) = self.by_hash.get_mut(&hash) {
275275
peers.remove(&peer_id);
276276
if peers.is_empty() {
@@ -281,6 +281,18 @@ impl TransactionPoolCandidatesState {
281281
}
282282
}
283283

284+
pub fn peer_transaction_remove(&mut self, peer_id: PeerId, hash: &TransactionHash) {
285+
if let Some(txs) = self.by_peer.get_mut(&peer_id) {
286+
txs.remove(hash);
287+
if let Some(peers) = self.by_hash.get_mut(hash) {
288+
peers.remove(&peer_id);
289+
if peers.is_empty() {
290+
self.by_hash.remove(hash);
291+
}
292+
}
293+
}
294+
}
295+
284296
fn transaction_remove(&mut self, hash: &TransactionHash) {
285297
if let Some(peers) = self.by_hash.remove(hash) {
286298
for peer_id in peers {
@@ -310,11 +322,11 @@ impl TransactionPoolCandidatesState {
310322
self.by_hash.retain(|hash, peers| {
311323
let mut predicate = predicate(hash);
312324
peers.retain(|peer_id| {
313-
if let Some(peer_works) = by_peer.get_mut(peer_id) {
314-
match peer_works.get(hash) {
325+
if let Some(peer_txs) = by_peer.get_mut(peer_id) {
326+
match peer_txs.get(hash) {
315327
Some(s) if predicate(s) => true,
316328
Some(_) => {
317-
peer_works.remove(hash);
329+
peer_txs.remove(hash);
318330
false
319331
}
320332
None => false,

0 commit comments

Comments
 (0)