Skip to content

Commit a909a46

Browse files
committed
admin/enqueue_jobs: Remove spawn_blocking() usage
1 parent 1aaa5a6 commit a909a46

File tree

1 file changed

+109
-95
lines changed

1 file changed

+109
-95
lines changed

src/admin/enqueue_job.rs

Lines changed: 109 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use crate::db;
22
use crate::schema::{background_jobs, crates};
3-
use crate::tasks::spawn_blocking;
43
use crate::worker::jobs;
54
use anyhow::Result;
65
use chrono::NaiveDate;
76
use crates_io_worker::BackgroundJob;
87
use diesel::dsl::exists;
98
use diesel::prelude::*;
9+
use diesel_async::RunQueryDsl;
1010

1111
#[derive(clap::Parser, Debug)]
1212
#[command(
@@ -52,108 +52,122 @@ pub enum Command {
5252
}
5353

5454
pub async fn run(command: Command) -> Result<()> {
55-
spawn_blocking(move || {
56-
let conn = &mut db::oneoff_connection()?;
57-
println!("Enqueueing background job: {command:?}");
55+
let mut conn = db::oneoff_async_connection().await?;
56+
println!("Enqueueing background job: {command:?}");
5857

59-
match command {
60-
Command::ArchiveVersionDownloads { before } => {
61-
before
62-
.map(jobs::ArchiveVersionDownloads::before)
63-
.unwrap_or_default()
64-
.enqueue(conn)?;
65-
}
66-
Command::IndexVersionDownloadsArchive => {
67-
jobs::IndexVersionDownloadsArchive.enqueue(conn)?;
68-
}
69-
Command::UpdateDownloads => {
70-
let count: i64 = background_jobs::table
71-
.filter(background_jobs::job_type.eq(jobs::UpdateDownloads::JOB_NAME))
72-
.count()
73-
.get_result(conn)?;
58+
match command {
59+
Command::ArchiveVersionDownloads { before } => {
60+
before
61+
.map(jobs::ArchiveVersionDownloads::before)
62+
.unwrap_or_default()
63+
.async_enqueue(&mut conn)
64+
.await?;
65+
}
66+
Command::IndexVersionDownloadsArchive => {
67+
jobs::IndexVersionDownloadsArchive
68+
.async_enqueue(&mut conn)
69+
.await?;
70+
}
71+
Command::UpdateDownloads => {
72+
let count: i64 = background_jobs::table
73+
.filter(background_jobs::job_type.eq(jobs::UpdateDownloads::JOB_NAME))
74+
.count()
75+
.get_result(&mut conn)
76+
.await?;
77+
78+
if count > 0 {
79+
println!(
80+
"Did not enqueue {}, existing job already in progress",
81+
jobs::UpdateDownloads::JOB_NAME
82+
);
83+
} else {
84+
jobs::UpdateDownloads.async_enqueue(&mut conn).await?;
85+
}
86+
}
87+
Command::CleanProcessedLogFiles => {
88+
jobs::CleanProcessedLogFiles
89+
.async_enqueue(&mut conn)
90+
.await?;
91+
}
92+
Command::DumpDb => {
93+
jobs::DumpDb.async_enqueue(&mut conn).await?;
94+
}
95+
Command::SyncAdmins { force } => {
96+
if !force {
97+
// By default, we don't want to enqueue a sync if one is already
98+
// in progress. If a sync fails due to e.g. an expired pinned
99+
// certificate we don't want to keep adding new jobs to the
100+
// queue, since the existing job will be retried until it
101+
// succeeds.
74102

75-
if count > 0 {
76-
println!(
103+
let query = background_jobs::table
104+
.filter(background_jobs::job_type.eq(jobs::SyncAdmins::JOB_NAME));
105+
106+
if diesel::select(exists(query)).get_result(&mut conn).await? {
107+
info!(
77108
"Did not enqueue {}, existing job already in progress",
78-
jobs::UpdateDownloads::JOB_NAME
109+
jobs::SyncAdmins::JOB_NAME
79110
);
80-
} else {
81-
jobs::UpdateDownloads.enqueue(conn)?;
111+
return Ok(());
82112
}
83113
}
84-
Command::CleanProcessedLogFiles => {
85-
jobs::CleanProcessedLogFiles.enqueue(conn)?;
86-
}
87-
Command::DumpDb => {
88-
jobs::DumpDb.enqueue(conn)?;
89-
}
90-
Command::SyncAdmins { force } => {
91-
if !force {
92-
// By default, we don't want to enqueue a sync if one is already
93-
// in progress. If a sync fails due to e.g. an expired pinned
94-
// certificate we don't want to keep adding new jobs to the
95-
// queue, since the existing job will be retried until it
96-
// succeeds.
97114

98-
let query = background_jobs::table
99-
.filter(background_jobs::job_type.eq(jobs::SyncAdmins::JOB_NAME));
100-
101-
if diesel::select(exists(query)).get_result(conn)? {
102-
info!(
103-
"Did not enqueue {}, existing job already in progress",
104-
jobs::SyncAdmins::JOB_NAME
105-
);
106-
return Ok(());
107-
}
108-
}
109-
110-
jobs::SyncAdmins.enqueue(conn)?;
111-
}
112-
Command::DailyDbMaintenance => {
113-
jobs::DailyDbMaintenance.enqueue(conn)?;
114-
}
115-
Command::ProcessCdnLogQueue(job) => {
116-
job.enqueue(conn)?;
115+
jobs::SyncAdmins.async_enqueue(&mut conn).await?;
116+
}
117+
Command::DailyDbMaintenance => {
118+
jobs::DailyDbMaintenance.async_enqueue(&mut conn).await?;
119+
}
120+
Command::ProcessCdnLogQueue(job) => {
121+
job.async_enqueue(&mut conn).await?;
122+
}
123+
Command::SquashIndex => {
124+
jobs::SquashIndex.async_enqueue(&mut conn).await?;
125+
}
126+
Command::NormalizeIndex { dry_run } => {
127+
jobs::NormalizeIndex::new(dry_run)
128+
.async_enqueue(&mut conn)
129+
.await?;
130+
}
131+
Command::CheckTyposquat { name } => {
132+
// The job will fail if the crate doesn't actually exist, so let's check that up front.
133+
if crates::table
134+
.filter(crates::name.eq(&name))
135+
.count()
136+
.get_result::<i64>(&mut conn)
137+
.await?
138+
== 0
139+
{
140+
anyhow::bail!(
141+
"cannot enqueue a typosquat check for a crate that doesn't exist: {name}"
142+
);
117143
}
118-
Command::SquashIndex => {
119-
jobs::SquashIndex.enqueue(conn)?;
120-
}
121-
Command::NormalizeIndex { dry_run } => {
122-
jobs::NormalizeIndex::new(dry_run).enqueue(conn)?;
123-
}
124-
Command::CheckTyposquat { name } => {
125-
// The job will fail if the crate doesn't actually exist, so let's check that up front.
126-
if crates::table
127-
.filter(crates::name.eq(&name))
128-
.count()
129-
.get_result::<i64>(conn)?
130-
== 0
131-
{
132-
anyhow::bail!(
133-
"cannot enqueue a typosquat check for a crate that doesn't exist: {name}"
134-
);
135-
}
136144

137-
jobs::CheckTyposquat::new(&name).enqueue(conn)?;
138-
}
139-
Command::SendTokenExpiryNotifications => {
140-
jobs::SendTokenExpiryNotifications.enqueue(conn)?;
141-
}
142-
Command::SyncCratesFeed => {
143-
jobs::rss::SyncCratesFeed.enqueue(conn)?;
144-
}
145-
Command::SyncToGitIndex { name } => {
146-
jobs::SyncToGitIndex::new(name).enqueue(conn)?;
147-
}
148-
Command::SyncToSparseIndex { name } => {
149-
jobs::SyncToSparseIndex::new(name).enqueue(conn)?;
150-
}
151-
Command::SyncUpdatesFeed => {
152-
jobs::rss::SyncUpdatesFeed.enqueue(conn)?;
153-
}
154-
};
145+
jobs::CheckTyposquat::new(&name)
146+
.async_enqueue(&mut conn)
147+
.await?;
148+
}
149+
Command::SendTokenExpiryNotifications => {
150+
jobs::SendTokenExpiryNotifications
151+
.async_enqueue(&mut conn)
152+
.await?;
153+
}
154+
Command::SyncCratesFeed => {
155+
jobs::rss::SyncCratesFeed.async_enqueue(&mut conn).await?;
156+
}
157+
Command::SyncToGitIndex { name } => {
158+
jobs::SyncToGitIndex::new(name)
159+
.async_enqueue(&mut conn)
160+
.await?;
161+
}
162+
Command::SyncToSparseIndex { name } => {
163+
jobs::SyncToSparseIndex::new(name)
164+
.async_enqueue(&mut conn)
165+
.await?;
166+
}
167+
Command::SyncUpdatesFeed => {
168+
jobs::rss::SyncUpdatesFeed.async_enqueue(&mut conn).await?;
169+
}
170+
};
155171

156-
Ok(())
157-
})
158-
.await
172+
Ok(())
159173
}

0 commit comments

Comments
 (0)