Skip to content

Commit f141bc5

Browse files
committed
database: Implement CloudFrontInvalidationQueueItem models
1 parent 5303414 commit f141bc5

File tree

2 files changed

+76
-0
lines changed

2 files changed

+76
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use crate::schema::cloudfront_invalidation_queue;
2+
use diesel::dsl::{count, exists};
3+
use diesel::prelude::*;
4+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
5+
6+
#[derive(Debug, Identifiable, Queryable, QueryableByName, Selectable)]
7+
#[diesel(table_name = cloudfront_invalidation_queue, check_for_backend(diesel::pg::Pg))]
8+
pub struct CloudFrontInvalidationQueueItem {
9+
pub id: i64,
10+
pub path: String,
11+
pub created_at: chrono::DateTime<chrono::Utc>,
12+
}
13+
14+
#[derive(Debug, Insertable)]
15+
#[diesel(table_name = cloudfront_invalidation_queue, check_for_backend(diesel::pg::Pg))]
16+
pub struct NewCloudFrontInvalidationQueueItem<'a> {
17+
pub path: &'a str,
18+
}
19+
20+
impl CloudFrontInvalidationQueueItem {
21+
/// Queue multiple invalidation paths for later processing
22+
pub async fn queue_paths(conn: &mut AsyncPgConnection, paths: &[String]) -> QueryResult<usize> {
23+
let new_items: Vec<_> = paths
24+
.iter()
25+
.map(|path| NewCloudFrontInvalidationQueueItem { path })
26+
.collect();
27+
28+
diesel::insert_into(cloudfront_invalidation_queue::table)
29+
.values(&new_items)
30+
.execute(conn)
31+
.await
32+
}
33+
34+
/// Fetch the oldest paths from the queue
35+
pub async fn fetch_batch(
36+
conn: &mut AsyncPgConnection,
37+
limit: i64,
38+
) -> QueryResult<Vec<CloudFrontInvalidationQueueItem>> {
39+
// Fetch the oldest entries up to the limit
40+
cloudfront_invalidation_queue::table
41+
.order(cloudfront_invalidation_queue::created_at.asc())
42+
.limit(limit)
43+
.load(conn)
44+
.await
45+
}
46+
47+
/// Remove queue items by their IDs
48+
pub async fn remove_items(
49+
conn: &mut AsyncPgConnection,
50+
item_ids: &[i64],
51+
) -> QueryResult<usize> {
52+
diesel::delete(cloudfront_invalidation_queue::table)
53+
.filter(cloudfront_invalidation_queue::id.eq_any(item_ids))
54+
.execute(conn)
55+
.await
56+
}
57+
58+
/// Check if there are more items in the queue
59+
pub async fn has_pending_items(conn: &mut AsyncPgConnection) -> QueryResult<bool> {
60+
diesel::select(exists(
61+
cloudfront_invalidation_queue::table.select(cloudfront_invalidation_queue::id),
62+
))
63+
.get_result(conn)
64+
.await
65+
}
66+
67+
/// Get current queue depth for monitoring
68+
pub async fn get_queue_depth(conn: &mut AsyncPgConnection) -> QueryResult<i64> {
69+
cloudfront_invalidation_queue::table
70+
.select(count(cloudfront_invalidation_queue::id))
71+
.first(conn)
72+
.await
73+
}
74+
}

crates/crates_io_database/src/models/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub use self::action::{NewVersionOwnerAction, VersionAction, VersionOwnerAction};
22
pub use self::category::{Category, CrateCategory, NewCategory};
3+
pub use self::cloudfront_invalidation_queue::CloudFrontInvalidationQueueItem;
34
pub use self::crate_owner_invitation::{
45
CrateOwnerInvitation, NewCrateOwnerInvitation, NewCrateOwnerInvitationOutcome,
56
};
@@ -22,6 +23,7 @@ pub mod helpers;
2223

2324
mod action;
2425
pub mod category;
26+
mod cloudfront_invalidation_queue;
2527
pub mod crate_owner_invitation;
2628
pub mod default_versions;
2729
mod deleted_crate;

0 commit comments

Comments
 (0)