diff --git a/Cargo.lock b/Cargo.lock index 6e4ab415f..5415149e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -676,6 +676,7 @@ dependencies = [ "torrust-rest-tracker-api-client", "torrust-tracker-clock", "torrust-tracker-configuration", + "torrust-tracker-events", "torrust-tracker-located-error", "torrust-tracker-primitives", "torrust-tracker-test-helpers", diff --git a/packages/axum-http-tracker-server/src/environment.rs b/packages/axum-http-tracker-server/src/environment.rs index 10dada2db..0c1431db5 100644 --- a/packages/axum-http-tracker-server/src/environment.rs +++ b/packages/axum-http-tracker-server/src/environment.rs @@ -25,12 +25,12 @@ pub struct Environment { impl Environment { /// Add a torrent to the tracker - pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) -> bool { + pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { self.container .tracker_core_container .in_memory_torrent_repository - .upsert_peer(info_hash, peer, None) - .await + .handle_announcement(info_hash, peer, None) + .await; } } diff --git a/packages/axum-rest-tracker-api-server/src/environment.rs b/packages/axum-rest-tracker-api-server/src/environment.rs index 92ca5a2d1..be93a8723 100644 --- a/packages/axum-rest-tracker-api-server/src/environment.rs +++ b/packages/axum-rest-tracker-api-server/src/environment.rs @@ -33,12 +33,12 @@ where S: std::fmt::Debug + std::fmt::Display, { /// Add a torrent to the tracker - pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) -> bool { + pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { self.container .tracker_core_container .in_memory_torrent_repository - .upsert_peer(info_hash, peer, None) - .await + .handle_announcement(info_hash, peer, None) + .await; } } diff --git a/packages/http-tracker-core/src/services/announce.rs b/packages/http-tracker-core/src/services/announce.rs index 9f39a04e4..0ad5ed143 100644 --- a/packages/http-tracker-core/src/services/announce.rs +++ b/packages/http-tracker-core/src/services/announce.rs @@ -87,7 +87,7 @@ impl AnnounceService { let announce_data = self .announce_handler - .announce( + .handle_announcement( &announce_request.info_hash, &mut peer, &remote_client_addr.ip(), diff --git a/packages/http-tracker-core/src/services/scrape.rs b/packages/http-tracker-core/src/services/scrape.rs index 3da1aa88f..f22f2f632 100644 --- a/packages/http-tracker-core/src/services/scrape.rs +++ b/packages/http-tracker-core/src/services/scrape.rs @@ -78,7 +78,7 @@ impl ScrapeService { let scrape_data = if self.authentication_is_required() && !self.is_authenticated(maybe_key).await { ScrapeData::zeroed(&scrape_request.info_hashes) } else { - self.scrape_handler.scrape(&scrape_request.info_hashes).await? + self.scrape_handler.handle_scrape(&scrape_request.info_hashes).await? }; let remote_client_addr = resolve_remote_client_addr(&self.core_config.net.on_reverse_proxy.into(), client_ip_sources)?; @@ -291,7 +291,7 @@ mod tests { let original_peer_ip = peer.ip(); container .announce_handler - .announce(&info_hash, &mut peer, &original_peer_ip, &PeersWanted::AsManyAsPossible) + .handle_announcement(&info_hash, &mut peer, &original_peer_ip, &PeersWanted::AsManyAsPossible) .await .unwrap(); @@ -482,7 +482,7 @@ mod tests { let original_peer_ip = peer.ip(); container .announce_handler - .announce(&info_hash, &mut peer, &original_peer_ip, &PeersWanted::AsManyAsPossible) + .handle_announcement(&info_hash, &mut peer, &original_peer_ip, &PeersWanted::AsManyAsPossible) .await .unwrap(); diff --git a/packages/torrent-repository/src/swarm.rs b/packages/torrent-repository/src/swarm.rs index b9076289b..84e1f2da4 100644 --- a/packages/torrent-repository/src/swarm.rs +++ b/packages/torrent-repository/src/swarm.rs @@ -33,17 +33,13 @@ impl Swarm { } } - pub async fn handle_announcement(&mut self, incoming_announce: &PeerAnnouncement) -> bool { - let mut downloads_increased: bool = false; - + pub async fn handle_announcement(&mut self, incoming_announce: &PeerAnnouncement) { let _previous_peer = match peer::ReadInfo::get_event(incoming_announce) { AnnounceEvent::Started | AnnounceEvent::None | AnnounceEvent::Completed => { - self.upsert_peer(Arc::new(*incoming_announce), &mut downloads_increased).await + self.upsert_peer(Arc::new(*incoming_announce)).await } AnnounceEvent::Stopped => self.remove_peer(&incoming_announce.peer_addr).await, }; - - downloads_increased } pub async fn remove_inactive(&mut self, current_cutoff: DurationSinceUnixEpoch) -> usize { @@ -159,26 +155,20 @@ impl Swarm { !self.should_be_removed(policy) } - async fn upsert_peer( - &mut self, - incoming_announce: Arc, - downloads_increased: &mut bool, - ) -> Option> { + async fn upsert_peer(&mut self, incoming_announce: Arc) -> Option> { let announcement = incoming_announce.clone(); if let Some(previous_announce) = self.peers.insert(incoming_announce.peer_addr, incoming_announce) { - *downloads_increased = self.update_metadata_on_update(&previous_announce, &announcement); + let downloads_increased = self.update_metadata_on_update(&previous_announce, &announcement); self.trigger_peer_updated_event(&previous_announce, &announcement).await; - if *downloads_increased { + if downloads_increased { self.trigger_peer_download_completed_event(&announcement).await; } Some(previous_announce) } else { - *downloads_increased = false; - self.update_metadata_on_insert(&announcement); self.trigger_peer_added_event(&announcement).await; @@ -362,36 +352,30 @@ mod tests { #[tokio::test] async fn it_should_allow_inserting_a_new_peer() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let peer = PeerBuilder::default().build(); - assert_eq!(swarm.upsert_peer(peer.into(), &mut downloads_increased).await, None); + assert_eq!(swarm.upsert_peer(peer.into()).await, None); } #[tokio::test] async fn it_should_allow_updating_a_preexisting_peer() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let peer = PeerBuilder::default().build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; - assert_eq!( - swarm.upsert_peer(peer.into(), &mut downloads_increased).await, - Some(Arc::new(peer)) - ); + assert_eq!(swarm.upsert_peer(peer.into()).await, Some(Arc::new(peer))); } #[tokio::test] async fn it_should_allow_getting_all_peers() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let peer = PeerBuilder::default().build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; assert_eq!(swarm.peers(None), [Arc::new(peer)]); } @@ -399,11 +383,10 @@ mod tests { #[tokio::test] async fn it_should_allow_getting_one_peer_by_id() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let peer = PeerBuilder::default().build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; assert_eq!(swarm.get(&peer.peer_addr), Some(Arc::new(peer)).as_ref()); } @@ -411,11 +394,10 @@ mod tests { #[tokio::test] async fn it_should_increase_the_number_of_peers_after_inserting_a_new_one() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let peer = PeerBuilder::default().build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; assert_eq!(swarm.len(), 1); } @@ -423,11 +405,10 @@ mod tests { #[tokio::test] async fn it_should_decrease_the_number_of_peers_after_removing_one() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let peer = PeerBuilder::default().build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; swarm.remove_peer(&peer.peer_addr).await; @@ -437,11 +418,10 @@ mod tests { #[tokio::test] async fn it_should_allow_removing_an_existing_peer() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let peer = PeerBuilder::default().build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; let old = swarm.remove_peer(&peer.peer_addr).await; @@ -461,19 +441,18 @@ mod tests { #[tokio::test] async fn it_should_allow_getting_all_peers_excluding_peers_with_a_given_address() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let peer1 = PeerBuilder::default() .with_peer_id(&PeerId(*b"-qB00000000000000001")) .with_peer_addr(&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 6969)) .build(); - swarm.upsert_peer(peer1.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer1.into()).await; let peer2 = PeerBuilder::default() .with_peer_id(&PeerId(*b"-qB00000000000000002")) .with_peer_addr(&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 6969)) .build(); - swarm.upsert_peer(peer2.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer2.into()).await; assert_eq!(swarm.peers_excluding(&peer2.peer_addr, None), [Arc::new(peer1)]); } @@ -481,13 +460,13 @@ mod tests { #[tokio::test] async fn it_should_count_inactive_peers() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; + let one_second = DurationSinceUnixEpoch::new(1, 0); // Insert the peer let last_update_time = DurationSinceUnixEpoch::new(1_669_397_478_934, 0); let peer = PeerBuilder::default().last_updated_on(last_update_time).build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; let inactive_peers_total = swarm.count_inactive_peers(last_update_time + one_second); @@ -497,13 +476,13 @@ mod tests { #[tokio::test] async fn it_should_remove_inactive_peers() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; + let one_second = DurationSinceUnixEpoch::new(1, 0); // Insert the peer let last_update_time = DurationSinceUnixEpoch::new(1_669_397_478_934, 0); let peer = PeerBuilder::default().last_updated_on(last_update_time).build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; // Remove peers not updated since one second after inserting the peer swarm.remove_inactive(last_update_time + one_second).await; @@ -514,13 +493,13 @@ mod tests { #[tokio::test] async fn it_should_not_remove_active_peers() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; + let one_second = DurationSinceUnixEpoch::new(1, 0); // Insert the peer let last_update_time = DurationSinceUnixEpoch::new(1_669_397_478_934, 0); let peer = PeerBuilder::default().last_updated_on(last_update_time).build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; // Remove peers not updated since one second before inserting the peer. swarm.remove_inactive(last_update_time - one_second).await; @@ -542,7 +521,7 @@ mod tests { async fn not_empty_swarm() -> Swarm { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - swarm.upsert_peer(PeerBuilder::default().build().into(), &mut false).await; + swarm.upsert_peer(PeerBuilder::default().build().into()).await; swarm } @@ -550,13 +529,12 @@ mod tests { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); let mut peer = PeerBuilder::leecher().build(); - let mut downloads_increased = false; - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; peer.event = aquatic_udp_protocol::AnnounceEvent::Completed; - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; assert!(swarm.metadata().downloads() > 0); @@ -631,17 +609,16 @@ mod tests { #[tokio::test] async fn it_should_allow_inserting_two_identical_peers_except_for_the_socket_address() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let peer1 = PeerBuilder::default() .with_peer_addr(&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 6969)) .build(); - swarm.upsert_peer(peer1.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer1.into()).await; let peer2 = PeerBuilder::default() .with_peer_addr(&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 6969)) .build(); - swarm.upsert_peer(peer2.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer2.into()).await; assert_eq!(swarm.len(), 2); } @@ -649,7 +626,6 @@ mod tests { #[tokio::test] async fn it_should_not_allow_inserting_two_peers_with_different_peer_id_but_the_same_socket_address() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; // When that happens the peer ID will be changed in the swarm. // In practice, it's like if the peer had changed its ID. @@ -658,13 +634,13 @@ mod tests { .with_peer_id(&PeerId(*b"-qB00000000000000001")) .with_peer_addr(&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 6969)) .build(); - swarm.upsert_peer(peer1.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer1.into()).await; let peer2 = PeerBuilder::default() .with_peer_id(&PeerId(*b"-qB00000000000000002")) .with_peer_addr(&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 6969)) .build(); - swarm.upsert_peer(peer2.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer2.into()).await; assert_eq!(swarm.len(), 1); } @@ -672,13 +648,12 @@ mod tests { #[tokio::test] async fn it_should_return_the_swarm_metadata() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let seeder = PeerBuilder::seeder().build(); let leecher = PeerBuilder::leecher().build(); - swarm.upsert_peer(seeder.into(), &mut downloads_increased).await; - swarm.upsert_peer(leecher.into(), &mut downloads_increased).await; + swarm.upsert_peer(seeder.into()).await; + swarm.upsert_peer(leecher.into()).await; assert_eq!( swarm.metadata(), @@ -693,13 +668,12 @@ mod tests { #[tokio::test] async fn it_should_return_the_number_of_seeders_in_the_list() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let seeder = PeerBuilder::seeder().build(); let leecher = PeerBuilder::leecher().build(); - swarm.upsert_peer(seeder.into(), &mut downloads_increased).await; - swarm.upsert_peer(leecher.into(), &mut downloads_increased).await; + swarm.upsert_peer(seeder.into()).await; + swarm.upsert_peer(leecher.into()).await; let (seeders, _leechers) = swarm.seeders_and_leechers(); @@ -709,13 +683,12 @@ mod tests { #[tokio::test] async fn it_should_return_the_number_of_leechers_in_the_list() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let seeder = PeerBuilder::seeder().build(); let leecher = PeerBuilder::leecher().build(); - swarm.upsert_peer(seeder.into(), &mut downloads_increased).await; - swarm.upsert_peer(leecher.into(), &mut downloads_increased).await; + swarm.upsert_peer(seeder.into()).await; + swarm.upsert_peer(leecher.into()).await; let (_seeders, leechers) = swarm.seeders_and_leechers(); @@ -739,13 +712,12 @@ mod tests { #[tokio::test] async fn it_should_increase_the_number_of_leechers_if_the_new_peer_is_a_leecher_() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let leechers = swarm.metadata().leechers(); let leecher = PeerBuilder::leecher().build(); - swarm.upsert_peer(leecher.into(), &mut downloads_increased).await; + swarm.upsert_peer(leecher.into()).await; assert_eq!(swarm.metadata().leechers(), leechers + 1); } @@ -753,13 +725,12 @@ mod tests { #[tokio::test] async fn it_should_increase_the_number_of_seeders_if_the_new_peer_is_a_seeder() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let seeders = swarm.metadata().seeders(); let seeder = PeerBuilder::seeder().build(); - swarm.upsert_peer(seeder.into(), &mut downloads_increased).await; + swarm.upsert_peer(seeder.into()).await; assert_eq!(swarm.metadata().seeders(), seeders + 1); } @@ -768,13 +739,12 @@ mod tests { async fn it_should_not_increasing_the_number_of_downloads_if_the_new_peer_has_completed_downloading_as_it_was_not_previously_known( ) { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let downloads = swarm.metadata().downloads(); let seeder = PeerBuilder::seeder().build(); - swarm.upsert_peer(seeder.into(), &mut downloads_increased).await; + swarm.upsert_peer(seeder.into()).await; assert_eq!(swarm.metadata().downloads(), downloads); } @@ -789,11 +759,10 @@ mod tests { #[tokio::test] async fn it_should_decrease_the_number_of_leechers_if_the_removed_peer_was_a_leecher() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let leecher = PeerBuilder::leecher().build(); - swarm.upsert_peer(leecher.into(), &mut downloads_increased).await; + swarm.upsert_peer(leecher.into()).await; let leechers = swarm.metadata().leechers(); @@ -805,11 +774,10 @@ mod tests { #[tokio::test] async fn it_should_decrease_the_number_of_seeders_if_the_removed_peer_was_a_seeder() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let seeder = PeerBuilder::seeder().build(); - swarm.upsert_peer(seeder.into(), &mut downloads_increased).await; + swarm.upsert_peer(seeder.into()).await; let seeders = swarm.metadata().seeders(); @@ -830,11 +798,10 @@ mod tests { #[tokio::test] async fn it_should_decrease_the_number_of_leechers_when_a_removed_peer_is_a_leecher() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let leecher = PeerBuilder::leecher().build(); - swarm.upsert_peer(leecher.into(), &mut downloads_increased).await; + swarm.upsert_peer(leecher.into()).await; let leechers = swarm.metadata().leechers(); @@ -846,11 +813,10 @@ mod tests { #[tokio::test] async fn it_should_decrease_the_number_of_seeders_when_the_removed_peer_is_a_seeder() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let seeder = PeerBuilder::seeder().build(); - swarm.upsert_peer(seeder.into(), &mut downloads_increased).await; + swarm.upsert_peer(seeder.into()).await; let seeders = swarm.metadata().seeders(); @@ -870,18 +836,17 @@ mod tests { #[tokio::test] async fn it_should_increase_seeders_and_decreasing_leechers_when_the_peer_changes_from_leecher_to_seeder_() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let mut peer = PeerBuilder::leecher().build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; let leechers = swarm.metadata().leechers(); let seeders = swarm.metadata().seeders(); peer.left = NumberOfBytes::new(0); // Convert to seeder - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; assert_eq!(swarm.metadata().seeders(), seeders + 1); assert_eq!(swarm.metadata().leechers(), leechers - 1); @@ -890,18 +855,17 @@ mod tests { #[tokio::test] async fn it_should_increase_leechers_and_decreasing_seeders_when_the_peer_changes_from_seeder_to_leecher() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let mut peer = PeerBuilder::seeder().build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; let leechers = swarm.metadata().leechers(); let seeders = swarm.metadata().seeders(); peer.left = NumberOfBytes::new(10); // Convert to leecher - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; assert_eq!(swarm.metadata().leechers(), leechers + 1); assert_eq!(swarm.metadata().seeders(), seeders - 1); @@ -910,17 +874,16 @@ mod tests { #[tokio::test] async fn it_should_increase_the_number_of_downloads_when_the_peer_announces_completed_downloading() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let mut peer = PeerBuilder::leecher().build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; let downloads = swarm.metadata().downloads(); peer.event = aquatic_udp_protocol::AnnounceEvent::Completed; - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; assert_eq!(swarm.metadata().downloads(), downloads + 1); } @@ -928,19 +891,18 @@ mod tests { #[tokio::test] async fn it_should_not_increasing_the_number_of_downloads_when_the_peer_announces_completed_downloading_twice_() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); - let mut downloads_increased = false; let mut peer = PeerBuilder::leecher().build(); - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; let downloads = swarm.metadata().downloads(); peer.event = aquatic_udp_protocol::AnnounceEvent::Completed; - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; assert_eq!(swarm.metadata().downloads(), downloads + 1); } @@ -971,8 +933,7 @@ mod tests { let mut swarm = Swarm::new(&sample_info_hash(), 0, Some(Arc::new(event_sender_mock))); - let mut downloads_increased = false; - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; } #[tokio::test] @@ -990,8 +951,7 @@ mod tests { let mut swarm = Swarm::new(&info_hash, 0, Some(Arc::new(event_sender_mock))); // Insert the peer - let mut downloads_increased = false; - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; swarm.remove_peer(&peer.peer_addr).await; } @@ -1011,8 +971,7 @@ mod tests { let mut swarm = Swarm::new(&info_hash, 0, Some(Arc::new(event_sender_mock))); // Insert the peer - let mut downloads_increased = false; - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; // Peers not updated after this time will be removed let current_cutoff = peer.updated + DurationSinceUnixEpoch::from_secs(1); @@ -1042,11 +1001,10 @@ mod tests { let mut swarm = Swarm::new(&info_hash, 0, Some(Arc::new(event_sender_mock))); // Insert the peer - let mut downloads_increased = false; - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; // Update the peer - swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(peer.into()).await; } #[tokio::test] @@ -1079,11 +1037,10 @@ mod tests { let mut swarm = Swarm::new(&info_hash, 0, Some(Arc::new(event_sender_mock))); // Insert the peer - let mut downloads_increased = false; - swarm.upsert_peer(started_peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(started_peer.into()).await; // Announce as completed - swarm.upsert_peer(completed_peer.into(), &mut downloads_increased).await; + swarm.upsert_peer(completed_peer.into()).await; } } } diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index 36f83070d..1504ac1f4 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -54,7 +54,7 @@ impl Swarms { info_hash: &InfoHash, peer: &peer::Peer, opt_persistent_torrent: Option, - ) -> Result { + ) -> Result<(), Error> { let swarm_handle = match self.swarms.get(info_hash) { None => { let number_of_downloads = opt_persistent_torrent.unwrap_or_default(); @@ -80,9 +80,9 @@ impl Swarms { let mut swarm = swarm_handle.value().lock().await; - let downloads_increased = swarm.handle_announcement(peer).await; + swarm.handle_announcement(peer).await; - Ok(downloads_increased) + Ok(()) } /// Inserts a new swarm. Only used for testing purposes. diff --git a/packages/tracker-core/Cargo.toml b/packages/tracker-core/Cargo.toml index ac1cee88d..3c89505b2 100644 --- a/packages/tracker-core/Cargo.toml +++ b/packages/tracker-core/Cargo.toml @@ -29,6 +29,7 @@ thiserror = "2" tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } torrust-tracker-clock = { version = "3.0.0-develop", path = "../clock" } torrust-tracker-configuration = { version = "3.0.0-develop", path = "../configuration" } +torrust-tracker-events = { version = "3.0.0-develop", path = "../events" } torrust-tracker-located-error = { version = "3.0.0-develop", path = "../located-error" } torrust-tracker-primitives = { version = "3.0.0-develop", path = "../primitives" } torrust-tracker-torrent-repository = { version = "3.0.0-develop", path = "../torrent-repository" } diff --git a/packages/tracker-core/src/announce_handler.rs b/packages/tracker-core/src/announce_handler.rs index a2e8db743..ffd244f2a 100644 --- a/packages/tracker-core/src/announce_handler.rs +++ b/packages/tracker-core/src/announce_handler.rs @@ -154,7 +154,7 @@ impl AnnounceHandler { /// /// Returns an error if the tracker is running in `listed` mode and the /// torrent is not whitelisted. - pub async fn announce( + pub async fn handle_announcement( &self, info_hash: &InfoHash, peer: &mut peer::Peer, @@ -163,6 +163,11 @@ impl AnnounceHandler { ) -> Result { self.whitelist_authorization.authorize(info_hash).await?; + // This will be removed in the future. + // See https://github.com/torrust/torrust-tracker/issues/1502 + // There will be a persisted metric for counting the total number of + // downloads across all torrents. The in-memory metric will count only + // the number of downloads during the current tracker uptime. let opt_persistent_torrent = if self.config.tracker_policy.persistent_torrent_completed_stat { self.db_torrent_repository.load(info_hash)? } else { @@ -171,15 +176,10 @@ impl AnnounceHandler { peer.change_ip(&assign_ip_address_to_peer(remote_client_ip, self.config.net.external_ip)); - let number_of_downloads_increased = self - .in_memory_torrent_repository - .upsert_peer(info_hash, peer, opt_persistent_torrent) + self.in_memory_torrent_repository + .handle_announcement(info_hash, peer, opt_persistent_torrent) .await; - if self.config.tracker_policy.persistent_torrent_completed_stat && number_of_downloads_increased { - self.db_torrent_repository.increase_number_of_downloads(info_hash)?; - } - Ok(self.build_announce_data(info_hash, peer, peers_wanted).await) } @@ -455,7 +455,7 @@ mod tests { let mut peer = sample_peer(); let announce_data = announce_handler - .announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::AsManyAsPossible) + .handle_announcement(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::AsManyAsPossible) .await .unwrap(); @@ -468,7 +468,7 @@ mod tests { let mut previously_announced_peer = sample_peer_1(); announce_handler - .announce( + .handle_announcement( &sample_info_hash(), &mut previously_announced_peer, &peer_ip(), @@ -479,7 +479,7 @@ mod tests { let mut peer = sample_peer_2(); let announce_data = announce_handler - .announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::AsManyAsPossible) + .handle_announcement(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::AsManyAsPossible) .await .unwrap(); @@ -492,7 +492,7 @@ mod tests { let mut previously_announced_peer_1 = sample_peer_1(); announce_handler - .announce( + .handle_announcement( &sample_info_hash(), &mut previously_announced_peer_1, &peer_ip(), @@ -503,7 +503,7 @@ mod tests { let mut previously_announced_peer_2 = sample_peer_2(); announce_handler - .announce( + .handle_announcement( &sample_info_hash(), &mut previously_announced_peer_2, &peer_ip(), @@ -514,7 +514,7 @@ mod tests { let mut peer = sample_peer_3(); let announce_data = announce_handler - .announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::only(1)) + .handle_announcement(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::only(1)) .await .unwrap(); @@ -539,7 +539,7 @@ mod tests { let mut peer = seeder(); let announce_data = announce_handler - .announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::AsManyAsPossible) + .handle_announcement(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::AsManyAsPossible) .await .unwrap(); @@ -553,7 +553,7 @@ mod tests { let mut peer = leecher(); let announce_data = announce_handler - .announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::AsManyAsPossible) + .handle_announcement(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::AsManyAsPossible) .await .unwrap(); @@ -567,7 +567,7 @@ mod tests { // We have to announce with "started" event because peer does not count if peer was not previously known let mut started_peer = started_peer(); announce_handler - .announce( + .handle_announcement( &sample_info_hash(), &mut started_peer, &peer_ip(), @@ -578,7 +578,7 @@ mod tests { let mut completed_peer = completed_peer(); let announce_data = announce_handler - .announce( + .handle_announcement( &sample_info_hash(), &mut completed_peer, &peer_ip(), @@ -593,83 +593,6 @@ mod tests { } } - mod handling_torrent_persistence { - - use std::sync::Arc; - - use aquatic_udp_protocol::AnnounceEvent; - use torrust_tracker_test_helpers::configuration; - use torrust_tracker_torrent_repository::Swarms; - - use crate::announce_handler::tests::the_announce_handler::peer_ip; - use crate::announce_handler::{AnnounceHandler, PeersWanted}; - use crate::databases::setup::initialize_database; - use crate::test_helpers::tests::{sample_info_hash, sample_peer}; - use crate::torrent::manager::TorrentsManager; - use crate::torrent::repository::in_memory::InMemoryTorrentRepository; - use crate::torrent::repository::persisted::DatabasePersistentTorrentRepository; - use crate::whitelist::authorization::WhitelistAuthorization; - use crate::whitelist::repository::in_memory::InMemoryWhitelist; - - #[tokio::test] - async fn it_should_persist_the_number_of_completed_peers_for_all_torrents_into_the_database() { - let mut config = configuration::ephemeral_public(); - - config.core.tracker_policy.persistent_torrent_completed_stat = true; - - let database = initialize_database(&config.core); - let swarms = Arc::new(Swarms::default()); - let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::new(swarms)); - let db_torrent_repository = Arc::new(DatabasePersistentTorrentRepository::new(&database)); - let torrents_manager = Arc::new(TorrentsManager::new( - &config.core, - &in_memory_torrent_repository, - &db_torrent_repository, - )); - let in_memory_whitelist = Arc::new(InMemoryWhitelist::default()); - let whitelist_authorization = Arc::new(WhitelistAuthorization::new(&config.core, &in_memory_whitelist.clone())); - let announce_handler = Arc::new(AnnounceHandler::new( - &config.core, - &whitelist_authorization, - &in_memory_torrent_repository, - &db_torrent_repository, - )); - - let info_hash = sample_info_hash(); - - let mut peer = sample_peer(); - - peer.event = AnnounceEvent::Started; - let announce_data = announce_handler - .announce(&info_hash, &mut peer, &peer_ip(), &PeersWanted::AsManyAsPossible) - .await - .unwrap(); - assert_eq!(announce_data.stats.downloaded, 0); - - peer.event = AnnounceEvent::Completed; - let announce_data = announce_handler - .announce(&info_hash, &mut peer, &peer_ip(), &PeersWanted::AsManyAsPossible) - .await - .unwrap(); - assert_eq!(announce_data.stats.downloaded, 1); - - // Remove the newly updated torrent from memory - let _unused = in_memory_torrent_repository.remove(&info_hash).await; - - torrents_manager.load_torrents_from_database().unwrap(); - - let torrent_entry = in_memory_torrent_repository - .get(&info_hash) - .expect("it should be able to get entry"); - - // It persists the number of completed peers. - assert_eq!(torrent_entry.lock().await.metadata().downloaded, 1); - - // It does not persist the peers - assert!(torrent_entry.lock().await.is_empty()); - } - } - mod should_allow_the_client_peers_to_specified_the_number_of_peers_wanted { use torrust_tracker_configuration::TORRENT_PEERS_LIMIT; diff --git a/packages/tracker-core/src/lib.rs b/packages/tracker-core/src/lib.rs index 82ebac3c6..5167abf51 100644 --- a/packages/tracker-core/src/lib.rs +++ b/packages/tracker-core/src/lib.rs @@ -124,6 +124,7 @@ pub mod container; pub mod databases; pub mod error; pub mod scrape_handler; +pub mod statistics; pub mod torrent; pub mod whitelist; @@ -156,6 +157,8 @@ pub(crate) type CurrentClock = clock::Working; #[allow(dead_code)] pub(crate) type CurrentClock = clock::Stopped; +pub const TRACKER_CORE_LOG_TARGET: &str = "TRACKER_CORE"; + #[cfg(test)] mod tests { mod the_tracker { @@ -200,7 +203,7 @@ mod tests { // Announce a "complete" peer for the torrent let mut complete_peer = complete_peer(); announce_handler - .announce( + .handle_announcement( &info_hash, &mut complete_peer, &IpAddr::V4(Ipv4Addr::new(126, 0, 0, 10)), @@ -212,7 +215,7 @@ mod tests { // Announce an "incomplete" peer for the torrent let mut incomplete_peer = incomplete_peer(); announce_handler - .announce( + .handle_announcement( &info_hash, &mut incomplete_peer, &IpAddr::V4(Ipv4Addr::new(126, 0, 0, 11)), @@ -222,7 +225,7 @@ mod tests { .unwrap(); // Scrape - let scrape_data = scrape_handler.scrape(&vec![info_hash]).await.unwrap(); + let scrape_data = scrape_handler.handle_scrape(&vec![info_hash]).await.unwrap(); // The expected swarm metadata for the torrent let mut expected_scrape_data = ScrapeData::empty(); @@ -256,7 +259,7 @@ mod tests { let non_whitelisted_info_hash = "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0".parse::().unwrap(); // DevSkim: ignore DS173237 - let scrape_data = scrape_handler.scrape(&vec![non_whitelisted_info_hash]).await.unwrap(); + let scrape_data = scrape_handler.handle_scrape(&vec![non_whitelisted_info_hash]).await.unwrap(); // The expected zeroed swarm metadata for the file let mut expected_scrape_data = ScrapeData::empty(); diff --git a/packages/tracker-core/src/scrape_handler.rs b/packages/tracker-core/src/scrape_handler.rs index 443d989a6..9c94a4e50 100644 --- a/packages/tracker-core/src/scrape_handler.rs +++ b/packages/tracker-core/src/scrape_handler.rs @@ -107,7 +107,7 @@ impl ScrapeHandler { /// # BEP Reference: /// /// [BEP 48: Scrape Protocol](https://www.bittorrent.org/beps/bep_0048.html) - pub async fn scrape(&self, info_hashes: &Vec) -> Result { + pub async fn handle_scrape(&self, info_hashes: &Vec) -> Result { let mut scrape_data = ScrapeData::empty(); for info_hash in info_hashes { @@ -158,7 +158,7 @@ mod tests { let info_hashes = vec!["3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0".parse::().unwrap()]; // DevSkim: ignore DS173237 - let scrape_data = scrape_handler.scrape(&info_hashes).await.unwrap(); + let scrape_data = scrape_handler.handle_scrape(&info_hashes).await.unwrap(); let mut expected_scrape_data = ScrapeData::empty(); @@ -176,7 +176,7 @@ mod tests { "99c82bb73505a3c0b453f9fa0e881d6e5a32a0c1".parse::().unwrap(), // DevSkim: ignore DS173237 ]; - let scrape_data = scrape_handler.scrape(&info_hashes).await.unwrap(); + let scrape_data = scrape_handler.handle_scrape(&info_hashes).await.unwrap(); let mut expected_scrape_data = ScrapeData::empty(); expected_scrape_data.add_file_with_zeroed_metadata(&info_hashes[0]); diff --git a/packages/tracker-core/src/statistics/event/handler.rs b/packages/tracker-core/src/statistics/event/handler.rs new file mode 100644 index 000000000..7b6ce83b7 --- /dev/null +++ b/packages/tracker-core/src/statistics/event/handler.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use torrust_tracker_primitives::DurationSinceUnixEpoch; +use torrust_tracker_torrent_repository::event::Event; + +use crate::torrent::repository::persisted::DatabasePersistentTorrentRepository; + +pub async fn handle_event( + event: Event, + db_torrent_repository: &Arc, + _now: DurationSinceUnixEpoch, +) { + match event { + // Torrent events + Event::TorrentAdded { info_hash, .. } => { + tracing::debug!(info_hash = ?info_hash, "Torrent added",); + } + Event::TorrentRemoved { info_hash } => { + tracing::debug!(info_hash = ?info_hash, "Torrent removed",); + } + + // Peer events + Event::PeerAdded { info_hash, peer } => { + tracing::debug!(info_hash = ?info_hash, peer = ?peer, "Peer added", ); + } + Event::PeerRemoved { info_hash, peer } => { + tracing::debug!(info_hash = ?info_hash, peer = ?peer, "Peer removed", ); + } + Event::PeerUpdated { + info_hash, + old_peer, + new_peer, + } => { + tracing::debug!(info_hash = ?info_hash, old_peer = ?old_peer, new_peer = ?new_peer, "Peer updated"); + } + Event::PeerDownloadCompleted { info_hash, peer } => { + tracing::debug!(info_hash = ?info_hash, peer = ?peer, "Peer download completed", ); + + match db_torrent_repository.increase_number_of_downloads(&info_hash) { + Ok(()) => { + tracing::debug!(info_hash = ?info_hash, "Number of downloads increased"); + } + Err(err) => { + tracing::error!(info_hash = ?info_hash, error = ?err, "Failed to increase number of downloads"); + } + } + } + } +} diff --git a/packages/tracker-core/src/statistics/event/listener.rs b/packages/tracker-core/src/statistics/event/listener.rs new file mode 100644 index 000000000..e04675092 --- /dev/null +++ b/packages/tracker-core/src/statistics/event/listener.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use tokio::task::JoinHandle; +use torrust_tracker_clock::clock::Time; +use torrust_tracker_events::receiver::RecvError; +use torrust_tracker_torrent_repository::event::receiver::Receiver; + +use super::handler::handle_event; +use crate::torrent::repository::persisted::DatabasePersistentTorrentRepository; +use crate::{CurrentClock, TRACKER_CORE_LOG_TARGET}; + +#[must_use] +pub fn run_event_listener( + receiver: Receiver, + db_torrent_repository: &Arc, +) -> JoinHandle<()> { + let db_torrent_repository: Arc = db_torrent_repository.clone(); + + tracing::info!(target: TRACKER_CORE_LOG_TARGET, "Starting torrent repository event listener"); + + tokio::spawn(async move { + dispatch_events(receiver, db_torrent_repository).await; + + tracing::info!(target: TRACKER_CORE_LOG_TARGET, "Torrent repository listener finished"); + }) +} + +async fn dispatch_events(mut receiver: Receiver, db_torrent_repository: Arc) { + let shutdown_signal = tokio::signal::ctrl_c(); + + tokio::pin!(shutdown_signal); + + loop { + tokio::select! { + biased; + + _ = &mut shutdown_signal => { + tracing::info!(target: TRACKER_CORE_LOG_TARGET, "Received Ctrl+C, shutting down torrent repository event listener"); + break; + } + + result = receiver.recv() => { + match result { + Ok(event) => handle_event(event, &db_torrent_repository, CurrentClock::now()).await, + Err(e) => { + match e { + RecvError::Closed => { + tracing::info!(target: TRACKER_CORE_LOG_TARGET, "Torrent repository event receiver closed"); + break; + } + RecvError::Lagged(n) => { + tracing::warn!(target: TRACKER_CORE_LOG_TARGET, "Torrent repository event receiver lagged by {} events", n); + } + } + } + } + } + } + } +} diff --git a/packages/tracker-core/src/statistics/event/mod.rs b/packages/tracker-core/src/statistics/event/mod.rs new file mode 100644 index 000000000..dae683398 --- /dev/null +++ b/packages/tracker-core/src/statistics/event/mod.rs @@ -0,0 +1,2 @@ +pub mod handler; +pub mod listener; diff --git a/packages/tracker-core/src/statistics/mod.rs b/packages/tracker-core/src/statistics/mod.rs new file mode 100644 index 000000000..53f112654 --- /dev/null +++ b/packages/tracker-core/src/statistics/mod.rs @@ -0,0 +1 @@ +pub mod event; diff --git a/packages/tracker-core/src/torrent/manager.rs b/packages/tracker-core/src/torrent/manager.rs index bf73f7e8b..d9997c4ad 100644 --- a/packages/tracker-core/src/torrent/manager.rs +++ b/packages/tracker-core/src/torrent/manager.rs @@ -74,6 +74,8 @@ impl TorrentsManager { pub fn load_torrents_from_database(&self) -> Result<(), databases::error::Error> { let persistent_torrents = self.db_torrent_repository.load_all()?; + println!("Loaded {} persistent torrents from the database", persistent_torrents.len()); + self.in_memory_torrent_repository.import_persistent(&persistent_torrents); Ok(()) @@ -237,9 +239,9 @@ mod tests { // Add a peer to the torrent let mut peer = sample_peer(); peer.updated = DurationSinceUnixEpoch::new(0, 0); - let _number_of_downloads_increased = services + services .in_memory_torrent_repository - .upsert_peer(&infohash, &peer, None) + .handle_announcement(&infohash, &peer, None) .await; // Simulate the time has passed 1 second more than the max peer timeout. @@ -257,7 +259,7 @@ mod tests { // Add a peer to the torrent let mut peer = sample_peer(); peer.updated = DurationSinceUnixEpoch::new(0, 0); - let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(infohash, &peer, None).await; + in_memory_torrent_repository.handle_announcement(infohash, &peer, None).await; // Remove the peer. The torrent is now peerless. in_memory_torrent_repository diff --git a/packages/tracker-core/src/torrent/repository/in_memory.rs b/packages/tracker-core/src/torrent/repository/in_memory.rs index 311480306..5c8a335b6 100644 --- a/packages/tracker-core/src/torrent/repository/in_memory.rs +++ b/packages/tracker-core/src/torrent/repository/in_memory.rs @@ -48,36 +48,16 @@ impl InMemoryTorrentRepository { /// # Panics /// /// This function panics if the underling swarms return an error. - #[must_use] - pub async fn upsert_peer( + pub async fn handle_announcement( &self, info_hash: &InfoHash, peer: &peer::Peer, opt_persistent_torrent: Option, - ) -> bool { + ) { self.swarms .handle_announcement(info_hash, peer, opt_persistent_torrent) .await - .expect("Failed to upsert the peer in swarms") - } - - /// Removes a torrent entry from the repository. - /// - /// This method is only available in tests. It removes the torrent entry - /// associated with the given info hash and returns the removed entry if it - /// existed. - /// - /// # Arguments - /// - /// * `key` - The info hash of the torrent to remove. - /// - /// # Returns - /// - /// An `Option` containing the removed torrent entry if it existed. - #[cfg(test)] - #[must_use] - pub(crate) async fn remove(&self, key: &InfoHash) -> Option { - self.swarms.remove(key).await + .expect("Failed to upsert the peer in swarms"); } /// Removes inactive peers from all torrent entries. diff --git a/packages/tracker-core/src/torrent/services.rs b/packages/tracker-core/src/torrent/services.rs index 97694a80f..2ae51fc78 100644 --- a/packages/tracker-core/src/torrent/services.rs +++ b/packages/tracker-core/src/torrent/services.rs @@ -251,8 +251,8 @@ mod tests { let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); // DevSkim: ignore DS173237 let info_hash = InfoHash::from_str(&hash).unwrap(); - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash, &sample_peer(), None) + in_memory_torrent_repository + .handle_announcement(&info_hash, &sample_peer(), None) .await; let torrent_info = get_torrent_info(&in_memory_torrent_repository, &info_hash).await.unwrap(); @@ -297,8 +297,8 @@ mod tests { let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); // DevSkim: ignore DS173237 let info_hash = InfoHash::from_str(&hash).unwrap(); - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash, &sample_peer(), None) + in_memory_torrent_repository + .handle_announcement(&info_hash, &sample_peer(), None) .await; let torrents = get_torrents_page(&in_memory_torrent_repository, Some(&Pagination::default())).await; @@ -324,11 +324,11 @@ mod tests { let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); // DevSkim: ignore DS173237 let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash1, &sample_peer(), None) + in_memory_torrent_repository + .handle_announcement(&info_hash1, &sample_peer(), None) .await; - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash2, &sample_peer(), None) + in_memory_torrent_repository + .handle_announcement(&info_hash2, &sample_peer(), None) .await; let offset = 0; @@ -349,11 +349,11 @@ mod tests { let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); // DevSkim: ignore DS173237 let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash1, &sample_peer(), None) + in_memory_torrent_repository + .handle_announcement(&info_hash1, &sample_peer(), None) .await; - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash2, &sample_peer(), None) + in_memory_torrent_repository + .handle_announcement(&info_hash2, &sample_peer(), None) .await; let offset = 1; @@ -379,14 +379,14 @@ mod tests { let hash1 = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); // DevSkim: ignore DS173237 let info_hash1 = InfoHash::from_str(&hash1).unwrap(); - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash1, &sample_peer(), None) + in_memory_torrent_repository + .handle_announcement(&info_hash1, &sample_peer(), None) .await; let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); // DevSkim: ignore DS173237 let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash2, &sample_peer(), None) + in_memory_torrent_repository + .handle_announcement(&info_hash2, &sample_peer(), None) .await; let torrents = get_torrents_page(&in_memory_torrent_repository, Some(&Pagination::default())).await; @@ -435,8 +435,8 @@ mod tests { let info_hash = sample_info_hash(); - let _ = in_memory_torrent_repository - .upsert_peer(&info_hash, &sample_peer(), None) + in_memory_torrent_repository + .handle_announcement(&info_hash, &sample_peer(), None) .await; let torrent_info = get_torrents(&in_memory_torrent_repository, &[info_hash]).await; diff --git a/packages/tracker-core/tests/common/fixtures.rs b/packages/tracker-core/tests/common/fixtures.rs new file mode 100644 index 000000000..ea9c93a65 --- /dev/null +++ b/packages/tracker-core/tests/common/fixtures.rs @@ -0,0 +1,52 @@ +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::str::FromStr; + +use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes, PeerId}; +use bittorrent_primitives::info_hash::InfoHash; +use torrust_tracker_configuration::Core; +use torrust_tracker_primitives::peer::Peer; +use torrust_tracker_primitives::DurationSinceUnixEpoch; +use torrust_tracker_test_helpers::configuration::ephemeral_sqlite_database; + +/// # Panics +/// +/// Will panic if the temporary file path is not a valid UTF-8 string. +#[must_use] +pub fn ephemeral_configuration() -> Core { + let mut config = Core::default(); + + let temp_file = ephemeral_sqlite_database(); + temp_file.to_str().unwrap().clone_into(&mut config.database.path); + + config +} + +/// # Panics +/// +/// Will panic if the string representation of the info hash is not a valid infohash. +#[must_use] +pub fn sample_info_hash() -> InfoHash { + "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0" // DevSkim: ignore DS173237 + .parse::() + .expect("String should be a valid info hash") +} + +/// Sample peer whose state is not relevant for the tests. +#[must_use] +pub fn sample_peer() -> Peer { + Peer { + peer_id: PeerId(*b"-qB00000000000000000"), + peer_addr: SocketAddr::new(remote_client_ip(), 8080), + updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0), + uploaded: NumberOfBytes::new(0), + downloaded: NumberOfBytes::new(0), + left: NumberOfBytes::new(0), // No bytes left to download + event: AnnounceEvent::Completed, + } +} + +// The client peer IP. +#[must_use] +pub fn remote_client_ip() -> IpAddr { + IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap()) +} diff --git a/packages/tracker-core/tests/common/mod.rs b/packages/tracker-core/tests/common/mod.rs new file mode 100644 index 000000000..414e9d7b5 --- /dev/null +++ b/packages/tracker-core/tests/common/mod.rs @@ -0,0 +1,2 @@ +pub mod fixtures; +pub mod test_env; diff --git a/packages/tracker-core/tests/common/test_env.rs b/packages/tracker-core/tests/common/test_env.rs new file mode 100644 index 000000000..d4462e3f6 --- /dev/null +++ b/packages/tracker-core/tests/common/test_env.rs @@ -0,0 +1,137 @@ +use std::net::IpAddr; +use std::sync::Arc; + +use aquatic_udp_protocol::AnnounceEvent; +use bittorrent_primitives::info_hash::InfoHash; +use bittorrent_tracker_core::announce_handler::PeersWanted; +use bittorrent_tracker_core::container::TrackerCoreContainer; +use tokio::task::yield_now; +use torrust_tracker_configuration::Core; +use torrust_tracker_primitives::core::{AnnounceData, ScrapeData}; +use torrust_tracker_primitives::peer::Peer; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_torrent_repository::container::TorrentRepositoryContainer; + +pub struct TestEnv { + pub torrent_repository_container: Arc, + pub tracker_core_container: Arc, +} + +impl TestEnv { + #[must_use] + pub async fn started(core_config: Core) -> Self { + let test_env = TestEnv::new(core_config); + test_env.start().await; + test_env + } + + #[must_use] + pub fn new(core_config: Core) -> Self { + let core_config = Arc::new(core_config); + + let torrent_repository_container = Arc::new(TorrentRepositoryContainer::initialize( + core_config.tracker_usage_statistics.into(), + )); + + let tracker_core_container = Arc::new(TrackerCoreContainer::initialize_from( + &core_config, + &torrent_repository_container, + )); + + Self { + torrent_repository_container, + tracker_core_container, + } + } + + pub async fn start(&self) { + let mut jobs = vec![]; + + let job = torrust_tracker_torrent_repository::statistics::event::listener::run_event_listener( + self.torrent_repository_container.event_bus.receiver(), + &self.torrent_repository_container.stats_repository, + ); + + jobs.push(job); + + let job = bittorrent_tracker_core::statistics::event::listener::run_event_listener( + self.torrent_repository_container.event_bus.receiver(), + &self.tracker_core_container.db_torrent_repository, + ); + + jobs.push(job); + + // Give the event listeners some time to start + // todo: they should notify when they are ready + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + pub async fn announce_peer_started( + &mut self, + mut peer: Peer, + remote_client_ip: &IpAddr, + info_hash: &InfoHash, + ) -> AnnounceData { + peer.event = AnnounceEvent::Started; + + let announce_data = self + .tracker_core_container + .announce_handler + .handle_announcement(info_hash, &mut peer, remote_client_ip, &PeersWanted::AsManyAsPossible) + .await + .unwrap(); + + // Give time to the event listeners to process the event + yield_now().await; + + announce_data + } + + pub async fn announce_peer_completed( + &mut self, + mut peer: Peer, + remote_client_ip: &IpAddr, + info_hash: &InfoHash, + ) -> AnnounceData { + peer.event = AnnounceEvent::Completed; + + let announce_data = self + .tracker_core_container + .announce_handler + .handle_announcement(info_hash, &mut peer, remote_client_ip, &PeersWanted::AsManyAsPossible) + .await + .unwrap(); + + // Give time to the event listeners to process the event + yield_now().await; + + announce_data + } + + pub async fn scrape(&self, info_hash: &InfoHash) -> ScrapeData { + self.tracker_core_container + .scrape_handler + .handle_scrape(&vec![*info_hash]) + .await + .unwrap() + } + + pub async fn increase_number_of_downloads(&mut self, peer: Peer, remote_client_ip: &IpAddr, info_hash: &InfoHash) { + let _announce_data = self.announce_peer_started(peer, remote_client_ip, info_hash).await; + let announce_data = self.announce_peer_completed(peer, remote_client_ip, info_hash).await; + + assert_eq!(announce_data.stats.downloads(), 1); + } + + pub async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrent_repository_container + .swarms + .get_swarm_metadata(info_hash) + .await + .unwrap() + } + + pub async fn remove_swarm(&self, info_hash: &InfoHash) { + self.torrent_repository_container.swarms.remove(info_hash).await.unwrap(); + } +} diff --git a/packages/tracker-core/tests/integration.rs b/packages/tracker-core/tests/integration.rs index 5aaded10a..d24acf67b 100644 --- a/packages/tracker-core/tests/integration.rs +++ b/packages/tracker-core/tests/integration.rs @@ -1,135 +1,88 @@ -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::str::FromStr; -use std::sync::Arc; - -use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes, PeerId}; -use bittorrent_primitives::info_hash::InfoHash; -use bittorrent_tracker_core::announce_handler::{AnnounceHandler, PeersWanted}; -use bittorrent_tracker_core::databases::setup::initialize_database; -use bittorrent_tracker_core::scrape_handler::ScrapeHandler; -use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository; -use bittorrent_tracker_core::torrent::repository::persisted::DatabasePersistentTorrentRepository; -use bittorrent_tracker_core::whitelist; -use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist; -use torrust_tracker_configuration::Core; -use torrust_tracker_primitives::peer::Peer; -use torrust_tracker_primitives::DurationSinceUnixEpoch; -use torrust_tracker_test_helpers::configuration::ephemeral_sqlite_database; - -/// # Panics -/// -/// Will panic if the temporary file path is not a valid UTF-8 string. -#[must_use] -pub fn ephemeral_configuration() -> Core { - let mut config = Core::default(); - - let temp_file = ephemeral_sqlite_database(); - temp_file.to_str().unwrap().clone_into(&mut config.database.path); - - config -} +mod common; -/// # Panics -/// -/// Will panic if the string representation of the info hash is not a valid infohash. -#[must_use] -pub fn sample_info_hash() -> InfoHash { - "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0" // DevSkim: ignore DS173237 - .parse::() - .expect("String should be a valid info hash") -} +use common::fixtures::{ephemeral_configuration, remote_client_ip, sample_info_hash, sample_peer}; +use common::test_env::TestEnv; +use torrust_tracker_configuration::AnnouncePolicy; +use torrust_tracker_primitives::core::AnnounceData; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; -/// Sample peer whose state is not relevant for the tests. -#[must_use] -pub fn sample_peer() -> Peer { - Peer { - peer_id: PeerId(*b"-qB00000000000000000"), - peer_addr: SocketAddr::new(remote_client_ip(), 8080), - updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0), - uploaded: NumberOfBytes::new(0), - downloaded: NumberOfBytes::new(0), - left: NumberOfBytes::new(0), // No bytes left to download - event: AnnounceEvent::Completed, - } +#[tokio::test] +async fn it_should_handle_the_announce_request() { + let mut test_env = TestEnv::started(ephemeral_configuration()).await; + + let announce_data = test_env + .announce_peer_started(sample_peer(), &remote_client_ip(), &sample_info_hash()) + .await; + + assert_eq!( + announce_data, + AnnounceData { + peers: vec![], + stats: SwarmMetadata { + downloaded: 0, + complete: 1, + incomplete: 0 + }, + policy: AnnouncePolicy { + interval: 120, + interval_min: 120 + } + } + ); } -// The client peer IP. -#[must_use] -fn remote_client_ip() -> IpAddr { - IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap()) -} +#[tokio::test] +async fn it_should_not_return_the_peer_making_the_announce_request() { + let mut test_env = TestEnv::started(ephemeral_configuration()).await; -struct Container { - pub announce_handler: Arc, - pub scrape_handler: Arc, -} + let announce_data = test_env + .announce_peer_started(sample_peer(), &remote_client_ip(), &sample_info_hash()) + .await; -impl Container { - pub fn initialize(config: &Core) -> Self { - let database = initialize_database(config); - let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default()); - let db_torrent_repository = Arc::new(DatabasePersistentTorrentRepository::new(&database)); - let in_memory_whitelist = Arc::new(InMemoryWhitelist::default()); - let whitelist_authorization = Arc::new(whitelist::authorization::WhitelistAuthorization::new( - config, - &in_memory_whitelist.clone(), - )); - let announce_handler = Arc::new(AnnounceHandler::new( - config, - &whitelist_authorization, - &in_memory_torrent_repository, - &db_torrent_repository, - )); - let scrape_handler = Arc::new(ScrapeHandler::new(&whitelist_authorization, &in_memory_torrent_repository)); - - Self { - announce_handler, - scrape_handler, - } - } + assert_eq!(announce_data.peers.len(), 0); } #[tokio::test] -async fn test_announce_and_scrape_requests() { - let config = ephemeral_configuration(); - - let container = Container::initialize(&config); +async fn it_should_handle_the_scrape_request() { + let mut test_env = TestEnv::started(ephemeral_configuration()).await; let info_hash = sample_info_hash(); - let mut peer = sample_peer(); + let _announce_data = test_env + .announce_peer_started(sample_peer(), &remote_client_ip(), &info_hash) + .await; - // Announce + let scrape_data = test_env.scrape(&info_hash).await; - // First announce: download started - peer.event = AnnounceEvent::Started; - let announce_data = container - .announce_handler - .announce(&info_hash, &mut peer, &remote_client_ip(), &PeersWanted::AsManyAsPossible) - .await - .unwrap(); + assert!(scrape_data.files.contains_key(&info_hash)); +} - // NOTICE: you don't get back the peer making the request. - assert_eq!(announce_data.peers.len(), 0); - assert_eq!(announce_data.stats.downloaded, 0); - - // Second announce: download completed - peer.event = AnnounceEvent::Completed; - let announce_data = container - .announce_handler - .announce(&info_hash, &mut peer, &remote_client_ip(), &PeersWanted::AsManyAsPossible) - .await - .unwrap(); +#[tokio::test] +async fn it_should_persist_the_number_of_completed_peers_for_all_torrents_into_the_database() { + let mut core_config = ephemeral_configuration(); + core_config.tracker_policy.persistent_torrent_completed_stat = true; - assert_eq!(announce_data.peers.len(), 0); - assert_eq!(announce_data.stats.downloaded, 1); + let mut test_env = TestEnv::started(core_config).await; - // Scrape + let info_hash = sample_info_hash(); - let scrape_data = container.scrape_handler.scrape(&vec![info_hash]).await.unwrap(); + test_env + .increase_number_of_downloads(sample_peer(), &remote_client_ip(), &info_hash) + .await; - assert!(scrape_data.files.contains_key(&info_hash)); -} + assert!(test_env.get_swarm_metadata(&info_hash).await.unwrap().downloads() == 1); + + test_env.remove_swarm(&info_hash).await; + + // Ensure the swarm metadata is removed + assert!(test_env.get_swarm_metadata(&info_hash).await.is_none()); -#[test] -fn test_scrape_request() {} + // Load torrents from the database to ensure the completed stats are persisted + test_env + .tracker_core_container + .torrents_manager + .load_torrents_from_database() + .unwrap(); + + assert!(test_env.get_swarm_metadata(&info_hash).await.unwrap().downloads() == 1); +} diff --git a/packages/udp-tracker-core/src/services/announce.rs b/packages/udp-tracker-core/src/services/announce.rs index 6ea237d84..a69e91d8a 100644 --- a/packages/udp-tracker-core/src/services/announce.rs +++ b/packages/udp-tracker-core/src/services/announce.rs @@ -78,7 +78,7 @@ impl AnnounceService { let announce_data = self .announce_handler - .announce(&info_hash, &mut peer, &remote_client_ip, &peers_wanted) + .handle_announcement(&info_hash, &mut peer, &remote_client_ip, &peers_wanted) .await?; self.send_event(info_hash, peer, client_socket_addr, server_service_binding) diff --git a/packages/udp-tracker-core/src/services/scrape.rs b/packages/udp-tracker-core/src/services/scrape.rs index b42004f63..8551351fb 100644 --- a/packages/udp-tracker-core/src/services/scrape.rs +++ b/packages/udp-tracker-core/src/services/scrape.rs @@ -56,7 +56,7 @@ impl ScrapeService { let scrape_data = self .scrape_handler - .scrape(&Self::convert_from_aquatic(&request.info_hashes)) + .handle_scrape(&Self::convert_from_aquatic(&request.info_hashes)) .await?; self.send_event(client_socket_addr, server_service_binding).await; diff --git a/packages/udp-tracker-server/src/environment.rs b/packages/udp-tracker-server/src/environment.rs index f92d5dd29..94a166e4e 100644 --- a/packages/udp-tracker-server/src/environment.rs +++ b/packages/udp-tracker-server/src/environment.rs @@ -35,11 +35,10 @@ where /// Add a torrent to the tracker #[allow(dead_code)] pub async fn add_torrent(&self, info_hash: &InfoHash, peer: &peer::Peer) { - let _number_of_downloads_increased = self - .container + self.container .tracker_core_container .in_memory_torrent_repository - .upsert_peer(info_hash, peer, None) + .handle_announcement(info_hash, peer, None) .await; } } diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index 567f43740..e2ca6821e 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -369,8 +369,8 @@ mod tests { .with_peer_address(SocketAddr::new(IpAddr::V6(client_ip_v6), client_port)) .into(); - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash.0.into(), &peer_using_ipv6, None) + in_memory_torrent_repository + .handle_announcement(&info_hash.0.into(), &peer_using_ipv6, None) .await; } @@ -713,8 +713,8 @@ mod tests { .with_peer_address(SocketAddr::new(IpAddr::V4(client_ip_v4), client_port)) .into(); - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash.0.into(), &peer_using_ipv4, None) + in_memory_torrent_repository + .handle_announcement(&info_hash.0.into(), &peer_using_ipv4, None) .await; } diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index a9462e0f9..8bac05c1e 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -165,8 +165,8 @@ mod tests { .with_bytes_left_to_download(0) .into(); - let _number_of_downloads_increased = in_memory_torrent_repository - .upsert_peer(&info_hash.0.into(), &peer, None) + in_memory_torrent_repository + .handle_announcement(&info_hash.0.into(), &peer, None) .await; } diff --git a/src/app.rs b/src/app.rs index 5180e4583..5037ad761 100644 --- a/src/app.rs +++ b/src/app.rs @@ -75,6 +75,7 @@ async fn start_jobs(config: &Configuration, app_container: &Arc) - let mut job_manager = JobManager::new(); start_torrent_repository_event_listener(config, app_container, &mut job_manager); + start_tracker_core_event_listener(config, app_container, &mut job_manager); start_http_core_event_listener(config, app_container, &mut job_manager); start_udp_core_event_listener(config, app_container, &mut job_manager); start_udp_server_event_listener(config, app_container, &mut job_manager); @@ -138,35 +139,38 @@ fn start_torrent_repository_event_listener( app_container: &Arc, job_manager: &mut JobManager, ) { - let opt_handle = jobs::torrent_repository::start_event_listener(config, app_container); + job_manager.push_opt( + "torrent_repository_event_listener", + jobs::torrent_repository::start_event_listener(config, app_container), + ); +} - if let Some(handle) = opt_handle { - job_manager.push("torrent_repository_event_listener", handle); - } +fn start_tracker_core_event_listener(config: &Configuration, app_container: &Arc, job_manager: &mut JobManager) { + job_manager.push_opt( + "tracker_core_event_listener", + jobs::tracker_core::start_event_listener(config, app_container), + ); } fn start_http_core_event_listener(config: &Configuration, app_container: &Arc, job_manager: &mut JobManager) { - let opt_handle = jobs::http_tracker_core::start_event_listener(config, app_container); - - if let Some(handle) = opt_handle { - job_manager.push("http_core_event_listener", handle); - } + job_manager.push_opt( + "http_core_event_listener", + jobs::http_tracker_core::start_event_listener(config, app_container), + ); } fn start_udp_core_event_listener(config: &Configuration, app_container: &Arc, job_manager: &mut JobManager) { - let opt_handle = jobs::udp_tracker_core::start_event_listener(config, app_container); - - if let Some(handle) = opt_handle { - job_manager.push("udp_core_event_listener", handle); - } + job_manager.push_opt( + "udp_core_event_listener", + jobs::udp_tracker_core::start_event_listener(config, app_container), + ); } fn start_udp_server_event_listener(config: &Configuration, app_container: &Arc, job_manager: &mut JobManager) { - let opt_handle = jobs::udp_tracker_server::start_event_listener(config, app_container); - - if let Some(handle) = opt_handle { - job_manager.push("udp_server_event_listener", handle); - } + job_manager.push_opt( + "udp_server_event_listener", + jobs::udp_tracker_server::start_event_listener(config, app_container), + ); } async fn start_the_udp_instances(config: &Configuration, app_container: &Arc, job_manager: &mut JobManager) { diff --git a/src/bootstrap/jobs/manager.rs b/src/bootstrap/jobs/manager.rs index 5beab3224..53733844b 100644 --- a/src/bootstrap/jobs/manager.rs +++ b/src/bootstrap/jobs/manager.rs @@ -36,6 +36,12 @@ impl JobManager { self.jobs.push(Job::new(name, handle)); } + pub fn push_opt>(&mut self, name: N, handle: Option>) { + if let Some(handle) = handle { + self.push(name, handle); + } + } + /// Waits sequentially for all jobs to complete, with a graceful timeout per /// job. pub async fn wait_for_all(mut self, grace_period: Duration) { diff --git a/src/bootstrap/jobs/mod.rs b/src/bootstrap/jobs/mod.rs index c8d7a8598..0e9c912af 100644 --- a/src/bootstrap/jobs/mod.rs +++ b/src/bootstrap/jobs/mod.rs @@ -14,6 +14,7 @@ pub mod manager; pub mod torrent_cleanup; pub mod torrent_repository; pub mod tracker_apis; +pub mod tracker_core; pub mod udp_tracker; pub mod udp_tracker_core; pub mod udp_tracker_server; diff --git a/src/bootstrap/jobs/torrent_repository.rs b/src/bootstrap/jobs/torrent_repository.rs index 2125de554..ea0d215ee 100644 --- a/src/bootstrap/jobs/torrent_repository.rs +++ b/src/bootstrap/jobs/torrent_repository.rs @@ -14,7 +14,7 @@ pub fn start_event_listener(config: &Configuration, app_container: &Arc) -> Option> { + // todo: enable this when labeled metrics are implemented. + //if config.core.tracker_usage_statistics || config.core.tracker_policy.persistent_torrent_completed_stat { + if config.core.tracker_policy.persistent_torrent_completed_stat { + let job = bittorrent_tracker_core::statistics::event::listener::run_event_listener( + app_container.torrent_repository_container.event_bus.receiver(), + &app_container.tracker_core_container.db_torrent_repository, + ); + + Some(job) + } else { + tracing::info!("Tracker core event listener job is disabled."); + None + } +}