@@ -21,6 +21,7 @@ use crate::io::Pluralize;
2121use crate :: prelude:: * ;
2222use crate :: runtime:: makedirs_with_perms;
2323use crate :: storage:: fs:: OpenFsRepository ;
24+ use crate :: storage:: { TagNamespace , TagNamespaceBuf } ;
2425use crate :: { Digest , Error , Result , encoding, graph, storage, tracking} ;
2526
2627#[ cfg( test) ]
4546 attached : DashSet < encoding:: Digest > ,
4647 dry_run : bool ,
4748 must_be_older_than : DateTime < Utc > ,
49+ prune_all_tag_namespaces : bool ,
4850 prune_repeated_tags : Option < NonZero < u64 > > ,
4951 prune_params : PruneParameters ,
5052 remove_proxies_with_no_links : bool ,
@@ -68,6 +70,7 @@ impl<'repo> Cleaner<'repo, SilentCleanReporter> {
6870 attached : Default :: default ( ) ,
6971 dry_run : false ,
7072 must_be_older_than : Utc :: now ( ) ,
73+ prune_all_tag_namespaces : false ,
7174 prune_repeated_tags : None ,
7275 prune_params : Default :: default ( ) ,
7376 remove_proxies_with_no_links : true ,
8891 attached : self . attached ,
8992 dry_run : self . dry_run ,
9093 must_be_older_than : self . must_be_older_than ,
94+ prune_all_tag_namespaces : self . prune_all_tag_namespaces ,
9195 prune_repeated_tags : self . prune_repeated_tags ,
9296 prune_params : self . prune_params ,
9397 removal_concurrency : self . removal_concurrency ,
@@ -154,6 +158,13 @@ where
154158 self
155159 }
156160
161+ /// When walking tags, whether to prune tags in all tag namespaces or only
162+ /// the tag namespace configured on the repository.
163+ pub fn with_prune_all_tag_namespaces ( mut self , prune_all_tag_namespaces : bool ) -> Self {
164+ self . prune_all_tag_namespaces = prune_all_tag_namespaces;
165+ self
166+ }
167+
157168 /// When walking the history of a tag, delete older entries
158169 /// that have the same target as a more recent one.
159170 pub fn with_prune_repeated_tags ( mut self , prune_repeated_tags : Option < NonZero < u64 > > ) -> Self {
@@ -318,9 +329,24 @@ where
318329 /// partially complete depending on the nature of the errors.
319330 pub async fn prune_all_tags_and_clean ( & self ) -> Result < CleanResult > {
320331 let mut result = CleanResult :: default ( ) ;
321- let mut stream = self . repo . iter_tag_streams ( ) . boxed ( ) ;
332+ let tag_namespace_to_prune = self . repo . get_tag_namespace ( ) ;
333+ let namespaces = self . repo . ls_tag_namespaces ( ) ;
334+ let mut stream = namespaces
335+ . map ( |ns| {
336+ ns. map ( |ns| {
337+ self . repo
338+ . iter_tag_streams_in_namespace ( Some ( & ns) )
339+ . map ( move |r| r. map ( |( spec, stream) | ( Some ( ns. clone ( ) ) , spec, stream) ) )
340+ } )
341+ } )
342+ . try_flatten ( )
343+ . chain (
344+ self . repo
345+ . iter_tag_streams_in_namespace ( None )
346+ . map ( |r| r. map ( |( spec, stream) | ( None , spec, stream) ) ) ,
347+ ) ;
322348 let mut futures = futures:: stream:: FuturesUnordered :: new ( ) ;
323- while let Some ( ( tag_spec, _stream) ) = stream. try_next ( ) . await ? {
349+ while let Some ( ( tag_namespace , tag_spec, _stream) ) = stream. try_next ( ) . await ? {
324350 if futures. len ( ) > self . tag_stream_concurrency {
325351 // if we've reached the limit, let the fastest half finish
326352 // before adding additional futures. This is a crude way to
@@ -331,7 +357,11 @@ where
331357 futures. try_next ( ) . await ?;
332358 }
333359 }
334- futures. push ( self . prune_tag_stream_and_walk ( tag_spec) ) ;
360+ futures. push ( self . prune_tag_stream_and_walk (
361+ tag_namespace_to_prune. as_ref ( ) ,
362+ tag_namespace,
363+ tag_spec,
364+ ) ) ;
335365 }
336366 drop ( stream) ;
337367 while let Some ( r) = futures. try_next ( ) . await ? {
@@ -360,8 +390,18 @@ where
360390 Ok ( result)
361391 }
362392
363- async fn prune_tag_stream_and_walk ( & self , tag_spec : tracking:: TagSpec ) -> Result < CleanResult > {
364- let ( mut result, to_keep) = self . prune_tag_stream ( tag_spec) . await ?;
393+ async fn prune_tag_stream_and_walk < T > (
394+ & self ,
395+ tag_namespace_to_prune : Option < T > ,
396+ tag_namespace_to_visit : Option < TagNamespaceBuf > ,
397+ tag_spec : tracking:: TagSpec ,
398+ ) -> Result < CleanResult >
399+ where
400+ T : AsRef < TagNamespace > ,
401+ {
402+ let ( mut result, to_keep) = self
403+ . prune_tag_stream ( tag_namespace_to_prune, tag_namespace_to_visit, tag_spec)
404+ . await ?;
365405 result += self . walk_attached_objects ( & to_keep) . await ?;
366406
367407 Ok ( result)
@@ -385,13 +425,32 @@ where
385425 /// Visit the tag and its history, pruning as configured and
386426 /// returning a cleaning (pruning) result and list of tags that
387427 /// were kept.
388- pub async fn prune_tag_stream (
428+ pub async fn prune_tag_stream < T > (
389429 & self ,
430+ tag_namespace_to_prune : Option < T > ,
431+ tag_namespace_to_visit : Option < TagNamespaceBuf > ,
390432 tag_spec : tracking:: TagSpec ,
391- ) -> Result < ( CleanResult , Vec < encoding:: Digest > ) > {
433+ ) -> Result < ( CleanResult , Vec < encoding:: Digest > ) >
434+ where
435+ T : AsRef < TagNamespace > ,
436+ {
437+ // The Cleaner needs to visit all namespaces to learn what objects are
438+ // still alive, but if the repo passed to the Cleaner has a namespace
439+ // set on it then one would expect that only tags in that namespace are
440+ // pruned.
441+ let should_prune_this_namespace = self . prune_all_tag_namespaces
442+ || match (
443+ tag_namespace_to_prune. as_ref ( ) ,
444+ tag_namespace_to_visit. as_deref ( ) ,
445+ ) {
446+ ( Some ( prune) , Some ( visit) ) if prune. as_ref ( ) == visit => true ,
447+ ( None , None ) => true ,
448+ _ => false ,
449+ } ;
450+
392451 let history = self
393452 . repo
394- . read_tag ( & tag_spec)
453+ . read_tag_in_namespace ( tag_namespace_to_visit . as_deref ( ) , & tag_spec)
395454 . await ?
396455 . try_collect :: < Vec < _ > > ( )
397456 . await ?;
@@ -403,7 +462,7 @@ where
403462 self . reporter . visit_tag ( & tag) ;
404463 let count = if let Some ( seen_count) = seen_targets. get ( & tag. target ) {
405464 if let Some ( keep_number) = self . prune_repeated_tags {
406- if * seen_count >= keep_number. get ( ) {
465+ if should_prune_this_namespace && * seen_count >= keep_number. get ( ) {
407466 to_prune. push ( tag) ;
408467 continue ;
409468 }
@@ -415,7 +474,7 @@ where
415474
416475 seen_targets. insert ( tag. target , count) ;
417476
418- if self . prune_params . should_prune ( & spec, & tag) {
477+ if should_prune_this_namespace && self . prune_params . should_prune ( & spec, & tag) {
419478 to_prune. push ( tag) ;
420479 } else {
421480 to_keep. push ( tag. target ) ;
@@ -429,12 +488,18 @@ where
429488
430489 for tag in to_prune. iter ( ) {
431490 if !self . dry_run {
432- self . repo . remove_tag ( tag) . await ?;
491+ self . repo
492+ . remove_tag_in_namespace ( tag_namespace_to_visit. as_deref ( ) , tag)
493+ . await ?;
433494 }
434495 self . reporter . tag_removed ( tag) ;
435496 }
436497
437- result. pruned_tags . insert ( tag_spec, to_prune) ;
498+ result
499+ . pruned_tags
500+ . entry ( tag_namespace_to_visit)
501+ . or_default ( )
502+ . insert ( tag_spec, to_prune) ;
438503
439504 Ok ( ( result, to_keep) )
440505 }
@@ -779,7 +844,8 @@ pub struct CleanResult {
779844 /// The number of tags visited when walking the database
780845 pub visited_tags : u64 ,
781846 /// The tags pruned from the database
782- pub pruned_tags : HashMap < tracking:: TagSpec , Vec < tracking:: Tag > > ,
847+ pub pruned_tags :
848+ HashMap < Option < TagNamespaceBuf > , HashMap < tracking:: TagSpec , Vec < tracking:: Tag > > > ,
783849
784850 /// The number of objects visited when walking the database
785851 pub visited_objects : u64 ,
@@ -815,7 +881,11 @@ impl CleanResult {
815881 }
816882
817883 pub fn into_all_tags ( self ) -> Vec < tracking:: Tag > {
818- self . pruned_tags . into_values ( ) . flatten ( ) . collect ( )
884+ self . pruned_tags
885+ . into_values ( )
886+ . flat_map ( |tags| tags. into_values ( ) )
887+ . flatten ( )
888+ . collect ( )
819889 }
820890}
821891
0 commit comments