Skip to content

Commit 2d9c4d7

Browse files
authored
Merge pull request #9731 from Turbo87/less-pools
worker/jobs/archive_version_downloads: Simplify function arguments
2 parents d1d13e6 + 7e899a9 commit 2d9c4d7

File tree

1 file changed

+10
-24
lines changed

1 file changed

+10
-24
lines changed

src/worker/jobs/archive_version_downloads.rs

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
use super::IndexVersionDownloadsArchive;
12
use crate::schema::version_downloads;
23
use crate::tasks::spawn_blocking;
34
use crate::worker::Environment;
45
use anyhow::{anyhow, Context};
56
use chrono::{NaiveDate, Utc};
67
use crates_io_worker::BackgroundJob;
78
use diesel::prelude::*;
8-
use diesel_async::pooled_connection::deadpool::Pool;
99
use diesel_async::{AsyncPgConnection, RunQueryDsl};
1010
use futures_util::StreamExt;
1111
use object_store::ObjectStore;
@@ -63,10 +63,15 @@ impl BackgroundJob for ArchiveVersionDownloads {
6363
export(&env.config.db.primary.url, &csv_path, &self.before).await?;
6464
let dates = spawn_blocking(move || split(csv_path)).await?;
6565
let uploaded_dates = upload(downloads_archive_store, tempdir.path(), dates).await?;
66-
delete(&env.deadpool, uploaded_dates).await?;
66+
67+
let mut conn = env.deadpool.get().await?;
68+
delete(&mut conn, uploaded_dates).await?;
6769

6870
// Queue up the job to regenerate the archive index.
69-
enqueue_index_job(&env.deadpool).await?;
71+
IndexVersionDownloadsArchive
72+
.async_enqueue(&mut conn)
73+
.await
74+
.context("Failed to enqueue IndexVersionDownloadsArchive job")?;
7075

7176
info!("Finished archiving old version downloads");
7277
Ok(())
@@ -218,12 +223,7 @@ async fn upload_file(store: &impl ObjectStore, path: impl AsRef<Path>) -> anyhow
218223
}
219224

220225
/// Delete version downloads for the given dates from the database.
221-
async fn delete(db_pool: &Pool<AsyncPgConnection>, dates: Vec<NaiveDate>) -> anyhow::Result<()> {
222-
let mut conn = db_pool.get().await?;
223-
delete_inner(&mut conn, dates).await
224-
}
225-
226-
async fn delete_inner(conn: &mut AsyncPgConnection, dates: Vec<NaiveDate>) -> anyhow::Result<()> {
226+
async fn delete(conn: &mut AsyncPgConnection, dates: Vec<NaiveDate>) -> anyhow::Result<()> {
227227
// Delete version downloads for the given dates in chunks to avoid running
228228
// into the maximum query parameter limit.
229229
const CHUNK_SIZE: usize = 5000;
@@ -244,23 +244,11 @@ async fn delete_inner(conn: &mut AsyncPgConnection, dates: Vec<NaiveDate>) -> an
244244
Ok(())
245245
}
246246

247-
async fn enqueue_index_job(db_pool: &Pool<AsyncPgConnection>) -> anyhow::Result<()> {
248-
let mut conn = db_pool.get().await?;
249-
250-
super::IndexVersionDownloadsArchive
251-
.async_enqueue(&mut conn)
252-
.await
253-
.context("Failed to enqueue IndexVersionDownloadsArchive job")?;
254-
255-
Ok(())
256-
}
257-
258247
#[cfg(test)]
259248
mod tests {
260249
use super::*;
261250
use crate::schema::{crates, version_downloads, versions};
262251
use crates_io_test_db::TestDatabase;
263-
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
264252
use diesel_async::AsyncConnection;
265253
use insta::assert_snapshot;
266254

@@ -372,10 +360,8 @@ mod tests {
372360
let mut conn = AsyncPgConnection::establish(test_db.url()).await.unwrap();
373361
prepare_database(&mut conn).await;
374362

375-
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(test_db.url());
376-
let db_pool = Pool::builder(manager).build().unwrap();
377363
let dates = vec![NaiveDate::from_ymd_opt(2021, 1, 1).unwrap()];
378-
delete(&db_pool, dates).await.unwrap();
364+
delete(&mut conn, dates).await.unwrap();
379365

380366
let row_count: i64 = version_downloads::table
381367
.count()

0 commit comments

Comments
 (0)