Skip to content

Commit a21977b

Browse files
committed
feat(p2p/webrtc): improve disconnection logic and remove mutex for active connection limiting
1 parent 4ff8099 commit a21977b

File tree

7 files changed

+95
-48
lines changed

7 files changed

+95
-48
lines changed

node/src/event_source/event_source_effects.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ pub fn event_source_effects<S: Service>(store: &mut Store<S>, action: EventSourc
198198
},
199199
P2pConnectionEvent::Closed(peer_id) => {
200200
store.dispatch(P2pDisconnectionAction::PeerClosed { peer_id });
201+
store.dispatch(P2pDisconnectionAction::Finish { peer_id });
201202
}
202203
},
203204
P2pEvent::Channel(e) => match e {

p2p/src/disconnection/p2p_disconnection_actions.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,22 @@ impl redux::EnablingCondition<P2pState> for P2pDisconnectionAction {
4242
P2pDisconnectionAction::RandomTry => time
4343
.checked_sub(state.last_random_disconnection_try)
4444
.map_or(false, |dur| dur >= RANDOM_DISCONNECTION_TRY_FREQUENCY),
45-
P2pDisconnectionAction::Init { peer_id, .. }
46-
| P2pDisconnectionAction::PeerClosed { peer_id, .. }
47-
| P2pDisconnectionAction::Finish { peer_id } => {
45+
P2pDisconnectionAction::Init { peer_id, .. } => {
46+
state.peers.get(peer_id).map_or(false, |peer| {
47+
!peer.status.is_disconnected_or_disconnecting() && !peer.status.is_error()
48+
})
49+
}
50+
P2pDisconnectionAction::Finish { peer_id } => {
4851
state.peers.get(peer_id).map_or(false, |peer| {
4952
!matches!(peer.status, P2pPeerStatus::Disconnected { .. })
5053
&& !peer.status.is_error()
5154
})
5255
}
56+
P2pDisconnectionAction::PeerClosed { peer_id, .. } => {
57+
state.peers.get(peer_id).map_or(false, |peer| {
58+
!peer.status.is_disconnected_or_disconnecting() && !peer.status.is_error()
59+
})
60+
}
5361
P2pDisconnectionAction::FailedCleanup { peer_id } => state
5462
.peers
5563
.get(peer_id)

p2p/src/disconnection/p2p_disconnection_reducer.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,14 @@ impl P2pDisconnectedState {
5252
Ok(())
5353
}
5454
P2pDisconnectionAction::Init { peer_id, reason } => {
55+
let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
56+
bug_condition!("Invalid state for: `P2pDisconnectionAction::Init`");
57+
return Ok(());
58+
};
59+
peer.status = P2pPeerStatus::Disconnecting { time: meta.time() };
60+
5561
#[cfg(feature = "p2p-libp2p")]
56-
if p2p_state.is_libp2p_peer(&peer_id) {
62+
if peer.is_libp2p() {
5763
let connections = p2p_state
5864
.network
5965
.scheduler
@@ -63,12 +69,6 @@ impl P2pDisconnectedState {
6369
.map(|(addr, _)| *addr)
6470
.collect::<Vec<_>>();
6571

66-
let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
67-
bug_condition!("Invalid state for: `P2pDisconnectionAction::Finish`");
68-
return Ok(());
69-
};
70-
peer.status = P2pPeerStatus::Disconnecting { time: meta.time() };
71-
7272
let dispatcher = state_context.into_dispatcher();
7373
for addr in connections {
7474
dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
@@ -91,6 +91,12 @@ impl P2pDisconnectedState {
9191
Ok(())
9292
}
9393
P2pDisconnectionAction::FailedCleanup { peer_id } => {
94+
let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
95+
bug_condition!("Invalid state for: `P2pDisconnectionAction::FailedCleanup`");
96+
return Ok(());
97+
};
98+
peer.status = P2pPeerStatus::Disconnecting { time: meta.time() };
99+
94100
let dispatcher = state_context.into_dispatcher();
95101
dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id });
96102
Ok(())

p2p/src/disconnection_effectful/p2p_disconnection_effectful_effects.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use redux::ActionMeta;
22

3-
use crate::disconnection::P2pDisconnectionAction;
4-
53
use super::{P2pDisconnectionEffectfulAction, P2pDisconnectionService};
64

75
impl P2pDisconnectionEffectfulAction {
@@ -13,7 +11,6 @@ impl P2pDisconnectionEffectfulAction {
1311
match self {
1412
P2pDisconnectionEffectfulAction::Init { peer_id } => {
1513
store.service().disconnect(peer_id);
16-
store.dispatch(P2pDisconnectionAction::Finish { peer_id });
1714
}
1815
}
1916
}

p2p/src/p2p_state.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use openmina_core::{
22
block::{ArcBlockWithHash, BlockWithHash},
3-
impl_substate_access,
3+
bug_condition, impl_substate_access,
44
requests::RpcId,
55
snark::{Snark, SnarkInfo, SnarkJobCommitment},
66
transaction::{TransactionInfo, TransactionWithHash},
@@ -395,10 +395,18 @@ impl P2pPeerState {
395395
P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(
396396
P2pConnectionOutgoingState::Error { time, .. },
397397
)) => is_time_passed(now, *time, timeouts.outgoing_error_reconnect_timeout),
398-
P2pPeerStatus::Disconnected { time } | P2pPeerStatus::Disconnecting { time } => {
398+
P2pPeerStatus::Disconnected { time } => {
399399
*time == Timestamp::ZERO
400400
|| is_time_passed(now, *time, timeouts.reconnect_timeout)
401401
}
402+
P2pPeerStatus::Disconnecting { time } => {
403+
if !is_time_passed(now, *time, timeouts.reconnect_timeout) {
404+
false
405+
} else {
406+
bug_condition!("peer stuck in `P2pPeerStatus::Disconnecting` state?");
407+
true
408+
}
409+
}
402410
_ => false,
403411
}
404412
}

p2p/src/service_impl/webrtc/api.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use std::{collections::BTreeMap, sync::Arc};
2+
3+
use openmina_core::channels::broadcast;
4+
use tokio::sync::Mutex;
5+
6+
use crate::PeerId;
7+
8+
use super::{ApiInner, build_api};
9+
10+
#[derive(Clone)]
11+
pub struct Api {
12+
inner: ApiInner,
13+
active_peers: Arc<Mutex<BTreeMap<PeerId, broadcast::Receiver<()>>>>
14+
}
15+
16+
impl Api {
17+
pub fn new() -> Self {
18+
Self {
19+
inner: build_api(),
20+
active_peers: Mutex::new(BTreeMap::new()).into()
21+
}
22+
}
23+
24+
pub(super) fn inner(&self) -> &ApiInner {
25+
&self.inner
26+
}
27+
}

p2p/src/service_impl/webrtc/mod.rs

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::{collections::BTreeMap, time::Duration};
1212

1313
use openmina_core::bug_condition;
1414
use serde::Serialize;
15-
use tokio::sync::Mutex;
15+
use tokio::sync::Semaphore;
1616

1717
#[cfg(not(target_arch = "wasm32"))]
1818
use tokio::task::spawn_local;
@@ -226,7 +226,12 @@ async fn wait_for_ice_gathering_complete(pc: &mut RTCConnection) {
226226
}
227227
}
228228

229-
async fn peer_start(api: Api, args: PeerAddArgs, abort: broadcast::Receiver<()>) {
229+
async fn peer_start(
230+
api: Api,
231+
args: PeerAddArgs,
232+
abort: broadcast::Receiver<()>,
233+
closed: broadcast::Sender<()>,
234+
) {
230235
let PeerAddArgs {
231236
peer_id,
232237
kind,
@@ -337,7 +342,6 @@ async fn peer_start(api: Api, args: PeerAddArgs, abort: broadcast::Receiver<()>)
337342
connected_tx.send(Ok(())).unwrap();
338343
} else {
339344
let mut connected_tx = Some(connected_tx);
340-
let event_sender = event_sender.clone();
341345
pc.on_connection_state_change(Box::new(move |state| {
342346
match state {
343347
RTCConnectionState::Connected => {
@@ -349,7 +353,7 @@ async fn peer_start(api: Api, args: PeerAddArgs, abort: broadcast::Receiver<()>)
349353
if let Some(connected_tx) = connected_tx.take() {
350354
let _ = connected_tx.send(Err("disconnected"));
351355
} else {
352-
let _ = event_sender(P2pConnectionEvent::Closed(peer_id).into());
356+
let _ = closed.send(());
353357
}
354358
}
355359
_ => {}
@@ -365,6 +369,7 @@ async fn peer_start(api: Api, args: PeerAddArgs, abort: broadcast::Receiver<()>)
365369
Ok(_) => {}
366370
Err(err) => {
367371
let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, Err(err)).into());
372+
return;
368373
}
369374
}
370375

@@ -727,50 +732,45 @@ pub trait P2pServiceWebrtc: redux::Service {
727732
fn peers(&mut self) -> &mut BTreeMap<PeerId, PeerState>;
728733

729734
fn init<S: TaskSpawner>(secret_key: SecretKey, spawner: S) -> P2pServiceCtx {
735+
const MAX_PEERS: usize = 500;
730736
let (cmd_sender, mut cmd_receiver) = mpsc::unbounded_channel();
731737

732738
let _ = secret_key;
733739

734740
spawner.spawn_main("webrtc", async move {
735741
#[allow(clippy::all)]
736742
let api = build_api();
737-
let active_peers = Arc::new(Mutex::new(BTreeMap::new()));
743+
let conn_permits = Arc::new(Semaphore::const_new(MAX_PEERS));
738744
while let Some(cmd) = cmd_receiver.recv().await {
739745
match cmd {
740-
Cmd::PeerAdd { args, abort } => {
746+
Cmd::PeerAdd { args, mut abort } => {
741747
#[allow(clippy::all)]
742748
let api = api.clone();
743-
let active_peers = active_peers.clone();
749+
let conn_permits = conn_permits.clone();
750+
let peer_id = args.peer_id;
751+
let event_sender = args.event_sender.clone();
744752
spawn_local(async move {
745-
let peer_id = args.peer_id;
746-
let (abort_finished_tx, abort_finished) = broadcast::channel::<()>(1);
747-
let mut aborted = abort.resubscribe();
748-
749-
if let Some(mut abort_finished) =
750-
active_peers.lock().await.insert(peer_id, abort_finished)
751-
{
752-
// wait for peer conn with same id to finish.
753-
let _ = abort_finished.recv().await;
754-
}
755-
if active_peers.lock().await.len() > 100 {
756-
sleep(Duration::from_millis(500)).await;
757-
}
758-
753+
let Ok(_permit) = conn_permits.try_acquire() else {
754+
// state machine shouldn't allow this to happen.
755+
bug_condition!("P2P WebRTC Semaphore acquisition failed!");
756+
return;
757+
};
758+
let (closed_tx, mut closed) = broadcast::channel(1);
759+
let event_sender_clone = event_sender.clone();
760+
spawn_local(async move {
761+
// to avoid sending closed multiple times
762+
let _ = closed.recv().await;
763+
event_sender_clone(P2pConnectionEvent::Closed(peer_id).into());
764+
});
759765
tokio::select! {
760-
_ = aborted.recv() => {}
761-
_ = peer_start(api, args, abort) => {}
766+
_ = peer_start(api, args, abort.resubscribe(), closed_tx.clone()) => {}
767+
_ = abort.recv() => {
768+
}
762769
}
763770

764-
// delay removing active_peer to give some time for cleanup.
765-
sleep(Duration::from_millis(500)).await;
766-
let rx1 = abort_finished_tx.subscribe();
767-
let mut active_peers = active_peers.lock().await;
768-
if let Some(rx) = active_peers
769-
.remove(&peer_id)
770-
.filter(|rx2| !rx2.same_channel(&rx1))
771-
{
772-
active_peers.insert(peer_id, rx);
773-
}
771+
// delay dropping permit to give some time for cleanup.
772+
sleep(Duration::from_millis(100)).await;
773+
let _ = closed_tx.send(());
774774
});
775775
}
776776
}

0 commit comments

Comments
 (0)