Skip to content

Commit 6aaaa2a

Browse files
committed
worker/jobs/rss/sync_updates_feed: Remove spawn_blocking() usage
1 parent 82ac1a5 commit 6aaaa2a

File tree

1 file changed

+27
-28
lines changed

1 file changed

+27
-28
lines changed

src/worker/jobs/rss/sync_updates_feed.rs

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
use crate::schema::{crates, versions};
22
use crate::storage::FeedId;
3-
use crate::tasks::spawn_blocking;
4-
use crate::util::diesel::Conn;
53
use crate::worker::Environment;
64
use chrono::Duration;
75
use crates_io_worker::BackgroundJob;
86
use diesel::prelude::*;
9-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
7+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
108
use std::sync::Arc;
119

1210
#[derive(Serialize, Deserialize)]
@@ -34,12 +32,8 @@ impl BackgroundJob for SyncUpdatesFeed {
3432
let domain = &ctx.config.domain_name;
3533

3634
info!("Loading latest {NUM_ITEMS} version updates from the database…");
37-
let conn = ctx.deadpool.get().await?;
38-
let version_updates = spawn_blocking(move || {
39-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
40-
Ok::<_, anyhow::Error>(load_version_updates(conn)?)
41-
})
42-
.await?;
35+
let mut conn = ctx.deadpool.get().await?;
36+
let version_updates = load_version_updates(&mut conn).await?;
4337

4438
let link = rss::extension::atom::Link {
4539
href: ctx.storage.feed_url(&feed_id),
@@ -86,15 +80,16 @@ impl BackgroundJob for SyncUpdatesFeed {
8680
/// than [`ALWAYS_INCLUDE_AGE`]. If there are less than [`NUM_ITEMS`] versions
8781
/// then the list will be padded with older versions until [`NUM_ITEMS`] are
8882
/// returned.
89-
fn load_version_updates(conn: &mut impl Conn) -> QueryResult<Vec<VersionUpdate>> {
83+
async fn load_version_updates(conn: &mut AsyncPgConnection) -> QueryResult<Vec<VersionUpdate>> {
9084
let threshold_dt = chrono::Utc::now().naive_utc() - ALWAYS_INCLUDE_AGE;
9185

9286
let updates = versions::table
9387
.inner_join(crates::table)
9488
.filter(versions::created_at.gt(threshold_dt))
9589
.order(versions::created_at.desc())
9690
.select(VersionUpdate::as_select())
97-
.load(conn)?;
91+
.load(conn)
92+
.await?;
9893

9994
let num_updates = updates.len();
10095
if num_updates as i64 >= NUM_ITEMS {
@@ -107,6 +102,7 @@ fn load_version_updates(conn: &mut impl Conn) -> QueryResult<Vec<VersionUpdate>>
107102
.select(VersionUpdate::as_select())
108103
.limit(NUM_ITEMS)
109104
.load(conn)
105+
.await
110106
}
111107

112108
#[derive(Debug, Queryable, Selectable)]
@@ -177,65 +173,67 @@ mod tests {
177173
use super::*;
178174
use chrono::NaiveDateTime;
179175
use crates_io_test_db::TestDatabase;
176+
use diesel_async::AsyncConnection;
180177
use insta::assert_debug_snapshot;
181178

182-
#[test]
183-
fn test_load_version_updates() {
179+
#[tokio::test]
180+
async fn test_load_version_updates() {
184181
crate::util::tracing::init_for_test();
185182

186183
let db = TestDatabase::new();
187-
let mut conn = db.connect();
184+
let mut conn = AsyncPgConnection::establish(db.url()).await.unwrap();
188185

189186
let now = chrono::Utc::now().naive_utc();
190187

191-
let updates = assert_ok!(load_version_updates(&mut conn));
188+
let updates = assert_ok!(load_version_updates(&mut conn).await);
192189
assert_eq!(updates.len(), 0);
193190

194-
let foo = create_crate(&mut conn, "foo");
191+
let foo = create_crate(&mut conn, "foo").await;
195192

196193
// If there are less than NUM_ITEMS versions, they should all be returned
197-
create_version(&mut conn, foo, "1.0.0", now - Duration::days(123));
198-
create_version(&mut conn, foo, "1.0.1", now - Duration::days(110));
199-
create_version(&mut conn, foo, "1.1.0", now - Duration::days(100));
200-
create_version(&mut conn, foo, "1.2.0", now - Duration::days(90));
194+
create_version(&mut conn, foo, "1.0.0", now - Duration::days(123)).await;
195+
create_version(&mut conn, foo, "1.0.1", now - Duration::days(110)).await;
196+
create_version(&mut conn, foo, "1.1.0", now - Duration::days(100)).await;
197+
create_version(&mut conn, foo, "1.2.0", now - Duration::days(90)).await;
201198

202-
let updates = assert_ok!(load_version_updates(&mut conn));
199+
let updates = assert_ok!(load_version_updates(&mut conn).await);
203200
assert_eq!(updates.len(), 4);
204201
assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::<Vec<_>>());
205202

206203
// If there are more than NUM_ITEMS versions, only the most recent NUM_ITEMS should be returned
207204
for i in 1..=NUM_ITEMS {
208205
let version = format!("1.2.{i}");
209206
let publish_time = now - Duration::days(90) + Duration::hours(i);
210-
create_version(&mut conn, foo, &version, publish_time);
207+
create_version(&mut conn, foo, &version, publish_time).await;
211208
}
212209

213-
let updates = assert_ok!(load_version_updates(&mut conn));
210+
let updates = assert_ok!(load_version_updates(&mut conn).await);
214211
assert_eq!(updates.len() as i64, NUM_ITEMS);
215212
assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::<Vec<_>>());
216213

217214
// But if there are more than NUM_ITEMS versions that are younger than ALWAYS_INCLUDE_AGE, all of them should be returned
218215
for i in 1..=(NUM_ITEMS + 10) {
219216
let version = format!("1.3.{i}");
220217
let publish_time = now - Duration::minutes(30) + Duration::seconds(i);
221-
create_version(&mut conn, foo, &version, publish_time);
218+
create_version(&mut conn, foo, &version, publish_time).await;
222219
}
223220

224-
let updates = assert_ok!(load_version_updates(&mut conn));
221+
let updates = assert_ok!(load_version_updates(&mut conn).await);
225222
assert_eq!(updates.len() as i64, NUM_ITEMS + 10);
226223
assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::<Vec<_>>());
227224
}
228225

229-
fn create_crate(conn: &mut impl Conn, name: &str) -> i32 {
226+
async fn create_crate(conn: &mut AsyncPgConnection, name: &str) -> i32 {
230227
diesel::insert_into(crates::table)
231228
.values((crates::name.eq(name),))
232229
.returning(crates::id)
233230
.get_result(conn)
231+
.await
234232
.unwrap()
235233
}
236234

237-
fn create_version(
238-
conn: &mut impl Conn,
235+
async fn create_version(
236+
conn: &mut AsyncPgConnection,
239237
crate_id: i32,
240238
version: &str,
241239
publish_time: NaiveDateTime,
@@ -250,6 +248,7 @@ mod tests {
250248
))
251249
.returning(versions::id)
252250
.get_result(conn)
251+
.await
253252
.unwrap()
254253
}
255254
}

0 commit comments

Comments
 (0)