From effdf3b3f56024b846df5b9debf20bfe1b049760 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 19 Nov 2024 12:36:28 +0100 Subject: [PATCH] Simplify `spawn_blocking()` wrapper fn The idea for our wrapper to require a `Result` return type for the callback function might have been good in theory, but leads to too many small annoyances in practice. Let's get rid of it and move back closer to the original `spawn_blocking()` implementation from `tokio`. --- src/bin/crates-admin/default_versions.rs | 2 +- src/bin/crates-admin/delete_version.rs | 6 +++--- src/bin/crates-admin/dialoguer.rs | 2 +- src/bin/crates-admin/migrate.rs | 2 +- src/bin/crates-admin/render_readmes.rs | 2 +- src/bin/crates-admin/upload_index.rs | 2 +- src/controllers/crate_owner_invitation.rs | 4 ++-- src/controllers/keyword.rs | 2 +- src/controllers/krate/metadata.rs | 2 +- src/controllers/krate/owners.rs | 4 ++-- src/controllers/krate/publish.rs | 2 +- src/controllers/krate/search.rs | 2 +- src/controllers/krate/versions.rs | 2 +- src/controllers/metrics.rs | 7 ++----- src/controllers/user/me.rs | 2 +- src/controllers/version/downloads.rs | 2 +- src/tasks.rs | 22 +++++++------------- src/worker/jobs/archive_version_downloads.rs | 2 +- src/worker/jobs/downloads/update_metadata.rs | 2 +- src/worker/jobs/dump_db.rs | 2 +- src/worker/jobs/index/normalize.rs | 2 +- src/worker/jobs/index/squash.rs | 2 +- src/worker/jobs/index/sync.rs | 2 +- src/worker/jobs/readmes.rs | 4 ++-- src/worker/jobs/update_default_version.rs | 2 +- 25 files changed, 37 insertions(+), 48 deletions(-) diff --git a/src/bin/crates-admin/default_versions.rs b/src/bin/crates-admin/default_versions.rs index 4dc5e082d04..3d6a59a0079 100644 --- a/src/bin/crates-admin/default_versions.rs +++ b/src/bin/crates-admin/default_versions.rs @@ -48,5 +48,5 @@ pub async fn run(command: Command) -> anyhow::Result<()> { Ok(()) }) - .await + .await? } diff --git a/src/bin/crates-admin/delete_version.rs b/src/bin/crates-admin/delete_version.rs index 0ef989dc934..6619ca4484c 100644 --- a/src/bin/crates-admin/delete_version.rs +++ b/src/bin/crates-admin/delete_version.rs @@ -64,7 +64,7 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> { } } - let opts = spawn_blocking::<_, _, anyhow::Error>(move || { + let opts = spawn_blocking(move || { use diesel::RunQueryDsl; let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); @@ -110,8 +110,8 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> { warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job"); } - Ok(opts) - }).await?; + Ok::<_, anyhow::Error>(opts) + }).await??; let crate_name = &opts.crate_name; diff --git a/src/bin/crates-admin/dialoguer.rs b/src/bin/crates-admin/dialoguer.rs index 6bf570db3a4..d4fc9a03d38 100644 --- a/src/bin/crates-admin/dialoguer.rs +++ b/src/bin/crates-admin/dialoguer.rs @@ -3,7 +3,7 @@ use crates_io::tasks::spawn_blocking; pub async fn confirm(msg: impl Into) -> anyhow::Result { let msg = msg.into(); - spawn_blocking(move || sync_confirm(msg).map_err(anyhow::Error::from)).await + spawn_blocking(move || sync_confirm(msg).map_err(anyhow::Error::from)).await? } fn sync_confirm(msg: impl Into) -> dialoguer::Result { diff --git a/src/bin/crates-admin/migrate.rs b/src/bin/crates-admin/migrate.rs index 79950c1ae0b..606d9bae937 100644 --- a/src/bin/crates-admin/migrate.rs +++ b/src/bin/crates-admin/migrate.rs @@ -54,7 +54,7 @@ pub async fn run(_opts: Opts) -> Result<(), Error> { Ok::<_, Error>(conn) }) - .await?; + .await??; info!("Synchronizing crate categories"); crates_io::boot::categories::sync_with_connection(CATEGORIES_TOML, &mut conn).await?; diff --git a/src/bin/crates-admin/render_readmes.rs b/src/bin/crates-admin/render_readmes.rs index b8331c18724..72643de80f4 100644 --- a/src/bin/crates-admin/render_readmes.rs +++ b/src/bin/crates-admin/render_readmes.rs @@ -175,7 +175,7 @@ async fn get_readme( let archive = Archive::new(reader); render_pkg_readme(archive, &pkg_name) }) - .await + .await? } fn render_pkg_readme(mut archive: Archive, pkg_name: &str) -> anyhow::Result { diff --git a/src/bin/crates-admin/upload_index.rs b/src/bin/crates-admin/upload_index.rs index 8c9fb6bba66..20ed7502b31 100644 --- a/src/bin/crates-admin/upload_index.rs +++ b/src/bin/crates-admin/upload_index.rs @@ -30,7 +30,7 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> { Ok::<_, anyhow::Error>((repo, files)) }) - .await?; + .await??; if !dialoguer::confirm("continue with upload?").await? { return Ok(()); diff --git a/src/controllers/crate_owner_invitation.rs b/src/controllers/crate_owner_invitation.rs index b36a6c8017c..675e5e85df7 100644 --- a/src/controllers/crate_owner_invitation.rs +++ b/src/controllers/crate_owner_invitation.rs @@ -65,7 +65,7 @@ pub async fn list(app: AppState, req: Parts) -> AppResult { "users": users, })) }) - .await + .await? } /// Handles the `GET /api/private/crate_owner_invitations` route. @@ -86,7 +86,7 @@ pub async fn private_list(app: AppState, req: Parts) -> AppResult, req: Parts) -> AppRes "meta": { "total": total }, })) }) - .await + .await? } /// Handles the `GET /keywords/:keyword_id` route. diff --git a/src/controllers/krate/metadata.rs b/src/controllers/krate/metadata.rs index 87156783b75..ae0a652c77e 100644 --- a/src/controllers/krate/metadata.rs +++ b/src/controllers/krate/metadata.rs @@ -164,7 +164,7 @@ pub async fn show(app: AppState, Path(name): Path, req: Parts) -> AppRes "categories": encodable_cats, })) }) - .await + .await? } #[derive(Debug)] diff --git a/src/controllers/krate/owners.rs b/src/controllers/krate/owners.rs index 22201a4c86b..dcada53a248 100644 --- a/src/controllers/krate/owners.rs +++ b/src/controllers/krate/owners.rs @@ -80,7 +80,7 @@ pub async fn owner_user(state: AppState, Path(crate_name): Path) -> AppR Ok(json!({ "users": owners })) }) - .await + .await? } /// Handles the `PUT /crates/:crate_id/owners` route. @@ -246,7 +246,7 @@ async fn modify_owners( Ok(json!({ "msg": comma_sep_msg, "ok": true })) }) - .await + .await? } pub struct OwnerInviteEmail { diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 406cd5d7e49..2b551525a6b 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -562,7 +562,7 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult AppResult { }, })) }) - .await + .await? } #[derive(Default)] diff --git a/src/controllers/krate/versions.rs b/src/controllers/krate/versions.rs index 62cad9f008b..ed48f00e530 100644 --- a/src/controllers/krate/versions.rs +++ b/src/controllers/krate/versions.rs @@ -84,7 +84,7 @@ pub async fn versions( None => json!({ "versions": versions }), }) }) - .await + .await? } /// Seek-based pagination of versions by date diff --git a/src/controllers/metrics.rs b/src/controllers/metrics.rs index fc6e35c59f1..06b27080bed 100644 --- a/src/controllers/metrics.rs +++ b/src/controllers/metrics.rs @@ -1,6 +1,6 @@ use crate::app::AppState; use crate::tasks::spawn_blocking; -use crate::util::errors::{custom, forbidden, not_found, AppResult, BoxedAppError}; +use crate::util::errors::{custom, forbidden, not_found, AppResult}; use axum::extract::Path; use http::request::Parts; use http::{header, StatusCode}; @@ -30,10 +30,7 @@ pub async fn prometheus(app: AppState, Path(kind): Path, req: Parts) -> let mut conn = app.db_read().await?; app.service_metrics.gather(&mut conn).await? } - "instance" => { - spawn_blocking(move || Ok::<_, BoxedAppError>(app.instance_metrics.gather(&app)?)) - .await? - } + "instance" => spawn_blocking(move || app.instance_metrics.gather(&app)).await??, _ => return Err(not_found()), }; diff --git a/src/controllers/user/me.rs b/src/controllers/user/me.rs index 68d8192f011..be4cb7fcd67 100644 --- a/src/controllers/user/me.rs +++ b/src/controllers/user/me.rs @@ -103,7 +103,7 @@ pub async fn updates(app: AppState, req: Parts) -> AppResult { "meta": { "more": more }, })) }) - .await + .await? } /// Handles the `PUT /confirm/:email_token` route diff --git a/src/controllers/version/downloads.rs b/src/controllers/version/downloads.rs index bd4f2791e16..e7ef5436f62 100644 --- a/src/controllers/version/downloads.rs +++ b/src/controllers/version/downloads.rs @@ -69,5 +69,5 @@ pub async fn downloads( Ok(json!({ "version_downloads": downloads })) }) - .await + .await? } diff --git a/src/tasks.rs b/src/tasks.rs index 8598e002e03..d3d04f53c24 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -1,28 +1,18 @@ use sentry::Hub; -use std::convert::identity; -use tokio::task::JoinError; +use tokio::task::JoinHandle; /// Runs the provided closure on a thread where blocking is acceptable. /// /// This is using [tokio::task::spawn_blocking] internally, but automatically /// runs the callback function in the context of the current Sentry [Hub]. -/// -/// The function also returns a flattened [Result], which requires the error -/// variant of the [Result] to implement [From\]. -pub async fn spawn_blocking(f: F) -> Result +pub fn spawn_blocking(f: F) -> JoinHandle where - F: FnOnce() -> Result + Send + 'static, + F: FnOnce() -> R + Send + 'static, R: Send + 'static, - E: Send + From + 'static, { let current_span = tracing::Span::current(); let hub = Hub::current(); tokio::task::spawn_blocking(move || current_span.in_scope(|| Hub::run(hub, f))) - .await - // Convert `JoinError` to `E` - .map_err(Into::into) - // Flatten `Result, E>` to `Result<_, E>` - .and_then(identity) } #[cfg(test)] @@ -33,16 +23,18 @@ mod tests { /// Test that [spawn_blocking] works with [anyhow]. #[tokio::test] async fn test_spawn_blocking_anyhow() { - spawn_blocking::<_, _, anyhow::Error>(|| Ok(())) + spawn_blocking(|| Ok::<_, anyhow::Error>(())) .await .unwrap() + .unwrap() } /// Test that [spawn_blocking] works with [BoxedAppError]. #[tokio::test] async fn test_spawn_blocking_apperror() { - spawn_blocking::<_, _, BoxedAppError>(|| Ok(())) + spawn_blocking(|| Ok::<_, BoxedAppError>(())) .await .unwrap() + .unwrap() } } diff --git a/src/worker/jobs/archive_version_downloads.rs b/src/worker/jobs/archive_version_downloads.rs index 81ec2b6dc62..e44ef601bc8 100644 --- a/src/worker/jobs/archive_version_downloads.rs +++ b/src/worker/jobs/archive_version_downloads.rs @@ -61,7 +61,7 @@ impl BackgroundJob for ArchiveVersionDownloads { let csv_path = tempdir.path().join(FILE_NAME); export(&env.config.db.primary.url, &csv_path, &self.before).await?; - let dates = spawn_blocking(move || split(csv_path)).await?; + let dates = spawn_blocking(move || split(csv_path)).await??; let uploaded_dates = upload(downloads_archive_store, tempdir.path(), dates).await?; let mut conn = env.deadpool.get().await?; diff --git a/src/worker/jobs/downloads/update_metadata.rs b/src/worker/jobs/downloads/update_metadata.rs index d2b24359645..e2cba9e4045 100644 --- a/src/worker/jobs/downloads/update_metadata.rs +++ b/src/worker/jobs/downloads/update_metadata.rs @@ -24,7 +24,7 @@ impl BackgroundJob for UpdateDownloads { let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); Ok(update(conn)?) }) - .await + .await? } } diff --git a/src/worker/jobs/dump_db.rs b/src/worker/jobs/dump_db.rs index 2b02815ad15..c08965ab4ef 100644 --- a/src/worker/jobs/dump_db.rs +++ b/src/worker/jobs/dump_db.rs @@ -37,7 +37,7 @@ impl BackgroundJob for DumpDb { PathBuf::from(directory.timestamp.format("%Y-%m-%d-%H%M%S").to_string()); create_archives(export_dir, &tarball_prefix) }) - .await?; + .await??; info!("Uploading tarball…"); env.storage diff --git a/src/worker/jobs/index/normalize.rs b/src/worker/jobs/index/normalize.rs index 7f9cba8c9a5..2f2d23a4b44 100644 --- a/src/worker/jobs/index/normalize.rs +++ b/src/worker/jobs/index/normalize.rs @@ -94,6 +94,6 @@ impl BackgroundJob for NormalizeIndex { Ok(()) }) - .await + .await? } } diff --git a/src/worker/jobs/index/squash.rs b/src/worker/jobs/index/squash.rs index 85ac0af8aa1..18cb2a87910 100644 --- a/src/worker/jobs/index/squash.rs +++ b/src/worker/jobs/index/squash.rs @@ -62,6 +62,6 @@ impl BackgroundJob for SquashIndex { Ok(()) }) - .await + .await? } } diff --git a/src/worker/jobs/index/sync.rs b/src/worker/jobs/index/sync.rs index 81de55bc568..da1e92a8806 100644 --- a/src/worker/jobs/index/sync.rs +++ b/src/worker/jobs/index/sync.rs @@ -73,7 +73,7 @@ impl BackgroundJob for SyncToGitIndex { Ok(()) }) - .await + .await? } } diff --git a/src/worker/jobs/readmes.rs b/src/worker/jobs/readmes.rs index 8650b8d9cf0..cab0530156b 100644 --- a/src/worker/jobs/readmes.rs +++ b/src/worker/jobs/readmes.rs @@ -52,12 +52,12 @@ impl BackgroundJob for RenderAndUploadReadme { let job = self.clone(); let rendered = spawn_blocking(move || { - Ok::<_, anyhow::Error>(text_to_html( + text_to_html( &job.text, &job.readme_path, job.base_url.as_deref(), job.pkg_path_in_vcs.as_ref(), - )) + ) }) .await?; diff --git a/src/worker/jobs/update_default_version.rs b/src/worker/jobs/update_default_version.rs index 5dbb5c32f9b..89e4e9f4949 100644 --- a/src/worker/jobs/update_default_version.rs +++ b/src/worker/jobs/update_default_version.rs @@ -33,6 +33,6 @@ impl BackgroundJob for UpdateDefaultVersion { update_default_version(crate_id, conn)?; Ok(()) }) - .await + .await? } }