Skip to content

Commit 38f2a25

Browse files
authored
controllers/krate/versions: Remove spawn_blocking() call (#10020)
1 parent b9e7596 commit 38f2a25

File tree

1 file changed

+97
-92
lines changed

1 file changed

+97
-92
lines changed

src/controllers/krate/versions.rs

Lines changed: 97 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use crate::util::diesel::prelude::*;
44
use axum::extract::Path;
55
use axum_extra::json;
66
use axum_extra::response::ErasedJson;
7-
use diesel::connection::DefaultLoadingMode;
87
use diesel::dsl::not;
9-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
8+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
9+
use futures_util::{future, TryStreamExt};
1010
use http::request::Parts;
1111
use indexmap::{IndexMap, IndexSet};
1212
use std::cmp::Reverse;
@@ -16,8 +16,6 @@ use crate::app::AppState;
1616
use crate::controllers::helpers::pagination::{encode_seek, Page, PaginationOptions};
1717
use crate::models::{Crate, User, Version, VersionOwnerAction};
1818
use crate::schema::{crates, users, versions};
19-
use crate::tasks::spawn_blocking;
20-
use crate::util::diesel::Conn;
2119
use crate::util::errors::{bad_request, crate_not_found, AppResult, BoxedAppError};
2220
use crate::util::RequestUtils;
2321
use crate::views::EncodableVersion;
@@ -28,78 +26,74 @@ pub async fn versions(
2826
Path(crate_name): Path<String>,
2927
req: Parts,
3028
) -> AppResult<ErasedJson> {
31-
let conn = state.db_read().await?;
32-
spawn_blocking(move || {
33-
use diesel::RunQueryDsl;
34-
35-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
36-
37-
let crate_id: i32 = Crate::by_name(&crate_name)
38-
.select(crates::id)
39-
.first(conn)
40-
.optional()?
41-
.ok_or_else(|| crate_not_found(&crate_name))?;
42-
43-
let mut pagination = None;
44-
let params = req.query();
45-
// To keep backward compatibility, we paginate only if per_page is provided
46-
if params.get("per_page").is_some() {
47-
pagination = Some(
48-
PaginationOptions::builder()
49-
.enable_seek(true)
50-
.enable_pages(false)
51-
.gather(&req)?,
52-
);
53-
}
29+
let mut conn = state.db_read().await?;
30+
31+
let crate_id: i32 = Crate::by_name(&crate_name)
32+
.select(crates::id)
33+
.first(&mut conn)
34+
.await
35+
.optional()?
36+
.ok_or_else(|| crate_not_found(&crate_name))?;
37+
38+
let mut pagination = None;
39+
let params = req.query();
40+
// To keep backward compatibility, we paginate only if per_page is provided
41+
if params.get("per_page").is_some() {
42+
pagination = Some(
43+
PaginationOptions::builder()
44+
.enable_seek(true)
45+
.enable_pages(false)
46+
.gather(&req)?,
47+
);
48+
}
5449

55-
let include = req
56-
.query()
57-
.get("include")
58-
.map(|mode| ShowIncludeMode::from_str(mode))
59-
.transpose()?
60-
.unwrap_or_default();
61-
62-
// Sort by semver by default
63-
let versions_and_publishers = match params.get("sort").map(|s| s.to_lowercase()).as_deref()
64-
{
65-
Some("date") => list_by_date(crate_id, pagination.as_ref(), include, &req, conn)?,
66-
_ => list_by_semver(crate_id, pagination.as_ref(), include, &req, conn)?,
67-
};
50+
let include = req
51+
.query()
52+
.get("include")
53+
.map(|mode| ShowIncludeMode::from_str(mode))
54+
.transpose()?
55+
.unwrap_or_default();
56+
57+
// Sort by semver by default
58+
let versions_and_publishers = match params.get("sort").map(|s| s.to_lowercase()).as_deref() {
59+
Some("date") => {
60+
list_by_date(crate_id, pagination.as_ref(), include, &req, &mut conn).await?
61+
}
62+
_ => list_by_semver(crate_id, pagination.as_ref(), include, &req, &mut conn).await?,
63+
};
6864

69-
let versions = versions_and_publishers
70-
.data
71-
.iter()
72-
.map(|(v, _)| v)
73-
.collect::<Vec<_>>();
74-
let actions = VersionOwnerAction::for_versions(conn, &versions)?;
75-
let versions = versions_and_publishers
76-
.data
77-
.into_iter()
78-
.zip(actions)
79-
.map(|((v, pb), aas)| EncodableVersion::from(v, &crate_name, pb, aas))
80-
.collect::<Vec<_>>();
81-
82-
Ok(match pagination {
83-
Some(_) => json!({ "versions": versions, "meta": versions_and_publishers.meta }),
84-
None => json!({ "versions": versions }),
85-
})
65+
let versions = versions_and_publishers
66+
.data
67+
.iter()
68+
.map(|(v, _)| v)
69+
.collect::<Vec<_>>();
70+
let actions = VersionOwnerAction::async_for_versions(&mut conn, &versions).await?;
71+
let versions = versions_and_publishers
72+
.data
73+
.into_iter()
74+
.zip(actions)
75+
.map(|((v, pb), aas)| EncodableVersion::from(v, &crate_name, pb, aas))
76+
.collect::<Vec<_>>();
77+
78+
Ok(match pagination {
79+
Some(_) => json!({ "versions": versions, "meta": versions_and_publishers.meta }),
80+
None => json!({ "versions": versions }),
8681
})
87-
.await?
8882
}
8983

9084
/// Seek-based pagination of versions by date
9185
///
9286
/// # Panics
9387
///
9488
/// This function will panic if `option` is built with `enable_pages` set to true.
95-
fn list_by_date(
89+
async fn list_by_date(
9690
crate_id: i32,
9791
options: Option<&PaginationOptions>,
9892
include: ShowIncludeMode,
9993
req: &Parts,
100-
conn: &mut impl Conn,
94+
conn: &mut AsyncPgConnection,
10195
) -> AppResult<PaginatedVersionsAndPublishers> {
102-
use diesel::RunQueryDsl;
96+
use diesel_async::RunQueryDsl;
10397
use seek::*;
10498

10599
let mut query = versions::table
@@ -126,17 +120,20 @@ fn list_by_date(
126120

127121
if include.release_tracks {
128122
let mut sorted_versions = IndexSet::new();
129-
for result in versions::table
123+
versions::table
130124
.filter(versions::crate_id.eq(crate_id))
131125
.filter(not(versions::yanked))
132126
.select(versions::num)
133-
.load_iter::<String, DefaultLoadingMode>(conn)?
134-
{
135-
let Ok(semver) = semver::Version::parse(&result?) else {
136-
continue;
137-
};
138-
sorted_versions.insert(semver);
139-
}
127+
.load_stream::<String>(conn)
128+
.await?
129+
.try_for_each(|num| {
130+
if let Ok(semver) = semver::Version::parse(&num) {
131+
sorted_versions.insert(semver);
132+
};
133+
future::ready(Ok(()))
134+
})
135+
.await?;
136+
140137
sorted_versions.sort_unstable_by(|a, b| b.cmp(a));
141138
release_tracks = Some(ReleaseTracks::from_sorted_semver_iter(
142139
sorted_versions.iter(),
@@ -146,7 +143,7 @@ fn list_by_date(
146143

147144
query = query.order((versions::created_at.desc(), versions::id.desc()));
148145

149-
let data: Vec<(Version, Option<User>)> = query.load(conn)?;
146+
let data: Vec<(Version, Option<User>)> = query.load(conn).await?;
150147
let mut next_page = None;
151148
if let Some(options) = options {
152149
next_page = next_seek_params(&data, options, |last| Seek::Date.to_payload(last))?
@@ -159,7 +156,8 @@ fn list_by_date(
159156
versions::table
160157
.filter(versions::crate_id.eq(crate_id))
161158
.count()
162-
.get_result(conn)?
159+
.get_result(conn)
160+
.await?
163161
} else {
164162
0
165163
};
@@ -182,14 +180,13 @@ fn list_by_date(
182180
183181
// Unfortunately, Heroku Postgres has no support for the semver PG extension.
184182
// Therefore, we need to perform both sorting and pagination manually on the server.
185-
fn list_by_semver(
183+
async fn list_by_semver(
186184
crate_id: i32,
187185
options: Option<&PaginationOptions>,
188186
include: ShowIncludeMode,
189187
req: &Parts,
190-
conn: &mut impl Conn,
188+
conn: &mut AsyncPgConnection,
191189
) -> AppResult<PaginatedVersionsAndPublishers> {
192-
use diesel::RunQueryDsl;
193190
use seek::*;
194191

195192
let (data, total, release_tracks) = if let Some(options) = options {
@@ -200,16 +197,20 @@ fn list_by_semver(
200197
// without sorting twice.
201198
// Sorting by semver but opted for id as the seek key because num can be quite lengthy,
202199
// while id values are significantly smaller.
200+
203201
let mut sorted_versions = IndexMap::new();
204-
for result in versions::table
202+
versions::table
205203
.filter(versions::crate_id.eq(crate_id))
206204
.select((versions::id, versions::num, versions::yanked))
207-
.load_iter::<(i32, String, bool), DefaultLoadingMode>(conn)?
208-
{
209-
let (id, num, yanked) = result?;
210-
let semver = semver::Version::parse(&num).ok();
211-
sorted_versions.insert(id, (semver, yanked, None));
212-
}
205+
.load_stream::<(i32, String, bool)>(conn)
206+
.await?
207+
.try_for_each(|(id, num, yanked)| {
208+
let semver = semver::Version::parse(&num).ok();
209+
sorted_versions.insert(id, (semver, yanked, None));
210+
future::ready(Ok(()))
211+
})
212+
.await?;
213+
213214
sorted_versions
214215
.sort_unstable_by(|_, (semver_a, _, _), _, (semver_b, _, _)| semver_b.cmp(semver_a));
215216

@@ -240,20 +241,23 @@ fn list_by_semver(
240241
.keys()
241242
.cloned()
242243
.collect::<Vec<_>>();
243-
for result in versions::table
244+
versions::table
244245
.filter(versions::crate_id.eq(crate_id))
245246
.left_outer_join(users::table)
246247
.select(<(Version, Option<User>)>::as_select())
247248
.filter(versions::id.eq_any(ids))
248-
.load_iter::<(Version, Option<User>), DefaultLoadingMode>(conn)?
249-
{
250-
let row = result?;
251-
// The versions are already sorted, and we only need to enrich the fetched rows into them.
252-
// Therefore, other values can now be safely ignored.
253-
sorted_versions
254-
.entry(row.0.id)
255-
.and_modify(|entry| *entry = (None, false, Some(row)));
256-
}
249+
.load_stream::<(Version, Option<User>)>(conn)
250+
.await?
251+
.try_for_each(|row| {
252+
// The versions are already sorted, and we only need to enrich the fetched rows into them.
253+
// Therefore, other values can now be safely ignored.
254+
sorted_versions
255+
.entry(row.0.id)
256+
.and_modify(|entry| *entry = (None, false, Some(row)));
257+
258+
future::ready(Ok(()))
259+
})
260+
.await?;
257261

258262
let len = sorted_versions.len();
259263
(
@@ -272,7 +276,8 @@ fn list_by_semver(
272276
.filter(versions::crate_id.eq(crate_id))
273277
.left_outer_join(users::table)
274278
.select(<(Version, Option<User>)>::as_select())
275-
.load(conn)?;
279+
.load(conn)
280+
.await?;
276281
data.sort_by_cached_key(|(version, _)| Reverse(semver::Version::parse(&version.num).ok()));
277282
let total = data.len();
278283
(data, total, None)

0 commit comments

Comments
 (0)