Skip to content

Commit 743ad44

Browse files
authored
worker/jobs/downloads: Remove spawn_blocking() usage (#9784)
1 parent fba2234 commit 743ad44

File tree

2 files changed

+89
-85
lines changed

2 files changed

+89
-85
lines changed
Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use crate::schema::processed_log_files;
2-
use crate::tasks::spawn_blocking;
3-
use crate::util::diesel::Conn;
42
use crate::worker::Environment;
53
use crates_io_worker::BackgroundJob;
64
use diesel::prelude::*;
7-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
5+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
86
use std::sync::Arc;
97

108
/// This job is responsible for cleaning up old entries in the
@@ -22,18 +20,17 @@ impl BackgroundJob for CleanProcessedLogFiles {
2220
type Context = Arc<Environment>;
2321

2422
async fn run(&self, env: Self::Context) -> anyhow::Result<()> {
25-
let conn = env.deadpool.get().await?;
26-
spawn_blocking(move || {
27-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
28-
Ok(run(conn)?)
29-
})
30-
.await
23+
let mut conn = env.deadpool.get().await?;
24+
Ok(run(&mut conn).await?)
3125
}
3226
}
3327

34-
fn run(conn: &mut impl Conn) -> QueryResult<()> {
28+
async fn run(conn: &mut AsyncPgConnection) -> QueryResult<()> {
3529
let filter = processed_log_files::time.lt(cut_off_date());
36-
diesel::delete(processed_log_files::table.filter(filter)).execute(conn)?;
30+
31+
diesel::delete(processed_log_files::table.filter(filter))
32+
.execute(conn)
33+
.await?;
3734

3835
Ok(())
3936
}
@@ -45,29 +42,29 @@ fn cut_off_date() -> chrono::DateTime<chrono::Utc> {
4542
#[cfg(test)]
4643
mod tests {
4744
use super::*;
48-
use crate::test_util::test_db_connection;
4945
use chrono::{DateTime, Utc};
46+
use crates_io_test_db::TestDatabase;
47+
use diesel_async::{AsyncConnection, AsyncPgConnection};
5048
use insta::assert_debug_snapshot;
5149

52-
#[test]
53-
fn test_cleanup() {
54-
let (_test_db, conn) = &mut test_db_connection();
50+
#[tokio::test]
51+
async fn test_cleanup() {
52+
let test_db = TestDatabase::new();
53+
let mut conn = AsyncPgConnection::establish(test_db.url()).await.unwrap();
5554

5655
let now = chrono::Utc::now();
5756
let cut_off_date = cut_off_date();
5857
let one_hour = chrono::Duration::try_hours(1).unwrap();
5958

60-
insert(
61-
conn,
62-
vec![
63-
("very-old-file", cut_off_date - one_hour * 30 * 24),
64-
("old-file", cut_off_date - one_hour),
65-
("newish-file", cut_off_date + one_hour),
66-
("brand-new-file", now),
67-
("future-file", now + one_hour * 7 * 24),
68-
],
69-
);
70-
assert_debug_snapshot!(paths_in_table(conn), @r###"
59+
let inserts = vec![
60+
("very-old-file", cut_off_date - one_hour * 30 * 24),
61+
("old-file", cut_off_date - one_hour),
62+
("newish-file", cut_off_date + one_hour),
63+
("brand-new-file", now),
64+
("future-file", now + one_hour * 7 * 24),
65+
];
66+
insert(&mut conn, inserts).await;
67+
assert_debug_snapshot!(paths_in_table(&mut conn).await, @r###"
7168
[
7269
"very-old-file",
7370
"old-file",
@@ -77,8 +74,8 @@ mod tests {
7774
]
7875
"###);
7976

80-
run(conn).unwrap();
81-
assert_debug_snapshot!(paths_in_table(conn), @r###"
77+
run(&mut conn).await.unwrap();
78+
assert_debug_snapshot!(paths_in_table(&mut conn).await, @r###"
8279
[
8380
"newish-file",
8481
"brand-new-file",
@@ -88,7 +85,7 @@ mod tests {
8885
}
8986

9087
/// Insert a list of paths and times into the `processed_log_files` table.
91-
fn insert(conn: &mut PgConnection, inserts: Vec<(&str, DateTime<Utc>)>) {
88+
async fn insert(conn: &mut AsyncPgConnection, inserts: Vec<(&str, DateTime<Utc>)>) {
9289
let inserts = inserts
9390
.into_iter()
9491
.map(|(path, time)| {
@@ -102,14 +99,16 @@ mod tests {
10299
diesel::insert_into(processed_log_files::table)
103100
.values(&inserts)
104101
.execute(conn)
102+
.await
105103
.unwrap();
106104
}
107105

108106
/// Read all paths from the `processed_log_files` table.
109-
fn paths_in_table(conn: &mut PgConnection) -> Vec<String> {
107+
async fn paths_in_table(conn: &mut AsyncPgConnection) -> Vec<String> {
110108
processed_log_files::table
111109
.select(processed_log_files::path)
112110
.load::<String>(conn)
111+
.await
113112
.unwrap()
114113
}
115114
}

src/worker/jobs/downloads/process_log.rs

Lines changed: 60 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
use crate::config::CdnLogStorageConfig;
2-
use crate::tasks::spawn_blocking;
3-
use crate::util::diesel::Conn;
42
use crate::worker::Environment;
53
use anyhow::Context;
64
use chrono::NaiveDate;
@@ -9,9 +7,9 @@ use crates_io_worker::BackgroundJob;
97
use diesel::dsl::exists;
108
use diesel::prelude::*;
119
use diesel::{select, QueryResult};
12-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
1310
use diesel_async::pooled_connection::deadpool::Pool;
14-
use diesel_async::AsyncPgConnection;
11+
use diesel_async::scoped_futures::ScopedFutureExt;
12+
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
1513
use object_store::aws::AmazonS3Builder;
1614
use object_store::local::LocalFileSystem;
1715
use object_store::memory::InMemory;
@@ -123,11 +121,9 @@ async fn run(
123121
log_stats(&downloads);
124122

125123
let path = path.to_string();
126-
let conn = db_pool.get().await?;
127-
spawn_blocking(move || {
128-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
129-
130-
conn.transaction(|conn| {
124+
let mut conn = db_pool.get().await?;
125+
conn.transaction(|conn| {
126+
async move {
131127
// Mark the log file as processed before saving the downloads to
132128
// the database.
133129
//
@@ -138,14 +134,15 @@ async fn run(
138134
// When the job is retried the `already_processed()` call above
139135
// will return `true` and the job will skip processing the log
140136
// file again.
141-
save_as_processed(path, conn)?;
142-
143-
save_downloads(downloads, conn)
144-
})?;
137+
save_as_processed(path, conn).await?;
145138

146-
Ok::<_, anyhow::Error>(())
139+
save_downloads(downloads, conn).await
140+
}
141+
.scope_boxed()
147142
})
148-
.await
143+
.await?;
144+
145+
Ok(())
149146
}
150147

151148
/// Loads the given log file from the object store and counts the number of
@@ -219,15 +216,23 @@ impl From<(String, Version, NaiveDate, u64)> for NewDownload {
219216
/// The temporary table only exists on the current connection, but if a
220217
/// connection pool is used, the temporary table will not be dropped when
221218
/// the connection is returned to the pool.
222-
pub fn save_downloads(downloads: DownloadsMap, conn: &mut impl Conn) -> anyhow::Result<()> {
219+
pub async fn save_downloads(
220+
downloads: DownloadsMap,
221+
conn: &mut AsyncPgConnection,
222+
) -> anyhow::Result<()> {
223223
debug!("Creating temp_downloads table");
224-
create_temp_downloads_table(conn).context("Failed to create temp_downloads table")?;
224+
create_temp_downloads_table(conn)
225+
.await
226+
.context("Failed to create temp_downloads table")?;
225227

226228
debug!("Saving counted downloads to temp_downloads table");
227-
fill_temp_downloads_table(downloads, conn).context("Failed to fill temp_downloads table")?;
229+
fill_temp_downloads_table(downloads, conn)
230+
.await
231+
.context("Failed to fill temp_downloads table")?;
228232

229233
debug!("Saving temp_downloads to version_downloads table");
230234
let failed_inserts = save_to_version_downloads(conn)
235+
.await
231236
.context("Failed to save temp_downloads to version_downloads table")?;
232237

233238
if !failed_inserts.is_empty() {
@@ -247,7 +252,7 @@ pub fn save_downloads(downloads: DownloadsMap, conn: &mut impl Conn) -> anyhow::
247252
/// look up the `version_id` for each crate and version combination, and that
248253
/// requires a join with the `crates` and `versions` tables.
249254
#[instrument("db.query", skip_all, fields(message = "CREATE TEMPORARY TABLE ..."))]
250-
fn create_temp_downloads_table(conn: &mut impl Conn) -> QueryResult<usize> {
255+
async fn create_temp_downloads_table(conn: &mut AsyncPgConnection) -> QueryResult<usize> {
251256
diesel::sql_query(
252257
r#"
253258
CREATE TEMPORARY TABLE temp_downloads (
@@ -259,6 +264,7 @@ fn create_temp_downloads_table(conn: &mut impl Conn) -> QueryResult<usize> {
259264
"#,
260265
)
261266
.execute(conn)
267+
.await
262268
}
263269

264270
/// Fills the temporary `temp_downloads` table with the downloads from the
@@ -268,7 +274,10 @@ fn create_temp_downloads_table(conn: &mut impl Conn) -> QueryResult<usize> {
268274
skip_all,
269275
fields(message = "INSERT INTO temp_downloads ...")
270276
)]
271-
fn fill_temp_downloads_table(downloads: DownloadsMap, conn: &mut impl Conn) -> QueryResult<()> {
277+
async fn fill_temp_downloads_table(
278+
downloads: DownloadsMap,
279+
conn: &mut AsyncPgConnection,
280+
) -> QueryResult<()> {
272281
// `tokio-postgres` has a limit on the size of values it can send to the
273282
// database. To avoid hitting this limit, we insert the downloads in
274283
// batches.
@@ -283,7 +292,8 @@ fn fill_temp_downloads_table(downloads: DownloadsMap, conn: &mut impl Conn) -> Q
283292
for chunk in map.chunks(MAX_BATCH_SIZE) {
284293
diesel::insert_into(temp_downloads::table)
285294
.values(chunk)
286-
.execute(conn)?;
295+
.execute(conn)
296+
.await?;
287297
}
288298

289299
Ok(())
@@ -297,7 +307,9 @@ fn fill_temp_downloads_table(downloads: DownloadsMap, conn: &mut impl Conn) -> Q
297307
skip_all,
298308
fields(message = "INSERT INTO version_downloads ...")
299309
)]
300-
fn save_to_version_downloads(conn: &mut impl Conn) -> QueryResult<Vec<NameAndVersion>> {
310+
async fn save_to_version_downloads(
311+
conn: &mut AsyncPgConnection,
312+
) -> QueryResult<Vec<NameAndVersion>> {
301313
diesel::sql_query(
302314
r#"
303315
WITH joined_data AS (
@@ -319,7 +331,7 @@ fn save_to_version_downloads(conn: &mut impl Conn) -> QueryResult<Vec<NameAndVer
319331
WHERE joined_data.id IS NULL;
320332
"#,
321333
)
322-
.load(conn)
334+
.load(conn).await
323335
}
324336

325337
table! {
@@ -358,12 +370,8 @@ async fn already_processed(
358370
) -> anyhow::Result<bool> {
359371
let path = path.into();
360372

361-
let conn = db_pool.get().await?;
362-
let already_processed = spawn_blocking(move || {
363-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
364-
Ok::<_, anyhow::Error>(already_processed_inner(path, conn)?)
365-
})
366-
.await?;
373+
let mut conn = db_pool.get().await?;
374+
let already_processed = already_processed_inner(path, &mut conn).await?;
367375

368376
Ok(already_processed)
369377
}
@@ -374,21 +382,28 @@ async fn already_processed(
374382
/// Note that if a second job is already processing the same log file, this
375383
/// function will return `false` because the second job will not have inserted
376384
/// the path into the `processed_log_files` table yet.
377-
fn already_processed_inner(path: impl Into<String>, conn: &mut impl Conn) -> QueryResult<bool> {
385+
async fn already_processed_inner(
386+
path: impl Into<String>,
387+
conn: &mut AsyncPgConnection,
388+
) -> QueryResult<bool> {
378389
use crate::schema::processed_log_files;
379390

380391
let query = processed_log_files::table.filter(processed_log_files::path.eq(path.into()));
381-
select(exists(query)).get_result(conn)
392+
select(exists(query)).get_result(conn).await
382393
}
383394

384395
/// Inserts the given path into the `processed_log_files` table to mark it as
385396
/// processed.
386-
fn save_as_processed(path: impl Into<String>, conn: &mut impl Conn) -> QueryResult<()> {
397+
async fn save_as_processed(
398+
path: impl Into<String>,
399+
conn: &mut AsyncPgConnection,
400+
) -> QueryResult<()> {
387401
use crate::schema::processed_log_files;
388402

389403
diesel::insert_into(processed_log_files::table)
390404
.values(processed_log_files::path.eq(path.into()))
391-
.execute(conn)?;
405+
.execute(conn)
406+
.await?;
392407

393408
Ok(())
394409
}
@@ -397,7 +412,6 @@ fn save_as_processed(path: impl Into<String>, conn: &mut impl Conn) -> QueryResu
397412
mod tests {
398413
use super::*;
399414
use crate::schema::{crates, version_downloads, versions};
400-
use crate::util::diesel::Conn;
401415
use crates_io_test_db::TestDatabase;
402416
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
403417
use insta::assert_debug_snapshot;
@@ -485,26 +499,20 @@ mod tests {
485499

486500
/// Inserts some dummy crates and versions into the database.
487501
async fn create_dummy_crates_and_versions(db_pool: Pool<AsyncPgConnection>) {
488-
let conn = db_pool.get().await.unwrap();
489-
spawn_blocking(move || {
490-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
491-
492-
create_crate_and_version("bindgen", "0.65.1", conn);
493-
create_crate_and_version("tracing-core", "0.1.32", conn);
494-
create_crate_and_version("quick-error", "1.2.3", conn);
502+
let mut conn = db_pool.get().await.unwrap();
495503

496-
Ok::<_, anyhow::Error>(())
497-
})
498-
.await
499-
.unwrap();
504+
create_crate_and_version("bindgen", "0.65.1", &mut conn).await;
505+
create_crate_and_version("tracing-core", "0.1.32", &mut conn).await;
506+
create_crate_and_version("quick-error", "1.2.3", &mut conn).await;
500507
}
501508

502509
/// Inserts a dummy crate and version into the database.
503-
fn create_crate_and_version(name: &str, version: &str, conn: &mut impl Conn) {
510+
async fn create_crate_and_version(name: &str, version: &str, conn: &mut AsyncPgConnection) {
504511
let crate_id: i32 = diesel::insert_into(crates::table)
505512
.values(crates::name.eq(name))
506513
.returning(crates::id)
507514
.get_result(conn)
515+
.await
508516
.unwrap();
509517

510518
diesel::insert_into(versions::table)
@@ -515,19 +523,15 @@ mod tests {
515523
versions::checksum.eq("checksum"),
516524
))
517525
.execute(conn)
526+
.await
518527
.unwrap();
519528
}
520529

521530
/// Queries all version downloads from the database and returns them as a
522531
/// [`Vec`] of strings for use with [`assert_debug_snapshot!()`].
523532
async fn all_version_downloads(db_pool: Pool<AsyncPgConnection>) -> Vec<String> {
524-
let conn = db_pool.get().await.unwrap();
525-
let downloads = spawn_blocking(move || {
526-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
527-
Ok::<_, anyhow::Error>(query_all_version_downloads(conn))
528-
})
529-
.await
530-
.unwrap();
533+
let mut conn = db_pool.get().await.unwrap();
534+
let downloads = query_all_version_downloads(&mut conn).await;
531535

532536
downloads
533537
.into_iter()
@@ -539,8 +543,8 @@ mod tests {
539543

540544
/// Queries all version downloads from the database and returns them as a
541545
/// [`Vec`] of tuples.
542-
fn query_all_version_downloads(
543-
conn: &mut impl Conn,
546+
async fn query_all_version_downloads(
547+
conn: &mut AsyncPgConnection,
544548
) -> Vec<(String, String, i32, i32, NaiveDate, bool)> {
545549
version_downloads::table
546550
.inner_join(versions::table)
@@ -555,6 +559,7 @@ mod tests {
555559
))
556560
.order((crates::name, versions::num, version_downloads::date))
557561
.load(conn)
562+
.await
558563
.unwrap()
559564
}
560565
}

0 commit comments

Comments
 (0)