Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion crates/crates_io_worker/src/background_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
use diesel::connection::LoadConnection;
use diesel::dsl::{exists, not};
use diesel::pg::Pg;
use diesel::prelude::*;
use diesel::sql_types::{Int2, Jsonb, Text};
use diesel::{ExpressionMethods, IntoSql, OptionalExtension, QueryDsl};
use diesel_async::AsyncPgConnection;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
Expand Down Expand Up @@ -53,6 +54,24 @@
Ok(Some(enqueue_simple(conn, Self::JOB_NAME, &data, priority)?))
}
}

#[allow(async_fn_in_trait)]
#[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))]

Check warning on line 59 in crates/crates_io_worker/src/background_job.rs

View check run for this annotation

Codecov / codecov/patch

crates/crates_io_worker/src/background_job.rs#L59

Added line #L59 was not covered by tests
async fn async_enqueue(
&self,
conn: &mut AsyncPgConnection,
) -> Result<Option<i64>, EnqueueError> {
let data = serde_json::to_value(self)?;
let priority = Self::PRIORITY;

if Self::DEDUPLICATED {
Ok(async_enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?)
} else {
Ok(Some(
async_enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?,
))
}
}
}

fn enqueue_deduplicated(
Expand All @@ -61,6 +80,8 @@
data: &Value,
priority: i16,
) -> Result<Option<i64>, EnqueueError> {
use diesel::RunQueryDsl;

let similar_jobs = background_jobs::table
.select(background_jobs::id)
.filter(background_jobs::job_type.eq(job_type))
Expand Down Expand Up @@ -90,12 +111,52 @@
Ok(id)
}

async fn async_enqueue_deduplicated(
conn: &mut AsyncPgConnection,
job_type: &str,
data: &Value,
priority: i16,
) -> Result<Option<i64>, EnqueueError> {

Check warning on line 119 in crates/crates_io_worker/src/background_job.rs

View check run for this annotation

Codecov / codecov/patch

crates/crates_io_worker/src/background_job.rs#L114-L119

Added lines #L114 - L119 were not covered by tests
use diesel_async::RunQueryDsl;

let similar_jobs = background_jobs::table
.select(background_jobs::id)
.filter(background_jobs::job_type.eq(job_type))
.filter(background_jobs::data.eq(data))
.filter(background_jobs::priority.eq(priority))
.for_update()
.skip_locked();

let deduplicated_select = diesel::select((
job_type.into_sql::<Text>(),
data.into_sql::<Jsonb>(),
priority.into_sql::<Int2>(),
))
.filter(not(exists(similar_jobs)));

Check warning on line 135 in crates/crates_io_worker/src/background_job.rs

View check run for this annotation

Codecov / codecov/patch

crates/crates_io_worker/src/background_job.rs#L122-L135

Added lines #L122 - L135 were not covered by tests

let id = diesel::insert_into(background_jobs::table)
.values(deduplicated_select)
.into_columns((
background_jobs::job_type,
background_jobs::data,
background_jobs::priority,
))
.returning(background_jobs::id)
.get_result::<i64>(conn)
.await
.optional()?;

Check warning on line 147 in crates/crates_io_worker/src/background_job.rs

View check run for this annotation

Codecov / codecov/patch

crates/crates_io_worker/src/background_job.rs#L137-L147

Added lines #L137 - L147 were not covered by tests

Ok(id)
}

Check warning on line 150 in crates/crates_io_worker/src/background_job.rs

View check run for this annotation

Codecov / codecov/patch

crates/crates_io_worker/src/background_job.rs#L149-L150

Added lines #L149 - L150 were not covered by tests

fn enqueue_simple(
conn: &mut impl LoadConnection<Backend = Pg>,
job_type: &str,
data: &Value,
priority: i16,
) -> Result<i64, EnqueueError> {
use diesel::RunQueryDsl;

let id = diesel::insert_into(background_jobs::table)
.values((
background_jobs::job_type.eq(job_type),
Expand All @@ -107,3 +168,24 @@

Ok(id)
}

async fn async_enqueue_simple(
conn: &mut AsyncPgConnection,
job_type: &str,
data: &Value,
priority: i16,
) -> Result<i64, EnqueueError> {

Check warning on line 177 in crates/crates_io_worker/src/background_job.rs

View check run for this annotation

Codecov / codecov/patch

crates/crates_io_worker/src/background_job.rs#L172-L177

Added lines #L172 - L177 were not covered by tests
use diesel_async::RunQueryDsl;

let id = diesel::insert_into(background_jobs::table)
.values((
background_jobs::job_type.eq(job_type),
background_jobs::data.eq(data),
background_jobs::priority.eq(priority),
))
.returning(background_jobs::id)
.get_result(conn)
.await?;

Check warning on line 188 in crates/crates_io_worker/src/background_job.rs

View check run for this annotation

Codecov / codecov/patch

crates/crates_io_worker/src/background_job.rs#L180-L188

Added lines #L180 - L188 were not covered by tests

Ok(id)
}

Check warning on line 191 in crates/crates_io_worker/src/background_job.rs

View check run for this annotation

Codecov / codecov/patch

crates/crates_io_worker/src/background_job.rs#L190-L191

Added lines #L190 - L191 were not covered by tests
93 changes: 44 additions & 49 deletions src/admin/delete_crate.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::schema::{crate_owners, teams, users};
use crate::tasks::spawn_blocking;
use crate::worker::jobs;
use crate::{admin::dialoguer, db, schema::crates};
use anyhow::Context;
use crates_io_worker::BackgroundJob;
use diesel::dsl::sql;
use diesel::sql_types::Text;
use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl};
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::RunQueryDsl;
use std::collections::HashMap;

#[derive(clap::Parser, Debug)]
Expand All @@ -34,24 +33,21 @@
let mut crate_names = opts.crate_names;
crate_names.sort();

let query_result = {
use diesel_async::RunQueryDsl;

crates::table
.select((
crates::name,
crates::id,
sql::<Text>(
"CASE WHEN crate_owners.owner_kind = 1 THEN teams.login ELSE users.gh_login END",
),
))
.left_join(crate_owners::table.on(crate_owners::crate_id.eq(crates::id)))
.left_join(teams::table.on(teams::id.eq(crate_owners::owner_id)))
.left_join(users::table.on(users::id.eq(crate_owners::owner_id)))
.filter(crates::name.eq_any(&crate_names))
.load::<(String, i32, String)>(&mut conn).await
.context("Failed to look up crate name from the database")
}?;
let query_result = crates::table
.select((
crates::name,
crates::id,
sql::<Text>(
"CASE WHEN crate_owners.owner_kind = 1 THEN teams.login ELSE users.gh_login END",
),
))
.left_join(crate_owners::table.on(crate_owners::crate_id.eq(crates::id)))
.left_join(teams::table.on(teams::id.eq(crate_owners::owner_id)))
.left_join(users::table.on(users::id.eq(crate_owners::owner_id)))
.filter(crates::name.eq_any(&crate_names))
.load::<(String, i32, String)>(&mut conn)
.await
.context("Failed to look up crate name from the database")?;

Check warning on line 50 in src/admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_crate.rs#L36-L50

Added lines #L36 - L50 were not covered by tests

let mut existing_crates: HashMap<String, (i32, Vec<String>)> = HashMap::new();
for (name, id, login) in query_result {
Expand Down Expand Up @@ -81,37 +77,36 @@
return Ok(());
}

spawn_blocking(move || {
use diesel::RunQueryDsl;

let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();

for name in &crate_names {
if let Some((id, _)) = existing_crates.get(name) {
info!("{name}: Deleting crate from the database…");
if let Err(error) = diesel::delete(crates::table.find(id)).execute(conn) {
warn!(%id, "{name}: Failed to delete crate from the database: {error}");
}
} else {
info!("{name}: Skipped missing crate");
};

info!("{name}: Enqueuing index sync jobs…");
if let Err(error) = jobs::SyncToGitIndex::new(name).enqueue(conn) {
warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}");
}
if let Err(error) = jobs::SyncToSparseIndex::new(name).enqueue(conn) {
warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}");
for name in &crate_names {
if let Some((id, _)) = existing_crates.get(name) {
info!("{name}: Deleting crate from the database…");
if let Err(error) = diesel::delete(crates::table.find(id))
.execute(&mut conn)
.await

Check warning on line 85 in src/admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_crate.rs#L80-L85

Added lines #L80 - L85 were not covered by tests
{
warn!(%id, "{name}: Failed to delete crate from the database: {error}");

Check warning on line 87 in src/admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_crate.rs#L87

Added line #L87 was not covered by tests
}
} else {
info!("{name}: Skipped missing crate");

Check warning on line 90 in src/admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_crate.rs#L90

Added line #L90 was not covered by tests
};

info!("{name}: Enqueuing index sync jobs…");
let job = jobs::SyncToGitIndex::new(name);
if let Err(error) = job.async_enqueue(&mut conn).await {
warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}");
}

Check warning on line 97 in src/admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_crate.rs#L93-L97

Added lines #L93 - L97 were not covered by tests

info!("{name}: Enqueuing DeleteCrateFromStorage job…");
let job = jobs::DeleteCrateFromStorage::new(name.into());
if let Err(error) = job.enqueue(conn) {
warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}");
}
let job = jobs::SyncToSparseIndex::new(name);
if let Err(error) = job.async_enqueue(&mut conn).await {
warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}");

Check warning on line 101 in src/admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_crate.rs#L99-L101

Added lines #L99 - L101 were not covered by tests
}

Ok(())
})
.await
info!("{name}: Enqueuing DeleteCrateFromStorage job…");
let job = jobs::DeleteCrateFromStorage::new(name.into());
if let Err(error) = job.async_enqueue(&mut conn).await {
warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}");
}

Check warning on line 108 in src/admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_crate.rs#L104-L108

Added lines #L104 - L108 were not covered by tests
}

Ok(())

Check warning on line 111 in src/admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_crate.rs#L111

Added line #L111 was not covered by tests
}
Loading