11use std:: collections:: { BTreeMap , HashSet } ;
22use std:: hash:: { DefaultHasher , Hash , Hasher } ;
3+ use std:: sync:: { Arc , Mutex } ;
34
45use aquatic_udp_protocol:: { AnnounceEvent , NumberOfBytes } ;
56use bittorrent_primitives:: info_hash:: InfoHash ;
@@ -9,14 +10,13 @@ use torrust_tracker_primitives::pagination::Pagination;
910use torrust_tracker_primitives:: swarm_metadata:: SwarmMetadata ;
1011use torrust_tracker_primitives:: PersistentTorrents ;
1112use torrust_tracker_torrent_repository:: entry:: torrent:: TrackedTorrent ;
12- use torrust_tracker_torrent_repository:: TorrentRepository ;
13+ use torrust_tracker_torrent_repository:: { LockTrackedTorrent , TorrentRepository } ;
1314
14- use crate :: common:: repo:: Repo ;
1515use crate :: common:: torrent_peer_builder:: { a_completed_peer, a_started_peer} ;
1616
1717#[ fixture]
18- fn skip_list_mutex_std ( ) -> Repo {
19- Repo :: SkipMapMutexStd ( TorrentRepository :: default ( ) )
18+ fn skip_list_mutex_std ( ) -> TorrentRepository {
19+ TorrentRepository :: default ( )
2020}
2121
2222type Entries = Vec < ( InfoHash , TrackedTorrent ) > ;
@@ -148,9 +148,10 @@ fn persistent_three() -> PersistentTorrents {
148148 t. iter ( ) . copied ( ) . collect ( )
149149}
150150
151- fn make ( repo : & Repo , entries : & Entries ) {
151+ fn make ( repo : & TorrentRepository , entries : & Entries ) {
152152 for ( info_hash, entry) in entries {
153- repo. insert ( info_hash, entry. clone ( ) ) ;
153+ let new = Arc :: new ( Mutex :: new ( entry. clone ( ) ) ) ;
154+ repo. torrents . insert ( * info_hash, new) ;
154155 }
155156}
156157
@@ -199,13 +200,16 @@ fn policy_remove_persist() -> TrackerPolicy {
199200#[ case:: out_of_order( many_out_of_order( ) ) ]
200201#[ case:: in_order( many_hashed_in_order( ) ) ]
201202#[ tokio:: test]
202- async fn it_should_get_a_torrent_entry ( #[ values( skip_list_mutex_std( ) ) ] repo : Repo , #[ case] entries : Entries ) {
203+ async fn it_should_get_a_torrent_entry ( #[ values( skip_list_mutex_std( ) ) ] repo : TorrentRepository , #[ case] entries : Entries ) {
203204 make ( & repo, & entries) ;
204205
205206 if let Some ( ( info_hash, torrent) ) = entries. first ( ) {
206- assert_eq ! ( repo. get( info_hash) , Some ( torrent. clone( ) ) ) ;
207+ assert_eq ! (
208+ Some ( repo. get( info_hash) . unwrap( ) . lock_or_panic( ) . clone( ) ) ,
209+ Some ( torrent. clone( ) )
210+ ) ;
207211 } else {
208- assert_eq ! ( repo. get( & InfoHash :: default ( ) ) , None ) ;
212+ assert ! ( repo. get( & InfoHash :: default ( ) ) . is_none ( ) ) ;
209213 }
210214}
211215
@@ -220,7 +224,7 @@ async fn it_should_get_a_torrent_entry(#[values(skip_list_mutex_std())] repo: Re
220224#[ case:: in_order( many_hashed_in_order( ) ) ]
221225#[ tokio:: test]
222226async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order (
223- #[ values( skip_list_mutex_std( ) ) ] repo : Repo ,
227+ #[ values( skip_list_mutex_std( ) ) ] repo : TorrentRepository ,
224228 #[ case] entries : Entries ,
225229 many_out_of_order : Entries ,
226230) {
@@ -253,7 +257,7 @@ async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order(
253257#[ case:: in_order( many_hashed_in_order( ) ) ]
254258#[ tokio:: test]
255259async fn it_should_get_paginated (
256- #[ values( skip_list_mutex_std( ) ) ] repo : Repo ,
260+ #[ values( skip_list_mutex_std( ) ) ] repo : TorrentRepository ,
257261 #[ case] entries : Entries ,
258262 #[ values( paginated_limit_zero( ) , paginated_limit_one( ) , paginated_limit_one_offset_one( ) ) ] paginated : Pagination ,
259263) {
@@ -264,7 +268,15 @@ async fn it_should_get_paginated(
264268
265269 match paginated {
266270 // it should return empty if limit is zero.
267- Pagination { limit : 0 , .. } => assert_eq ! ( repo. get_paginated( Some ( & paginated) ) , vec![ ] ) ,
271+ Pagination { limit : 0 , .. } => {
272+ let torrents: Vec < ( InfoHash , TrackedTorrent ) > = repo
273+ . get_paginated ( Some ( & paginated) )
274+ . iter ( )
275+ . map ( |( i, lock_tracked_torrent) | ( * i, lock_tracked_torrent. lock_or_panic ( ) . clone ( ) ) )
276+ . collect ( ) ;
277+
278+ assert_eq ! ( torrents, vec![ ] ) ;
279+ }
268280
269281 // it should return a single entry if the limit is one.
270282 Pagination { limit : 1 , offset : 0 } => {
@@ -300,7 +312,7 @@ async fn it_should_get_paginated(
300312#[ case:: out_of_order( many_out_of_order( ) ) ]
301313#[ case:: in_order( many_hashed_in_order( ) ) ]
302314#[ tokio:: test]
303- async fn it_should_get_metrics ( #[ values( skip_list_mutex_std( ) ) ] repo : Repo , #[ case] entries : Entries ) {
315+ async fn it_should_get_metrics ( #[ values( skip_list_mutex_std( ) ) ] repo : TorrentRepository , #[ case] entries : Entries ) {
304316 use torrust_tracker_primitives:: swarm_metadata:: AggregateSwarmMetadata ;
305317
306318 make ( & repo, & entries) ;
@@ -316,7 +328,7 @@ async fn it_should_get_metrics(#[values(skip_list_mutex_std())] repo: Repo, #[ca
316328 metrics. total_downloaded += u64:: from ( stats. downloaded ) ;
317329 }
318330
319- assert_eq ! ( repo. get_metrics ( ) , metrics) ;
331+ assert_eq ! ( repo. get_aggregate_swarm_metadata ( ) , metrics) ;
320332}
321333
322334#[ rstest]
@@ -330,18 +342,18 @@ async fn it_should_get_metrics(#[values(skip_list_mutex_std())] repo: Repo, #[ca
330342#[ case:: in_order( many_hashed_in_order( ) ) ]
331343#[ tokio:: test]
332344async fn it_should_import_persistent_torrents (
333- #[ values( skip_list_mutex_std( ) ) ] repo : Repo ,
345+ #[ values( skip_list_mutex_std( ) ) ] repo : TorrentRepository ,
334346 #[ case] entries : Entries ,
335347 #[ values( persistent_empty( ) , persistent_single( ) , persistent_three( ) ) ] persistent_torrents : PersistentTorrents ,
336348) {
337349 make ( & repo, & entries) ;
338350
339- let mut downloaded = repo. get_metrics ( ) . total_downloaded ;
351+ let mut downloaded = repo. get_aggregate_swarm_metadata ( ) . total_downloaded ;
340352 persistent_torrents. iter ( ) . for_each ( |( _, d) | downloaded += u64:: from ( * d) ) ;
341353
342354 repo. import_persistent ( & persistent_torrents) ;
343355
344- assert_eq ! ( repo. get_metrics ( ) . total_downloaded, downloaded) ;
356+ assert_eq ! ( repo. get_aggregate_swarm_metadata ( ) . total_downloaded, downloaded) ;
345357
346358 for ( entry, _) in persistent_torrents {
347359 assert ! ( repo. get( & entry) . is_some( ) ) ;
@@ -358,18 +370,21 @@ async fn it_should_import_persistent_torrents(
358370#[ case:: out_of_order( many_out_of_order( ) ) ]
359371#[ case:: in_order( many_hashed_in_order( ) ) ]
360372#[ tokio:: test]
361- async fn it_should_remove_an_entry ( #[ values( skip_list_mutex_std( ) ) ] repo : Repo , #[ case] entries : Entries ) {
373+ async fn it_should_remove_an_entry ( #[ values( skip_list_mutex_std( ) ) ] repo : TorrentRepository , #[ case] entries : Entries ) {
362374 make ( & repo, & entries) ;
363375
364376 for ( info_hash, torrent) in entries {
365- assert_eq ! ( repo. get( & info_hash) , Some ( torrent. clone( ) ) ) ;
366- assert_eq ! ( repo. remove( & info_hash) , Some ( torrent) ) ;
377+ assert_eq ! (
378+ Some ( repo. get( & info_hash) . unwrap( ) . lock_or_panic( ) . clone( ) ) ,
379+ Some ( torrent. clone( ) )
380+ ) ;
381+ assert_eq ! ( Some ( repo. remove( & info_hash) . unwrap( ) . lock_or_panic( ) . clone( ) ) , Some ( torrent) ) ;
367382
368- assert_eq ! ( repo. get( & info_hash) , None ) ;
369- assert_eq ! ( repo. remove( & info_hash) , None ) ;
383+ assert ! ( repo. get( & info_hash) . is_none ( ) ) ;
384+ assert ! ( repo. remove( & info_hash) . is_none ( ) ) ;
370385 }
371386
372- assert_eq ! ( repo. get_metrics ( ) . total_torrents, 0 ) ;
387+ assert_eq ! ( repo. get_aggregate_swarm_metadata ( ) . total_torrents, 0 ) ;
373388}
374389
375390#[ rstest]
@@ -382,7 +397,7 @@ async fn it_should_remove_an_entry(#[values(skip_list_mutex_std())] repo: Repo,
382397#[ case:: out_of_order( many_out_of_order( ) ) ]
383398#[ case:: in_order( many_hashed_in_order( ) ) ]
384399#[ tokio:: test]
385- async fn it_should_remove_inactive_peers ( #[ values( skip_list_mutex_std( ) ) ] repo : Repo , #[ case] entries : Entries ) {
400+ async fn it_should_remove_inactive_peers ( #[ values( skip_list_mutex_std( ) ) ] repo : TorrentRepository , #[ case] entries : Entries ) {
386401 use std:: ops:: Sub as _;
387402 use std:: time:: Duration ;
388403
@@ -420,7 +435,7 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo:
420435 // and verify there is an extra torrent entry.
421436 {
422437 repo. upsert_peer ( & info_hash, & peer, None ) ;
423- assert_eq ! ( repo. get_metrics ( ) . total_torrents, entries. len( ) as u64 + 1 ) ;
438+ assert_eq ! ( repo. get_aggregate_swarm_metadata ( ) . total_torrents, entries. len( ) as u64 + 1 ) ;
424439 }
425440
426441 // Insert the infohash and peer into the repository
@@ -440,7 +455,8 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo:
440455
441456 // Verify that this new peer was inserted into the repository.
442457 {
443- let entry = repo. get ( & info_hash) . expect ( "it_should_get_some" ) ;
458+ let lock_tracked_torrent = repo. get ( & info_hash) . expect ( "it_should_get_some" ) ;
459+ let entry = lock_tracked_torrent. lock_or_panic ( ) ;
444460 assert ! ( entry. get_peers( None ) . contains( & peer. into( ) ) ) ;
445461 }
446462
@@ -451,7 +467,8 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo:
451467
452468 // Verify that the this peer was removed from the repository.
453469 {
454- let entry = repo. get ( & info_hash) . expect ( "it_should_get_some" ) ;
470+ let lock_tracked_torrent = repo. get ( & info_hash) . expect ( "it_should_get_some" ) ;
471+ let entry = lock_tracked_torrent. lock_or_panic ( ) ;
455472 assert ! ( !entry. get_peers( None ) . contains( & peer. into( ) ) ) ;
456473 }
457474}
@@ -467,15 +484,19 @@ async fn it_should_remove_inactive_peers(#[values(skip_list_mutex_std())] repo:
467484#[ case:: in_order( many_hashed_in_order( ) ) ]
468485#[ tokio:: test]
469486async fn it_should_remove_peerless_torrents (
470- #[ values( skip_list_mutex_std( ) ) ] repo : Repo ,
487+ #[ values( skip_list_mutex_std( ) ) ] repo : TorrentRepository ,
471488 #[ case] entries : Entries ,
472489 #[ values( policy_none( ) , policy_persist( ) , policy_remove( ) , policy_remove_persist( ) ) ] policy : TrackerPolicy ,
473490) {
474491 make ( & repo, & entries) ;
475492
476493 repo. remove_peerless_torrents ( & policy) ;
477494
478- let torrents = repo. get_paginated ( None ) ;
495+ let torrents: Vec < ( InfoHash , TrackedTorrent ) > = repo
496+ . get_paginated ( None )
497+ . iter ( )
498+ . map ( |( i, lock_tracked_torrent) | ( * i, lock_tracked_torrent. lock_or_panic ( ) . clone ( ) ) )
499+ . collect ( ) ;
479500
480501 for ( _, entry) in torrents {
481502 assert ! ( entry. meets_retaining_policy( & policy) ) ;
0 commit comments