Skip to content

Commit 85bd8ac

Browse files
committed
Ported remaining callbacks in p2p layer and snark layer
1 parent f04d63e commit 85bd8ac

18 files changed

+270
-123
lines changed

node/src/rpc/rpc_reducer.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use openmina_core::{
22
block::AppliedBlock,
33
bug_condition,
4-
requests::{RequestId, RpcIdType},
4+
requests::{RequestId, RpcId, RpcIdType},
55
};
66
use p2p::{
77
connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
88
webrtc::P2pConnectionResponse,
9+
PeerId,
910
};
1011
use redux::ActionWithMeta;
1112

@@ -79,11 +80,21 @@ impl RpcState {
7980
state.requests.insert(*rpc_id, rpc_state);
8081

8182
let dispatcher = state_context.into_dispatcher();
83+
84+
let callback = redux::callback!(
85+
on_p2p_connection_outgoing_rpc_connection_success((peer_id: PeerId, rpc_id: Option<RpcId>)) -> crate::Action {
86+
let Some(rpc_id) = rpc_id else {
87+
panic!("RPC ID not provided");
88+
};
89+
90+
RpcAction::P2pConnectionOutgoingPending{ rpc_id }
91+
}
92+
);
8293
dispatcher.push(P2pConnectionOutgoingAction::Init {
8394
opts: opts.clone(),
8495
rpc_id: Some(*rpc_id),
96+
on_success: Some(callback),
8597
});
86-
dispatcher.push(RpcAction::P2pConnectionOutgoingPending { rpc_id: *rpc_id });
8798
}
8899
RpcAction::P2pConnectionOutgoingPending { rpc_id } => {
89100
let Some(rpc) = state.requests.get_mut(rpc_id) else {

node/src/snark/snark_effects.rs

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use snark::work_verify::SnarkWorkVerifyAction;
2-
3-
use crate::{snark_pool::candidate::SnarkPoolCandidateAction, Service, Store};
1+
use crate::{Service, Store};
42

53
use super::{SnarkAction, SnarkActionWithMeta};
64

@@ -12,35 +10,7 @@ pub fn snark_effects<S: Service>(store: &mut Store<S>, action: SnarkActionWithMe
1210
SnarkAction::BlockVerifyEffect(a) => {
1311
a.effects(&meta, store);
1412
}
15-
SnarkAction::WorkVerify(a) => match a {
16-
// TODO(tizoc): handle this logic with the on_error callback passed on the Init action
17-
SnarkWorkVerifyAction::Error { req_id, .. } => {
18-
let req = store.state().snark.work_verify.jobs.get(req_id);
19-
let Some(req) = req else { return };
20-
let sender = req.sender().parse().unwrap();
21-
22-
store.dispatch(SnarkPoolCandidateAction::WorkVerifyError {
23-
peer_id: sender,
24-
verify_id: req_id,
25-
});
26-
}
27-
// TODO(tizoc): handle this logic with the on_success callback passed on the Init action
28-
SnarkWorkVerifyAction::Success { req_id } => {
29-
let req = store.state().snark.work_verify.jobs.get(req_id);
30-
let Some(req) = req else { return };
31-
let sender = req.sender().parse().unwrap();
32-
let batch = req.batch().to_vec();
33-
34-
store.dispatch(SnarkPoolCandidateAction::WorkVerifySuccess {
35-
peer_id: sender,
36-
verify_id: req_id,
37-
batch,
38-
});
39-
}
40-
SnarkWorkVerifyAction::Init { .. } => {}
41-
SnarkWorkVerifyAction::Pending { .. } => {}
42-
SnarkWorkVerifyAction::Finish { .. } => {}
43-
},
13+
SnarkAction::WorkVerify(_) => {}
4414
SnarkAction::WorkVerifyEffect(a) => {
4515
a.effects(&meta, store);
4616
}

p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_reducer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ impl P2pChannelsSignalingDiscoveryState {
269269
},
270270
},
271271
rpc_id: None,
272+
on_success: None,
272273
};
273274
let accepted = redux::EnablingCondition::is_enabled(&action, state, meta.time());
274275
if accepted {

p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ impl P2pConnectionIncomingState {
6060
}),
6161
status: P2pPeerStatus::Connecting(P2pConnectionState::incoming_init(&opts)),
6262
identify: None,
63+
on_connect_success: None,
6364
});
6465

6566
state.status =
@@ -427,6 +428,7 @@ impl P2pConnectionIncomingState {
427428
)),
428429
status: P2pPeerStatus::Disconnected { time: meta.time() },
429430
identify: None,
431+
on_connect_success: None,
430432
});
431433

432434
Self::reduce_finalize_libp2p_pending(state, addr, time, my_id, peer_id);

p2p/src/connection/outgoing/p2p_connection_outgoing_actions.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use openmina_core::ActionEvent;
2+
use redux::Callback;
23
use serde::{Deserialize, Serialize};
34

45
use openmina_core::requests::RpcId;
@@ -18,6 +19,7 @@ pub enum P2pConnectionOutgoingAction {
1819
Init {
1920
opts: P2pConnectionOutgoingInitOpts,
2021
rpc_id: Option<RpcId>,
22+
on_success: Option<Callback<(PeerId, Option<RpcId>)>>,
2123
},
2224
/// Reconnect to an existing peer.
2325
// TODO: rename `Init` and `Reconnect` to `New` and `Connect` or something

p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ impl P2pConnectionOutgoingState {
4040
dispatcher.push(P2pConnectionOutgoingEffectfulAction::RandomInit);
4141
Ok(())
4242
}
43-
P2pConnectionOutgoingAction::Init { opts, rpc_id } => {
43+
P2pConnectionOutgoingAction::Init {
44+
opts,
45+
rpc_id,
46+
on_success,
47+
} => {
4448
let peer_state =
4549
p2p_state
4650
.peers
@@ -52,8 +56,13 @@ impl P2pConnectionOutgoingState {
5256
&opts,
5357
)),
5458
identify: None,
59+
on_connect_success: None,
5560
});
5661

62+
if let Some(on_connect_success) = on_success {
63+
peer_state.on_connect_success = Some(on_connect_success);
64+
}
65+
5766
peer_state.status =
5867
P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(Self::Init {
5968
time,
@@ -509,19 +518,30 @@ impl P2pConnectionOutgoingState {
509518
return Ok(());
510519
}
511520

521+
let Some(peer_state) = p2p_state.peers.get_mut(&peer_id) else {
522+
bug_condition!("Outgoing peer state not found for: {}", peer_id);
523+
return Ok(());
524+
};
525+
let callback = peer_state.on_connect_success.take();
512526
let (dispatcher, state) = state_context.into_dispatcher_and_state();
513527
let p2p_state: &P2pState = state.substate()?;
528+
514529
dispatcher.push(P2pPeerAction::Ready {
515530
peer_id,
516531
incoming: false,
517532
});
518533

519-
if let Some(rpc_id) = p2p_state.peer_connection_rpc_id(&peer_id) {
534+
let rpc_id = p2p_state.peer_connection_rpc_id(&peer_id);
535+
if let Some(rpc_id) = rpc_id {
520536
if let Some(callback) = &p2p_state.callbacks.on_p2p_connection_outgoing_success
521537
{
522538
dispatcher.push_callback(callback.clone(), rpc_id);
523539
}
524540
}
541+
542+
if let Some(callback) = callback {
543+
dispatcher.push_callback(callback, (peer_id, rpc_id));
544+
}
525545
Ok(())
526546
}
527547
}

p2p/src/network/kad/request/p2p_network_kad_request_actions.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
use std::net::SocketAddr;
22

33
use openmina_core::ActionEvent;
4-
use redux::EnablingCondition;
4+
use redux::{Callback, EnablingCondition};
55
use serde::{Deserialize, Serialize};
66

7-
use crate::{ConnectionAddr, P2pAction, P2pNetworkKadEntry, P2pState, PeerId, StreamId};
7+
use crate::{
8+
ConnectionAddr, P2pAction, P2pNetworkKadEntry, P2pNetworkKademliaRpcRequest, P2pState, PeerId,
9+
StreamId,
10+
};
11+
12+
type StreamReadyCallback = Callback<(
13+
ConnectionAddr,
14+
PeerId,
15+
StreamId,
16+
P2pNetworkKademliaRpcRequest,
17+
)>;
818

919
#[derive(Clone, Debug, Serialize, Deserialize, ActionEvent)]
1020
#[action_event(fields(display(peer_id), display(addr), display(key), stream_id, error))]
@@ -29,6 +39,7 @@ pub enum P2pNetworkKadRequestAction {
2939
peer_id: PeerId,
3040
stream_id: StreamId,
3141
addr: ConnectionAddr,
42+
callback: StreamReadyCallback,
3243
},
3344
RequestSent {
3445
peer_id: PeerId,

p2p/src/network/kad/request/p2p_network_kad_request_reducer.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
use openmina_core::{bug_condition, Substate, SubstateAccess};
1+
use openmina_core::{bug_condition, requests::RpcId, Substate, SubstateAccess};
22
use redux::{ActionWithMeta, Dispatcher};
33

44
use crate::{
55
connection::outgoing::P2pConnectionOutgoingAction, ConnectionAddr,
66
P2pNetworkConnectionMuxState, P2pNetworkKadBootstrapAction, P2pNetworkKadEffectfulAction,
77
P2pNetworkKadState, P2pNetworkKademliaRpcRequest, P2pNetworkKademliaStreamAction,
8-
P2pNetworkYamuxAction, P2pPeerState, P2pState,
8+
P2pNetworkYamuxAction, P2pPeerState, P2pState, PeerId,
99
};
1010

1111
use super::{P2pNetworkKadRequestAction, P2pNetworkKadRequestState, P2pNetworkKadRequestStatus};
@@ -47,14 +47,19 @@ impl P2pNetworkKadRequestState {
4747
let peer_state = p2p_state.peers.get(&peer_id);
4848

4949
let on_initialize_connection = |dispatcher: &mut Dispatcher<Action, State>| {
50-
// initialize connection to the peer.
51-
// when connection is establised and yamux layer is ready, we will continue with TODO
52-
// TODO: add callbacks
5350
let opts = crate::connection::outgoing::P2pConnectionOutgoingInitOpts::LibP2P(
5451
(peer_id, addr).into(),
5552
);
56-
dispatcher.push(P2pConnectionOutgoingAction::Init { opts, rpc_id: None });
57-
dispatcher.push(P2pNetworkKadRequestAction::PeerIsConnecting { peer_id });
53+
let callback = redux::callback!(
54+
on_p2p_connection_outgoing_kad_connection_success((peer_id: PeerId, _rpc_id: Option<RpcId>)) -> crate::P2pAction {
55+
P2pNetworkKadRequestAction::PeerIsConnecting { peer_id }
56+
}
57+
);
58+
dispatcher.push(P2pConnectionOutgoingAction::Init {
59+
opts,
60+
rpc_id: None,
61+
on_success: Some(callback),
62+
});
5863
Ok(())
5964
};
6065

@@ -172,6 +177,7 @@ impl P2pNetworkKadRequestState {
172177
peer_id,
173178
stream_id,
174179
addr,
180+
callback,
175181
} => {
176182
let find_node = match P2pNetworkKademliaRpcRequest::find_node(request_state.key) {
177183
Ok(find_node) => find_node,
@@ -193,20 +199,8 @@ impl P2pNetworkKadRequestState {
193199
super::P2pNetworkKadRequestStatus::Request,
194200
);
195201

196-
let key = request_state.key;
197-
198202
let dispatcher = state_context.into_dispatcher();
199-
let data =
200-
P2pNetworkKademliaRpcRequest::find_node(key).map_err(|e| e.to_string())?;
201-
202-
// TODO: move action bellow to callback
203-
dispatcher.push(P2pNetworkKademliaStreamAction::SendRequest {
204-
addr,
205-
peer_id,
206-
stream_id,
207-
data,
208-
});
209-
dispatcher.push(P2pNetworkKadRequestAction::RequestSent { peer_id });
203+
dispatcher.push_callback(callback, (addr, peer_id, stream_id, find_node));
210204
Ok(())
211205
}
212206
P2pNetworkKadRequestAction::RequestSent { .. } => {

p2p/src/network/kad/stream/mod.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,64 @@
11
mod p2p_network_kad_stream_state;
2+
use redux::Callback;
3+
use serde::{Deserialize, Serialize};
4+
5+
use crate::ConnectionAddr;
6+
use crate::P2pNetworkKademliaAction;
7+
use crate::PeerId;
8+
use crate::StreamId;
9+
210
pub use self::p2p_network_kad_stream_state::*;
311

412
mod p2p_network_kad_stream_actions;
513
pub use self::p2p_network_kad_stream_actions::*;
614

15+
use super::P2pNetworkKadEntry;
16+
use super::CID;
17+
718
#[cfg(feature = "p2p-libp2p")]
819
mod p2p_network_kad_stream_reducer;
20+
21+
#[derive(Deserialize, Serialize, Debug, Clone)]
22+
pub enum P2pNetworkKademliaStreamWaitOutgoingCallback {
23+
AnswerFindNodeRequest {
24+
callback: Callback<(ConnectionAddr, PeerId, StreamId, CID)>,
25+
args: CID,
26+
},
27+
UpdateFindNodeRequest {
28+
callback: Callback<(ConnectionAddr, PeerId, StreamId, Vec<P2pNetworkKadEntry>)>,
29+
args: Vec<P2pNetworkKadEntry>,
30+
},
31+
}
32+
33+
impl P2pNetworkKademliaStreamWaitOutgoingCallback {
34+
pub fn answer_find_node_request(cid: CID) -> Self {
35+
Self::AnswerFindNodeRequest {
36+
callback: redux::callback!(
37+
on_p2p_network_stream_wait_outgoing_answer_find_node_request((
38+
addr: ConnectionAddr,
39+
peer_id: PeerId,
40+
stream_id: StreamId,
41+
cid: CID
42+
)) -> crate::P2pAction{
43+
P2pNetworkKademliaAction::AnswerFindNodeRequest { addr, peer_id, stream_id, key: cid }
44+
}
45+
),
46+
args: cid,
47+
}
48+
}
49+
pub fn update_find_node_request(peers: Vec<P2pNetworkKadEntry>) -> Self {
50+
Self::UpdateFindNodeRequest {
51+
callback: redux::callback!(
52+
on_p2p_network_stream_wait_outgoing_answer_find_node_request((
53+
addr: ConnectionAddr,
54+
peer_id: PeerId,
55+
stream_id: StreamId,
56+
closest_peers: Vec<P2pNetworkKadEntry>
57+
)) -> crate::P2pAction{
58+
P2pNetworkKademliaAction::UpdateFindNodeRequest { addr, peer_id, stream_id, closest_peers }
59+
}
60+
),
61+
args: peers,
62+
}
63+
}
64+
}

p2p/src/network/kad/stream/p2p_network_kad_stream_actions.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use crate::{
77
P2pState, PeerId, StreamId,
88
};
99

10+
use super::P2pNetworkKademliaStreamWaitOutgoingCallback;
11+
1012
/// Kademlia stream related actions.
1113
#[derive(Debug, Clone, Serialize, Deserialize, ActionEvent)]
1214
#[action_event(fields(display(addr), display(peer_id), stream_id, incoming, debug(data)))]
@@ -44,6 +46,7 @@ pub enum P2pNetworkKademliaStreamAction {
4446
addr: ConnectionAddr,
4547
peer_id: PeerId,
4648
stream_id: StreamId,
49+
callback: P2pNetworkKademliaStreamWaitOutgoingCallback,
4750
},
4851

4952
/// Sends request to the stream.

0 commit comments

Comments
 (0)