Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bin/crates-admin/default_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ pub async fn run(command: Command) -> anyhow::Result<()> {

Ok(())
})
.await
.await?
}
6 changes: 3 additions & 3 deletions src/bin/crates-admin/delete_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
}
}

let opts = spawn_blocking::<_, _, anyhow::Error>(move || {
let opts = spawn_blocking(move || {
use diesel::RunQueryDsl;

let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
Expand Down Expand Up @@ -110,8 +110,8 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job");
}

Ok(opts)
}).await?;
Ok::<_, anyhow::Error>(opts)
}).await??;

let crate_name = &opts.crate_name;

Expand Down
2 changes: 1 addition & 1 deletion src/bin/crates-admin/dialoguer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crates_io::tasks::spawn_blocking;

pub async fn confirm(msg: impl Into<String>) -> anyhow::Result<bool> {
let msg = msg.into();
spawn_blocking(move || sync_confirm(msg).map_err(anyhow::Error::from)).await
spawn_blocking(move || sync_confirm(msg).map_err(anyhow::Error::from)).await?
}

fn sync_confirm(msg: impl Into<String>) -> dialoguer::Result<bool> {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/crates-admin/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn run(_opts: Opts) -> Result<(), Error> {

Ok::<_, Error>(conn)
})
.await?;
.await??;

info!("Synchronizing crate categories");
crates_io::boot::categories::sync_with_connection(CATEGORIES_TOML, &mut conn).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/bin/crates-admin/render_readmes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async fn get_readme(
let archive = Archive::new(reader);
render_pkg_readme(archive, &pkg_name)
})
.await
.await?
}

fn render_pkg_readme<R: Read>(mut archive: Archive<R>, pkg_name: &str) -> anyhow::Result<String> {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/crates-admin/upload_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {

Ok::<_, anyhow::Error>((repo, files))
})
.await?;
.await??;

if !dialoguer::confirm("continue with upload?").await? {
return Ok(());
Expand Down
4 changes: 2 additions & 2 deletions src/controllers/crate_owner_invitation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn list(app: AppState, req: Parts) -> AppResult<ErasedJson> {
"users": users,
}))
})
.await
.await?
}

/// Handles the `GET /api/private/crate_owner_invitations` route.
Expand All @@ -86,7 +86,7 @@ pub async fn private_list(app: AppState, req: Parts) -> AppResult<Json<PrivateLi
let list = prepare_list(&app, &req, auth, filter, conn)?;
Ok(Json(list))
})
.await
.await?
}

enum ListFilter {
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/keyword.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn index(state: AppState, qp: Query<IndexQuery>, req: Parts) -> AppRes
"meta": { "total": total },
}))
})
.await
.await?
}

/// Handles the `GET /keywords/:keyword_id` route.
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/krate/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub async fn show(app: AppState, Path(name): Path<String>, req: Parts) -> AppRes
"categories": encodable_cats,
}))
})
.await
.await?
}

#[derive(Debug)]
Expand Down
4 changes: 2 additions & 2 deletions src/controllers/krate/owners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn owner_user(state: AppState, Path(crate_name): Path<String>) -> AppR

Ok(json!({ "users": owners }))
})
.await
.await?
}

/// Handles the `PUT /crates/:crate_id/owners` route.
Expand Down Expand Up @@ -246,7 +246,7 @@ async fn modify_owners(

Ok(json!({ "msg": comma_sep_msg, "ok": true }))
})
.await
.await?
}

pub struct OwnerInviteEmail {
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCra
}))
})
})
.await
.await?
}

/// Counts the number of versions for `crate_id` that were published within
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/krate/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ pub async fn search(app: AppState, req: Parts) -> AppResult<ErasedJson> {
},
}))
})
.await
.await?
}

#[derive(Default)]
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/krate/versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub async fn versions(
None => json!({ "versions": versions }),
})
})
.await
.await?
}

/// Seek-based pagination of versions by date
Expand Down
7 changes: 2 additions & 5 deletions src/controllers/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::app::AppState;
use crate::tasks::spawn_blocking;
use crate::util::errors::{custom, forbidden, not_found, AppResult, BoxedAppError};
use crate::util::errors::{custom, forbidden, not_found, AppResult};
use axum::extract::Path;
use http::request::Parts;
use http::{header, StatusCode};
Expand Down Expand Up @@ -30,10 +30,7 @@ pub async fn prometheus(app: AppState, Path(kind): Path<String>, req: Parts) ->
let mut conn = app.db_read().await?;
app.service_metrics.gather(&mut conn).await?
}
"instance" => {
spawn_blocking(move || Ok::<_, BoxedAppError>(app.instance_metrics.gather(&app)?))
.await?
}
"instance" => spawn_blocking(move || app.instance_metrics.gather(&app)).await??,
_ => return Err(not_found()),
};

Expand Down
2 changes: 1 addition & 1 deletion src/controllers/user/me.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub async fn updates(app: AppState, req: Parts) -> AppResult<ErasedJson> {
"meta": { "more": more },
}))
})
.await
.await?
}

/// Handles the `PUT /confirm/:email_token` route
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/version/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@ pub async fn downloads(

Ok(json!({ "version_downloads": downloads }))
})
.await
.await?
}
22 changes: 7 additions & 15 deletions src/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
use sentry::Hub;
use std::convert::identity;
use tokio::task::JoinError;
use tokio::task::JoinHandle;

/// Runs the provided closure on a thread where blocking is acceptable.
///
/// This is using [tokio::task::spawn_blocking] internally, but automatically
/// runs the callback function in the context of the current Sentry [Hub].
///
/// The function also returns a flattened [Result], which requires the error
/// variant of the [Result] to implement [From\<JoinError>].
pub async fn spawn_blocking<F, R, E>(f: F) -> Result<R, E>
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> Result<R, E> + Send + 'static,
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
E: Send + From<JoinError> + 'static,
{
let current_span = tracing::Span::current();
let hub = Hub::current();
tokio::task::spawn_blocking(move || current_span.in_scope(|| Hub::run(hub, f)))
.await
// Convert `JoinError` to `E`
.map_err(Into::into)
// Flatten `Result<Result<_, E>, E>` to `Result<_, E>`
.and_then(identity)
}

#[cfg(test)]
Expand All @@ -33,16 +23,18 @@ mod tests {
/// Test that [spawn_blocking] works with [anyhow].
#[tokio::test]
async fn test_spawn_blocking_anyhow() {
spawn_blocking::<_, _, anyhow::Error>(|| Ok(()))
spawn_blocking(|| Ok::<_, anyhow::Error>(()))
.await
.unwrap()
.unwrap()
}

/// Test that [spawn_blocking] works with [BoxedAppError].
#[tokio::test]
async fn test_spawn_blocking_apperror() {
spawn_blocking::<_, _, BoxedAppError>(|| Ok(()))
spawn_blocking(|| Ok::<_, BoxedAppError>(()))
.await
.unwrap()
.unwrap()
}
}
2 changes: 1 addition & 1 deletion src/worker/jobs/archive_version_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl BackgroundJob for ArchiveVersionDownloads {
let csv_path = tempdir.path().join(FILE_NAME);

export(&env.config.db.primary.url, &csv_path, &self.before).await?;
let dates = spawn_blocking(move || split(csv_path)).await?;
let dates = spawn_blocking(move || split(csv_path)).await??;
let uploaded_dates = upload(downloads_archive_store, tempdir.path(), dates).await?;

let mut conn = env.deadpool.get().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/worker/jobs/downloads/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl BackgroundJob for UpdateDownloads {
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
Ok(update(conn)?)
})
.await
.await?
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/worker/jobs/dump_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl BackgroundJob for DumpDb {
PathBuf::from(directory.timestamp.format("%Y-%m-%d-%H%M%S").to_string());
create_archives(export_dir, &tarball_prefix)
})
.await?;
.await??;

info!("Uploading tarball…");
env.storage
Expand Down
2 changes: 1 addition & 1 deletion src/worker/jobs/index/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ impl BackgroundJob for NormalizeIndex {

Ok(())
})
.await
.await?
}
}
2 changes: 1 addition & 1 deletion src/worker/jobs/index/squash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ impl BackgroundJob for SquashIndex {

Ok(())
})
.await
.await?
}
}
2 changes: 1 addition & 1 deletion src/worker/jobs/index/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl BackgroundJob for SyncToGitIndex {

Ok(())
})
.await
.await?
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/worker/jobs/readmes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ impl BackgroundJob for RenderAndUploadReadme {

let job = self.clone();
let rendered = spawn_blocking(move || {
Ok::<_, anyhow::Error>(text_to_html(
text_to_html(
&job.text,
&job.readme_path,
job.base_url.as_deref(),
job.pkg_path_in_vcs.as_ref(),
))
)
})
.await?;

Expand Down
2 changes: 1 addition & 1 deletion src/worker/jobs/update_default_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ impl BackgroundJob for UpdateDefaultVersion {
update_default_version(crate_id, conn)?;
Ok(())
})
.await
.await?
}
}