@@ -298,8 +298,67 @@ impl CdnBackend {
298298    } 
299299} 
300300
301+ /// fully invalidate the CDN distribution, also emptying the queue. 
302+ #[ instrument( skip( conn) ) ]  
303+ pub ( crate )  async  fn  full_invalidation ( 
304+     config :  & Config , 
305+     cdn :  & CdnBackend , 
306+     metrics :  & InstanceMetrics , 
307+     conn :  & mut  sqlx:: PgConnection , 
308+     distribution_id :  & str , 
309+ )  -> Result < ( ) >  { 
310+     let  mut  transaction = conn. begin ( ) . await ?; 
311+ 
312+     let  now = Utc :: now ( ) ; 
313+     for  row in  sqlx:: query!( 
314+         "SELECT queued 
315+          FROM cdn_invalidation_queue 
316+          WHERE cdn_distribution_id = $1 AND created_in_cdn IS NULL 
317+          FOR UPDATE" , 
318+         distribution_id, 
319+     ) 
320+     . fetch_all ( & mut  * transaction) 
321+     . await ?
322+     { 
323+         if  let  Ok ( duration)  = ( now - row. queued ) . to_std ( )  { 
324+             // This can only fail when the duration is negative, which can't happen anyways 
325+             metrics
326+                 . cdn_queue_time 
327+                 . with_label_values ( & [ distribution_id] ) 
328+                 . observe ( duration_to_seconds ( duration) ) ; 
329+         } 
330+     } 
331+ 
332+     match  cdn
333+         . create_invalidation ( distribution_id,  & [ "/*" ] ) 
334+         . await 
335+         . context ( "error creating new invalidation" ) 
336+     { 
337+         Ok ( invalidation)  => { 
338+             sqlx:: query!( 
339+                 "UPDATE cdn_invalidation_queue 
340+                  SET 
341+                      created_in_cdn = CURRENT_TIMESTAMP, 
342+                      cdn_reference = $1 
343+                  WHERE 
344+                     cdn_distribution_id = $2 AND created_in_cdn IS NULL" , 
345+                 invalidation. invalidation_id, 
346+                 distribution_id, 
347+             ) 
348+             . execute ( & mut  * transaction) 
349+             . await ?; 
350+ 
351+             transaction. commit ( ) . await ?; 
352+         } 
353+         Err ( err)  => return  Err ( err) , 
354+     } ; 
355+ 
356+     Ok ( ( ) ) 
357+ } 
358+ 
301359#[ instrument( skip( conn) ) ]  
302360pub ( crate )  async  fn  handle_queued_invalidation_requests ( 
361+     config :  & Config , 
303362    cdn :  & CdnBackend , 
304363    metrics :  & InstanceMetrics , 
305364    conn :  & mut  sqlx:: PgConnection , 
@@ -385,6 +444,24 @@ pub(crate) async fn handle_queued_invalidation_requests(
385444        return  Ok ( ( ) ) ; 
386445    } 
387446
447+     if  let  Some ( min_queued)  = sqlx:: query_scalar!( 
448+         "SELECT 
449+              min(queued) 
450+          FROM cdn_invalidation_queue 
451+          WHERE 
452+              cdn_distribution_id = $1 AND 
453+              created_in_cdn IS NULL" , 
454+         distribution_id
455+     ) 
456+     . fetch_one ( & mut  * conn) 
457+     . await ?
458+     { 
459+         if  ( now - min_queued) . to_std ( ) . unwrap_or_default ( )  >= config. cdn_max_queued_age  { 
460+             full_invalidation ( config,  cdn,  metrics,  conn,  distribution_id) . await ?; 
461+             return  Ok ( ( ) ) ; 
462+         } 
463+     } 
464+ 
388465    // create new an invalidation for the queued path patterns 
389466    let  mut  transaction = conn. begin ( ) . await ?; 
390467    let  mut  path_patterns:  Vec < String >  = Vec :: new ( ) ; 
@@ -566,6 +643,8 @@ pub(crate) async fn queued_or_active_crate_invalidation_count_by_distribution(
566643
567644#[ cfg( test) ]  
568645mod  tests { 
646+     use  std:: time:: Duration ; 
647+ 
569648    use  super :: * ; 
570649    use  crate :: test:: async_wrapper; 
571650
@@ -671,6 +750,111 @@ mod tests {
671750        } ) 
672751    } 
673752
753+     #[ test]  
754+     fn  escalate_to_full_invalidation ( )  { 
755+         crate :: test:: async_wrapper ( |env| async  move  { 
756+             env. override_config ( |config| { 
757+                 config. cloudfront_distribution_id_web  = Some ( "distribution_id_web" . into ( ) ) ; 
758+                 config. cloudfront_distribution_id_static  = Some ( "distribution_id_static" . into ( ) ) ; 
759+                 config. cdn_max_queued_age  = Duration :: from_secs ( 0 ) ; 
760+             } ) ; 
761+ 
762+             let  cdn = env. cdn ( ) . await ; 
763+             let  config = env. config ( ) ; 
764+             let  mut  conn = env. async_db ( ) . await . async_conn ( ) . await ; 
765+             assert ! ( queued_or_active_crate_invalidations( & mut  conn) 
766+                 . await ?
767+                 . is_empty( ) ) ; 
768+ 
769+             queue_crate_invalidation ( & mut  conn,  & env. config ( ) ,  "krate" ) . await ?; 
770+ 
771+             // invalidation paths are queued. 
772+             assert_eq ! ( 
773+                 queued_or_active_crate_invalidations( & mut  conn) 
774+                     . await ?
775+                     . into_iter( ) 
776+                     . map( |i| ( 
777+                         i. cdn_distribution_id, 
778+                         i. krate, 
779+                         i. path_pattern, 
780+                         i. cdn_reference
781+                     ) ) 
782+                     . collect:: <Vec <_>>( ) , 
783+                 vec![ 
784+                     ( 
785+                         "distribution_id_web" . into( ) , 
786+                         "krate" . into( ) , 
787+                         "/krate*" . into( ) , 
788+                         None 
789+                     ) , 
790+                     ( 
791+                         "distribution_id_web" . into( ) , 
792+                         "krate" . into( ) , 
793+                         "/crate/krate*" . into( ) , 
794+                         None 
795+                     ) , 
796+                     ( 
797+                         "distribution_id_static" . into( ) , 
798+                         "krate" . into( ) , 
799+                         "/rustdoc/krate*" . into( ) , 
800+                         None 
801+                     ) , 
802+                 ] 
803+             ) ; 
804+ 
805+             let  counts =
806+                 queued_or_active_crate_invalidation_count_by_distribution ( & mut  conn,  & config) 
807+                     . await ?; 
808+             assert_eq ! ( counts. len( ) ,  2 ) ; 
809+             assert_eq ! ( * counts. get( "distribution_id_web" ) . unwrap( ) ,  2 ) ; 
810+             assert_eq ! ( * counts. get( "distribution_id_static" ) . unwrap( ) ,  1 ) ; 
811+ 
812+             // queueing the invalidation doesn't create it in the CDN 
813+             assert ! ( active_invalidations( & cdn,  "distribution_id_web" ) . is_empty( ) ) ; 
814+             assert ! ( active_invalidations( & cdn,  "distribution_id_static" ) . is_empty( ) ) ; 
815+ 
816+             let  cdn = env. cdn ( ) . await ; 
817+             let  config = env. config ( ) ; 
818+ 
819+             // now handle the queued invalidations 
820+             handle_queued_invalidation_requests ( 
821+                 & config, 
822+                 & cdn, 
823+                 & env. instance_metrics ( ) , 
824+                 & mut  conn, 
825+                 "distribution_id_web" , 
826+             ) 
827+             . await ?; 
828+             handle_queued_invalidation_requests ( 
829+                 & config, 
830+                 & cdn, 
831+                 & env. instance_metrics ( ) , 
832+                 & mut  conn, 
833+                 "distribution_id_static" , 
834+             ) 
835+             . await ?; 
836+ 
837+             // which creates them in the CDN 
838+             { 
839+                 let  ir_web = active_invalidations ( & cdn,  "distribution_id_web" ) ; 
840+                 assert_eq ! ( ir_web. len( ) ,  1 ) ; 
841+                 assert_eq ! ( ir_web[ 0 ] . path_patterns,  vec![ "/*" ] ) ; 
842+ 
843+                 let  ir_static = active_invalidations ( & cdn,  "distribution_id_static" ) ; 
844+                 assert_eq ! ( ir_web. len( ) ,  1 ) ; 
845+                 assert_eq ! ( ir_static[ 0 ] . path_patterns,  vec![ "/*" ] ) ; 
846+             } 
847+ 
848+             // the queued entries got a CDN reference attached 
849+             assert ! ( queued_or_active_crate_invalidations( & mut  conn) 
850+                 . await ?
851+                 . iter( ) 
852+                 . all( |i| i. cdn_reference. is_some( )  && i. created_in_cdn. is_some( ) ) ) ; 
853+ 
854+             Ok ( ( ) ) 
855+         } ) ; 
856+     } 
857+ 
674858    #[ test]  
675859    fn  invalidate_a_crate ( )  { 
676860        crate :: test:: async_wrapper ( |env| async  move  { 
@@ -734,16 +918,19 @@ mod tests {
734918            assert ! ( active_invalidations( & cdn,  "distribution_id_static" ) . is_empty( ) ) ; 
735919
736920            let  cdn = env. cdn ( ) . await ; 
921+             let  config = env. config ( ) ; 
737922
738923            // now handle the queued invalidations 
739924            handle_queued_invalidation_requests ( 
925+                 & config, 
740926                & cdn, 
741927                & env. instance_metrics ( ) , 
742928                & mut  conn, 
743929                "distribution_id_web" , 
744930            ) 
745931            . await ?; 
746932            handle_queued_invalidation_requests ( 
933+                 & config, 
747934                & cdn, 
748935                & env. instance_metrics ( ) , 
749936                & mut  conn, 
@@ -774,13 +961,15 @@ mod tests {
774961
775962            // now handle again 
776963            handle_queued_invalidation_requests ( 
964+                 & config, 
777965                & cdn, 
778966                & env. instance_metrics ( ) , 
779967                & mut  conn, 
780968                "distribution_id_web" , 
781969            ) 
782970            . await ?; 
783971            handle_queued_invalidation_requests ( 
972+                 & config, 
784973                & cdn, 
785974                & env. instance_metrics ( ) , 
786975                & mut  conn, 
@@ -849,6 +1038,7 @@ mod tests {
8491038
8501039            // handle the queued invalidations 
8511040            handle_queued_invalidation_requests ( 
1041+                 & env. config ( ) , 
8521042                & * env. cdn ( ) . await , 
8531043                & env. instance_metrics ( ) , 
8541044                & mut  conn, 
@@ -909,6 +1099,7 @@ mod tests {
9091099
9101100            // handle the queued invalidations 
9111101            handle_queued_invalidation_requests ( 
1102+                 & env. config ( ) , 
9121103                & * env. cdn ( ) . await , 
9131104                & env. instance_metrics ( ) , 
9141105                & mut  conn, 
@@ -937,6 +1128,7 @@ mod tests {
9371128
9381129            // now handle again 
9391130            handle_queued_invalidation_requests ( 
1131+                 & env. config ( ) , 
9401132                & * env. cdn ( ) . await , 
9411133                & env. instance_metrics ( ) , 
9421134                & mut  conn, 
@@ -976,6 +1168,7 @@ mod tests {
9761168
9771169            // run the handler 
9781170            handle_queued_invalidation_requests ( 
1171+                 & env. config ( ) , 
9791172                & * env. cdn ( ) . await , 
9801173                & env. instance_metrics ( ) , 
9811174                & mut  conn, 
0 commit comments