diff --git a/packages/torrent-repository/src/swarm.rs b/packages/torrent-repository/src/swarm.rs index 8cf2982e6..f25304979 100644 --- a/packages/torrent-repository/src/swarm.rs +++ b/packages/torrent-repository/src/swarm.rs @@ -1,8 +1,6 @@ //! A swarm is a collection of peers that are all trying to download the same //! torrent. use std::collections::BTreeMap; -use std::fmt::Debug; -use std::hash::{Hash, Hasher}; use std::net::SocketAddr; use std::sync::Arc; @@ -24,31 +22,6 @@ pub struct Swarm { event_sender: Sender, } -#[allow(clippy::missing_fields_in_debug)] -impl Debug for Swarm { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Swarm") - .field("peers", &self.peers) - .field("metadata", &self.metadata) - .finish() - } -} - -impl Hash for Swarm { - fn hash(&self, state: &mut H) { - self.peers.hash(state); - self.metadata.hash(state); - } -} - -impl PartialEq for Swarm { - fn eq(&self, other: &Self) -> bool { - self.peers == other.peers && self.metadata == other.metadata - } -} - -impl Eq for Swarm {} - impl Swarm { #[must_use] pub fn new(info_hash: &InfoHash, downloaded: u32, event_sender: Sender) -> Self { @@ -329,16 +302,6 @@ mod tests { use crate::swarm::Swarm; use crate::tests::sample_info_hash; - #[test] - fn it_should_allow_debugging() { - let swarm = Swarm::new(&sample_info_hash(), 0, None); - - assert_eq!( - format!("{swarm:?}"), - "Swarm { peers: {}, metadata: SwarmMetadata { downloaded: 0, complete: 0, incomplete: 0 } }" - ); - } - #[test] fn it_should_be_empty_when_no_peers_have_been_inserted() { let swarm = Swarm::new(&sample_info_hash(), 0, None); diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index 8b8327778..ac2490853 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -49,7 +49,6 @@ impl Swarms { /// # Errors /// /// This function panics if the lock for the swarm handle cannot be acquired. - #[allow(clippy::await_holding_lock)] pub async fn handle_announcement( &self, info_hash: &InfoHash, diff --git a/packages/torrent-repository/tests/common/mod.rs b/packages/torrent-repository/tests/common/mod.rs deleted file mode 100644 index c77ca2769..000000000 --- a/packages/torrent-repository/tests/common/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod torrent_peer_builder; diff --git a/packages/torrent-repository/tests/common/torrent_peer_builder.rs b/packages/torrent-repository/tests/common/torrent_peer_builder.rs deleted file mode 100644 index 0c065e670..000000000 --- a/packages/torrent-repository/tests/common/torrent_peer_builder.rs +++ /dev/null @@ -1,106 +0,0 @@ -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - -use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes, PeerId}; -use torrust_tracker_clock::clock::Time; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; - -use crate::CurrentClock; - -#[derive(Debug, Default)] -struct TorrentPeerBuilder { - peer: peer::Peer, -} - -#[allow(dead_code)] -impl TorrentPeerBuilder { - #[must_use] - fn new() -> Self { - Self { - peer: peer::Peer { - updated: CurrentClock::now(), - ..Default::default() - }, - } - } - - #[must_use] - fn with_event_completed(mut self) -> Self { - self.peer.event = AnnounceEvent::Completed; - self - } - - #[must_use] - fn with_event_started(mut self) -> Self { - self.peer.event = AnnounceEvent::Started; - self - } - - #[must_use] - fn with_peer_address(mut self, peer_addr: SocketAddr) -> Self { - self.peer.peer_addr = peer_addr; - self - } - - #[must_use] - fn with_peer_id(mut self, peer_id: PeerId) -> Self { - self.peer.peer_id = peer_id; - self - } - - #[must_use] - fn with_number_of_bytes_left(mut self, left: i64) -> Self { - self.peer.left = NumberOfBytes::new(left); - self - } - - #[must_use] - fn updated_at(mut self, updated: DurationSinceUnixEpoch) -> Self { - self.peer.updated = updated; - self - } - - #[must_use] - fn into(self) -> peer::Peer { - self.peer - } -} - -/// A torrent seeder is a peer with 0 bytes left to download which -/// has not announced it has stopped -#[allow(clippy::cast_sign_loss)] -#[allow(clippy::cast_possible_truncation)] -#[must_use] -pub fn a_completed_peer(id: i32) -> peer::Peer { - let peer_id = peer::Id::new(id); - let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id as u16); - - TorrentPeerBuilder::new() - .with_number_of_bytes_left(0) - .with_event_completed() - .with_peer_id(*peer_id) - .with_peer_address(peer_addr) - .into() -} - -/// A torrent leecher is a peer that is not a seeder. -/// Leecher: left > 0 OR event = Stopped -/// -/// # Panics -/// -/// This function panics if proved id can't be converted into a valid socket address port. -/// -/// The `id` argument is used to identify the peer in both the `peer_id` and the `peer_addr`. -#[allow(clippy::cast_sign_loss)] -#[allow(clippy::cast_possible_truncation)] -#[must_use] -pub fn a_started_peer(id: i32) -> peer::Peer { - let peer_id = peer::Id::new(id); - let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id as u16); - - TorrentPeerBuilder::new() - .with_number_of_bytes_left(1) - .with_event_started() - .with_peer_id(*peer_id) - .with_peer_address(peer_addr) - .into() -} diff --git a/packages/torrent-repository/tests/integration.rs b/packages/torrent-repository/tests/integration.rs deleted file mode 100644 index b3e057075..000000000 --- a/packages/torrent-repository/tests/integration.rs +++ /dev/null @@ -1,22 +0,0 @@ -//! Integration tests. -//! -//! ```text -//! cargo test --test integration -//! ``` - -use torrust_tracker_clock::clock; - -pub mod common; -mod swarm; -mod swarms; - -/// This code needs to be copied into each crate. -/// Working version, for production. -#[cfg(not(test))] -#[allow(dead_code)] -pub(crate) type CurrentClock = clock::Working; - -/// Stopped version, for testing. -#[cfg(test)] -#[allow(dead_code)] -pub(crate) type CurrentClock = clock::Stopped; diff --git a/packages/torrent-repository/tests/swarm/mod.rs b/packages/torrent-repository/tests/swarm/mod.rs deleted file mode 100644 index cb4009ba9..000000000 --- a/packages/torrent-repository/tests/swarm/mod.rs +++ /dev/null @@ -1,397 +0,0 @@ -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::ops::Sub; -use std::time::Duration; - -use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; -use bittorrent_primitives::info_hash::InfoHash; -use rstest::{fixture, rstest}; -use torrust_tracker_clock::clock::stopped::Stopped as _; -use torrust_tracker_clock::clock::{self, Time as _}; -use torrust_tracker_configuration::{TrackerPolicy, TORRENT_PEERS_LIMIT}; -use torrust_tracker_primitives::peer; -use torrust_tracker_primitives::peer::Peer; -use torrust_tracker_torrent_repository::Swarm; - -use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; -use crate::CurrentClock; - -#[fixture] -fn swarm() -> Swarm { - Swarm::new(&InfoHash::default(), 0, None) -} - -#[fixture] -fn policy_none() -> TrackerPolicy { - TrackerPolicy::new(0, false, false) -} - -#[fixture] -fn policy_persist() -> TrackerPolicy { - TrackerPolicy::new(0, true, false) -} - -#[fixture] -fn policy_remove() -> TrackerPolicy { - TrackerPolicy::new(0, false, true) -} - -#[fixture] -fn policy_remove_persist() -> TrackerPolicy { - TrackerPolicy::new(0, true, true) -} - -pub enum Makes { - Empty, - Started, - Completed, - Downloaded, - Three, -} - -async fn make(swarm: &mut Swarm, makes: &Makes) -> Vec { - match makes { - Makes::Empty => vec![], - Makes::Started => { - let peer = a_started_peer(1); - swarm.handle_announcement(&peer).await; - vec![peer] - } - Makes::Completed => { - let peer = a_completed_peer(2); - swarm.handle_announcement(&peer).await; - vec![peer] - } - Makes::Downloaded => { - let mut peer = a_started_peer(3); - swarm.handle_announcement(&peer).await; - peer.event = AnnounceEvent::Completed; - peer.left = NumberOfBytes::new(0); - swarm.handle_announcement(&peer).await; - vec![peer] - } - Makes::Three => { - let peer_1 = a_started_peer(1); - swarm.handle_announcement(&peer_1).await; - - let peer_2 = a_completed_peer(2); - swarm.handle_announcement(&peer_2).await; - - let mut peer_3 = a_started_peer(3); - swarm.handle_announcement(&peer_3).await; - peer_3.event = AnnounceEvent::Completed; - peer_3.left = NumberOfBytes::new(0); - swarm.handle_announcement(&peer_3).await; - vec![peer_1, peer_2, peer_3] - } - } -} - -#[rstest] -#[case::empty(&Makes::Empty)] -#[tokio::test] -async fn it_should_be_empty_by_default(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { - make(&mut swarm, makes).await; - - assert_eq!(swarm.len(), 0); -} - -#[rstest] -#[case::empty(&Makes::Empty)] -#[case::started(&Makes::Started)] -#[case::completed(&Makes::Completed)] -#[case::downloaded(&Makes::Downloaded)] -#[case::three(&Makes::Three)] -#[tokio::test] -async fn it_should_check_if_entry_should_be_retained_based_on_the_tracker_policy( - #[values(swarm())] mut swarm: Swarm, - #[case] makes: &Makes, - #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, -) { - make(&mut swarm, makes).await; - - let has_peers = !swarm.is_empty(); - let has_downloads = swarm.metadata().downloaded != 0; - - match (policy.remove_peerless_torrents, policy.persistent_torrent_completed_stat) { - // remove torrents without peers, and keep completed download stats - (true, true) => match (has_peers, has_downloads) { - // no peers, but has downloads - // peers, with or without downloads - (false, true) | (true, true | false) => assert!(swarm.meets_retaining_policy(&policy)), - // no peers and no downloads - (false, false) => assert!(!swarm.meets_retaining_policy(&policy)), - }, - // remove torrents without peers and drop completed download stats - (true, false) => match (has_peers, has_downloads) { - // peers, with or without downloads - (true, true | false) => assert!(swarm.meets_retaining_policy(&policy)), - // no peers and with or without downloads - (false, true | false) => assert!(!swarm.meets_retaining_policy(&policy)), - }, - // keep torrents without peers, but keep or drop completed download stats - (false, true | false) => assert!(swarm.meets_retaining_policy(&policy)), - } -} - -#[rstest] -#[case::empty(&Makes::Empty)] -#[case::started(&Makes::Started)] -#[case::completed(&Makes::Completed)] -#[case::downloaded(&Makes::Downloaded)] -#[case::three(&Makes::Three)] -#[tokio::test] -async fn it_should_get_peers_for_torrent_entry(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { - let peers = make(&mut swarm, makes).await; - - let torrent_peers = swarm.peers(None); - - assert_eq!(torrent_peers.len(), peers.len()); - - for peer in torrent_peers { - assert!(peers.contains(&peer)); - } -} - -#[rstest] -#[case::empty(&Makes::Empty)] -#[case::started(&Makes::Started)] -#[case::completed(&Makes::Completed)] -#[case::downloaded(&Makes::Downloaded)] -#[case::three(&Makes::Three)] -#[tokio::test] -async fn it_should_update_a_peer(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { - make(&mut swarm, makes).await; - - // Make and insert a new peer. - let mut peer = a_started_peer(-1); - swarm.handle_announcement(&peer).await; - - // Get the Inserted Peer by Id. - let peers = swarm.peers(None); - let original = peers - .iter() - .find(|p| peer::ReadInfo::get_id(*p) == peer::ReadInfo::get_id(&peer)) - .expect("it should find peer by id"); - - assert_eq!(original.event, AnnounceEvent::Started, "it should be as created"); - - // Announce "Completed" torrent download event. - peer.event = AnnounceEvent::Completed; - swarm.handle_announcement(&peer).await; - - // Get the Updated Peer by Id. - let peers = swarm.peers(None); - let updated = peers - .iter() - .find(|p| peer::ReadInfo::get_id(*p) == peer::ReadInfo::get_id(&peer)) - .expect("it should find peer by id"); - - assert_eq!(updated.event, AnnounceEvent::Completed, "it should be updated"); -} - -#[rstest] -#[case::empty(&Makes::Empty)] -#[case::started(&Makes::Started)] -#[case::completed(&Makes::Completed)] -#[case::downloaded(&Makes::Downloaded)] -#[case::three(&Makes::Three)] -#[tokio::test] -async fn it_should_remove_a_peer_upon_stopped_announcement(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { - use torrust_tracker_primitives::peer::ReadInfo as _; - - make(&mut swarm, makes).await; - - let mut peer = a_started_peer(-1); - - swarm.handle_announcement(&peer).await; - - // The started peer should be inserted. - let peers = swarm.peers(None); - let original = peers - .iter() - .find(|p| p.get_id() == peer.get_id()) - .expect("it should find peer by id"); - - assert_eq!(original.event, AnnounceEvent::Started); - - // Change peer to "Stopped" and insert. - peer.event = AnnounceEvent::Stopped; - swarm.handle_announcement(&peer).await; - - // It should be removed now. - let peers = swarm.peers(None); - - assert_eq!( - peers.iter().find(|p| p.get_id() == peer.get_id()), - None, - "it should be removed" - ); -} - -#[rstest] -#[case::started(&Makes::Started)] -#[case::completed(&Makes::Completed)] -#[case::downloaded(&Makes::Downloaded)] -#[case::three(&Makes::Three)] -#[tokio::test] -async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloaded_statistic( - #[values(swarm())] mut torrent: Swarm, - #[case] makes: &Makes, -) { - make(&mut torrent, makes).await; - let downloaded = torrent.metadata().downloaded; - - let peers = torrent.peers(None); - let mut peer = **peers.first().expect("there should be a peer"); - - let is_already_completed = peer.event == AnnounceEvent::Completed; - - // Announce "Completed" torrent download event. - peer.event = AnnounceEvent::Completed; - - torrent.handle_announcement(&peer).await; - let stats = torrent.metadata(); - - if is_already_completed { - assert_eq!(stats.downloaded, downloaded); - } else { - assert_eq!(stats.downloaded, downloaded + 1); - } -} - -#[rstest] -#[case::started(&Makes::Started)] -#[case::completed(&Makes::Completed)] -#[case::downloaded(&Makes::Downloaded)] -#[case::three(&Makes::Three)] -#[tokio::test] -async fn it_should_update_a_peer_as_a_seeder(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { - let peers = make(&mut swarm, makes).await; - let completed = u32::try_from(peers.iter().filter(|p| p.is_seeder()).count()).expect("it_should_not_be_so_many"); - - let peers = swarm.peers(None); - let mut peer = **peers.first().expect("there should be a peer"); - - let is_already_non_left = peer.left == NumberOfBytes::new(0); - - // Set Bytes Left to Zero - peer.left = NumberOfBytes::new(0); - swarm.handle_announcement(&peer).await; - let stats = swarm.metadata(); - - if is_already_non_left { - // it was already complete - assert_eq!(stats.complete, completed); - } else { - // now it is complete - assert_eq!(stats.complete, completed + 1); - } -} - -#[rstest] -#[case::started(&Makes::Started)] -#[case::completed(&Makes::Completed)] -#[case::downloaded(&Makes::Downloaded)] -#[case::three(&Makes::Three)] -#[tokio::test] -async fn it_should_update_a_peer_as_incomplete(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { - let peers = make(&mut swarm, makes).await; - let incomplete = u32::try_from(peers.iter().filter(|p| !p.is_seeder()).count()).expect("it should not be so many"); - - let peers = swarm.peers(None); - let mut peer = **peers.first().expect("there should be a peer"); - - let completed_already = peer.left == NumberOfBytes::new(0); - - // Set Bytes Left to no Zero - peer.left = NumberOfBytes::new(1); - swarm.handle_announcement(&peer).await; - let stats = swarm.metadata(); - - if completed_already { - // now it is incomplete - assert_eq!(stats.incomplete, incomplete + 1); - } else { - // was already incomplete - assert_eq!(stats.incomplete, incomplete); - } -} - -#[rstest] -#[case::started(&Makes::Started)] -#[case::completed(&Makes::Completed)] -#[case::downloaded(&Makes::Downloaded)] -#[case::three(&Makes::Three)] -#[tokio::test] -async fn it_should_get_peers_excluding_the_client_socket(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { - make(&mut swarm, makes).await; - - let peers = swarm.peers(None); - let mut peer = **peers.first().expect("there should be a peer"); - - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081); - - // for this test, we should not already use this socket. - assert_ne!(peer.peer_addr, socket); - - // it should get the peer as it dose not share the socket. - assert!(swarm.peers_excluding(&socket, None).contains(&peer.into())); - - // set the address to the socket. - peer.peer_addr = socket; - swarm.handle_announcement(&peer).await; // Add peer - - // It should not include the peer that has the same socket. - assert!(!swarm.peers_excluding(&socket, None).contains(&peer.into())); -} - -#[rstest] -#[case::empty(&Makes::Empty)] -#[case::started(&Makes::Started)] -#[case::completed(&Makes::Completed)] -#[case::downloaded(&Makes::Downloaded)] -#[case::three(&Makes::Three)] -#[tokio::test] -async fn it_should_limit_the_number_of_peers_returned(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { - make(&mut swarm, makes).await; - - // We add one more peer than the scrape limit - for peer_number in 1..=74 + 1 { - let peer = a_started_peer(peer_number); - swarm.handle_announcement(&peer).await; - } - - let peers = swarm.peers(Some(TORRENT_PEERS_LIMIT)); - - assert_eq!(peers.len(), 74); -} - -#[rstest] -#[case::empty(&Makes::Empty)] -#[case::started(&Makes::Started)] -#[case::completed(&Makes::Completed)] -#[case::downloaded(&Makes::Downloaded)] -#[case::three(&Makes::Three)] -#[tokio::test] -async fn it_should_remove_inactive_peers_beyond_cutoff(#[values(swarm())] mut swarm: Swarm, #[case] makes: &Makes) { - const TIMEOUT: Duration = Duration::from_secs(120); - const EXPIRE: Duration = Duration::from_secs(121); - - let peers = make(&mut swarm, makes).await; - - let mut peer = a_completed_peer(-1); - - let now = clock::Working::now(); - clock::Stopped::local_set(&now); - - peer.updated = now.sub(EXPIRE); - - swarm.handle_announcement(&peer).await; - - assert_eq!(swarm.len(), peers.len() + 1); - - let current_cutoff = CurrentClock::now_sub(&TIMEOUT).unwrap_or_default(); - swarm.remove_inactive(current_cutoff).await; - - assert_eq!(swarm.len(), peers.len()); -} diff --git a/packages/torrent-repository/tests/swarms/mod.rs b/packages/torrent-repository/tests/swarms/mod.rs deleted file mode 100644 index 780d6cd4c..000000000 --- a/packages/torrent-repository/tests/swarms/mod.rs +++ /dev/null @@ -1,524 +0,0 @@ -use std::collections::{BTreeMap, HashSet}; -use std::hash::{DefaultHasher, Hash, Hasher}; - -use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; -use bittorrent_primitives::info_hash::InfoHash; -use futures::future::join_all; -use rstest::{fixture, rstest}; -use torrust_tracker_configuration::TrackerPolicy; -use torrust_tracker_primitives::pagination::Pagination; -use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; -use torrust_tracker_primitives::PersistentTorrents; -use torrust_tracker_torrent_repository::swarm::Swarm; -use torrust_tracker_torrent_repository::Swarms; - -use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; - -fn swarm() -> Swarm { - Swarm::new(&InfoHash::default(), 0, None) -} - -#[fixture] -fn swarms() -> Swarms { - Swarms::default() -} - -type Entries = Vec<(InfoHash, Swarm)>; - -#[fixture] -fn empty() -> Entries { - vec![] -} - -#[fixture] -fn default() -> Entries { - vec![(InfoHash::default(), swarm())] -} - -#[fixture] -async fn started() -> Entries { - let mut swarm = swarm(); - swarm.handle_announcement(&a_started_peer(1)).await; - vec![(InfoHash::default(), swarm)] -} - -#[fixture] -async fn completed() -> Entries { - let mut swarm = swarm(); - swarm.handle_announcement(&a_completed_peer(2)).await; - vec![(InfoHash::default(), swarm)] -} - -#[fixture] -async fn downloaded() -> Entries { - let mut swarm = swarm(); - let mut peer = a_started_peer(3); - swarm.handle_announcement(&peer).await; - peer.event = AnnounceEvent::Completed; - peer.left = NumberOfBytes::new(0); - swarm.handle_announcement(&peer).await; - vec![(InfoHash::default(), swarm)] -} - -#[fixture] -async fn three() -> Entries { - let mut started = swarm(); - let started_h = &mut DefaultHasher::default(); - started.handle_announcement(&a_started_peer(1)).await; - started.hash(started_h); - - let mut completed = swarm(); - let completed_h = &mut DefaultHasher::default(); - completed.handle_announcement(&a_completed_peer(2)).await; - completed.hash(completed_h); - - let mut downloaded = swarm(); - let downloaded_h = &mut DefaultHasher::default(); - let mut downloaded_peer = a_started_peer(3); - downloaded.handle_announcement(&downloaded_peer).await; - downloaded_peer.event = AnnounceEvent::Completed; - downloaded_peer.left = NumberOfBytes::new(0); - downloaded.handle_announcement(&downloaded_peer).await; - downloaded.hash(downloaded_h); - - vec![ - (InfoHash::from(&started_h.clone()), started), - (InfoHash::from(&completed_h.clone()), completed), - (InfoHash::from(&downloaded_h.clone()), downloaded), - ] -} - -#[fixture] -async fn many_out_of_order() -> Entries { - let mut entries: HashSet<(InfoHash, Swarm)> = HashSet::default(); - - for i in 0..408 { - let mut entry = swarm(); - entry.handle_announcement(&a_started_peer(i)).await; - - entries.insert((InfoHash::from(&i), entry)); - } - - // we keep the random order from the hashed set for the vector. - entries.iter().map(|(i, e)| (*i, e.clone())).collect() -} - -#[fixture] -async fn many_hashed_in_order() -> Entries { - let mut entries: BTreeMap = BTreeMap::default(); - - for i in 0..408 { - let mut entry = swarm(); - entry.handle_announcement(&a_started_peer(i)).await; - - let hash: &mut DefaultHasher = &mut DefaultHasher::default(); - hash.write_i32(i); - - entries.insert(InfoHash::from(&hash.clone()), entry); - } - - // We return the entries in-order from from the b-tree map. - entries.iter().map(|(i, e)| (*i, e.clone())).collect() -} - -#[fixture] -fn persistent_empty() -> PersistentTorrents { - PersistentTorrents::default() -} - -#[fixture] -fn persistent_single() -> PersistentTorrents { - let hash = &mut DefaultHasher::default(); - - hash.write_u8(1); - let t = [(InfoHash::from(&hash.clone()), 0_u32)]; - - t.iter().copied().collect() -} - -#[fixture] -fn persistent_three() -> PersistentTorrents { - let hash = &mut DefaultHasher::default(); - - hash.write_u8(1); - let info_1 = InfoHash::from(&hash.clone()); - hash.write_u8(2); - let info_2 = InfoHash::from(&hash.clone()); - hash.write_u8(3); - let info_3 = InfoHash::from(&hash.clone()); - - let t = [(info_1, 1_u32), (info_2, 2_u32), (info_3, 3_u32)]; - - t.iter().copied().collect() -} - -fn make(swarms: &Swarms, entries: &Entries) { - for (info_hash, swarm) in entries { - swarms.insert(info_hash, swarm.clone()); - } -} - -#[fixture] -fn paginated_limit_zero() -> Pagination { - Pagination::new(0, 0) -} - -#[fixture] -fn paginated_limit_one() -> Pagination { - Pagination::new(0, 1) -} - -#[fixture] -fn paginated_limit_one_offset_one() -> Pagination { - Pagination::new(1, 1) -} - -#[fixture] -fn policy_none() -> TrackerPolicy { - TrackerPolicy::new(0, false, false) -} - -#[fixture] -fn policy_persist() -> TrackerPolicy { - TrackerPolicy::new(0, true, false) -} - -#[fixture] -fn policy_remove() -> TrackerPolicy { - TrackerPolicy::new(0, false, true) -} - -#[fixture] -fn policy_remove_persist() -> TrackerPolicy { - TrackerPolicy::new(0, true, true) -} - -#[rstest] -#[case::empty(empty())] -#[case::default(default())] -#[case::started(started().await)] -#[case::completed(completed().await)] -#[case::downloaded(downloaded().await)] -#[case::three(three().await)] -#[case::out_of_order(many_out_of_order().await)] -#[case::in_order(many_hashed_in_order().await)] -#[tokio::test] -async fn it_should_get_a_torrent_entry(#[values(swarms())] repo: Swarms, #[case] entries: Entries) { - make(&repo, &entries); - - if let Some((info_hash, swarm)) = entries.first() { - assert_eq!(Some(repo.get(info_hash).unwrap().lock().await.clone()), Some(swarm.clone())); - } else { - assert!(repo.get(&InfoHash::default()).is_none()); - } -} - -#[rstest] -#[case::empty(empty())] -#[case::default(default())] -#[case::started(started().await)] -#[case::completed(completed().await)] -#[case::downloaded(downloaded().await)] -#[case::three(three().await)] -#[case::out_of_order(many_out_of_order().await)] -#[case::in_order(many_hashed_in_order().await)] -#[tokio::test] -async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( - #[values(swarms())] repo: Swarms, - #[case] entries: Entries, - #[future] many_out_of_order: Entries, -) { - make(&repo, &entries); - - let entries_a = repo.get_paginated(None).iter().map(|(i, _)| *i).collect::>(); - - make(&repo, &many_out_of_order.await); - - let entries_b = repo.get_paginated(None).iter().map(|(i, _)| *i).collect::>(); - - let is_equal = entries_b.iter().take(entries_a.len()).copied().collect::>() == entries_a; - - let is_sorted = entries_b.windows(2).all(|w| w[0] <= w[1]); - - assert!( - is_equal || is_sorted, - "The order is unstable: {is_equal}, or is sorted {is_sorted}." - ); -} - -#[rstest] -#[case::empty(empty())] -#[case::default(default())] -#[case::started(started().await)] -#[case::completed(completed().await)] -#[case::downloaded(downloaded().await)] -#[case::three(three().await)] -#[case::out_of_order(many_out_of_order().await)] -#[case::in_order(many_hashed_in_order().await)] -#[tokio::test] -async fn it_should_get_paginated( - #[values(swarms())] repo: Swarms, - #[case] entries: Entries, - #[values(paginated_limit_zero(), paginated_limit_one(), paginated_limit_one_offset_one())] paginated: Pagination, -) { - make(&repo, &entries); - - let mut info_hashes = repo.get_paginated(None).iter().map(|(i, _)| *i).collect::>(); - info_hashes.sort(); - - match paginated { - // it should return empty if limit is zero. - Pagination { limit: 0, .. } => { - let page = repo.get_paginated(Some(&paginated)); - - let futures = page.iter().map(|(i, swarm_handle)| { - let i = *i; - let swarm_handle = swarm_handle.clone(); - async move { (i, swarm_handle.lock().await.clone()) } - }); - - let swarms: Vec<(InfoHash, Swarm)> = join_all(futures).await; - - assert_eq!(swarms, vec![]); - } - - // it should return a single entry if the limit is one. - Pagination { limit: 1, offset: 0 } => { - if info_hashes.is_empty() { - assert_eq!(repo.get_paginated(Some(&paginated)).len(), 0); - } else { - let page = repo.get_paginated(Some(&paginated)); - assert_eq!(page.len(), 1); - assert_eq!(page.first().map(|(i, _)| i), info_hashes.first()); - } - } - - // it should return only the second entry if both the limit and the offset are one. - Pagination { limit: 1, offset: 1 } => { - if info_hashes.len() > 1 { - let page = repo.get_paginated(Some(&paginated)); - assert_eq!(page.len(), 1); - assert_eq!(page[0].0, info_hashes[1]); - } - } - - _ => {} - } -} - -#[rstest] -#[case::empty(empty())] -#[case::default(default())] -#[case::started(started().await)] -#[case::completed(completed().await)] -#[case::downloaded(downloaded().await)] -#[case::three(three().await)] -#[case::out_of_order(many_out_of_order().await)] -#[case::in_order(many_hashed_in_order().await)] -#[tokio::test] -async fn it_should_get_metrics(#[values(swarms())] swarms: Swarms, #[case] entries: Entries) { - use torrust_tracker_primitives::swarm_metadata::AggregateSwarmMetadata; - - make(&swarms, &entries); - - let mut metrics = AggregateSwarmMetadata::default(); - - for (_, torrent) in entries { - let stats = torrent.metadata(); - - metrics.total_torrents += 1; - metrics.total_incomplete += u64::from(stats.incomplete); - metrics.total_complete += u64::from(stats.complete); - metrics.total_downloaded += u64::from(stats.downloaded); - } - - assert_eq!(swarms.get_aggregate_swarm_metadata().await.unwrap(), metrics); -} - -#[rstest] -#[case::empty(empty())] -#[case::default(default())] -#[case::started(started().await)] -#[case::completed(completed().await)] -#[case::downloaded(downloaded().await)] -#[case::three(three().await)] -#[case::out_of_order(many_out_of_order().await)] -#[case::in_order(many_hashed_in_order().await)] -#[tokio::test] -async fn it_should_import_persistent_torrents( - #[values(swarms())] swarms: Swarms, - #[case] entries: Entries, - #[values(persistent_empty(), persistent_single(), persistent_three())] persistent_torrents: PersistentTorrents, -) { - make(&swarms, &entries); - - let mut downloaded = swarms.get_aggregate_swarm_metadata().await.unwrap().total_downloaded; - persistent_torrents.iter().for_each(|(_, d)| downloaded += u64::from(*d)); - - swarms.import_persistent(&persistent_torrents); - - assert_eq!( - swarms.get_aggregate_swarm_metadata().await.unwrap().total_downloaded, - downloaded - ); - - for (entry, _) in persistent_torrents { - assert!(swarms.get(&entry).is_some()); - } -} - -#[rstest] -#[case::empty(empty())] -#[case::default(default())] -#[case::started(started().await)] -#[case::completed(completed().await)] -#[case::downloaded(downloaded().await)] -#[case::three(three().await)] -#[case::out_of_order(many_out_of_order().await)] -#[case::in_order(many_hashed_in_order().await)] -#[tokio::test] -async fn it_should_remove_an_entry(#[values(swarms())] swarms: Swarms, #[case] entries: Entries) { - make(&swarms, &entries); - - for (info_hash, torrent) in entries { - assert_eq!( - Some(swarms.get(&info_hash).unwrap().lock().await.clone()), - Some(torrent.clone()) - ); - assert_eq!( - Some(swarms.remove(&info_hash).await.unwrap().lock().await.clone()), - Some(torrent) - ); - - assert!(swarms.get(&info_hash).is_none()); - assert!(swarms.remove(&info_hash).await.is_none()); - } - - assert_eq!(swarms.get_aggregate_swarm_metadata().await.unwrap().total_torrents, 0); -} - -#[rstest] -#[case::empty(empty())] -#[case::default(default())] -#[case::started(started().await)] -#[case::completed(completed().await)] -#[case::downloaded(downloaded().await)] -#[case::three(three().await)] -#[case::out_of_order(many_out_of_order().await)] -#[case::in_order(many_hashed_in_order().await)] -#[tokio::test] -async fn it_should_remove_inactive_peers(#[values(swarms())] swarms: Swarms, #[case] entries: Entries) { - use std::ops::Sub as _; - use std::time::Duration; - - use torrust_tracker_clock::clock::stopped::Stopped as _; - use torrust_tracker_clock::clock::{self, Time as _}; - use torrust_tracker_primitives::peer; - - use crate::CurrentClock; - - const TIMEOUT: Duration = Duration::from_secs(120); - const EXPIRE: Duration = Duration::from_secs(121); - - make(&swarms, &entries); - - let info_hash: InfoHash; - let mut peer: peer::Peer; - - // Generate a new infohash and peer. - { - let hash = &mut DefaultHasher::default(); - hash.write_u8(255); - info_hash = InfoHash::from(&hash.clone()); - peer = a_completed_peer(-1); - } - - // Set the last updated time of the peer to be 121 seconds ago. - { - let now = clock::Working::now(); - clock::Stopped::local_set(&now); - - peer.updated = now.sub(EXPIRE); - } - - // Insert the infohash and peer into the repository - // and verify there is an extra torrent entry. - { - swarms.handle_announcement(&info_hash, &peer, None).await.unwrap(); - assert_eq!( - swarms.get_aggregate_swarm_metadata().await.unwrap().total_torrents, - entries.len() as u64 + 1 - ); - } - - // Insert the infohash and peer into the repository - // and verify the swarm metadata was updated. - { - swarms.handle_announcement(&info_hash, &peer, None).await.unwrap(); - let stats = swarms.get_swarm_metadata(&info_hash).await.unwrap(); - assert_eq!( - stats, - Some(SwarmMetadata { - downloaded: 0, - complete: 1, - incomplete: 0 - }) - ); - } - - // Verify that this new peer was inserted into the repository. - { - let lock_tracked_torrent = swarms.get(&info_hash).expect("it_should_get_some"); - let entry = lock_tracked_torrent.lock().await; - assert!(entry.peers(None).contains(&peer.into())); - } - - // Remove peers that have not been updated since the timeout (120 seconds ago). - { - swarms - .remove_inactive_peers(CurrentClock::now_sub(&TIMEOUT).expect("it should get a time passed")) - .await - .unwrap(); - } - - // Verify that the this peer was removed from the repository. - { - let lock_tracked_torrent = swarms.get(&info_hash).expect("it_should_get_some"); - let entry = lock_tracked_torrent.lock().await; - assert!(!entry.peers(None).contains(&peer.into())); - } -} - -#[rstest] -#[case::empty(empty())] -#[case::default(default())] -#[case::started(started().await)] -#[case::completed(completed().await)] -#[case::downloaded(downloaded().await)] -#[case::three(three().await)] -#[case::out_of_order(many_out_of_order().await)] -#[case::in_order(many_hashed_in_order().await)] -#[tokio::test] -async fn it_should_remove_peerless_torrents( - #[values(swarms())] swarms: Swarms, - #[case] entries: Entries, - #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, -) { - make(&swarms, &entries); - - swarms.remove_peerless_torrents(&policy).await.unwrap(); - - let paginated = swarms.get_paginated(None); // ← store the result in a named variable - - let futures = paginated.iter().map(|(i, swarm_handle)| { - let i = *i; - let swarm_handle = swarm_handle.clone(); - async move { (i, swarm_handle.lock().await.clone()) } - }); - - let torrents: Vec<(InfoHash, Swarm)> = join_all(futures).await; - - for (_, entry) in torrents { - assert!(entry.meets_retaining_policy(&policy)); - } -}