Skip to content

Commit 1e420c5

Browse files
committed
Ported timeouts
1 parent 4710464 commit 1e420c5

File tree

7 files changed

+313
-295
lines changed

7 files changed

+313
-295
lines changed

node/src/effects.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use openmina_core::log::system_time;
2-
use p2p::p2p_timeout_effects;
32

43
use crate::block_producer::{block_producer_effects, BlockProducerAction};
54
use crate::event_source::event_source_effects;
@@ -37,8 +36,6 @@ pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
3736
store.dispatch(ExternalSnarkWorkerAction::Start);
3837

3938
if store.state().p2p.ready().is_some() {
40-
p2p_timeout_effects(store, &meta);
41-
4239
p2p_request_best_tip_if_needed(store);
4340
p2p_request_transactions_if_needed(store);
4441
p2p_request_snarks_if_needed(store);

node/src/reducer.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use openmina_core::{error, Substate};
2-
use p2p::{P2pAction, P2pInitializeAction};
1+
use openmina_core::{bug_condition, error, Substate};
2+
use p2p::{P2pAction, P2pInitializeAction, P2pState};
33

44
use crate::{Action, ActionWithMeta, EventSourceAction, P2p, State};
55

@@ -10,7 +10,15 @@ pub fn reducer(
1010
) {
1111
let meta = action.meta().clone();
1212
match action.action() {
13-
Action::CheckTimeouts(_) => {}
13+
Action::CheckTimeouts(_) => {
14+
if state.p2p.ready().is_some() {
15+
if let Err(error) =
16+
P2pState::p2p_timeout_dispatch(Substate::new(state, dispatcher), &meta)
17+
{
18+
bug_condition!("{}", error);
19+
};
20+
}
21+
}
1422
Action::EventSource(EventSourceAction::NewEvent { .. }) => {}
1523
Action::EventSource(_) => {}
1624

p2p/src/p2p_effects.rs

Lines changed: 3 additions & 269 deletions
Original file line numberDiff line numberDiff line change
@@ -1,274 +1,8 @@
1-
use openmina_core::bug_condition;
2-
use redux::{ActionMeta, ActionWithMeta};
3-
4-
use crate::{
5-
channels::P2pChannelsEffectfulAction,
6-
connection::{outgoing::P2pConnectionOutgoingAction, P2pConnectionEffectfulAction},
7-
P2pAction, P2pStore,
8-
};
9-
#[cfg(feature = "p2p-libp2p")]
101
use crate::{
11-
P2pNetworkKadKey, P2pNetworkKademliaAction, P2pNetworkPnetAction, P2pNetworkSelectAction,
12-
PeerId,
2+
channels::P2pChannelsEffectfulAction, connection::P2pConnectionEffectfulAction, P2pAction,
3+
P2pStore,
134
};
14-
15-
pub fn p2p_timeout_effects<Store, S>(store: &mut Store, meta: &ActionMeta)
16-
where
17-
Store: P2pStore<S>,
18-
{
19-
p2p_connection_timeouts(store, meta);
20-
store.dispatch(P2pConnectionOutgoingAction::RandomInit);
21-
22-
p2p_try_reconnect_disconnected_peers(store, meta.time());
23-
24-
#[cfg(feature = "p2p-libp2p")]
25-
p2p_pnet_timeouts(store, meta);
26-
27-
p2p_discovery(store, meta);
28-
29-
#[cfg(feature = "p2p-libp2p")]
30-
p2p_select_timeouts(store, meta);
31-
#[cfg(feature = "p2p-libp2p")]
32-
p2p_rpc_heartbeats(store, meta);
33-
34-
let state = store.state();
35-
for (peer_id, id, is_streaming) in state.peer_rpc_timeouts(meta.time()) {
36-
if !is_streaming {
37-
store.dispatch(crate::channels::rpc::P2pChannelsRpcAction::Timeout { peer_id, id });
38-
} else {
39-
store.dispatch(
40-
crate::channels::streaming_rpc::P2pChannelsStreamingRpcAction::Timeout {
41-
peer_id,
42-
id,
43-
},
44-
);
45-
}
46-
}
47-
}
48-
49-
#[cfg(feature = "p2p-libp2p")]
50-
fn p2p_pnet_timeouts<Store, S>(store: &mut Store, meta: &ActionMeta)
51-
where
52-
Store: P2pStore<S>,
53-
{
54-
let now = meta.time();
55-
let timeouts = &store.state().config.timeouts;
56-
let pnet_timeouts: Vec<_> = store
57-
.state()
58-
.network
59-
.scheduler
60-
.connections
61-
.iter()
62-
.filter_map(|(sock_addr, state)| {
63-
if state.pnet.is_timed_out(now, timeouts) {
64-
Some(*sock_addr)
65-
} else {
66-
None
67-
}
68-
})
69-
.collect();
70-
71-
for addr in pnet_timeouts {
72-
store.dispatch(P2pNetworkPnetAction::Timeout { addr });
73-
}
74-
}
75-
76-
#[cfg(feature = "p2p-libp2p")]
77-
fn p2p_select_timeouts<Store, S>(store: &mut Store, meta: &ActionMeta)
78-
where
79-
Store: P2pStore<S>,
80-
{
81-
let now = meta.time();
82-
let timeouts = &store.state().config.timeouts;
83-
let select_auth_timeouts: Vec<_> = store
84-
.state()
85-
.network
86-
.scheduler
87-
.connections
88-
.iter()
89-
.filter_map(|(sock_addr, state)| {
90-
if state.select_auth.is_timed_out(now, timeouts) {
91-
Some(*sock_addr)
92-
} else {
93-
None
94-
}
95-
})
96-
.collect();
97-
98-
let select_mux_timeouts: Vec<_> = store
99-
.state()
100-
.network
101-
.scheduler
102-
.connections
103-
.iter()
104-
.filter_map(|(sock_addr, state)| {
105-
if state.select_mux.is_timed_out(now, timeouts) {
106-
Some(*sock_addr)
107-
} else {
108-
None
109-
}
110-
})
111-
.collect();
112-
113-
let select_stream_timeouts: Vec<_> = store
114-
.state()
115-
.network
116-
.scheduler
117-
.connections
118-
.iter()
119-
.flat_map(|(sock_addr, state)| {
120-
state.streams.iter().filter_map(|(stream_id, stream)| {
121-
if stream.select.is_timed_out(now, timeouts) {
122-
Some((*sock_addr, *stream_id))
123-
} else {
124-
None
125-
}
126-
})
127-
})
128-
.collect();
129-
130-
for addr in select_auth_timeouts {
131-
store.dispatch(P2pNetworkSelectAction::Timeout {
132-
addr,
133-
kind: crate::SelectKind::Authentication,
134-
});
135-
}
136-
137-
for addr in select_mux_timeouts {
138-
store.dispatch(P2pNetworkSelectAction::Timeout {
139-
addr,
140-
kind: crate::SelectKind::MultiplexingNoPeerId,
141-
});
142-
}
143-
144-
for (addr, stream_id) in select_stream_timeouts {
145-
// TODO: better solution for PeerId
146-
let dummy = PeerId::from_bytes([0u8; 32]);
147-
148-
store.dispatch(P2pNetworkSelectAction::Timeout {
149-
addr,
150-
kind: crate::SelectKind::Stream(dummy, stream_id),
151-
});
152-
}
153-
}
154-
155-
#[cfg(feature = "p2p-libp2p")]
156-
fn p2p_rpc_heartbeats<Store, S>(store: &mut Store, meta: &ActionMeta)
157-
where
158-
Store: P2pStore<S>,
159-
{
160-
use crate::network::rpc::P2pNetworkRpcAction;
161-
let scheduler = &store.state().network.scheduler;
162-
163-
let send_heartbeat_actions: Vec<_> = scheduler
164-
.rpc_incoming_streams
165-
.iter()
166-
.chain(&scheduler.rpc_outgoing_streams)
167-
.flat_map(|(peer_id, state)| {
168-
state
169-
.iter()
170-
.filter(|(_, s)| s.should_send_heartbeat(meta.time()))
171-
.map(|(stream_id, state)| P2pNetworkRpcAction::HeartbeatSend {
172-
addr: state.addr,
173-
peer_id: *peer_id,
174-
stream_id: *stream_id,
175-
})
176-
})
177-
.collect();
178-
for action in send_heartbeat_actions {
179-
store.dispatch(action);
180-
}
181-
}
182-
183-
fn p2p_connection_timeouts<Store, S>(store: &mut Store, meta: &ActionMeta)
184-
where
185-
Store: P2pStore<S>,
186-
{
187-
use crate::connection::incoming::P2pConnectionIncomingAction;
188-
189-
let now = meta.time();
190-
let timeouts = &store.state().config.timeouts;
191-
let p2p_connection_timeouts: Vec<_> = store
192-
.state()
193-
.peers
194-
.iter()
195-
.filter_map(|(peer_id, peer)| {
196-
let s = peer.status.as_connecting()?;
197-
match s.is_timed_out(now, timeouts) {
198-
true => Some((*peer_id, s.as_outgoing().is_some())),
199-
false => None,
200-
}
201-
})
202-
.collect();
203-
204-
for (peer_id, is_outgoing) in p2p_connection_timeouts {
205-
match is_outgoing {
206-
true => store.dispatch(P2pConnectionOutgoingAction::Timeout { peer_id }),
207-
false => store.dispatch(P2pConnectionIncomingAction::Timeout { peer_id }),
208-
};
209-
}
210-
}
211-
212-
fn p2p_try_reconnect_disconnected_peers<Store, S>(store: &mut Store, now: redux::Timestamp)
213-
where
214-
Store: P2pStore<S>,
215-
{
216-
if store.state().already_has_min_peers() {
217-
return;
218-
}
219-
let timeouts = &store.state().config.timeouts;
220-
let reconnect_actions: Vec<_> = store
221-
.state()
222-
.peers
223-
.iter()
224-
.filter_map(|(_, p)| {
225-
if p.can_reconnect(now, timeouts) {
226-
p.dial_opts.clone()
227-
} else {
228-
None
229-
}
230-
})
231-
.map(|opts| P2pConnectionOutgoingAction::Reconnect { opts, rpc_id: None })
232-
.collect();
233-
for action in reconnect_actions {
234-
store.dispatch(action);
235-
}
236-
}
237-
238-
fn p2p_discovery<Store, S>(store: &mut Store, meta: &redux::ActionMeta)
239-
where
240-
Store: P2pStore<S>,
241-
{
242-
let now = meta.time();
243-
let state = store.state();
244-
let config = &state.config;
245-
if !config.peer_discovery {
246-
return;
247-
}
248-
// ask initial peers
249-
if let Some(_d) = config.timeouts.initial_peers {
250-
// TODO: use RPC to ask initial peers
251-
let _ = now;
252-
}
253-
254-
#[cfg(feature = "p2p-libp2p")]
255-
if let Some(discovery_state) = state.network.scheduler.discovery_state() {
256-
let my_id = state.my_id();
257-
match P2pNetworkKadKey::try_from(&my_id) {
258-
Ok(key) => {
259-
if discovery_state
260-
.routing_table
261-
.closest_peers(&key)
262-
.any(|_| true)
263-
&& discovery_state.status.can_bootstrap(now, &config.timeouts)
264-
{
265-
store.dispatch(P2pNetworkKademliaAction::StartBootstrap { key: my_id });
266-
}
267-
}
268-
Err(e) => bug_condition!("p2p_discovery error {:?}", e),
269-
}
270-
}
271-
}
5+
use redux::ActionWithMeta;
2726

2737
pub fn p2p_effects<Store, S>(store: &mut Store, action: ActionWithMeta<P2pAction>)
2748
where

0 commit comments

Comments
 (0)