Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 35 additions & 42 deletions src/controllers/version/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ use axum_extra::json;
use axum_extra::response::ErasedJson;
use crates_io_database::schema::{crates, dependencies};
use crates_io_worker::BackgroundJob;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::AsyncPgConnection;
use http::request::Parts;
use http::StatusCode;
use serde::Deserialize;
use tokio::runtime::Handle;

use crate::app::AppState;
use crate::auth::{AuthCheck, Authentication};
Expand All @@ -25,9 +23,7 @@ use crate::models::{
};
use crate::rate_limiter::LimitedAction;
use crate::schema::versions;
use crate::tasks::spawn_blocking;
use crate::util::diesel::prelude::*;
use crate::util::diesel::Conn;
use crate::util::errors::{bad_request, custom, version_not_found, AppResult};
use crate::views::{EncodableDependency, EncodableVersion};
use crate::worker::jobs::{SyncToGitIndex, SyncToSparseIndex, UpdateDefaultVersion};
Expand Down Expand Up @@ -102,16 +98,11 @@ pub async fn show(

let mut conn = state.db_read().await?;
let (version, krate) = version_and_crate(&mut conn, &crate_name, &version).await?;
spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
let published_by = version.published_by(&mut conn).await?;
let actions = VersionOwnerAction::by_version(&mut conn, &version).await?;

let published_by = version.published_by(conn)?;
let actions = VersionOwnerAction::by_version(conn, &version)?;

let version = EncodableVersion::from(version, &krate.name, published_by, actions);
Ok(json!({ "version": version }))
})
.await
let version = EncodableVersion::from(version, &krate.name, published_by, actions);
Ok(json!({ "version": version }))
}

/// Handles the `PATCH /crates/:crate/:version` route.
Expand All @@ -137,25 +128,21 @@ pub async fn update(
.check_rate_limit(auth.user_id(), LimitedAction::YankUnyank, &mut conn)
.await?;

spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();

perform_version_yank_update(
&state,
conn,
&mut version,
&krate,
&auth,
update_request.version.yanked,
update_request.version.yank_message,
)?;

let published_by = version.published_by(conn)?;
let actions = VersionOwnerAction::by_version(conn, &version)?;
let updated_version = EncodableVersion::from(version, &krate.name, published_by, actions);
Ok(json!({ "version": updated_version }))
})
.await
perform_version_yank_update(
&state,
&mut conn,
&mut version,
&krate,
&auth,
update_request.version.yanked,
update_request.version.yank_message,
)
.await?;

let published_by = version.published_by(&mut conn).await?;
let actions = VersionOwnerAction::by_version(&mut conn, &version).await?;
let updated_version = EncodableVersion::from(version, &krate.name, published_by, actions);
Ok(json!({ "version": updated_version }))
}

fn validate_yank_update(update_data: &VersionUpdate, version: &Version) -> AppResult<()> {
Expand Down Expand Up @@ -186,24 +173,24 @@ pub async fn authenticate(
.await
}

pub fn perform_version_yank_update(
pub async fn perform_version_yank_update(
state: &AppState,
conn: &mut impl Conn,
conn: &mut AsyncPgConnection,
version: &mut Version,
krate: &Crate,
auth: &Authentication,
yanked: Option<bool>,
yank_message: Option<String>,
) -> AppResult<()> {
use diesel::RunQueryDsl;
use diesel_async::RunQueryDsl;

let api_token_id = auth.api_token_id();
let user = auth.user();
let owners = krate.owners(conn)?;
let owners = krate.async_owners(conn).await?;

let yanked = yanked.unwrap_or(version.yanked);

if Handle::current().block_on(user.rights(state, &owners))? < Rights::Publish {
if user.rights(state, &owners).await? < Rights::Publish {
if user.is_admin {
let action = if yanked { "yanking" } else { "unyanking" };
warn!(
Expand All @@ -230,7 +217,8 @@ pub fn perform_version_yank_update(
versions::yanked.eq(yanked),
versions::yank_message.eq(&yank_message),
))
.execute(conn)?;
.execute(conn)
.await?;

// If no rows were updated, return early
if updated_cnt == 0 {
Expand All @@ -252,11 +240,16 @@ pub fn perform_version_yank_update(
.maybe_api_token_id(api_token_id)
.action(action)
.build()
.insert(conn)?;
.async_insert(conn)
.await?;

SyncToGitIndex::new(&krate.name).enqueue(conn)?;
SyncToSparseIndex::new(&krate.name).enqueue(conn)?;
UpdateDefaultVersion::new(krate.id).enqueue(conn)?;
SyncToGitIndex::new(&krate.name).async_enqueue(conn).await?;
SyncToSparseIndex::new(&krate.name)
.async_enqueue(conn)
.await?;
UpdateDefaultVersion::new(krate.id)
.async_enqueue(conn)
.await?;

Ok(())
}
28 changes: 12 additions & 16 deletions src/controllers/version/yank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ use super::version_and_crate;
use crate::app::AppState;
use crate::controllers::helpers::ok_true;
use crate::rate_limiter::LimitedAction;
use crate::tasks::spawn_blocking;
use crate::util::errors::{version_not_found, AppResult};
use axum::extract::Path;
use axum::response::Response;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use http::request::Parts;

/// Handles the `DELETE /crates/:crate_id/:version/yank` route.
Expand Down Expand Up @@ -62,18 +60,16 @@ async fn modify_yank(
.check_rate_limit(auth.user_id(), LimitedAction::YankUnyank, &mut conn)
.await?;

spawn_blocking(move || {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
perform_version_yank_update(
&state,
conn,
&mut version,
&krate,
&auth,
Some(yanked),
None,
)?;
ok_true()
})
.await
perform_version_yank_update(
&state,
&mut conn,
&mut version,
&krate,
&auth,
Some(yanked),
None,
)
.await?;

ok_true()
}
20 changes: 18 additions & 2 deletions src/models/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,19 @@ impl VersionOwnerAction {
version_owner_actions::table.load(conn)
}

pub fn by_version(conn: &mut impl Conn, version: &Version) -> QueryResult<Vec<(Self, User)>> {
use diesel::RunQueryDsl;
pub async fn by_version(
conn: &mut AsyncPgConnection,
version: &Version,
) -> QueryResult<Vec<(Self, User)>> {
use diesel_async::RunQueryDsl;
use version_owner_actions::dsl::version_id;

version_owner_actions::table
.filter(version_id.eq(version.id))
.inner_join(users::table)
.order(version_owner_actions::dsl::id)
.load(conn)
.await
}

pub fn for_versions(
Expand Down Expand Up @@ -114,4 +118,16 @@ impl NewVersionOwnerAction {
.values(self)
.get_result(conn)
}

pub async fn async_insert(
&self,
conn: &mut AsyncPgConnection,
) -> QueryResult<VersionOwnerAction> {
use diesel_async::RunQueryDsl;

diesel::insert_into(version_owner_actions::table)
.values(self)
.get_result(conn)
.await
}
}
6 changes: 3 additions & 3 deletions src/models/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ impl Version {

/// Gets the User who ran `cargo publish` for this version, if recorded.
/// Not for use when you have a group of versions you need the publishers for.
pub fn published_by(&self, conn: &mut impl Conn) -> QueryResult<Option<User>> {
use diesel::RunQueryDsl;
pub async fn published_by(&self, conn: &mut AsyncPgConnection) -> QueryResult<Option<User>> {
use diesel_async::RunQueryDsl;

match self.published_by {
Some(pb) => users::table.find(pb).first(conn).optional(),
Some(pb) => users::table.find(pb).first(conn).await.optional(),
None => Ok(None),
}
}
Expand Down