Skip to content

Commit ea289e5

Browse files
committed
Allow deletion of multiple peers simultaniously
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
1 parent 8576328 commit ea289e5

File tree

2 files changed

+55
-32
lines changed

2 files changed

+55
-32
lines changed

mycelium/src/peer_manager.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -960,7 +960,9 @@ where
960960
.pr
961961
.upgrade()
962962
{
963-
router.handle_dead_peer(old_peer);
963+
router.handle_dead_peer(&[old_peer]);
964+
} else {
965+
warn!("Added duplicate inbound entry but did not kill old peer");
964966
}
965967
}
966968
info!("Replaced existing inbound peer");

mycelium/src/router.rs

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ where
114114
let (router_data_tx, router_data_rx) = mpsc::channel::<DataPacket>(1000);
115115
let (expired_source_key_sink, expired_source_key_stream) = mpsc::channel(1);
116116
let (expired_route_entry_sink, expired_route_entry_stream) = mpsc::channel(1);
117-
let (dead_peer_sink, dead_peer_stream) = mpsc::channel(1);
117+
let (dead_peer_sink, dead_peer_stream) = mpsc::channel(100);
118118

119119
let routing_table = RoutingTable::new(expired_route_entry_sink);
120120

@@ -259,8 +259,23 @@ where
259259
"Removing peer {} from the router",
260260
peer.connection_identifier()
261261
);
262-
self.peer_interfaces.write().unwrap().retain(|p| p != peer);
263-
self.metrics.router_peer_removed();
262+
peer.died();
263+
264+
let mut peers = self.peer_interfaces.write().unwrap();
265+
let old_peers = peers.len();
266+
peers.retain(|p| p != peer);
267+
268+
let removed = old_peers - peers.len();
269+
for _ in 0..removed {
270+
self.metrics.router_peer_removed();
271+
}
272+
273+
if removed > 1 {
274+
warn!(
275+
"REMOVED {removed} peers from peer list while called with {}",
276+
peer.connection_identifier()
277+
);
278+
}
264279
}
265280

266281
/// Get a list of all selected route entries.
@@ -306,19 +321,20 @@ where
306321
dead_peers
307322
};
308323

309-
for dead_peer in dead_peers {
310-
self.handle_dead_peer(dead_peer);
311-
}
324+
self.handle_dead_peer(&dead_peers);
312325
}
313326
}
314327

315328
/// Remove a dead peer from the router.
316-
pub fn handle_dead_peer(&self, dead_peer: Peer) {
317-
self.metrics.router_peer_died();
318-
debug!(
319-
"Cleaning up peer {} which is reportedly dead",
320-
dead_peer.connection_identifier()
321-
);
329+
pub fn handle_dead_peer(&self, dead_peers: &[Peer]) {
330+
for dead_peer in dead_peers {
331+
self.metrics.router_peer_died();
332+
debug!(
333+
"Cleaning up peer {} which is reportedly dead",
334+
dead_peer.connection_identifier()
335+
);
336+
self.remove_peer_interface(dead_peer);
337+
}
322338

323339
// Scope for routing table write access.
324340
let subnets_to_select = {
@@ -329,29 +345,29 @@ where
329345

330346
while let Some((subnet, mut rl)) = rt_write.next() {
331347
rl.update_routes(|routes, eres, ct| {
332-
let Some(mut re) = routes.iter_mut().find(|re| re.neighbour() == &dead_peer)
333-
else {
334-
return;
335-
};
348+
for dead_peer in dead_peers {
349+
let Some(mut re) = routes.iter_mut().find(|re| re.neighbour() == dead_peer)
350+
else {
351+
continue;
352+
};
336353

337-
if re.selected() {
338-
subnets_to_select.push(subnet);
354+
if re.selected() {
355+
subnets_to_select.push(subnet);
339356

340-
// Don't clear selected flag yet, running route selection does that for us.
341-
re.set_metric(Metric::infinite());
342-
re.set_expires(
343-
tokio::time::Instant::now() + RETRACTED_ROUTE_HOLD_TIME,
344-
eres.clone(),
345-
ct.clone(),
346-
);
347-
} else {
348-
routes.remove(&dead_peer);
357+
// Don't clear selected flag yet, running route selection does that for us.
358+
re.set_metric(Metric::infinite());
359+
re.set_expires(
360+
tokio::time::Instant::now() + RETRACTED_ROUTE_HOLD_TIME,
361+
eres.clone(),
362+
ct.clone(),
363+
);
364+
} else {
365+
routes.remove(dead_peer);
366+
}
349367
}
350368
});
351369
}
352370

353-
self.remove_peer_interface(&dead_peer);
354-
355371
subnets_to_select
356372
};
357373

@@ -495,8 +511,13 @@ where
495511
/// Process notifications about peers who are dead. This allows peers who can self-diagnose
496512
/// connection states to notify us, and allow for more efficient cleanup.
497513
async fn process_dead_peers(self, mut dead_peer_stream: mpsc::Receiver<Peer>) {
498-
while let Some(dead_peer) = dead_peer_stream.recv().await {
499-
self.handle_dead_peer(dead_peer);
514+
let mut tx_buf = Vec::with_capacity(100);
515+
loop {
516+
let received = dead_peer_stream.recv_many(&mut tx_buf, 100).await;
517+
if received == 0 {
518+
break;
519+
}
520+
self.handle_dead_peer(&tx_buf[..received]);
500521
}
501522
warn!("Processing of dead peers halted");
502523
}

0 commit comments

Comments
 (0)