Skip to content

Commit c553294

Browse files
authored
Merge pull request #9632 from Turbo87/async-enqueue
worker: Implement async `enqueue()` fn
2 parents 94f1f0b + a909a46 commit c553294

File tree

3 files changed

+236
-145
lines changed

3 files changed

+236
-145
lines changed

crates/crates_io_worker/src/background_job.rs

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ use crate::schema::background_jobs;
33
use diesel::connection::LoadConnection;
44
use diesel::dsl::{exists, not};
55
use diesel::pg::Pg;
6-
use diesel::prelude::*;
76
use diesel::sql_types::{Int2, Jsonb, Text};
7+
use diesel::{ExpressionMethods, IntoSql, OptionalExtension, QueryDsl};
8+
use diesel_async::AsyncPgConnection;
89
use serde::de::DeserializeOwned;
910
use serde::Serialize;
1011
use serde_json::Value;
@@ -53,6 +54,24 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
5354
Ok(Some(enqueue_simple(conn, Self::JOB_NAME, &data, priority)?))
5455
}
5556
}
57+
58+
#[allow(async_fn_in_trait)]
59+
#[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))]
60+
async fn async_enqueue(
61+
&self,
62+
conn: &mut AsyncPgConnection,
63+
) -> Result<Option<i64>, EnqueueError> {
64+
let data = serde_json::to_value(self)?;
65+
let priority = Self::PRIORITY;
66+
67+
if Self::DEDUPLICATED {
68+
Ok(async_enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?)
69+
} else {
70+
Ok(Some(
71+
async_enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?,
72+
))
73+
}
74+
}
5675
}
5776

5877
fn enqueue_deduplicated(
@@ -61,6 +80,8 @@ fn enqueue_deduplicated(
6180
data: &Value,
6281
priority: i16,
6382
) -> Result<Option<i64>, EnqueueError> {
83+
use diesel::RunQueryDsl;
84+
6485
let similar_jobs = background_jobs::table
6586
.select(background_jobs::id)
6687
.filter(background_jobs::job_type.eq(job_type))
@@ -90,12 +111,52 @@ fn enqueue_deduplicated(
90111
Ok(id)
91112
}
92113

114+
async fn async_enqueue_deduplicated(
115+
conn: &mut AsyncPgConnection,
116+
job_type: &str,
117+
data: &Value,
118+
priority: i16,
119+
) -> Result<Option<i64>, EnqueueError> {
120+
use diesel_async::RunQueryDsl;
121+
122+
let similar_jobs = background_jobs::table
123+
.select(background_jobs::id)
124+
.filter(background_jobs::job_type.eq(job_type))
125+
.filter(background_jobs::data.eq(data))
126+
.filter(background_jobs::priority.eq(priority))
127+
.for_update()
128+
.skip_locked();
129+
130+
let deduplicated_select = diesel::select((
131+
job_type.into_sql::<Text>(),
132+
data.into_sql::<Jsonb>(),
133+
priority.into_sql::<Int2>(),
134+
))
135+
.filter(not(exists(similar_jobs)));
136+
137+
let id = diesel::insert_into(background_jobs::table)
138+
.values(deduplicated_select)
139+
.into_columns((
140+
background_jobs::job_type,
141+
background_jobs::data,
142+
background_jobs::priority,
143+
))
144+
.returning(background_jobs::id)
145+
.get_result::<i64>(conn)
146+
.await
147+
.optional()?;
148+
149+
Ok(id)
150+
}
151+
93152
fn enqueue_simple(
94153
conn: &mut impl LoadConnection<Backend = Pg>,
95154
job_type: &str,
96155
data: &Value,
97156
priority: i16,
98157
) -> Result<i64, EnqueueError> {
158+
use diesel::RunQueryDsl;
159+
99160
let id = diesel::insert_into(background_jobs::table)
100161
.values((
101162
background_jobs::job_type.eq(job_type),
@@ -107,3 +168,24 @@ fn enqueue_simple(
107168

108169
Ok(id)
109170
}
171+
172+
async fn async_enqueue_simple(
173+
conn: &mut AsyncPgConnection,
174+
job_type: &str,
175+
data: &Value,
176+
priority: i16,
177+
) -> Result<i64, EnqueueError> {
178+
use diesel_async::RunQueryDsl;
179+
180+
let id = diesel::insert_into(background_jobs::table)
181+
.values((
182+
background_jobs::job_type.eq(job_type),
183+
background_jobs::data.eq(data),
184+
background_jobs::priority.eq(priority),
185+
))
186+
.returning(background_jobs::id)
187+
.get_result(conn)
188+
.await?;
189+
190+
Ok(id)
191+
}

src/admin/delete_crate.rs

Lines changed: 44 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
use crate::schema::{crate_owners, teams, users};
2-
use crate::tasks::spawn_blocking;
32
use crate::worker::jobs;
43
use crate::{admin::dialoguer, db, schema::crates};
54
use anyhow::Context;
65
use crates_io_worker::BackgroundJob;
76
use diesel::dsl::sql;
87
use diesel::sql_types::Text;
98
use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl};
10-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
9+
use diesel_async::RunQueryDsl;
1110
use std::collections::HashMap;
1211

1312
#[derive(clap::Parser, Debug)]
@@ -34,24 +33,21 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
3433
let mut crate_names = opts.crate_names;
3534
crate_names.sort();
3635

37-
let query_result = {
38-
use diesel_async::RunQueryDsl;
39-
40-
crates::table
41-
.select((
42-
crates::name,
43-
crates::id,
44-
sql::<Text>(
45-
"CASE WHEN crate_owners.owner_kind = 1 THEN teams.login ELSE users.gh_login END",
46-
),
47-
))
48-
.left_join(crate_owners::table.on(crate_owners::crate_id.eq(crates::id)))
49-
.left_join(teams::table.on(teams::id.eq(crate_owners::owner_id)))
50-
.left_join(users::table.on(users::id.eq(crate_owners::owner_id)))
51-
.filter(crates::name.eq_any(&crate_names))
52-
.load::<(String, i32, String)>(&mut conn).await
53-
.context("Failed to look up crate name from the database")
54-
}?;
36+
let query_result = crates::table
37+
.select((
38+
crates::name,
39+
crates::id,
40+
sql::<Text>(
41+
"CASE WHEN crate_owners.owner_kind = 1 THEN teams.login ELSE users.gh_login END",
42+
),
43+
))
44+
.left_join(crate_owners::table.on(crate_owners::crate_id.eq(crates::id)))
45+
.left_join(teams::table.on(teams::id.eq(crate_owners::owner_id)))
46+
.left_join(users::table.on(users::id.eq(crate_owners::owner_id)))
47+
.filter(crates::name.eq_any(&crate_names))
48+
.load::<(String, i32, String)>(&mut conn)
49+
.await
50+
.context("Failed to look up crate name from the database")?;
5551

5652
let mut existing_crates: HashMap<String, (i32, Vec<String>)> = HashMap::new();
5753
for (name, id, login) in query_result {
@@ -81,37 +77,36 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
8177
return Ok(());
8278
}
8379

84-
spawn_blocking(move || {
85-
use diesel::RunQueryDsl;
86-
87-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
88-
89-
for name in &crate_names {
90-
if let Some((id, _)) = existing_crates.get(name) {
91-
info!("{name}: Deleting crate from the database…");
92-
if let Err(error) = diesel::delete(crates::table.find(id)).execute(conn) {
93-
warn!(%id, "{name}: Failed to delete crate from the database: {error}");
94-
}
95-
} else {
96-
info!("{name}: Skipped missing crate");
97-
};
98-
99-
info!("{name}: Enqueuing index sync jobs…");
100-
if let Err(error) = jobs::SyncToGitIndex::new(name).enqueue(conn) {
101-
warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}");
102-
}
103-
if let Err(error) = jobs::SyncToSparseIndex::new(name).enqueue(conn) {
104-
warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}");
80+
for name in &crate_names {
81+
if let Some((id, _)) = existing_crates.get(name) {
82+
info!("{name}: Deleting crate from the database…");
83+
if let Err(error) = diesel::delete(crates::table.find(id))
84+
.execute(&mut conn)
85+
.await
86+
{
87+
warn!(%id, "{name}: Failed to delete crate from the database: {error}");
10588
}
89+
} else {
90+
info!("{name}: Skipped missing crate");
91+
};
92+
93+
info!("{name}: Enqueuing index sync jobs…");
94+
let job = jobs::SyncToGitIndex::new(name);
95+
if let Err(error) = job.async_enqueue(&mut conn).await {
96+
warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}");
97+
}
10698

107-
info!("{name}: Enqueuing DeleteCrateFromStorage job…");
108-
let job = jobs::DeleteCrateFromStorage::new(name.into());
109-
if let Err(error) = job.enqueue(conn) {
110-
warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}");
111-
}
99+
let job = jobs::SyncToSparseIndex::new(name);
100+
if let Err(error) = job.async_enqueue(&mut conn).await {
101+
warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}");
112102
}
113103

114-
Ok(())
115-
})
116-
.await
104+
info!("{name}: Enqueuing DeleteCrateFromStorage job…");
105+
let job = jobs::DeleteCrateFromStorage::new(name.into());
106+
if let Err(error) = job.async_enqueue(&mut conn).await {
107+
warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}");
108+
}
109+
}
110+
111+
Ok(())
117112
}

0 commit comments

Comments
 (0)