Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 97 additions & 92 deletions src/controllers/krate/versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::util::diesel::prelude::*;
use axum::extract::Path;
use axum_extra::json;
use axum_extra::response::ErasedJson;
use diesel::connection::DefaultLoadingMode;
use diesel::dsl::not;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use futures_util::{future, TryStreamExt};
use http::request::Parts;
use indexmap::{IndexMap, IndexSet};
use std::cmp::Reverse;
Expand All @@ -16,8 +16,6 @@ use crate::app::AppState;
use crate::controllers::helpers::pagination::{encode_seek, Page, PaginationOptions};
use crate::models::{Crate, User, Version, VersionOwnerAction};
use crate::schema::{crates, users, versions};
use crate::tasks::spawn_blocking;
use crate::util::diesel::Conn;
use crate::util::errors::{bad_request, crate_not_found, AppResult, BoxedAppError};
use crate::util::RequestUtils;
use crate::views::EncodableVersion;
Expand All @@ -28,78 +26,74 @@ pub async fn versions(
Path(crate_name): Path<String>,
req: Parts,
) -> AppResult<ErasedJson> {
let conn = state.db_read().await?;
spawn_blocking(move || {
use diesel::RunQueryDsl;

let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();

let crate_id: i32 = Crate::by_name(&crate_name)
.select(crates::id)
.first(conn)
.optional()?
.ok_or_else(|| crate_not_found(&crate_name))?;

let mut pagination = None;
let params = req.query();
// To keep backward compatibility, we paginate only if per_page is provided
if params.get("per_page").is_some() {
pagination = Some(
PaginationOptions::builder()
.enable_seek(true)
.enable_pages(false)
.gather(&req)?,
);
}
let mut conn = state.db_read().await?;

let crate_id: i32 = Crate::by_name(&crate_name)
.select(crates::id)
.first(&mut conn)
.await
.optional()?
.ok_or_else(|| crate_not_found(&crate_name))?;

let mut pagination = None;
let params = req.query();
// To keep backward compatibility, we paginate only if per_page is provided
if params.get("per_page").is_some() {
pagination = Some(
PaginationOptions::builder()
.enable_seek(true)
.enable_pages(false)
.gather(&req)?,
);
}

let include = req
.query()
.get("include")
.map(|mode| ShowIncludeMode::from_str(mode))
.transpose()?
.unwrap_or_default();

// Sort by semver by default
let versions_and_publishers = match params.get("sort").map(|s| s.to_lowercase()).as_deref()
{
Some("date") => list_by_date(crate_id, pagination.as_ref(), include, &req, conn)?,
_ => list_by_semver(crate_id, pagination.as_ref(), include, &req, conn)?,
};
let include = req
.query()
.get("include")
.map(|mode| ShowIncludeMode::from_str(mode))
.transpose()?
.unwrap_or_default();

// Sort by semver by default
let versions_and_publishers = match params.get("sort").map(|s| s.to_lowercase()).as_deref() {
Some("date") => {
list_by_date(crate_id, pagination.as_ref(), include, &req, &mut conn).await?
}
_ => list_by_semver(crate_id, pagination.as_ref(), include, &req, &mut conn).await?,
};

let versions = versions_and_publishers
.data
.iter()
.map(|(v, _)| v)
.collect::<Vec<_>>();
let actions = VersionOwnerAction::for_versions(conn, &versions)?;
let versions = versions_and_publishers
.data
.into_iter()
.zip(actions)
.map(|((v, pb), aas)| EncodableVersion::from(v, &crate_name, pb, aas))
.collect::<Vec<_>>();

Ok(match pagination {
Some(_) => json!({ "versions": versions, "meta": versions_and_publishers.meta }),
None => json!({ "versions": versions }),
})
let versions = versions_and_publishers
.data
.iter()
.map(|(v, _)| v)
.collect::<Vec<_>>();
let actions = VersionOwnerAction::async_for_versions(&mut conn, &versions).await?;
let versions = versions_and_publishers
.data
.into_iter()
.zip(actions)
.map(|((v, pb), aas)| EncodableVersion::from(v, &crate_name, pb, aas))
.collect::<Vec<_>>();

Ok(match pagination {
Some(_) => json!({ "versions": versions, "meta": versions_and_publishers.meta }),
None => json!({ "versions": versions }),
})
.await?
}

/// Seek-based pagination of versions by date
///
/// # Panics
///
/// This function will panic if `option` is built with `enable_pages` set to true.
fn list_by_date(
async fn list_by_date(
crate_id: i32,
options: Option<&PaginationOptions>,
include: ShowIncludeMode,
req: &Parts,
conn: &mut impl Conn,
conn: &mut AsyncPgConnection,
) -> AppResult<PaginatedVersionsAndPublishers> {
use diesel::RunQueryDsl;
use diesel_async::RunQueryDsl;
use seek::*;

let mut query = versions::table
Expand All @@ -126,17 +120,20 @@ fn list_by_date(

if include.release_tracks {
let mut sorted_versions = IndexSet::new();
for result in versions::table
versions::table
.filter(versions::crate_id.eq(crate_id))
.filter(not(versions::yanked))
.select(versions::num)
.load_iter::<String, DefaultLoadingMode>(conn)?
{
let Ok(semver) = semver::Version::parse(&result?) else {
continue;
};
sorted_versions.insert(semver);
}
.load_stream::<String>(conn)
.await?
.try_for_each(|num| {
if let Ok(semver) = semver::Version::parse(&num) {
sorted_versions.insert(semver);
};
future::ready(Ok(()))
})
.await?;

sorted_versions.sort_unstable_by(|a, b| b.cmp(a));
release_tracks = Some(ReleaseTracks::from_sorted_semver_iter(
sorted_versions.iter(),
Expand All @@ -146,7 +143,7 @@ fn list_by_date(

query = query.order((versions::created_at.desc(), versions::id.desc()));

let data: Vec<(Version, Option<User>)> = query.load(conn)?;
let data: Vec<(Version, Option<User>)> = query.load(conn).await?;
let mut next_page = None;
if let Some(options) = options {
next_page = next_seek_params(&data, options, |last| Seek::Date.to_payload(last))?
Expand All @@ -159,7 +156,8 @@ fn list_by_date(
versions::table
.filter(versions::crate_id.eq(crate_id))
.count()
.get_result(conn)?
.get_result(conn)
.await?
} else {
0
};
Expand All @@ -182,14 +180,13 @@ fn list_by_date(

// Unfortunately, Heroku Postgres has no support for the semver PG extension.
// Therefore, we need to perform both sorting and pagination manually on the server.
fn list_by_semver(
async fn list_by_semver(
crate_id: i32,
options: Option<&PaginationOptions>,
include: ShowIncludeMode,
req: &Parts,
conn: &mut impl Conn,
conn: &mut AsyncPgConnection,
) -> AppResult<PaginatedVersionsAndPublishers> {
use diesel::RunQueryDsl;
use seek::*;

let (data, total, release_tracks) = if let Some(options) = options {
Expand All @@ -200,16 +197,20 @@ fn list_by_semver(
// without sorting twice.
// Sorting by semver but opted for id as the seek key because num can be quite lengthy,
// while id values are significantly smaller.

let mut sorted_versions = IndexMap::new();
for result in versions::table
versions::table
.filter(versions::crate_id.eq(crate_id))
.select((versions::id, versions::num, versions::yanked))
.load_iter::<(i32, String, bool), DefaultLoadingMode>(conn)?
{
let (id, num, yanked) = result?;
let semver = semver::Version::parse(&num).ok();
sorted_versions.insert(id, (semver, yanked, None));
}
.load_stream::<(i32, String, bool)>(conn)
.await?
.try_for_each(|(id, num, yanked)| {
let semver = semver::Version::parse(&num).ok();
sorted_versions.insert(id, (semver, yanked, None));
future::ready(Ok(()))
})
.await?;

sorted_versions
.sort_unstable_by(|_, (semver_a, _, _), _, (semver_b, _, _)| semver_b.cmp(semver_a));

Expand Down Expand Up @@ -240,20 +241,23 @@ fn list_by_semver(
.keys()
.cloned()
.collect::<Vec<_>>();
for result in versions::table
versions::table
.filter(versions::crate_id.eq(crate_id))
.left_outer_join(users::table)
.select(<(Version, Option<User>)>::as_select())
.filter(versions::id.eq_any(ids))
.load_iter::<(Version, Option<User>), DefaultLoadingMode>(conn)?
{
let row = result?;
// The versions are already sorted, and we only need to enrich the fetched rows into them.
// Therefore, other values can now be safely ignored.
sorted_versions
.entry(row.0.id)
.and_modify(|entry| *entry = (None, false, Some(row)));
}
.load_stream::<(Version, Option<User>)>(conn)
.await?
.try_for_each(|row| {
// The versions are already sorted, and we only need to enrich the fetched rows into them.
// Therefore, other values can now be safely ignored.
sorted_versions
.entry(row.0.id)
.and_modify(|entry| *entry = (None, false, Some(row)));

future::ready(Ok(()))
})
.await?;

let len = sorted_versions.len();
(
Expand All @@ -272,7 +276,8 @@ fn list_by_semver(
.filter(versions::crate_id.eq(crate_id))
.left_outer_join(users::table)
.select(<(Version, Option<User>)>::as_select())
.load(conn)?;
.load(conn)
.await?;
data.sort_by_cached_key(|(version, _)| Reverse(semver::Version::parse(&version.num).ok()));
let total = data.len();
(data, total, None)
Expand Down