Skip to content

Commit ba2033b

Browse files
committed
fix: [#1358] trigger PeerRemoved event when peer is removed due to inactivity
1 parent d47483f commit ba2033b

File tree

5 files changed

+36
-15
lines changed

5 files changed

+36
-15
lines changed

packages/torrent-repository/src/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub enum Event {
1717
announcement: PeerAnnouncement,
1818
},
1919
PeerRemoved {
20-
socket_addr: SocketAddr,
20+
peer_addr: SocketAddr,
2121
peer_id: PeerId,
2222
},
2323
}

packages/torrent-repository/src/statistics/event/handler.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ pub async fn handle_event(event: Event, stats_repository: &Arc<Repository>, now:
3636
// todo: update metrics
3737
tracing::debug!("Peer added {announcement:?}");
3838
}
39-
Event::PeerRemoved { socket_addr, peer_id } => {
39+
Event::PeerRemoved {
40+
peer_addr: socket_addr,
41+
peer_id,
42+
} => {
4043
// todo: update metrics
4144
tracing::debug!("Peer removed: socket address {socket_addr:?}, peer ID: {peer_id:?}");
4245
}

packages/torrent-repository/src/swarm.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl Swarm {
143143
if let Some(event_sender) = self.event_sender.as_deref() {
144144
event_sender
145145
.send(Event::PeerRemoved {
146-
socket_addr: old_peer.peer_addr,
146+
peer_addr: old_peer.peer_addr,
147147
peer_id: old_peer.peer_id,
148148
})
149149
.await;
@@ -155,10 +155,11 @@ impl Swarm {
155155
}
156156
}
157157

158-
pub fn remove_inactive(&mut self, current_cutoff: DurationSinceUnixEpoch) -> u64 {
159-
let mut inactive_peers_removed = 0;
158+
pub async fn remove_inactive(&mut self, current_cutoff: DurationSinceUnixEpoch) -> usize {
159+
let mut number_of_peers_removed = 0;
160+
let mut removed_peers = Vec::new();
160161

161-
self.peers.retain(|_, peer| {
162+
self.peers.retain(|_key, peer| {
162163
let is_active = peer::ReadInfo::get_updated(peer) > current_cutoff;
163164

164165
if !is_active {
@@ -169,13 +170,30 @@ impl Swarm {
169170
self.metadata.incomplete -= 1;
170171
}
171172

172-
inactive_peers_removed += 1;
173+
number_of_peers_removed += 1;
174+
175+
if let Some(_event_sender) = self.event_sender.as_deref() {
176+
// Events can not be trigger here because retain does not allow
177+
// async closures.
178+
removed_peers.push((peer.peer_addr, peer.peer_id));
179+
}
173180
}
174181

175182
is_active
176183
});
177184

178-
inactive_peers_removed
185+
if let Some(event_sender) = self.event_sender.as_deref() {
186+
for (peer_addr, peer_id) in &removed_peers {
187+
event_sender
188+
.send(Event::PeerRemoved {
189+
peer_addr: *peer_addr,
190+
peer_id: *peer_id,
191+
})
192+
.await;
193+
}
194+
}
195+
196+
number_of_peers_removed
179197
}
180198

181199
#[must_use]
@@ -431,7 +449,7 @@ mod tests {
431449
swarm.upsert_peer(peer.into(), &mut downloads_increased).await;
432450

433451
// Remove peers not updated since one second after inserting the peer
434-
swarm.remove_inactive(last_update_time + one_second);
452+
swarm.remove_inactive(last_update_time + one_second).await;
435453

436454
assert_eq!(swarm.len(), 0);
437455
}
@@ -448,7 +466,7 @@ mod tests {
448466
swarm.upsert_peer(peer.into(), &mut downloads_increased).await;
449467

450468
// Remove peers not updated since one second before inserting the peer.
451-
swarm.remove_inactive(last_update_time - one_second);
469+
swarm.remove_inactive(last_update_time - one_second).await;
452470

453471
assert_eq!(swarm.len(), 1);
454472
}
@@ -753,7 +771,7 @@ mod tests {
753771

754772
let leechers = swarm.metadata().leechers();
755773

756-
swarm.remove_inactive(leecher.updated + Duration::from_secs(1));
774+
swarm.remove_inactive(leecher.updated + Duration::from_secs(1)).await;
757775

758776
assert_eq!(swarm.metadata().leechers(), leechers - 1);
759777
}
@@ -769,7 +787,7 @@ mod tests {
769787

770788
let seeders = swarm.metadata().seeders();
771789

772-
swarm.remove_inactive(seeder.updated + Duration::from_secs(1));
790+
swarm.remove_inactive(seeder.updated + Duration::from_secs(1)).await;
773791

774792
assert_eq!(swarm.metadata().seeders(), seeders - 1);
775793
}

packages/torrent-repository/src/swarms.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ impl Swarms {
259259
///
260260
/// This function returns an error if it fails to acquire the lock for any
261261
/// swarm handle.
262-
pub async fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> Result<u64, Error> {
262+
pub async fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> Result<usize, Error> {
263263
tracing::info!(
264264
"Removing inactive peers since: {:?} ...",
265265
convert_from_timestamp_to_datetime_utc(current_cutoff)
@@ -269,7 +269,7 @@ impl Swarms {
269269

270270
for swarm_handle in &self.swarms {
271271
let mut swarm = swarm_handle.value().lock().await;
272-
let removed = swarm.remove_inactive(current_cutoff);
272+
let removed = swarm.remove_inactive(current_cutoff).await;
273273
inactive_peers_removed += removed;
274274
}
275275

packages/torrent-repository/tests/swarm/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ async fn it_should_remove_inactive_peers_beyond_cutoff(#[values(swarm())] mut sw
390390
assert_eq!(swarm.len(), peers.len() + 1);
391391

392392
let current_cutoff = CurrentClock::now_sub(&TIMEOUT).unwrap_or_default();
393-
swarm.remove_inactive(current_cutoff);
393+
swarm.remove_inactive(current_cutoff).await;
394394

395395
assert_eq!(swarm.len(), peers.len());
396396
}

0 commit comments

Comments
 (0)