From 117fe621c830eddba6818f379bbe8b613debf889 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Wed, 3 Sep 2025 13:07:59 -0700 Subject: [PATCH 1/5] cloudfront: Extract `CloudFrontError` enum --- src/cloudfront.rs | 50 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/src/cloudfront.rs b/src/cloudfront.rs index f80227bcb4e..404c00a04f9 100644 --- a/src/cloudfront.rs +++ b/src/cloudfront.rs @@ -1,10 +1,40 @@ use aws_credential_types::Credentials; use aws_sdk_cloudfront::config::retry::RetryConfig; use aws_sdk_cloudfront::config::{BehaviorVersion, Region}; +use aws_sdk_cloudfront::error::{BuildError, SdkError}; +use aws_sdk_cloudfront::operation::create_invalidation::CreateInvalidationError; use aws_sdk_cloudfront::types::{InvalidationBatch, Paths}; use aws_sdk_cloudfront::{Client, Config}; use tracing::{debug, instrument, warn}; +#[derive(Debug, thiserror::Error)] +pub enum CloudFrontError { + #[error(transparent)] + BuildError(#[from] BuildError), + #[error(transparent)] + SdkError(Box>), +} + +impl From> for CloudFrontError { + fn from(err: SdkError) -> Self { + CloudFrontError::SdkError(Box::new(err)) + } +} + +impl CloudFrontError { + pub fn is_too_many_invalidations_error(&self) -> bool { + let CloudFrontError::SdkError(sdk_error) = self else { + return false; + }; + + let Some(service_error) = sdk_error.as_service_error() else { + return false; + }; + + service_error.is_too_many_invalidations_in_progress() + } +} + pub struct CloudFront { client: Client, distribution_id: String, @@ -36,13 +66,13 @@ impl CloudFront { /// Invalidate a file on CloudFront /// /// `path` is the path to the file to invalidate, such as `config.json`, or `re/ge/regex` - pub async fn invalidate(&self, path: &str) -> anyhow::Result<()> { + pub async fn invalidate(&self, path: &str) -> Result<(), CloudFrontError> { self.invalidate_many(vec![path.to_string()]).await } /// Invalidate multiple paths on Cloudfront. #[instrument(skip(self))] - pub async fn invalidate_many(&self, mut paths: Vec) -> anyhow::Result<()> { + pub async fn invalidate_many(&self, mut paths: Vec) -> Result<(), CloudFrontError> { let now = chrono::offset::Utc::now().timestamp_micros(); // We need to ensure that paths have a starting slash. @@ -72,15 +102,11 @@ impl CloudFront { debug!("Sending invalidation request"); - match invalidation_request.send().await { - Ok(_) => { - debug!("Invalidation request successful"); - Ok(()) - } - Err(error) => { - warn!(?error, "Invalidation request failed"); - Err(error.into()) - } - } + Ok(invalidation_request + .send() + .await + .map(|_| ()) // We don't care about the result, just that it worked + .inspect(|_| debug!("Invalidation request successful")) + .inspect_err(|error| warn!(?error, "Invalidation request failed"))?) } } From b86547363b2b384a2fb403545034f24fccdc1e23 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Wed, 3 Sep 2025 13:41:28 -0700 Subject: [PATCH 2/5] database: Add `cloudfront_invalidation_queue` table --- crates/crates_io_database/src/schema.rs | 13 +++++++++++++ crates/crates_io_database_dump/src/dump-db.toml | 5 +++++ .../down.sql | 2 ++ .../up.sql | 15 +++++++++++++++ 4 files changed, 35 insertions(+) create mode 100644 migrations/2025-09-03-201218_create_cloudfront_invalidation_queue/down.sql create mode 100644 migrations/2025-09-03-201218_create_cloudfront_invalidation_queue/up.sql diff --git a/crates/crates_io_database/src/schema.rs b/crates/crates_io_database/src/schema.rs index ce88578ae13..e9784386272 100644 --- a/crates/crates_io_database/src/schema.rs +++ b/crates/crates_io_database/src/schema.rs @@ -176,6 +176,18 @@ diesel::table! { } } +diesel::table! { + /// Queue for batching CloudFront CDN invalidation requests + cloudfront_invalidation_queue (id) { + /// Unique identifier for each queued invalidation path + id -> Int8, + /// CloudFront path to invalidate (e.g. /crates/serde/serde-1.0.0.crate) + path -> Text, + /// Timestamp when the path was queued for invalidation + created_at -> Timestamptz, + } +} + diesel::table! { /// Number of downloads per crate. This was extracted from the `crates` table for performance reasons. crate_downloads (crate_id) { @@ -1141,6 +1153,7 @@ diesel::allow_tables_to_appear_in_same_query!( api_tokens, background_jobs, categories, + cloudfront_invalidation_queue, crate_downloads, crate_owner_invitations, crate_owners, diff --git a/crates/crates_io_database_dump/src/dump-db.toml b/crates/crates_io_database_dump/src/dump-db.toml index e426c6e0a05..94c35c7a005 100644 --- a/crates/crates_io_database_dump/src/dump-db.toml +++ b/crates/crates_io_database_dump/src/dump-db.toml @@ -49,6 +49,11 @@ crates_cnt = "public" created_at = "public" path = "public" +[cloudfront_invalidation_queue.columns] +id = "private" +path = "private" +created_at = "private" + [crate_downloads.columns] crate_id = "public" downloads = "public" diff --git a/migrations/2025-09-03-201218_create_cloudfront_invalidation_queue/down.sql b/migrations/2025-09-03-201218_create_cloudfront_invalidation_queue/down.sql new file mode 100644 index 00000000000..add3b4472b4 --- /dev/null +++ b/migrations/2025-09-03-201218_create_cloudfront_invalidation_queue/down.sql @@ -0,0 +1,2 @@ +-- Drop the CloudFront invalidation queue table +DROP TABLE IF EXISTS cloudfront_invalidation_queue; diff --git a/migrations/2025-09-03-201218_create_cloudfront_invalidation_queue/up.sql b/migrations/2025-09-03-201218_create_cloudfront_invalidation_queue/up.sql new file mode 100644 index 00000000000..f68bb6e206d --- /dev/null +++ b/migrations/2025-09-03-201218_create_cloudfront_invalidation_queue/up.sql @@ -0,0 +1,15 @@ +-- Create table for queuing CloudFront invalidation paths +CREATE TABLE cloudfront_invalidation_queue ( + id BIGSERIAL PRIMARY KEY, + path TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +COMMENT ON TABLE cloudfront_invalidation_queue IS 'Queue for batching CloudFront CDN invalidation requests'; +COMMENT ON COLUMN cloudfront_invalidation_queue.id IS 'Unique identifier for each queued invalidation path'; +COMMENT ON COLUMN cloudfront_invalidation_queue.path IS 'CloudFront path to invalidate (e.g. /crates/serde/serde-1.0.0.crate)'; +COMMENT ON COLUMN cloudfront_invalidation_queue.created_at IS 'Timestamp when the path was queued for invalidation'; + +-- Index for efficient batch processing (oldest first) +CREATE INDEX idx_cloudfront_invalidation_queue_created_at + ON cloudfront_invalidation_queue (created_at); From 06ffc69cecfc080f48869a9ed1ad19e83ff777bb Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Wed, 3 Sep 2025 13:42:04 -0700 Subject: [PATCH 3/5] database: Implement `CloudFrontInvalidationQueueItem` models --- .../models/cloudfront_invalidation_queue.rs | 56 +++++++++++++++++++ crates/crates_io_database/src/models/mod.rs | 2 + 2 files changed, 58 insertions(+) create mode 100644 crates/crates_io_database/src/models/cloudfront_invalidation_queue.rs diff --git a/crates/crates_io_database/src/models/cloudfront_invalidation_queue.rs b/crates/crates_io_database/src/models/cloudfront_invalidation_queue.rs new file mode 100644 index 00000000000..23c7c78a652 --- /dev/null +++ b/crates/crates_io_database/src/models/cloudfront_invalidation_queue.rs @@ -0,0 +1,56 @@ +use crate::schema::cloudfront_invalidation_queue; +use diesel::prelude::*; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; + +#[derive(Debug, Identifiable, Queryable, QueryableByName, Selectable)] +#[diesel(table_name = cloudfront_invalidation_queue, check_for_backend(diesel::pg::Pg))] +pub struct CloudFrontInvalidationQueueItem { + pub id: i64, + pub path: String, + pub created_at: chrono::DateTime, +} + +#[derive(Debug, Insertable)] +#[diesel(table_name = cloudfront_invalidation_queue, check_for_backend(diesel::pg::Pg))] +pub struct NewCloudFrontInvalidationQueueItem<'a> { + pub path: &'a str, +} + +impl CloudFrontInvalidationQueueItem { + /// Queue multiple invalidation paths for later processing + pub async fn queue_paths(conn: &mut AsyncPgConnection, paths: &[String]) -> QueryResult { + let new_items: Vec<_> = paths + .iter() + .map(|path| NewCloudFrontInvalidationQueueItem { path }) + .collect(); + + diesel::insert_into(cloudfront_invalidation_queue::table) + .values(&new_items) + .execute(conn) + .await + } + + /// Fetch the oldest paths from the queue + pub async fn fetch_batch( + conn: &mut AsyncPgConnection, + limit: i64, + ) -> QueryResult> { + // Fetch the oldest entries up to the limit + cloudfront_invalidation_queue::table + .order(cloudfront_invalidation_queue::created_at.asc()) + .limit(limit) + .load(conn) + .await + } + + /// Remove queue items by their IDs + pub async fn remove_items( + conn: &mut AsyncPgConnection, + item_ids: &[i64], + ) -> QueryResult { + diesel::delete(cloudfront_invalidation_queue::table) + .filter(cloudfront_invalidation_queue::id.eq_any(item_ids)) + .execute(conn) + .await + } +} diff --git a/crates/crates_io_database/src/models/mod.rs b/crates/crates_io_database/src/models/mod.rs index d69fbfcbb7f..10a82441f02 100644 --- a/crates/crates_io_database/src/models/mod.rs +++ b/crates/crates_io_database/src/models/mod.rs @@ -1,5 +1,6 @@ pub use self::action::{NewVersionOwnerAction, VersionAction, VersionOwnerAction}; pub use self::category::{Category, CrateCategory, NewCategory}; +pub use self::cloudfront_invalidation_queue::CloudFrontInvalidationQueueItem; pub use self::crate_owner_invitation::{ CrateOwnerInvitation, NewCrateOwnerInvitation, NewCrateOwnerInvitationOutcome, }; @@ -22,6 +23,7 @@ pub mod helpers; mod action; pub mod category; +mod cloudfront_invalidation_queue; pub mod crate_owner_invitation; pub mod default_versions; mod deleted_crate; From 53d587b6fc6704cd0d60eccbf794a159ea9fc209 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Wed, 3 Sep 2025 17:24:27 -0700 Subject: [PATCH 4/5] worker: Implement `ProcessCloudfrontInvalidationQueue` background job --- src/worker/jobs/mod.rs | 2 + .../process_cloudfront_invalidation_queue.rs | 122 ++++++++++++++++++ src/worker/mod.rs | 1 + 3 files changed, 125 insertions(+) create mode 100644 src/worker/jobs/process_cloudfront_invalidation_queue.rs diff --git a/src/worker/jobs/mod.rs b/src/worker/jobs/mod.rs index b38d970365f..22d371a9375 100644 --- a/src/worker/jobs/mod.rs +++ b/src/worker/jobs/mod.rs @@ -9,6 +9,7 @@ mod generate_og_image; mod index; mod index_version_downloads_archive; mod invalidate_cdns; +mod process_cloudfront_invalidation_queue; mod readmes; pub mod rss; mod send_publish_notifications; @@ -30,6 +31,7 @@ pub use self::generate_og_image::GenerateOgImage; pub use self::index::{NormalizeIndex, SquashIndex, SyncToGitIndex, SyncToSparseIndex}; pub use self::index_version_downloads_archive::IndexVersionDownloadsArchive; pub use self::invalidate_cdns::InvalidateCdns; +pub use self::process_cloudfront_invalidation_queue::ProcessCloudfrontInvalidationQueue; pub use self::readmes::RenderAndUploadReadme; pub use self::send_publish_notifications::SendPublishNotificationsJob; pub use self::sync_admins::SyncAdmins; diff --git a/src/worker/jobs/process_cloudfront_invalidation_queue.rs b/src/worker/jobs/process_cloudfront_invalidation_queue.rs new file mode 100644 index 00000000000..1bc96109730 --- /dev/null +++ b/src/worker/jobs/process_cloudfront_invalidation_queue.rs @@ -0,0 +1,122 @@ +use crate::cloudfront::{CloudFront, CloudFrontError}; +use crate::worker::Environment; +use anyhow::Context; +use crates_io_database::models::CloudFrontInvalidationQueueItem; +use crates_io_worker::BackgroundJob; +use diesel_async::AsyncPgConnection; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::sync::Arc; +use tokio::time::Duration; +use tracing::{info, instrument, warn}; + +/// Maximum number of paths to process in a single batch. +/// Conservative limit to stay within AWS CloudFront's 3,000 path limit per invalidation. +const BATCH_SIZE: usize = 1000; + +const INITIAL_BACKOFF: Duration = Duration::from_secs(30); +const MAX_BACKOFF: Duration = Duration::from_secs(15 * 60); +const MAX_RETRIES: u32 = 6; // 30s, 1m, 2m, 4m, 8m, 15m + +/// Background job that processes CloudFront invalidation paths from the database queue in batches. +/// +/// This job: +/// - Processes up to 1,000 paths per batch to stay within AWS limits +/// - Deduplicates paths before sending to CloudFront +/// - Implements exponential backoff for `TooManyInvalidationsInProgress` errors +/// - Processes all available batches in a single job run +#[derive(Deserialize, Serialize)] +pub struct ProcessCloudfrontInvalidationQueue; + +impl ProcessCloudfrontInvalidationQueue { + #[instrument(skip_all)] + async fn process_batch( + &self, + conn: &mut AsyncPgConnection, + cloudfront: &CloudFront, + ) -> anyhow::Result { + let items = CloudFrontInvalidationQueueItem::fetch_batch(conn, BATCH_SIZE as i64).await?; + if items.is_empty() { + info!("No more CloudFront invalidations to process"); + return Ok(0); + } + + let item_count = items.len(); + info!("Processing next {item_count} CloudFront invalidations…"); + + let mut unique_paths = HashSet::with_capacity(item_count); + let mut item_ids = Vec::with_capacity(item_count); + for item in items { + unique_paths.insert(item.path); + item_ids.push(item.id); + } + let unique_paths: Vec = unique_paths.into_iter().collect(); + + let result = self.invalidate_with_backoff(cloudfront, unique_paths).await; + result.context("Failed to request CloudFront invalidations")?; + + let result = CloudFrontInvalidationQueueItem::remove_items(conn, &item_ids).await; + result.context("Failed to remove CloudFront invalidations from the queue")?; + + info!("Successfully processed {item_count} CloudFront invalidations"); + + Ok(item_count) + } + + /// Invalidate paths on CloudFront with exponential backoff for `TooManyInvalidationsInProgress` + #[instrument(skip_all)] + async fn invalidate_with_backoff( + &self, + cloudfront: &CloudFront, + paths: Vec, + ) -> Result<(), CloudFrontError> { + let mut attempt = 1; + let mut backoff = INITIAL_BACKOFF; + loop { + let Err(error) = cloudfront.invalidate_many(paths.clone()).await else { + return Ok(()); + }; + + if !error.is_too_many_invalidations_error() || attempt >= MAX_RETRIES { + return Err(error); + } + + warn!( + "Too many CloudFront invalidations in progress, retrying in {backoff:?} seconds…", + ); + + tokio::time::sleep(backoff).await; + + attempt += 1; + backoff = std::cmp::min(backoff * 2, MAX_BACKOFF); + } + } +} + +impl BackgroundJob for ProcessCloudfrontInvalidationQueue { + const JOB_NAME: &'static str = "process_cloudfront_invalidation_queue"; + const DEDUPLICATED: bool = true; + + type Context = Arc; + + #[instrument(skip_all)] + async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> { + let Some(cloudfront) = ctx.cloudfront() else { + warn!("CloudFront not configured, skipping queue processing"); + return Ok(()); + }; + + let mut conn = ctx.deadpool.get().await?; + + // Process batches until the queue is empty, or we hit an error + loop { + let item_count = self.process_batch(&mut conn, cloudfront).await?; + if item_count == 0 { + // Queue is empty, we're done + break; + } + } + + Ok(()) + } +} diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 8bed028cbc1..e90a203bcdc 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -32,6 +32,7 @@ impl RunnerExt for Runner> { .register_job_type::() .register_job_type::() .register_job_type::() + .register_job_type::() .register_job_type::() .register_job_type::() .register_job_type::() From ace6a306d46165f89541dd4d93c4c77f13c4c7a3 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Wed, 3 Sep 2025 17:35:22 -0700 Subject: [PATCH 5/5] Use `ProcessCloudfrontInvalidationQueue` background job for CloudFront invalidations --- src/worker/environment.rs | 20 ++++++++++++++++--- src/worker/jobs/dump_db.rs | 5 +++-- src/worker/jobs/generate_og_image.rs | 18 ++++++++++++----- src/worker/jobs/index/sync.rs | 15 ++++++++++---- .../index_version_downloads_archive/mod.rs | 5 +++-- src/worker/jobs/invalidate_cdns.rs | 17 +++++++++++----- src/worker/jobs/rss/sync_crate_feed.rs | 2 +- src/worker/jobs/rss/sync_crates_feed.rs | 2 +- src/worker/jobs/rss/sync_updates_feed.rs | 2 +- 9 files changed, 62 insertions(+), 24 deletions(-) diff --git a/src/worker/environment.rs b/src/worker/environment.rs index 9b98b5b63bc..019b6820e2d 100644 --- a/src/worker/environment.rs +++ b/src/worker/environment.rs @@ -3,12 +3,15 @@ use crate::cloudfront::CloudFront; use crate::fastly::Fastly; use crate::storage::Storage; use crate::typosquat; +use crate::worker::jobs::ProcessCloudfrontInvalidationQueue; use anyhow::Context; use bon::Builder; +use crates_io_database::models::CloudFrontInvalidationQueueItem; use crates_io_docs_rs::DocsRsClient; use crates_io_index::{Repository, RepositoryConfig}; use crates_io_og_image::OgImageGenerator; use crates_io_team_repo::TeamRepo; +use crates_io_worker::BackgroundJob; use diesel_async::AsyncPgConnection; use diesel_async::pooled_connection::deadpool::Pool; use object_store::ObjectStore; @@ -70,9 +73,20 @@ impl Environment { } /// Invalidate a file in all registered CDNs. - pub(crate) async fn invalidate_cdns(&self, path: &str) -> anyhow::Result<()> { - if let Some(cloudfront) = self.cloudfront() { - cloudfront.invalidate(path).await.context("CloudFront")?; + pub(crate) async fn invalidate_cdns( + &self, + conn: &mut AsyncPgConnection, + path: &str, + ) -> anyhow::Result<()> { + // Queue CloudFront invalidations for batch processing instead of calling directly + if self.cloudfront().is_some() { + let paths = &[path.to_string()]; + let result = CloudFrontInvalidationQueueItem::queue_paths(conn, paths).await; + result.context("Failed to queue CloudFront invalidation path")?; + + // Schedule the processing job to handle the queued paths + let result = ProcessCloudfrontInvalidationQueue.enqueue(conn).await; + result.context("Failed to enqueue CloudFront invalidation processing job")?; } if let Some(fastly) = self.fastly() { diff --git a/src/worker/jobs/dump_db.rs b/src/worker/jobs/dump_db.rs index a5a77f954fa..e3302ad39b7 100644 --- a/src/worker/jobs/dump_db.rs +++ b/src/worker/jobs/dump_db.rs @@ -47,7 +47,8 @@ impl BackgroundJob for DumpDb { info!("Database dump tarball uploaded"); info!("Invalidating CDN caches…"); - if let Err(error) = env.invalidate_cdns(TAR_PATH).await { + let mut conn = env.deadpool.get().await?; + if let Err(error) = env.invalidate_cdns(&mut conn, TAR_PATH).await { warn!("Failed to invalidate CDN caches: {error}"); } @@ -58,7 +59,7 @@ impl BackgroundJob for DumpDb { info!("Database dump zip file uploaded"); info!("Invalidating CDN caches…"); - if let Err(error) = env.invalidate_cdns(ZIP_PATH).await { + if let Err(error) = env.invalidate_cdns(&mut conn, ZIP_PATH).await { warn!("Failed to invalidate CDN caches: {error}"); } diff --git a/src/worker/jobs/generate_og_image.rs b/src/worker/jobs/generate_og_image.rs index c0deb276578..217a34c6adc 100644 --- a/src/worker/jobs/generate_og_image.rs +++ b/src/worker/jobs/generate_og_image.rs @@ -1,7 +1,9 @@ use crate::models::OwnerKind; use crate::schema::*; use crate::worker::Environment; +use crate::worker::jobs::ProcessCloudfrontInvalidationQueue; use anyhow::Context; +use crates_io_database::models::CloudFrontInvalidationQueueItem; use crates_io_og_image::{OgImageAuthorData, OgImageData}; use crates_io_worker::BackgroundJob; use diesel::prelude::*; @@ -104,11 +106,17 @@ impl BackgroundJob for GenerateOgImage { // Invalidate CDN cache for the OG image let og_image_path = format!("og-images/{crate_name}.png"); - // Invalidate CloudFront CDN - if let Some(cloudfront) = ctx.cloudfront() - && let Err(error) = cloudfront.invalidate(&og_image_path).await - { - warn!("Failed to invalidate CloudFront CDN for {crate_name}: {error}"); + // Queue CloudFront invalidation for batch processing + if ctx.cloudfront().is_some() { + let paths = std::slice::from_ref(&og_image_path); + let result = CloudFrontInvalidationQueueItem::queue_paths(&mut conn, paths).await; + if let Err(error) = result { + warn!("Failed to queue CloudFront invalidation for {crate_name}: {error}"); + } else if let Err(error) = ProcessCloudfrontInvalidationQueue.enqueue(&mut conn).await { + warn!( + "Failed to enqueue CloudFront invalidation processing job for {crate_name}: {error}" + ); + } } // Invalidate Fastly CDN diff --git a/src/worker/jobs/index/sync.rs b/src/worker/jobs/index/sync.rs index fd0dd36a273..efdd14507bf 100644 --- a/src/worker/jobs/index/sync.rs +++ b/src/worker/jobs/index/sync.rs @@ -1,7 +1,9 @@ use crate::index::get_index_data; use crate::tasks::spawn_blocking; use crate::worker::Environment; +use crate::worker::jobs::ProcessCloudfrontInvalidationQueue; use anyhow::Context; +use crates_io_database::models::CloudFrontInvalidationQueueItem; use crates_io_index::Repository; use crates_io_worker::BackgroundJob; use serde::{Deserialize, Serialize}; @@ -113,12 +115,17 @@ impl BackgroundJob for SyncToSparseIndex { let future = env.storage.sync_index(&self.krate, content); future.await.context("Failed to sync index data")?; - if let Some(cloudfront) = env.cloudfront() { + if env.cloudfront().is_some() { let path = Repository::relative_index_file_for_url(&self.krate); - info!(%path, "Invalidating index file on CloudFront"); - let future = cloudfront.invalidate(&path); - future.await.context("Failed to invalidate CloudFront")?; + info!(%path, "Queuing index file invalidation on CloudFront"); + + let paths = &[path]; + let result = CloudFrontInvalidationQueueItem::queue_paths(&mut conn, paths).await; + result.context("Failed to queue CloudFront invalidation path")?; + + let result = ProcessCloudfrontInvalidationQueue.enqueue(&mut conn).await; + result.context("Failed to enqueue CloudFront invalidation processing job")?; } Ok(()) } diff --git a/src/worker/jobs/index_version_downloads_archive/mod.rs b/src/worker/jobs/index_version_downloads_archive/mod.rs index 0f02520a654..057780c9eea 100644 --- a/src/worker/jobs/index_version_downloads_archive/mod.rs +++ b/src/worker/jobs/index_version_downloads_archive/mod.rs @@ -54,10 +54,11 @@ impl BackgroundJob for IndexVersionDownloadsArchive { info!("index.json generated and uploaded"); info!("Invalidating CDN caches…"); - if let Err(error) = env.invalidate_cdns(INDEX_PATH).await { + let mut conn = env.deadpool.get().await?; + if let Err(error) = env.invalidate_cdns(&mut conn, INDEX_PATH).await { warn!("Failed to invalidate CDN caches: {error}"); } - if let Err(error) = env.invalidate_cdns(INDEX_JSON_PATH).await { + if let Err(error) = env.invalidate_cdns(&mut conn, INDEX_JSON_PATH).await { warn!("Failed to invalidate CDN caches: {error}"); } info!("CDN caches invalidated"); diff --git a/src/worker/jobs/invalidate_cdns.rs b/src/worker/jobs/invalidate_cdns.rs index 104e4b672d6..7b402f4e387 100644 --- a/src/worker/jobs/invalidate_cdns.rs +++ b/src/worker/jobs/invalidate_cdns.rs @@ -1,10 +1,12 @@ use std::sync::Arc; use anyhow::Context; +use crates_io_database::models::CloudFrontInvalidationQueueItem; use crates_io_worker::BackgroundJob; use serde::{Deserialize, Serialize}; use crate::worker::Environment; +use crate::worker::jobs::ProcessCloudfrontInvalidationQueue; /// A background job that invalidates the given paths on all CDNs in use on crates.io. #[derive(Deserialize, Serialize)] @@ -46,11 +48,16 @@ impl BackgroundJob for InvalidateCdns { } } - if let Some(cloudfront) = ctx.cloudfront() { - cloudfront - .invalidate_many(self.paths.clone()) - .await - .context("Failed to invalidate paths on CloudFront CDN")?; + // Queue CloudFront invalidations for batch processing instead of calling directly + if ctx.cloudfront().is_some() { + let mut conn = ctx.deadpool.get().await?; + + let result = CloudFrontInvalidationQueueItem::queue_paths(&mut conn, &self.paths).await; + result.context("Failed to queue CloudFront invalidation paths")?; + + // Schedule the processing job to handle the queued paths + let result = ProcessCloudfrontInvalidationQueue.enqueue(&mut conn).await; + result.context("Failed to enqueue CloudFront invalidation processing job")?; } Ok(()) diff --git a/src/worker/jobs/rss/sync_crate_feed.rs b/src/worker/jobs/rss/sync_crate_feed.rs index 9f9a11d31ff..8d06df6a470 100644 --- a/src/worker/jobs/rss/sync_crate_feed.rs +++ b/src/worker/jobs/rss/sync_crate_feed.rs @@ -80,7 +80,7 @@ impl BackgroundJob for SyncCrateFeed { ctx.storage.upload_feed(&feed_id, &channel).await?; let path = object_store::path::Path::from(&feed_id); - if let Err(error) = ctx.invalidate_cdns(path.as_ref()).await { + if let Err(error) = ctx.invalidate_cdns(&mut conn, path.as_ref()).await { warn!("Failed to invalidate CDN caches: {error}"); } diff --git a/src/worker/jobs/rss/sync_crates_feed.rs b/src/worker/jobs/rss/sync_crates_feed.rs index 242e8b0510b..92a377f7147 100644 --- a/src/worker/jobs/rss/sync_crates_feed.rs +++ b/src/worker/jobs/rss/sync_crates_feed.rs @@ -67,7 +67,7 @@ impl BackgroundJob for SyncCratesFeed { ctx.storage.upload_feed(&feed_id, &channel).await?; let path = object_store::path::Path::from(&feed_id); - if let Err(error) = ctx.invalidate_cdns(path.as_ref()).await { + if let Err(error) = ctx.invalidate_cdns(&mut conn, path.as_ref()).await { warn!("Failed to invalidate CDN caches: {error}"); } diff --git a/src/worker/jobs/rss/sync_updates_feed.rs b/src/worker/jobs/rss/sync_updates_feed.rs index 69479e132c6..a63f055209a 100644 --- a/src/worker/jobs/rss/sync_updates_feed.rs +++ b/src/worker/jobs/rss/sync_updates_feed.rs @@ -67,7 +67,7 @@ impl BackgroundJob for SyncUpdatesFeed { ctx.storage.upload_feed(&feed_id, &channel).await?; let path = object_store::path::Path::from(&feed_id); - if let Err(error) = ctx.invalidate_cdns(path.as_ref()).await { + if let Err(error) = ctx.invalidate_cdns(&mut conn, path.as_ref()).await { warn!("Failed to invalidate CDN caches: {error}"); }