Skip to content

Commit 94f1f0b

Browse files
authored
Merge pull request #9645 from Turbo87/async-rss
worker/jobs/rss: Remove `spawn_blocking()` usage
2 parents 51c3f84 + 6aaaa2a commit 94f1f0b

File tree

3 files changed

+85
-85
lines changed

3 files changed

+85
-85
lines changed

src/worker/jobs/rss/sync_crate_feed.rs

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
use crate::schema::{crates, versions};
22
use crate::storage::FeedId;
3-
use crate::tasks::spawn_blocking;
4-
use crate::util::diesel::Conn;
53
use crate::worker::Environment;
64
use chrono::Duration;
75
use crates_io_worker::BackgroundJob;
86
use diesel::prelude::*;
9-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
7+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
108
use std::sync::Arc;
119

1210
/// Items younger than this will always be included in the feed.
@@ -42,16 +40,9 @@ impl BackgroundJob for SyncCrateFeed {
4240
let domain = &ctx.config.domain_name;
4341

4442
info!("Loading latest {NUM_ITEMS} version updates for `{name}` from the database…");
45-
let conn = ctx.deadpool.get().await?;
46-
47-
let version_updates = spawn_blocking({
48-
let name = name.clone();
49-
move || {
50-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
51-
Ok::<_, anyhow::Error>(load_version_updates(&name, conn)?)
52-
}
53-
})
54-
.await?;
43+
let mut conn = ctx.deadpool.get().await?;
44+
45+
let version_updates = load_version_updates(name, &mut conn).await?;
5546

5647
let feed_id = FeedId::Crate { name };
5748

@@ -102,7 +93,10 @@ impl BackgroundJob for SyncCrateFeed {
10293
/// than [`ALWAYS_INCLUDE_AGE`]. If there are less than [`NUM_ITEMS`] versions
10394
/// then the list will be padded with older versions until [`NUM_ITEMS`] are
10495
/// returned.
105-
fn load_version_updates(name: &str, conn: &mut impl Conn) -> QueryResult<Vec<VersionUpdate>> {
96+
async fn load_version_updates(
97+
name: &str,
98+
conn: &mut AsyncPgConnection,
99+
) -> QueryResult<Vec<VersionUpdate>> {
106100
let threshold_dt = chrono::Utc::now().naive_utc() - ALWAYS_INCLUDE_AGE;
107101

108102
let updates = versions::table
@@ -111,7 +105,8 @@ fn load_version_updates(name: &str, conn: &mut impl Conn) -> QueryResult<Vec<Ver
111105
.filter(versions::created_at.gt(threshold_dt))
112106
.order(versions::created_at.desc())
113107
.select(VersionUpdate::as_select())
114-
.load(conn)?;
108+
.load(conn)
109+
.await?;
115110

116111
let num_updates = updates.len();
117112
if num_updates as i64 >= NUM_ITEMS {
@@ -125,6 +120,7 @@ fn load_version_updates(name: &str, conn: &mut impl Conn) -> QueryResult<Vec<Ver
125120
.select(VersionUpdate::as_select())
126121
.limit(NUM_ITEMS)
127122
.load(conn)
123+
.await
128124
}
129125

130126
#[derive(Debug, Queryable, Selectable)]
@@ -183,65 +179,67 @@ mod tests {
183179
use super::*;
184180
use chrono::NaiveDateTime;
185181
use crates_io_test_db::TestDatabase;
182+
use diesel_async::AsyncConnection;
186183
use insta::assert_debug_snapshot;
187184

188-
#[test]
189-
fn test_load_version_updates() {
185+
#[tokio::test]
186+
async fn test_load_version_updates() {
190187
crate::util::tracing::init_for_test();
191188

192189
let db = TestDatabase::new();
193-
let mut conn = db.connect();
190+
let mut conn = AsyncPgConnection::establish(db.url()).await.unwrap();
194191

195192
let now = chrono::Utc::now().naive_utc();
196193

197-
let updates = assert_ok!(load_version_updates("foo", &mut conn));
194+
let updates = assert_ok!(load_version_updates("foo", &mut conn).await);
198195
assert_eq!(updates.len(), 0);
199196

200-
let foo = create_crate(&mut conn, "foo");
197+
let foo = create_crate(&mut conn, "foo").await;
201198

202199
// If there are less than NUM_ITEMS versions, they should all be returned
203-
create_version(&mut conn, foo, "1.0.0", now - Duration::days(123));
204-
create_version(&mut conn, foo, "1.0.1", now - Duration::days(110));
205-
create_version(&mut conn, foo, "1.1.0", now - Duration::days(100));
206-
create_version(&mut conn, foo, "1.2.0", now - Duration::days(90));
200+
create_version(&mut conn, foo, "1.0.0", now - Duration::days(123)).await;
201+
create_version(&mut conn, foo, "1.0.1", now - Duration::days(110)).await;
202+
create_version(&mut conn, foo, "1.1.0", now - Duration::days(100)).await;
203+
create_version(&mut conn, foo, "1.2.0", now - Duration::days(90)).await;
207204

208-
let updates = assert_ok!(load_version_updates("foo", &mut conn));
205+
let updates = assert_ok!(load_version_updates("foo", &mut conn).await);
209206
assert_eq!(updates.len(), 4);
210207
assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::<Vec<_>>());
211208

212209
// If there are more than NUM_ITEMS versions, only the most recent NUM_ITEMS should be returned
213210
for i in 1..=NUM_ITEMS {
214211
let version = format!("1.2.{i}");
215212
let publish_time = now - Duration::days(90) + Duration::hours(i);
216-
create_version(&mut conn, foo, &version, publish_time);
213+
create_version(&mut conn, foo, &version, publish_time).await;
217214
}
218215

219-
let updates = assert_ok!(load_version_updates("foo", &mut conn));
216+
let updates = assert_ok!(load_version_updates("foo", &mut conn).await);
220217
assert_eq!(updates.len() as i64, NUM_ITEMS);
221218
assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::<Vec<_>>());
222219

223220
// But if there are more than NUM_ITEMS versions that are younger than ALWAYS_INCLUDE_AGE, all of them should be returned
224221
for i in 1..=(NUM_ITEMS + 10) {
225222
let version = format!("1.3.{i}");
226223
let publish_time = now - Duration::minutes(30) + Duration::seconds(i);
227-
create_version(&mut conn, foo, &version, publish_time);
224+
create_version(&mut conn, foo, &version, publish_time).await;
228225
}
229226

230-
let updates = assert_ok!(load_version_updates("foo", &mut conn));
227+
let updates = assert_ok!(load_version_updates("foo", &mut conn).await);
231228
assert_eq!(updates.len() as i64, NUM_ITEMS + 10);
232229
assert_debug_snapshot!(updates.iter().map(|u| &u.version).collect::<Vec<_>>());
233230
}
234231

235-
fn create_crate(conn: &mut impl Conn, name: &str) -> i32 {
232+
async fn create_crate(conn: &mut AsyncPgConnection, name: &str) -> i32 {
236233
diesel::insert_into(crates::table)
237234
.values((crates::name.eq(name),))
238235
.returning(crates::id)
239236
.get_result(conn)
237+
.await
240238
.unwrap()
241239
}
242240

243-
fn create_version(
244-
conn: &mut impl Conn,
241+
async fn create_version(
242+
conn: &mut AsyncPgConnection,
245243
crate_id: i32,
246244
version: &str,
247245
publish_time: NaiveDateTime,
@@ -256,6 +254,7 @@ mod tests {
256254
))
257255
.returning(versions::id)
258256
.get_result(conn)
257+
.await
259258
.unwrap()
260259
}
261260
}

src/worker/jobs/rss/sync_crates_feed.rs

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
use crate::schema::crates;
22
use crate::storage::FeedId;
3-
use crate::tasks::spawn_blocking;
4-
use crate::util::diesel::Conn;
53
use crate::worker::Environment;
64
use chrono::Duration;
75
use crates_io_worker::BackgroundJob;
86
use diesel::prelude::*;
9-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
7+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
108
use std::sync::Arc;
119

1210
#[derive(Serialize, Deserialize)]
@@ -34,12 +32,8 @@ impl BackgroundJob for SyncCratesFeed {
3432
let domain = &ctx.config.domain_name;
3533

3634
info!("Loading latest {NUM_ITEMS} crates from the database…");
37-
let conn = ctx.deadpool.get().await?;
38-
let new_crates = spawn_blocking(move || {
39-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
40-
Ok::<_, anyhow::Error>(load_new_crates(conn)?)
41-
})
42-
.await?;
35+
let mut conn = ctx.deadpool.get().await?;
36+
let new_crates = load_new_crates(&mut conn).await?;
4337

4438
let link = rss::extension::atom::Link {
4539
href: ctx.storage.feed_url(&feed_id),
@@ -86,14 +80,15 @@ impl BackgroundJob for SyncCratesFeed {
8680
/// than [`ALWAYS_INCLUDE_AGE`]. If there are less than [`NUM_ITEMS`] crates
8781
/// then the list will be padded with older crates until [`NUM_ITEMS`] are
8882
/// returned.
89-
fn load_new_crates(conn: &mut impl Conn) -> QueryResult<Vec<NewCrate>> {
83+
async fn load_new_crates(conn: &mut AsyncPgConnection) -> QueryResult<Vec<NewCrate>> {
9084
let threshold_dt = chrono::Utc::now().naive_utc() - ALWAYS_INCLUDE_AGE;
9185

9286
let new_crates = crates::table
9387
.filter(crates::created_at.gt(threshold_dt))
9488
.order(crates::created_at.desc())
9589
.select(NewCrate::as_select())
96-
.load(conn)?;
90+
.load(conn)
91+
.await?;
9792

9893
let num_new_crates = new_crates.len();
9994
if num_new_crates as i64 >= NUM_ITEMS {
@@ -105,6 +100,7 @@ fn load_new_crates(conn: &mut impl Conn) -> QueryResult<Vec<NewCrate>> {
105100
.select(NewCrate::as_select())
106101
.limit(NUM_ITEMS)
107102
.load(conn)
103+
.await
108104
}
109105

110106
#[derive(Debug, Queryable, Selectable)]
@@ -161,54 +157,59 @@ mod tests {
161157
use super::*;
162158
use chrono::NaiveDateTime;
163159
use crates_io_test_db::TestDatabase;
160+
use diesel_async::{AsyncConnection, AsyncPgConnection};
164161
use insta::assert_debug_snapshot;
165162

166-
#[test]
167-
fn test_load_version_updates() {
163+
#[tokio::test]
164+
async fn test_load_version_updates() {
168165
crate::util::tracing::init_for_test();
169166

170167
let db = TestDatabase::new();
171-
let mut conn = db.connect();
168+
let mut conn = AsyncPgConnection::establish(db.url()).await.unwrap();
172169

173170
let now = chrono::Utc::now().naive_utc();
174171

175-
let new_crates = assert_ok!(load_new_crates(&mut conn));
172+
let new_crates = assert_ok!(load_new_crates(&mut conn).await);
176173
assert_eq!(new_crates.len(), 0);
177174

178175
// If there are less than NUM_ITEMS crates, they should all be returned
179-
create_crate(&mut conn, "foo", now - Duration::days(123));
180-
create_crate(&mut conn, "bar", now - Duration::days(110));
181-
create_crate(&mut conn, "baz", now - Duration::days(100));
182-
create_crate(&mut conn, "qux", now - Duration::days(90));
176+
create_crate(&mut conn, "foo", now - Duration::days(123)).await;
177+
create_crate(&mut conn, "bar", now - Duration::days(110)).await;
178+
create_crate(&mut conn, "baz", now - Duration::days(100)).await;
179+
create_crate(&mut conn, "qux", now - Duration::days(90)).await;
183180

184-
let new_crates = assert_ok!(load_new_crates(&mut conn));
181+
let new_crates = assert_ok!(load_new_crates(&mut conn).await);
185182
assert_eq!(new_crates.len(), 4);
186183
assert_debug_snapshot!(new_crates.iter().map(|u| &u.name).collect::<Vec<_>>());
187184

188185
// If there are more than NUM_ITEMS crates, only the most recent NUM_ITEMS should be returned
189186
for i in 1..=NUM_ITEMS {
190187
let name = format!("crate-{i}");
191188
let publish_time = now - Duration::days(90) + Duration::hours(i);
192-
create_crate(&mut conn, &name, publish_time);
189+
create_crate(&mut conn, &name, publish_time).await;
193190
}
194191

195-
let new_crates = assert_ok!(load_new_crates(&mut conn));
192+
let new_crates = assert_ok!(load_new_crates(&mut conn).await);
196193
assert_eq!(new_crates.len() as i64, NUM_ITEMS);
197194
assert_debug_snapshot!(new_crates.iter().map(|u| &u.name).collect::<Vec<_>>());
198195

199196
// But if there are more than NUM_ITEMS crates that are younger than ALWAYS_INCLUDE_AGE, all of them should be returned
200197
for i in 1..=(NUM_ITEMS + 10) {
201198
let name = format!("other-crate-{i}");
202199
let publish_time = now - Duration::minutes(30) + Duration::seconds(i);
203-
create_crate(&mut conn, &name, publish_time);
200+
create_crate(&mut conn, &name, publish_time).await;
204201
}
205202

206-
let new_crates = assert_ok!(load_new_crates(&mut conn));
203+
let new_crates = assert_ok!(load_new_crates(&mut conn).await);
207204
assert_eq!(new_crates.len() as i64, NUM_ITEMS + 10);
208205
assert_debug_snapshot!(new_crates.iter().map(|u| &u.name).collect::<Vec<_>>());
209206
}
210207

211-
fn create_crate(conn: &mut impl Conn, name: &str, publish_time: NaiveDateTime) -> i32 {
208+
async fn create_crate(
209+
conn: &mut AsyncPgConnection,
210+
name: &str,
211+
publish_time: NaiveDateTime,
212+
) -> i32 {
212213
diesel::insert_into(crates::table)
213214
.values((
214215
crates::name.eq(name),
@@ -217,6 +218,7 @@ mod tests {
217218
))
218219
.returning(crates::id)
219220
.get_result(conn)
221+
.await
220222
.unwrap()
221223
}
222224
}

0 commit comments

Comments
 (0)