Skip to content

Commit 14cbb3f

Browse files
committed
worker: Remove spawn_blocking() usage
1 parent 743ad44 commit 14cbb3f

File tree

3 files changed

+36
-38
lines changed

3 files changed

+36
-38
lines changed

crates/crates_io_worker/src/runner.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,13 @@ use crate::job_registry::JobRegistry;
33
use crate::worker::Worker;
44
use crate::{storage, BackgroundJob};
55
use anyhow::anyhow;
6-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
76
use diesel_async::pooled_connection::deadpool::Pool;
87
use diesel_async::AsyncPgConnection;
98
use futures_util::future::join_all;
109
use std::collections::HashMap;
1110
use std::sync::Arc;
1211
use std::time::Duration;
13-
use tokio::task::{spawn_blocking, JoinHandle};
12+
use tokio::task::JoinHandle;
1413
use tracing::{info, info_span, warn, Instrument};
1514

1615
const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(1);
@@ -97,19 +96,14 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
9796
/// This function is intended for use in tests and will return an error if
9897
/// any jobs have failed.
9998
pub async fn check_for_failed_jobs(&self) -> anyhow::Result<()> {
100-
let conn = self.connection_pool.get().await?;
101-
spawn_blocking(move || {
102-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
103-
104-
let failed_jobs = storage::failed_job_count(conn)?;
105-
if failed_jobs == 0 {
106-
Ok(())
107-
} else {
108-
Err(anyhow!("{failed_jobs} jobs failed"))
109-
}
110-
})
111-
.await
112-
.map_err(|err| anyhow!(err.to_string()))?
99+
let mut conn = self.connection_pool.get().await?;
100+
101+
let failed_jobs = storage::failed_job_count(&mut conn).await?;
102+
if failed_jobs == 0 {
103+
Ok(())
104+
} else {
105+
Err(anyhow!("{failed_jobs} jobs failed"))
106+
}
113107
}
114108
}
115109

crates/crates_io_worker/src/storage.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::schema::background_jobs;
2-
use diesel::connection::LoadConnection;
32
use diesel::dsl::now;
43
use diesel::pg::Pg;
54
use diesel::prelude::*;
65
use diesel::sql_types::{Bool, Integer, Interval};
76
use diesel::{delete, update};
7+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
88

99
#[derive(Queryable, Selectable, Identifiable, Debug, Clone)]
1010
pub(super) struct BackgroundJob {
@@ -26,8 +26,8 @@ fn retriable() -> Box<dyn BoxableExpression<background_jobs::table, Pg, SqlType
2626

2727
/// Finds the next job that is unlocked, and ready to be retried. If a row is
2828
/// found, it will be locked.
29-
pub(super) fn find_next_unlocked_job(
30-
conn: &mut impl LoadConnection<Backend = Pg>,
29+
pub(super) async fn find_next_unlocked_job(
30+
conn: &mut AsyncPgConnection,
3131
job_types: &[String],
3232
) -> QueryResult<BackgroundJob> {
3333
background_jobs::table
@@ -38,34 +38,39 @@ pub(super) fn find_next_unlocked_job(
3838
.for_update()
3939
.skip_locked()
4040
.first::<BackgroundJob>(conn)
41+
.await
4142
}
4243

4344
/// The number of jobs that have failed at least once
44-
pub(super) fn failed_job_count(conn: &mut impl LoadConnection<Backend = Pg>) -> QueryResult<i64> {
45+
pub(super) async fn failed_job_count(conn: &mut AsyncPgConnection) -> QueryResult<i64> {
4546
background_jobs::table
4647
.count()
4748
.filter(background_jobs::retries.gt(0))
4849
.get_result(conn)
50+
.await
4951
}
5052

5153
/// Deletes a job that has successfully completed running
52-
pub(super) fn delete_successful_job(
53-
conn: &mut impl LoadConnection<Backend = Pg>,
54+
pub(super) async fn delete_successful_job(
55+
conn: &mut AsyncPgConnection,
5456
job_id: i64,
5557
) -> QueryResult<()> {
56-
delete(background_jobs::table.find(job_id)).execute(conn)?;
58+
delete(background_jobs::table.find(job_id))
59+
.execute(conn)
60+
.await?;
5761
Ok(())
5862
}
5963

6064
/// Marks that we just tried and failed to run a job.
6165
///
6266
/// Ignores any database errors that may have occurred. If the DB has gone away,
6367
/// we assume that just trying again with a new connection will succeed.
64-
pub(super) fn update_failed_job(conn: &mut impl LoadConnection<Backend = Pg>, job_id: i64) {
68+
pub(super) async fn update_failed_job(conn: &mut AsyncPgConnection, job_id: i64) {
6569
let _ = update(background_jobs::table.find(job_id))
6670
.set((
6771
background_jobs::retries.eq(background_jobs::retries + 1),
6872
background_jobs::last_retry.eq(now),
6973
))
70-
.execute(conn);
74+
.execute(conn)
75+
.await;
7176
}

crates/crates_io_worker/src/worker.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,15 @@ use crate::storage;
33
use crate::util::{try_to_extract_panic_info, with_sentry_transaction};
44
use anyhow::anyhow;
55
use diesel::prelude::*;
6-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
76
use diesel_async::pooled_connection::deadpool::Pool;
8-
use diesel_async::AsyncPgConnection;
7+
use diesel_async::scoped_futures::ScopedFutureExt;
8+
use diesel_async::{AsyncConnection, AsyncPgConnection};
99
use futures_util::FutureExt;
1010
use sentry_core::{Hub, SentryFutureExt};
1111
use std::panic::AssertUnwindSafe;
1212
use std::sync::Arc;
1313
use std::time::Duration;
1414
use tokio::runtime::Handle;
15-
use tokio::task::spawn_blocking;
1615
use tokio::time::sleep;
1716
use tracing::{debug, error, info_span, warn};
1817

@@ -58,15 +57,15 @@ impl<Context: Clone + Send + Sync + 'static> Worker<Context> {
5857
async fn run_next_job(&self) -> anyhow::Result<Option<i64>> {
5958
let context = self.context.clone();
6059
let job_registry = self.job_registry.clone();
61-
let conn = self.connection_pool.get().await?;
60+
let mut conn = self.connection_pool.get().await?;
6261

63-
spawn_blocking(move || {
64-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
65-
66-
let job_types = job_registry.job_types();
67-
conn.transaction(|conn| {
62+
let job_types = job_registry.job_types();
63+
conn.transaction(|conn| {
64+
async move {
6865
debug!("Looking for next background worker job…");
69-
let Some(job) = storage::find_next_unlocked_job(conn, &job_types).optional()?
66+
let Some(job) = storage::find_next_unlocked_job(conn, &job_types)
67+
.await
68+
.optional()?
7069
else {
7170
return Ok(None);
7271
};
@@ -95,18 +94,18 @@ impl<Context: Clone + Send + Sync + 'static> Worker<Context> {
9594
match result {
9695
Ok(_) => {
9796
debug!("Deleting successful job…");
98-
storage::delete_successful_job(conn, job_id)?
97+
storage::delete_successful_job(conn, job_id).await?
9998
}
10099
Err(error) => {
101100
warn!("Failed to run job: {error}");
102-
storage::update_failed_job(conn, job_id);
101+
storage::update_failed_job(conn, job_id).await;
103102
}
104103
}
105104

106105
Ok(Some(job_id))
107-
})
106+
}
107+
.scope_boxed()
108108
})
109109
.await
110-
.map_err(|err| anyhow!(err.to_string()))?
111110
}
112111
}

0 commit comments

Comments
 (0)