Skip to content

Commit e0110a0

Browse files
committed
worker/jobs/downloads/queue: Remove spawn_blocking() usage
1 parent 841872d commit e0110a0

File tree

1 file changed

+15
-16
lines changed
  • src/worker/jobs/downloads/queue

1 file changed

+15
-16
lines changed

src/worker/jobs/downloads/queue/job.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
use crate::config::CdnLogQueueConfig;
22
use crate::sqs::{MockSqsQueue, SqsQueue, SqsQueueImpl};
3-
use crate::tasks::spawn_blocking;
4-
use crate::util::diesel::Conn;
53
use crate::worker::jobs::ProcessCdnLog;
64
use crate::worker::Environment;
75
use anyhow::Context;
86
use aws_credential_types::Credentials;
97
use aws_sdk_sqs::config::Region;
108
use aws_sdk_sqs::types::Message;
119
use crates_io_worker::BackgroundJob;
12-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
1310
use diesel_async::pooled_connection::deadpool::Pool;
1411
use diesel_async::AsyncPgConnection;
1512
use std::sync::Arc;
@@ -156,12 +153,9 @@ async fn process_body(body: &str, connection_pool: &Pool<AsyncPgConnection>) ->
156153
}
157154

158155
let conn = connection_pool.get().await;
159-
let conn = conn.context("Failed to acquire database connection")?;
160-
spawn_blocking(move || {
161-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
162-
enqueue_jobs(jobs, conn)
163-
})
164-
.await
156+
let mut conn = conn.context("Failed to acquire database connection")?;
157+
158+
enqueue_jobs(jobs, &mut conn).await
165159
}
166160

167161
/// Extracts a list of [`ProcessCdnLog`] jobs from a message.
@@ -207,12 +201,16 @@ fn is_ignored_path(path: &str) -> bool {
207201
path.contains("/index.staging.crates.io/") || path.contains("/index.crates.io/")
208202
}
209203

210-
fn enqueue_jobs(jobs: Vec<ProcessCdnLog>, conn: &mut impl Conn) -> anyhow::Result<()> {
204+
async fn enqueue_jobs(
205+
jobs: Vec<ProcessCdnLog>,
206+
conn: &mut AsyncPgConnection,
207+
) -> anyhow::Result<()> {
211208
for job in jobs {
212209
let path = &job.path;
213210

214211
info!("Enqueuing processing job… ({path})");
215-
job.enqueue(conn)
212+
job.async_enqueue(conn)
213+
.await
216214
.context("Failed to enqueue processing job")?;
217215

218216
debug!("Enqueued processing job");
@@ -230,8 +228,8 @@ mod tests {
230228
use crates_io_test_db::TestDatabase;
231229
use crates_io_worker::schema::background_jobs;
232230
use diesel::prelude::*;
233-
use diesel::QueryDsl;
234231
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
232+
use diesel_async::RunQueryDsl;
235233
use insta::assert_snapshot;
236234
use parking_lot::Mutex;
237235

@@ -262,7 +260,7 @@ mod tests {
262260
assert_ok!(run(&queue, 100, &connection_pool).await);
263261

264262
assert_snapshot!(deleted_handles.lock().join(","), @"123");
265-
assert_snapshot!(open_jobs(&mut test_database.connect()), @"us-west-1 | bucket | path");
263+
assert_snapshot!(open_jobs(&mut connection_pool.get().await.unwrap()).await, @"us-west-1 | bucket | path");
266264
}
267265

268266
#[tokio::test]
@@ -310,7 +308,7 @@ mod tests {
310308
assert_ok!(run(&queue, 100, &connection_pool).await);
311309

312310
assert_snapshot!(deleted_handles.lock().join(","), @"1,2,3,4,5,6,7,8,9,10,11");
313-
assert_snapshot!(open_jobs(&mut test_database.connect()), @r###"
311+
assert_snapshot!(open_jobs(&mut connection_pool.get().await.unwrap()).await, @r###"
314312
us-west-1 | bucket | path1
315313
us-west-1 | bucket | path2
316314
us-west-1 | bucket | path3
@@ -358,7 +356,7 @@ mod tests {
358356
assert_ok!(run(&queue, 100, &connection_pool).await);
359357

360358
assert_snapshot!(deleted_handles.lock().join(","), @"1");
361-
assert_snapshot!(open_jobs(&mut test_database.connect()), @"");
359+
assert_snapshot!(open_jobs(&mut connection_pool.get().await.unwrap()).await, @"");
362360
}
363361

364362
#[test]
@@ -419,10 +417,11 @@ mod tests {
419417
.build()
420418
}
421419

422-
fn open_jobs(conn: &mut impl Conn) -> String {
420+
async fn open_jobs(conn: &mut AsyncPgConnection) -> String {
423421
let jobs = background_jobs::table
424422
.select((background_jobs::job_type, background_jobs::data))
425423
.load::<(String, serde_json::Value)>(conn)
424+
.await
426425
.unwrap();
427426

428427
jobs.into_iter()

0 commit comments

Comments
 (0)