Skip to content

Commit 8808af0

Browse files
committed
worker/jobs/archive_version_downloads: Reduce spawn_blocking() usage
1 parent 841872d commit 8808af0

File tree

1 file changed

+38
-37
lines changed

1 file changed

+38
-37
lines changed

src/worker/jobs/archive_version_downloads.rs

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
use crate::schema::version_downloads;
22
use crate::tasks::spawn_blocking;
3-
use crate::util::diesel::Conn;
43
use crate::worker::Environment;
54
use anyhow::{anyhow, Context};
65
use chrono::{NaiveDate, Utc};
76
use crates_io_worker::BackgroundJob;
87
use diesel::prelude::*;
9-
use diesel::{ExpressionMethods, RunQueryDsl};
10-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
118
use diesel_async::pooled_connection::deadpool::Pool;
12-
use diesel_async::AsyncPgConnection;
9+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
1310
use futures_util::StreamExt;
1411
use object_store::ObjectStore;
1512
use secrecy::{ExposeSecret, SecretString};
@@ -222,23 +219,19 @@ async fn upload_file(store: &impl ObjectStore, path: impl AsRef<Path>) -> anyhow
222219

223220
/// Delete version downloads for the given dates from the database.
224221
async fn delete(db_pool: &Pool<AsyncPgConnection>, dates: Vec<NaiveDate>) -> anyhow::Result<()> {
225-
let conn = db_pool.get().await?;
226-
spawn_blocking(move || {
227-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
228-
delete_inner(conn, dates)
229-
})
230-
.await
222+
let mut conn = db_pool.get().await?;
223+
delete_inner(&mut conn, dates).await
231224
}
232225

233-
fn delete_inner(conn: &mut impl Conn, dates: Vec<NaiveDate>) -> anyhow::Result<()> {
226+
async fn delete_inner(conn: &mut AsyncPgConnection, dates: Vec<NaiveDate>) -> anyhow::Result<()> {
234227
// Delete version downloads for the given dates in chunks to avoid running
235228
// into the maximum query parameter limit.
236229
const CHUNK_SIZE: usize = 5000;
237230

238231
info!("Deleting old version downloads for {} dates…", dates.len());
239232
for chunk in dates.chunks(CHUNK_SIZE) {
240233
let subset = version_downloads::table.filter(version_downloads::date.eq_any(chunk));
241-
match diesel::delete(subset).execute(conn) {
234+
match diesel::delete(subset).execute(conn).await {
242235
Ok(num_deleted_rows) => {
243236
info!("Deleted {num_deleted_rows} rows from `version_downloads`");
244237
}
@@ -252,14 +245,12 @@ fn delete_inner(conn: &mut impl Conn, dates: Vec<NaiveDate>) -> anyhow::Result<(
252245
}
253246

254247
async fn enqueue_index_job(db_pool: &Pool<AsyncPgConnection>) -> anyhow::Result<()> {
255-
let conn = db_pool.get().await?;
256-
spawn_blocking(move || {
257-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
258-
super::IndexVersionDownloadsArchive
259-
.enqueue(conn)
260-
.context("Failed to enqueue IndexVersionDownloadsArchive job")
261-
})
262-
.await?;
248+
let mut conn = db_pool.get().await?;
249+
250+
super::IndexVersionDownloadsArchive
251+
.async_enqueue(&mut conn)
252+
.await
253+
.context("Failed to enqueue IndexVersionDownloadsArchive job")?;
263254

264255
Ok(())
265256
}
@@ -270,13 +261,14 @@ mod tests {
270261
use crate::schema::{crates, version_downloads, versions};
271262
use crates_io_test_db::TestDatabase;
272263
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
264+
use diesel_async::AsyncConnection;
273265
use insta::assert_snapshot;
274266

275267
#[tokio::test]
276268
async fn test_export() {
277269
let test_db = TestDatabase::new();
278-
let mut conn = test_db.connect();
279-
prepare_database(&mut conn);
270+
let mut conn = AsyncPgConnection::establish(test_db.url()).await.unwrap();
271+
prepare_database(&mut conn).await;
280272

281273
let tempdir = tempdir().unwrap();
282274
let csv_path = tempdir.path().join(FILE_NAME);
@@ -377,8 +369,8 @@ mod tests {
377369
#[tokio::test]
378370
async fn test_delete() {
379371
let test_db = TestDatabase::new();
380-
let mut conn = test_db.connect();
381-
prepare_database(&mut conn);
372+
let mut conn = AsyncPgConnection::establish(test_db.url()).await.unwrap();
373+
prepare_database(&mut conn).await;
382374

383375
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(test_db.url());
384376
let db_pool = Pool::builder(manager).build().unwrap();
@@ -388,31 +380,33 @@ mod tests {
388380
let row_count: i64 = version_downloads::table
389381
.count()
390382
.get_result(&mut conn)
383+
.await
391384
.unwrap();
392385
assert_eq!(row_count, 4);
393386
}
394387

395-
fn prepare_database(conn: &mut impl Conn) {
396-
let c1 = create_crate(conn, "foo");
397-
let v1 = create_version(conn, c1, "1.0.0");
398-
let v2 = create_version(conn, c1, "2.0.0");
399-
insert_downloads(conn, v1, "2021-01-01", 100);
400-
insert_downloads(conn, v1, "2021-01-02", 200);
401-
insert_downloads(conn, v1, "2021-01-03", 300);
402-
insert_downloads(conn, v2, "2021-01-01", 400);
403-
insert_downloads(conn, v2, "2021-01-02", 500);
404-
insert_downloads(conn, v2, "2021-01-03", 600);
388+
async fn prepare_database(conn: &mut AsyncPgConnection) {
389+
let c1 = create_crate(conn, "foo").await;
390+
let v1 = create_version(conn, c1, "1.0.0").await;
391+
let v2 = create_version(conn, c1, "2.0.0").await;
392+
insert_downloads(conn, v1, "2021-01-01", 100).await;
393+
insert_downloads(conn, v1, "2021-01-02", 200).await;
394+
insert_downloads(conn, v1, "2021-01-03", 300).await;
395+
insert_downloads(conn, v2, "2021-01-01", 400).await;
396+
insert_downloads(conn, v2, "2021-01-02", 500).await;
397+
insert_downloads(conn, v2, "2021-01-03", 600).await;
405398
}
406399

407-
fn create_crate(conn: &mut impl Conn, name: &str) -> i32 {
400+
async fn create_crate(conn: &mut AsyncPgConnection, name: &str) -> i32 {
408401
diesel::insert_into(crates::table)
409402
.values(crates::name.eq(name))
410403
.returning(crates::id)
411404
.get_result(conn)
405+
.await
412406
.unwrap()
413407
}
414408

415-
fn create_version(conn: &mut impl Conn, crate_id: i32, num: &str) -> i32 {
409+
async fn create_version(conn: &mut AsyncPgConnection, crate_id: i32, num: &str) -> i32 {
416410
diesel::insert_into(versions::table)
417411
.values((
418412
versions::crate_id.eq(crate_id),
@@ -421,10 +415,16 @@ mod tests {
421415
))
422416
.returning(versions::id)
423417
.get_result(conn)
418+
.await
424419
.unwrap()
425420
}
426421

427-
fn insert_downloads(conn: &mut impl Conn, version_id: i32, date: &str, downloads: i32) {
422+
async fn insert_downloads(
423+
conn: &mut AsyncPgConnection,
424+
version_id: i32,
425+
date: &str,
426+
downloads: i32,
427+
) {
428428
let date = NaiveDate::parse_from_str(date, "%Y-%m-%d").unwrap();
429429

430430
diesel::insert_into(version_downloads::table)
@@ -434,6 +434,7 @@ mod tests {
434434
version_downloads::downloads.eq(downloads),
435435
))
436436
.execute(conn)
437+
.await
437438
.unwrap();
438439
}
439440
}

0 commit comments

Comments
 (0)