Skip to content

Commit 29558b8

Browse files
committed
refactor(snark_pool): Combine all action types of the snark_pool module into the parent action type
1 parent 16e461e commit 29558b8

File tree

9 files changed

+159
-237
lines changed

9 files changed

+159
-237
lines changed

node/src/effects.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ use crate::p2p::p2p_effects;
1717
use crate::rpc::rpc_effects;
1818
use crate::snark::snark_effects;
1919
use crate::snark_pool::candidate::SnarkPoolCandidateAction;
20-
use crate::snark_pool::{
21-
snark_pool_effects, SnarkPoolCheckTimeoutsAction, SnarkPoolP2pSendAllAction,
22-
};
20+
use crate::snark_pool::{snark_pool_effects, SnarkPoolAction};
2321
use crate::transition_frontier::sync::TransitionFrontierSyncBlocksNextApplyInitAction;
2422
use crate::transition_frontier::transition_frontier_effects;
2523
use crate::watched_accounts::watched_accounts_effects;
@@ -50,8 +48,8 @@ pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
5048

5149
p2p_try_reconnect_disconnected_peers(store);
5250

53-
store.dispatch(SnarkPoolCheckTimeoutsAction {});
54-
store.dispatch(SnarkPoolP2pSendAllAction {});
51+
store.dispatch(SnarkPoolAction::CheckTimeouts {});
52+
store.dispatch(SnarkPoolAction::P2pSendAll {});
5553

5654
p2p_request_best_tip_if_needed(store);
5755

node/src/external_snark_worker/external_snark_worker_effects.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use openmina_core::snark::Snark;
22

3-
use crate::snark_pool::{SnarkPoolAutoCreateCommitmentAction, SnarkPoolWorkAddAction};
3+
use crate::snark_pool::SnarkPoolAction;
44

55
use super::{
66
available_job_to_snark_worker_spec, ExternalSnarkWorkerAction,
@@ -27,7 +27,7 @@ pub fn external_snark_worker_effects<S: crate::Service>(
2727
}
2828
}
2929
ExternalSnarkWorkerAction::Started => {
30-
store.dispatch(SnarkPoolAutoCreateCommitmentAction {});
30+
store.dispatch(SnarkPoolAction::AutoCreateCommitment {});
3131
}
3232
ExternalSnarkWorkerAction::StartTimeout { .. } => {
3333
store.dispatch(ExternalSnarkWorkerAction::Error {
@@ -78,7 +78,7 @@ pub fn external_snark_worker_effects<S: crate::Service>(
7878
proofs: result.clone(),
7979
};
8080
let sender = store.state().p2p.my_id();
81-
store.dispatch(SnarkPoolWorkAddAction { snark, sender });
81+
store.dispatch(SnarkPoolAction::WorkAdd { snark, sender });
8282
store.dispatch(ExternalSnarkWorkerAction::PruneWork);
8383
}
8484
ExternalSnarkWorkerAction::WorkError { .. } => {
@@ -100,7 +100,7 @@ pub fn external_snark_worker_effects<S: crate::Service>(
100100
store.dispatch(ExternalSnarkWorkerAction::PruneWork);
101101
}
102102
ExternalSnarkWorkerAction::PruneWork => {
103-
store.dispatch(SnarkPoolAutoCreateCommitmentAction {});
103+
store.dispatch(SnarkPoolAction::AutoCreateCommitment {});
104104
}
105105
}
106106
}

node/src/p2p/p2p_effects.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::rpc::{
88
RpcP2pConnectionOutgoingSuccessAction,
99
};
1010
use crate::snark_pool::candidate::SnarkPoolCandidateAction;
11-
use crate::snark_pool::SnarkPoolJobCommitmentAddAction;
11+
use crate::snark_pool::SnarkPoolAction;
1212
use crate::transition_frontier::sync::ledger::snarked::{
1313
PeerLedgerQueryError, PeerLedgerQueryResponse,
1414
TransitionFrontierSyncLedgerSnarkedPeerQueryErrorAction,
@@ -348,7 +348,7 @@ pub fn p2p_effects<S: Service>(store: &mut Store<S>, action: P2pActionWithMeta)
348348
commitment,
349349
} = action
350350
{
351-
store.dispatch(SnarkPoolJobCommitmentAddAction {
351+
store.dispatch(SnarkPoolAction::CommitmentAdd {
352352
commitment,
353353
sender: peer_id,
354354
});

node/src/rpc/rpc_effects.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::p2p::connection::incoming::P2pConnectionIncomingInitAction;
77
use crate::p2p::connection::outgoing::P2pConnectionOutgoingInitAction;
88
use crate::p2p::connection::P2pConnectionResponse;
99
use crate::rpc::{PeerConnectionStatus, RpcPeerInfo};
10-
use crate::snark_pool::SnarkPoolCommitmentCreateAction;
10+
use crate::snark_pool::SnarkPoolAction;
1111
use crate::{Service, Store};
1212

1313
use super::{
@@ -369,7 +369,7 @@ pub fn rpc_effects<S: Service>(store: &mut Store<S>, action: RpcActionWithMeta)
369369
{
370370
return;
371371
}
372-
store.dispatch(SnarkPoolCommitmentCreateAction { job_id });
372+
store.dispatch(SnarkPoolAction::CommitmentCreate { job_id });
373373
}
374374
RpcAction::SnarkerJobSpec(action) => {
375375
let job_id = action.job_id;

node/src/snark/snark_effects.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::consensus::ConsensusAction;
22
use crate::snark_pool::candidate::SnarkPoolCandidateAction;
3-
use crate::snark_pool::SnarkPoolWorkAddAction;
3+
use crate::snark_pool::SnarkPoolAction;
44
use crate::{Service, Store};
55

66
use super::block_verify::SnarkBlockVerifyAction;
@@ -50,7 +50,7 @@ pub fn snark_effects<S: Service>(store: &mut Store<S>, action: SnarkActionWithMe
5050
verify_id: req_id,
5151
});
5252
for snark in batch {
53-
store.dispatch(SnarkPoolWorkAddAction { snark, sender });
53+
store.dispatch(SnarkPoolAction::WorkAdd { snark, sender });
5454
}
5555
}
5656
SnarkWorkVerifyAction::Init { .. } => {}

node/src/snark_pool/snark_pool_actions.rs

Lines changed: 90 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -11,173 +11,102 @@ use super::SnarkWork;
1111
pub type SnarkPoolActionWithMeta = redux::ActionWithMeta<SnarkPoolAction>;
1212
pub type SnarkPoolActionWithMetaRef<'a> = redux::ActionWithMeta<&'a SnarkPoolAction>;
1313

14-
#[derive(derive_more::From, Serialize, Deserialize, Debug, Clone)]
14+
#[derive(Serialize, Deserialize, Debug, Clone)]
1515
pub enum SnarkPoolAction {
1616
Candidate(SnarkPoolCandidateAction),
1717

18-
JobsUpdate(SnarkPoolJobsUpdateAction),
19-
AutoCreateCommitment(SnarkPoolAutoCreateCommitmentAction),
20-
CommitmentCreate(SnarkPoolCommitmentCreateAction),
21-
CommitmentAdd(SnarkPoolJobCommitmentAddAction),
22-
WorkAdd(SnarkPoolWorkAddAction),
23-
P2pSendAll(SnarkPoolP2pSendAllAction),
24-
P2pSend(SnarkPoolP2pSendAction),
25-
CheckTimeouts(SnarkPoolCheckTimeoutsAction),
26-
JobCommitmentTimeout(SnarkPoolJobCommitmentTimeoutAction),
27-
}
28-
29-
#[derive(Serialize, Deserialize, Debug, Clone)]
30-
pub struct SnarkPoolJobsUpdateAction {
31-
pub jobs: Vec<OneOrTwo<AvailableJobMessage>>,
32-
pub orphaned_snarks: Vec<SnarkWork>,
33-
}
34-
35-
impl redux::EnablingCondition<crate::State> for SnarkPoolJobsUpdateAction {}
36-
37-
#[derive(Serialize, Deserialize, Debug, Clone)]
38-
pub struct SnarkPoolAutoCreateCommitmentAction {}
39-
40-
impl redux::EnablingCondition<crate::State> for SnarkPoolAutoCreateCommitmentAction {
41-
fn is_enabled(&self, #[allow(unused_variables)] state: &crate::State) -> bool {
42-
state
43-
.config
44-
.snarker
45-
.as_ref()
46-
.map_or(false, |v| v.auto_commit)
47-
}
48-
}
49-
50-
#[derive(Serialize, Deserialize, Debug, Clone)]
51-
pub struct SnarkPoolCommitmentCreateAction {
52-
pub job_id: SnarkJobId,
53-
}
54-
55-
impl redux::EnablingCondition<crate::State> for SnarkPoolCommitmentCreateAction {
56-
fn is_enabled(&self, state: &crate::State) -> bool {
57-
state.config.snarker.is_some() && state.snark_pool.should_create_commitment(&self.job_id)
58-
}
59-
}
60-
61-
#[derive(Serialize, Deserialize, Debug, Clone)]
62-
pub struct SnarkPoolJobCommitmentAddAction {
63-
pub commitment: SnarkJobCommitment,
64-
pub sender: PeerId,
18+
JobsUpdate {
19+
jobs: Vec<OneOrTwo<AvailableJobMessage>>,
20+
orphaned_snarks: Vec<SnarkWork>,
21+
},
22+
AutoCreateCommitment,
23+
CommitmentCreate {
24+
job_id: SnarkJobId,
25+
},
26+
CommitmentAdd {
27+
commitment: SnarkJobCommitment,
28+
sender: PeerId,
29+
},
30+
WorkAdd {
31+
snark: Snark,
32+
sender: PeerId,
33+
},
34+
P2pSendAll,
35+
P2pSend {
36+
peer_id: PeerId,
37+
},
38+
CheckTimeouts,
39+
JobCommitmentTimeout {
40+
job_id: SnarkJobId,
41+
},
6542
}
6643

67-
impl redux::EnablingCondition<crate::State> for SnarkPoolJobCommitmentAddAction {
44+
impl redux::EnablingCondition<crate::State> for SnarkPoolAction {
6845
fn is_enabled(&self, state: &crate::State) -> bool {
69-
state
70-
.snark_pool
71-
.get(&self.commitment.job_id)
72-
.map_or(false, |s| match s.commitment.as_ref() {
73-
Some(cur) => self.commitment > cur.commitment,
74-
None => true,
75-
})
76-
}
77-
}
78-
79-
#[derive(Serialize, Deserialize, Debug, Clone)]
80-
pub struct SnarkPoolWorkAddAction {
81-
pub snark: Snark,
82-
pub sender: PeerId,
83-
}
84-
85-
impl redux::EnablingCondition<crate::State> for SnarkPoolWorkAddAction {
86-
fn is_enabled(&self, state: &crate::State) -> bool {
87-
state
88-
.snark_pool
89-
.get(&self.snark.job_id())
90-
.map_or(false, |s| match s.snark.as_ref() {
91-
Some(cur) => self.snark > cur.work,
92-
None => true,
93-
})
94-
}
95-
}
96-
97-
#[derive(Serialize, Deserialize, Debug, Clone)]
98-
pub struct SnarkPoolP2pSendAllAction {}
99-
100-
impl redux::EnablingCondition<crate::State> for SnarkPoolP2pSendAllAction {}
101-
102-
#[derive(Serialize, Deserialize, Debug, Clone)]
103-
pub struct SnarkPoolP2pSendAction {
104-
pub peer_id: PeerId,
105-
}
106-
107-
impl redux::EnablingCondition<crate::State> for SnarkPoolP2pSendAction {
108-
fn is_enabled(&self, state: &crate::State) -> bool {
109-
state
110-
.p2p
111-
.get_ready_peer(&self.peer_id)
112-
// Only send commitments/snarks if peer has the same best tip,
113-
// or its best tip is extension of our best tip. In such case
114-
// no commitment/snark will be dropped by peer, because it
115-
// doesn't yet have those jobs.
116-
//
117-
// By sending commitments/snarks to the peer, which has next
118-
// best tip, we might send outdated commitments/snarks, but
119-
// we might send useful ones as well.
120-
.and_then(|p| {
121-
let peer_best_tip = p.best_tip.as_ref()?;
122-
let our_best_tip = state.transition_frontier.best_tip()?.hash();
123-
Some(p).filter(|_| {
124-
peer_best_tip.hash() == our_best_tip
125-
|| peer_best_tip.pred_hash() == our_best_tip
126-
})
127-
})
128-
.map_or(false, |p| {
129-
let check = |(next_index, limit), last_index| limit > 0 && next_index <= last_index;
130-
let last_index = state.snark_pool.last_index();
131-
132-
check(
133-
p.channels.snark_job_commitment.next_send_index_and_limit(),
134-
last_index,
135-
) || check(p.channels.snark.next_send_index_and_limit(), last_index)
136-
})
137-
}
138-
}
139-
140-
#[derive(Serialize, Deserialize, Debug, Clone)]
141-
pub struct SnarkPoolCheckTimeoutsAction {}
142-
143-
impl redux::EnablingCondition<crate::State> for SnarkPoolCheckTimeoutsAction {
144-
fn is_enabled(&self, state: &crate::State) -> bool {
145-
state
146-
.time()
147-
.checked_sub(state.snark_pool.last_check_timeouts)
148-
.map_or(false, |dur| dur.as_secs() >= 5)
149-
}
150-
}
151-
152-
#[derive(Serialize, Deserialize, Debug, Clone)]
153-
pub struct SnarkPoolJobCommitmentTimeoutAction {
154-
pub job_id: SnarkJobId,
155-
}
156-
157-
impl redux::EnablingCondition<crate::State> for SnarkPoolJobCommitmentTimeoutAction {
158-
fn is_enabled(&self, state: &crate::State) -> bool {
159-
state
160-
.snark_pool
161-
.is_commitment_timed_out(&self.job_id, state.time())
162-
}
163-
}
164-
165-
macro_rules! impl_into_global_action {
166-
($a:ty) => {
167-
impl From<$a> for crate::Action {
168-
fn from(value: $a) -> Self {
169-
Self::SnarkPool(value.into())
46+
match self {
47+
SnarkPoolAction::Candidate(action) => action.is_enabled(state),
48+
SnarkPoolAction::AutoCreateCommitment => state
49+
.config
50+
.snarker
51+
.as_ref()
52+
.map_or(false, |v| v.auto_commit),
53+
SnarkPoolAction::CommitmentCreate { job_id } => {
54+
state.config.snarker.is_some() && state.snark_pool.should_create_commitment(job_id)
55+
}
56+
SnarkPoolAction::CommitmentAdd { commitment, .. } => state
57+
.snark_pool
58+
.get(&commitment.job_id)
59+
.map_or(false, |s| match s.commitment.as_ref() {
60+
Some(cur) => commitment > &cur.commitment,
61+
None => true,
62+
}),
63+
SnarkPoolAction::WorkAdd { snark, .. } => {
64+
state
65+
.snark_pool
66+
.get(&snark.job_id())
67+
.map_or(false, |s| match s.snark.as_ref() {
68+
Some(cur) => snark > &cur.work,
69+
None => true,
70+
})
17071
}
72+
SnarkPoolAction::P2pSend { peer_id } => state
73+
.p2p
74+
.get_ready_peer(peer_id)
75+
// Only send commitments/snarks if peer has the same best tip,
76+
// or its best tip is extension of our best tip. In such case
77+
// no commitment/snark will be dropped by peer, because it
78+
// doesn't yet have those jobs.
79+
//
80+
// By sending commitments/snarks to the peer, which has next
81+
// best tip, we might send outdated commitments/snarks, but
82+
// we might send useful ones as well.
83+
.and_then(|p| {
84+
let peer_best_tip = p.best_tip.as_ref()?;
85+
let our_best_tip = state.transition_frontier.best_tip()?.hash();
86+
Some(p).filter(|_| {
87+
peer_best_tip.hash() == our_best_tip
88+
|| peer_best_tip.pred_hash() == our_best_tip
89+
})
90+
})
91+
.map_or(false, |p| {
92+
let check =
93+
|(next_index, limit), last_index| limit > 0 && next_index <= last_index;
94+
let last_index = state.snark_pool.last_index();
95+
96+
check(
97+
p.channels.snark_job_commitment.next_send_index_and_limit(),
98+
last_index,
99+
) || check(p.channels.snark.next_send_index_and_limit(), last_index)
100+
}),
101+
SnarkPoolAction::CheckTimeouts => state
102+
.time()
103+
.checked_sub(state.snark_pool.last_check_timeouts)
104+
.map_or(false, |dur| dur.as_secs() >= 5),
105+
SnarkPoolAction::JobCommitmentTimeout { job_id } => state
106+
.snark_pool
107+
.is_commitment_timed_out(job_id, state.time()),
108+
SnarkPoolAction::JobsUpdate { .. } => true,
109+
SnarkPoolAction::P2pSendAll => true,
171110
}
172-
};
111+
}
173112
}
174-
175-
impl_into_global_action!(SnarkPoolJobsUpdateAction);
176-
impl_into_global_action!(SnarkPoolAutoCreateCommitmentAction);
177-
impl_into_global_action!(SnarkPoolCommitmentCreateAction);
178-
impl_into_global_action!(SnarkPoolJobCommitmentAddAction);
179-
impl_into_global_action!(SnarkPoolWorkAddAction);
180-
impl_into_global_action!(SnarkPoolP2pSendAllAction);
181-
impl_into_global_action!(SnarkPoolP2pSendAction);
182-
impl_into_global_action!(SnarkPoolCheckTimeoutsAction);
183-
impl_into_global_action!(SnarkPoolJobCommitmentTimeoutAction);

0 commit comments

Comments
 (0)