| 
 | 1 | +use crate::cloudfront::{CloudFront, CloudFrontError};  | 
 | 2 | +use crate::worker::Environment;  | 
 | 3 | +use anyhow::Context;  | 
 | 4 | +use crates_io_database::models::CloudFrontInvalidationQueueItem;  | 
 | 5 | +use crates_io_worker::BackgroundJob;  | 
 | 6 | +use diesel_async::AsyncPgConnection;  | 
 | 7 | +use serde::{Deserialize, Serialize};  | 
 | 8 | +use std::collections::HashSet;  | 
 | 9 | +use std::sync::Arc;  | 
 | 10 | +use tokio::time::Duration;  | 
 | 11 | +use tracing::{info, instrument, warn};  | 
 | 12 | + | 
 | 13 | +/// Maximum number of paths to process in a single batch.  | 
 | 14 | +/// Conservative limit to stay within AWS CloudFront's 3,000 path limit per invalidation.  | 
 | 15 | +const BATCH_SIZE: usize = 1000;  | 
 | 16 | + | 
 | 17 | +const INITIAL_BACKOFF: Duration = Duration::from_secs(30);  | 
 | 18 | +const MAX_BACKOFF: Duration = Duration::from_secs(15 * 60);  | 
 | 19 | +const MAX_RETRIES: u32 = 6; // 30s, 1m, 2m, 4m, 8m, 15m  | 
 | 20 | + | 
 | 21 | +/// Background job that processes CloudFront invalidation paths from the database queue in batches.  | 
 | 22 | +///  | 
 | 23 | +/// This job:  | 
 | 24 | +/// - Processes up to 1,000 paths per batch to stay within AWS limits  | 
 | 25 | +/// - Deduplicates paths before sending to CloudFront  | 
 | 26 | +/// - Implements exponential backoff for `TooManyInvalidationsInProgress` errors  | 
 | 27 | +/// - Processes all available batches in a single job run  | 
 | 28 | +#[derive(Deserialize, Serialize)]  | 
 | 29 | +pub struct ProcessCloudfrontInvalidationQueue;  | 
 | 30 | + | 
 | 31 | +impl ProcessCloudfrontInvalidationQueue {  | 
 | 32 | +    #[instrument(skip_all)]  | 
 | 33 | +    async fn process_batch(  | 
 | 34 | +        &self,  | 
 | 35 | +        conn: &mut AsyncPgConnection,  | 
 | 36 | +        cloudfront: &CloudFront,  | 
 | 37 | +    ) -> anyhow::Result<usize> {  | 
 | 38 | +        let items = CloudFrontInvalidationQueueItem::fetch_batch(conn, BATCH_SIZE as i64).await?;  | 
 | 39 | +        if items.is_empty() {  | 
 | 40 | +            info!("No more CloudFront invalidations to process");  | 
 | 41 | +            return Ok(0);  | 
 | 42 | +        }  | 
 | 43 | + | 
 | 44 | +        let item_count = items.len();  | 
 | 45 | +        info!("Processing next {item_count} CloudFront invalidations…");  | 
 | 46 | + | 
 | 47 | +        let mut unique_paths = HashSet::with_capacity(item_count);  | 
 | 48 | +        let mut item_ids = Vec::with_capacity(item_count);  | 
 | 49 | +        for item in items {  | 
 | 50 | +            unique_paths.insert(item.path);  | 
 | 51 | +            item_ids.push(item.id);  | 
 | 52 | +        }  | 
 | 53 | +        let unique_paths: Vec<String> = unique_paths.into_iter().collect();  | 
 | 54 | + | 
 | 55 | +        let result = self.invalidate_with_backoff(cloudfront, unique_paths).await;  | 
 | 56 | +        result.context("Failed to request CloudFront invalidations")?;  | 
 | 57 | + | 
 | 58 | +        let result = CloudFrontInvalidationQueueItem::remove_items(conn, &item_ids).await;  | 
 | 59 | +        result.context("Failed to remove CloudFront invalidations from the queue")?;  | 
 | 60 | + | 
 | 61 | +        info!("Successfully processed {item_count} CloudFront invalidations");  | 
 | 62 | + | 
 | 63 | +        Ok(item_count)  | 
 | 64 | +    }  | 
 | 65 | + | 
 | 66 | +    /// Invalidate paths on CloudFront with exponential backoff for `TooManyInvalidationsInProgress`  | 
 | 67 | +    #[instrument(skip_all)]  | 
 | 68 | +    async fn invalidate_with_backoff(  | 
 | 69 | +        &self,  | 
 | 70 | +        cloudfront: &CloudFront,  | 
 | 71 | +        paths: Vec<String>,  | 
 | 72 | +    ) -> Result<(), CloudFrontError> {  | 
 | 73 | +        let mut attempt = 1;  | 
 | 74 | +        let mut backoff = INITIAL_BACKOFF;  | 
 | 75 | +        loop {  | 
 | 76 | +            let Err(error) = cloudfront.invalidate_many(paths.clone()).await else {  | 
 | 77 | +                return Ok(());  | 
 | 78 | +            };  | 
 | 79 | + | 
 | 80 | +            if !error.is_too_many_invalidations_error() || attempt >= MAX_RETRIES {  | 
 | 81 | +                return Err(error);  | 
 | 82 | +            }  | 
 | 83 | + | 
 | 84 | +            warn!(  | 
 | 85 | +                "Too many CloudFront invalidations in progress, retrying in {backoff:?} seconds…",  | 
 | 86 | +            );  | 
 | 87 | + | 
 | 88 | +            tokio::time::sleep(backoff).await;  | 
 | 89 | + | 
 | 90 | +            attempt += 1;  | 
 | 91 | +            backoff = std::cmp::min(backoff * 2, MAX_BACKOFF);  | 
 | 92 | +        }  | 
 | 93 | +    }  | 
 | 94 | +}  | 
 | 95 | + | 
 | 96 | +impl BackgroundJob for ProcessCloudfrontInvalidationQueue {  | 
 | 97 | +    const JOB_NAME: &'static str = "process_cloudfront_invalidation_queue";  | 
 | 98 | +    const DEDUPLICATED: bool = true;  | 
 | 99 | + | 
 | 100 | +    type Context = Arc<Environment>;  | 
 | 101 | + | 
 | 102 | +    #[instrument(skip_all)]  | 
 | 103 | +    async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {  | 
 | 104 | +        let Some(cloudfront) = ctx.cloudfront() else {  | 
 | 105 | +            warn!("CloudFront not configured, skipping queue processing");  | 
 | 106 | +            return Ok(());  | 
 | 107 | +        };  | 
 | 108 | + | 
 | 109 | +        let mut conn = ctx.deadpool.get().await?;  | 
 | 110 | + | 
 | 111 | +        // Process batches until the queue is empty, or we hit an error  | 
 | 112 | +        loop {  | 
 | 113 | +            let item_count = self.process_batch(&mut conn, cloudfront).await?;  | 
 | 114 | +            if item_count == 0 {  | 
 | 115 | +                // Queue is empty, we're done  | 
 | 116 | +                break;  | 
 | 117 | +            }  | 
 | 118 | +        }  | 
 | 119 | + | 
 | 120 | +        Ok(())  | 
 | 121 | +    }  | 
 | 122 | +}  | 
0 commit comments