Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use crate::schema::cloudfront_invalidation_queue;
use diesel::prelude::*;
use diesel_async::{AsyncPgConnection, RunQueryDsl};

#[derive(Debug, Identifiable, Queryable, QueryableByName, Selectable)]
#[diesel(table_name = cloudfront_invalidation_queue, check_for_backend(diesel::pg::Pg))]
pub struct CloudFrontInvalidationQueueItem {
pub id: i64,
pub path: String,
pub created_at: chrono::DateTime<chrono::Utc>,
}

#[derive(Debug, Insertable)]
#[diesel(table_name = cloudfront_invalidation_queue, check_for_backend(diesel::pg::Pg))]
pub struct NewCloudFrontInvalidationQueueItem<'a> {
pub path: &'a str,
}

impl CloudFrontInvalidationQueueItem {
/// Queue multiple invalidation paths for later processing
pub async fn queue_paths(conn: &mut AsyncPgConnection, paths: &[String]) -> QueryResult<usize> {
let new_items: Vec<_> = paths
.iter()
.map(|path| NewCloudFrontInvalidationQueueItem { path })
.collect();

diesel::insert_into(cloudfront_invalidation_queue::table)
.values(&new_items)
.execute(conn)
.await
}

/// Fetch the oldest paths from the queue
pub async fn fetch_batch(
conn: &mut AsyncPgConnection,
limit: i64,
) -> QueryResult<Vec<CloudFrontInvalidationQueueItem>> {
// Fetch the oldest entries up to the limit
cloudfront_invalidation_queue::table
.order(cloudfront_invalidation_queue::created_at.asc())
.limit(limit)
.load(conn)
.await
}

/// Remove queue items by their IDs
pub async fn remove_items(
conn: &mut AsyncPgConnection,
item_ids: &[i64],
) -> QueryResult<usize> {
diesel::delete(cloudfront_invalidation_queue::table)
.filter(cloudfront_invalidation_queue::id.eq_any(item_ids))
.execute(conn)
.await
}
}
2 changes: 2 additions & 0 deletions crates/crates_io_database/src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub use self::action::{NewVersionOwnerAction, VersionAction, VersionOwnerAction};
pub use self::category::{Category, CrateCategory, NewCategory};
pub use self::cloudfront_invalidation_queue::CloudFrontInvalidationQueueItem;
pub use self::crate_owner_invitation::{
CrateOwnerInvitation, NewCrateOwnerInvitation, NewCrateOwnerInvitationOutcome,
};
Expand All @@ -22,6 +23,7 @@ pub mod helpers;

mod action;
pub mod category;
mod cloudfront_invalidation_queue;
pub mod crate_owner_invitation;
pub mod default_versions;
mod deleted_crate;
Expand Down
13 changes: 13 additions & 0 deletions crates/crates_io_database/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ diesel::table! {
}
}

diesel::table! {
/// Queue for batching CloudFront CDN invalidation requests
cloudfront_invalidation_queue (id) {
/// Unique identifier for each queued invalidation path
id -> Int8,
/// CloudFront path to invalidate (e.g. /crates/serde/serde-1.0.0.crate)
path -> Text,
/// Timestamp when the path was queued for invalidation
created_at -> Timestamptz,
}
}

diesel::table! {
/// Number of downloads per crate. This was extracted from the `crates` table for performance reasons.
crate_downloads (crate_id) {
Expand Down Expand Up @@ -1141,6 +1153,7 @@ diesel::allow_tables_to_appear_in_same_query!(
api_tokens,
background_jobs,
categories,
cloudfront_invalidation_queue,
crate_downloads,
crate_owner_invitations,
crate_owners,
Expand Down
5 changes: 5 additions & 0 deletions crates/crates_io_database_dump/src/dump-db.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ crates_cnt = "public"
created_at = "public"
path = "public"

[cloudfront_invalidation_queue.columns]
id = "private"
path = "private"
created_at = "private"

[crate_downloads.columns]
crate_id = "public"
downloads = "public"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Drop the CloudFront invalidation queue table
DROP TABLE IF EXISTS cloudfront_invalidation_queue;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Create table for queuing CloudFront invalidation paths
CREATE TABLE cloudfront_invalidation_queue (
id BIGSERIAL PRIMARY KEY,
path TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);

COMMENT ON TABLE cloudfront_invalidation_queue IS 'Queue for batching CloudFront CDN invalidation requests';
COMMENT ON COLUMN cloudfront_invalidation_queue.id IS 'Unique identifier for each queued invalidation path';
COMMENT ON COLUMN cloudfront_invalidation_queue.path IS 'CloudFront path to invalidate (e.g. /crates/serde/serde-1.0.0.crate)';
COMMENT ON COLUMN cloudfront_invalidation_queue.created_at IS 'Timestamp when the path was queued for invalidation';

-- Index for efficient batch processing (oldest first)
CREATE INDEX idx_cloudfront_invalidation_queue_created_at
ON cloudfront_invalidation_queue (created_at);
50 changes: 38 additions & 12 deletions src/cloudfront.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
use aws_credential_types::Credentials;
use aws_sdk_cloudfront::config::retry::RetryConfig;
use aws_sdk_cloudfront::config::{BehaviorVersion, Region};
use aws_sdk_cloudfront::error::{BuildError, SdkError};
use aws_sdk_cloudfront::operation::create_invalidation::CreateInvalidationError;
use aws_sdk_cloudfront::types::{InvalidationBatch, Paths};
use aws_sdk_cloudfront::{Client, Config};
use tracing::{debug, instrument, warn};

#[derive(Debug, thiserror::Error)]
pub enum CloudFrontError {
#[error(transparent)]
BuildError(#[from] BuildError),
#[error(transparent)]
SdkError(Box<SdkError<CreateInvalidationError>>),
}

impl From<SdkError<CreateInvalidationError>> for CloudFrontError {
fn from(err: SdkError<CreateInvalidationError>) -> Self {
CloudFrontError::SdkError(Box::new(err))
}
}

impl CloudFrontError {
pub fn is_too_many_invalidations_error(&self) -> bool {
let CloudFrontError::SdkError(sdk_error) = self else {
return false;
};

let Some(service_error) = sdk_error.as_service_error() else {
return false;
};

service_error.is_too_many_invalidations_in_progress()
}
}

pub struct CloudFront {
client: Client,
distribution_id: String,
Expand Down Expand Up @@ -36,13 +66,13 @@ impl CloudFront {
/// Invalidate a file on CloudFront
///
/// `path` is the path to the file to invalidate, such as `config.json`, or `re/ge/regex`
pub async fn invalidate(&self, path: &str) -> anyhow::Result<()> {
pub async fn invalidate(&self, path: &str) -> Result<(), CloudFrontError> {
self.invalidate_many(vec![path.to_string()]).await
}

/// Invalidate multiple paths on Cloudfront.
#[instrument(skip(self))]
pub async fn invalidate_many(&self, mut paths: Vec<String>) -> anyhow::Result<()> {
pub async fn invalidate_many(&self, mut paths: Vec<String>) -> Result<(), CloudFrontError> {
let now = chrono::offset::Utc::now().timestamp_micros();

// We need to ensure that paths have a starting slash.
Expand Down Expand Up @@ -72,15 +102,11 @@ impl CloudFront {

debug!("Sending invalidation request");

match invalidation_request.send().await {
Ok(_) => {
debug!("Invalidation request successful");
Ok(())
}
Err(error) => {
warn!(?error, "Invalidation request failed");
Err(error.into())
}
}
Ok(invalidation_request
.send()
.await
.map(|_| ()) // We don't care about the result, just that it worked
.inspect(|_| debug!("Invalidation request successful"))
.inspect_err(|error| warn!(?error, "Invalidation request failed"))?)
}
}
20 changes: 17 additions & 3 deletions src/worker/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ use crate::cloudfront::CloudFront;
use crate::fastly::Fastly;
use crate::storage::Storage;
use crate::typosquat;
use crate::worker::jobs::ProcessCloudfrontInvalidationQueue;
use anyhow::Context;
use bon::Builder;
use crates_io_database::models::CloudFrontInvalidationQueueItem;
use crates_io_docs_rs::DocsRsClient;
use crates_io_index::{Repository, RepositoryConfig};
use crates_io_og_image::OgImageGenerator;
use crates_io_team_repo::TeamRepo;
use crates_io_worker::BackgroundJob;
use diesel_async::AsyncPgConnection;
use diesel_async::pooled_connection::deadpool::Pool;
use object_store::ObjectStore;
Expand Down Expand Up @@ -70,9 +73,20 @@ impl Environment {
}

/// Invalidate a file in all registered CDNs.
pub(crate) async fn invalidate_cdns(&self, path: &str) -> anyhow::Result<()> {
if let Some(cloudfront) = self.cloudfront() {
cloudfront.invalidate(path).await.context("CloudFront")?;
pub(crate) async fn invalidate_cdns(
&self,
conn: &mut AsyncPgConnection,
path: &str,
) -> anyhow::Result<()> {
// Queue CloudFront invalidations for batch processing instead of calling directly
if self.cloudfront().is_some() {
let paths = &[path.to_string()];
let result = CloudFrontInvalidationQueueItem::queue_paths(conn, paths).await;
result.context("Failed to queue CloudFront invalidation path")?;

// Schedule the processing job to handle the queued paths
let result = ProcessCloudfrontInvalidationQueue.enqueue(conn).await;
result.context("Failed to enqueue CloudFront invalidation processing job")?;
}

if let Some(fastly) = self.fastly() {
Expand Down
5 changes: 3 additions & 2 deletions src/worker/jobs/dump_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ impl BackgroundJob for DumpDb {
info!("Database dump tarball uploaded");

info!("Invalidating CDN caches…");
if let Err(error) = env.invalidate_cdns(TAR_PATH).await {
let mut conn = env.deadpool.get().await?;
if let Err(error) = env.invalidate_cdns(&mut conn, TAR_PATH).await {
warn!("Failed to invalidate CDN caches: {error}");
}

Expand All @@ -58,7 +59,7 @@ impl BackgroundJob for DumpDb {
info!("Database dump zip file uploaded");

info!("Invalidating CDN caches…");
if let Err(error) = env.invalidate_cdns(ZIP_PATH).await {
if let Err(error) = env.invalidate_cdns(&mut conn, ZIP_PATH).await {
warn!("Failed to invalidate CDN caches: {error}");
}

Expand Down
18 changes: 13 additions & 5 deletions src/worker/jobs/generate_og_image.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::models::OwnerKind;
use crate::schema::*;
use crate::worker::Environment;
use crate::worker::jobs::ProcessCloudfrontInvalidationQueue;
use anyhow::Context;
use crates_io_database::models::CloudFrontInvalidationQueueItem;
use crates_io_og_image::{OgImageAuthorData, OgImageData};
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
Expand Down Expand Up @@ -104,11 +106,17 @@ impl BackgroundJob for GenerateOgImage {
// Invalidate CDN cache for the OG image
let og_image_path = format!("og-images/{crate_name}.png");

// Invalidate CloudFront CDN
if let Some(cloudfront) = ctx.cloudfront()
&& let Err(error) = cloudfront.invalidate(&og_image_path).await
{
warn!("Failed to invalidate CloudFront CDN for {crate_name}: {error}");
// Queue CloudFront invalidation for batch processing
if ctx.cloudfront().is_some() {
let paths = std::slice::from_ref(&og_image_path);
let result = CloudFrontInvalidationQueueItem::queue_paths(&mut conn, paths).await;
if let Err(error) = result {
warn!("Failed to queue CloudFront invalidation for {crate_name}: {error}");
} else if let Err(error) = ProcessCloudfrontInvalidationQueue.enqueue(&mut conn).await {
warn!(
"Failed to enqueue CloudFront invalidation processing job for {crate_name}: {error}"
);
}
}

// Invalidate Fastly CDN
Expand Down
15 changes: 11 additions & 4 deletions src/worker/jobs/index/sync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::index::get_index_data;
use crate::tasks::spawn_blocking;
use crate::worker::Environment;
use crate::worker::jobs::ProcessCloudfrontInvalidationQueue;
use anyhow::Context;
use crates_io_database::models::CloudFrontInvalidationQueueItem;
use crates_io_index::Repository;
use crates_io_worker::BackgroundJob;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -113,12 +115,17 @@ impl BackgroundJob for SyncToSparseIndex {
let future = env.storage.sync_index(&self.krate, content);
future.await.context("Failed to sync index data")?;

if let Some(cloudfront) = env.cloudfront() {
if env.cloudfront().is_some() {
let path = Repository::relative_index_file_for_url(&self.krate);

info!(%path, "Invalidating index file on CloudFront");
let future = cloudfront.invalidate(&path);
future.await.context("Failed to invalidate CloudFront")?;
info!(%path, "Queuing index file invalidation on CloudFront");

let paths = &[path];
let result = CloudFrontInvalidationQueueItem::queue_paths(&mut conn, paths).await;
result.context("Failed to queue CloudFront invalidation path")?;

let result = ProcessCloudfrontInvalidationQueue.enqueue(&mut conn).await;
result.context("Failed to enqueue CloudFront invalidation processing job")?;
}
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions src/worker/jobs/index_version_downloads_archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ impl BackgroundJob for IndexVersionDownloadsArchive {
info!("index.json generated and uploaded");

info!("Invalidating CDN caches…");
if let Err(error) = env.invalidate_cdns(INDEX_PATH).await {
let mut conn = env.deadpool.get().await?;
if let Err(error) = env.invalidate_cdns(&mut conn, INDEX_PATH).await {
warn!("Failed to invalidate CDN caches: {error}");
}
if let Err(error) = env.invalidate_cdns(INDEX_JSON_PATH).await {
if let Err(error) = env.invalidate_cdns(&mut conn, INDEX_JSON_PATH).await {
warn!("Failed to invalidate CDN caches: {error}");
}
info!("CDN caches invalidated");
Expand Down
17 changes: 12 additions & 5 deletions src/worker/jobs/invalidate_cdns.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::sync::Arc;

use anyhow::Context;
use crates_io_database::models::CloudFrontInvalidationQueueItem;
use crates_io_worker::BackgroundJob;
use serde::{Deserialize, Serialize};

use crate::worker::Environment;
use crate::worker::jobs::ProcessCloudfrontInvalidationQueue;

/// A background job that invalidates the given paths on all CDNs in use on crates.io.
#[derive(Deserialize, Serialize)]
Expand Down Expand Up @@ -46,11 +48,16 @@ impl BackgroundJob for InvalidateCdns {
}
}

if let Some(cloudfront) = ctx.cloudfront() {
cloudfront
.invalidate_many(self.paths.clone())
.await
.context("Failed to invalidate paths on CloudFront CDN")?;
// Queue CloudFront invalidations for batch processing instead of calling directly
if ctx.cloudfront().is_some() {
let mut conn = ctx.deadpool.get().await?;

let result = CloudFrontInvalidationQueueItem::queue_paths(&mut conn, &self.paths).await;
result.context("Failed to queue CloudFront invalidation paths")?;

// Schedule the processing job to handle the queued paths
let result = ProcessCloudfrontInvalidationQueue.enqueue(&mut conn).await;
result.context("Failed to enqueue CloudFront invalidation processing job")?;
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions src/worker/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod generate_og_image;
mod index;
mod index_version_downloads_archive;
mod invalidate_cdns;
mod process_cloudfront_invalidation_queue;
mod readmes;
pub mod rss;
mod send_publish_notifications;
Expand All @@ -30,6 +31,7 @@ pub use self::generate_og_image::GenerateOgImage;
pub use self::index::{NormalizeIndex, SquashIndex, SyncToGitIndex, SyncToSparseIndex};
pub use self::index_version_downloads_archive::IndexVersionDownloadsArchive;
pub use self::invalidate_cdns::InvalidateCdns;
pub use self::process_cloudfront_invalidation_queue::ProcessCloudfrontInvalidationQueue;
pub use self::readmes::RenderAndUploadReadme;
pub use self::send_publish_notifications::SendPublishNotificationsJob;
pub use self::sync_admins::SyncAdmins;
Expand Down
Loading