Skip to content

Commit adb328f

Browse files
committed
controllers/krate/publish: Enqueue background jobs in parallel
1 parent 6f66ec2 commit adb328f

File tree

1 file changed

+34
-21
lines changed

1 file changed

+34
-21
lines changed

src/controllers/krate/publish.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ use axum::Json;
1010
use cargo_manifest::{Dependency, DepsSet, TargetDepsSet};
1111
use chrono::{DateTime, SecondsFormat, Utc};
1212
use crates_io_tarball::{process_tarball, TarballError};
13-
use crates_io_worker::BackgroundJob;
13+
use crates_io_worker::{BackgroundJob, EnqueueError};
1414
use diesel::dsl::{exists, select};
1515
use diesel::prelude::*;
1616
use diesel_async::scoped_futures::ScopedFutureExt;
1717
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
18+
use futures_util::TryFutureExt;
1819
use futures_util::TryStreamExt;
1920
use hex::ToHex;
2021
use http::StatusCode;
@@ -503,29 +504,41 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCra
503504
.await
504505
.map_err(|e| internal(format!("failed to upload crate: {e}")))?;
505506

506-
jobs::SyncToGitIndex::new(&krate.name).enqueue(conn).await?;
507-
jobs::SyncToSparseIndex::new(&krate.name).enqueue(conn).await?;
508-
509-
SendPublishNotificationsJob::new(version.id).enqueue(conn).await?;
507+
let git_index_job = jobs::SyncToGitIndex::new(&krate.name);
508+
let sparse_index_job = jobs::SyncToSparseIndex::new(&krate.name);
509+
let publish_notifications_job = SendPublishNotificationsJob::new(version.id);
510+
let crate_feed_job = jobs::rss::SyncCrateFeed::new(krate.name.clone());
511+
let updates_feed_job = jobs::rss::SyncUpdatesFeed;
512+
513+
tokio::try_join!(
514+
git_index_job.enqueue(conn),
515+
sparse_index_job.enqueue(conn),
516+
publish_notifications_job.enqueue(conn),
517+
crate_feed_job.enqueue(conn).or_else(|error| async move {
518+
error!("Failed to enqueue `rss::SyncCrateFeed` job: {error}");
519+
Ok::<_, EnqueueError>(None)
520+
}),
521+
updates_feed_job.enqueue(conn).or_else(|error| async move {
522+
error!("Failed to enqueue `rss::SyncUpdatesFeed` job: {error}");
523+
Ok::<_, EnqueueError>(None)
524+
}),
525+
)?;
510526

511527
// Experiment: check new crates for potential typosquatting.
512528
if existing_crate.is_none() {
513-
CheckTyposquat::new(&krate.name).enqueue(conn).await?;
514-
}
515-
516-
let job = jobs::rss::SyncCrateFeed::new(krate.name.clone());
517-
if let Err(error) = job.enqueue(conn).await {
518-
error!("Failed to enqueue `rss::SyncCrateFeed` job: {error}");
519-
}
520-
521-
if let Err(error) = jobs::rss::SyncUpdatesFeed.enqueue(conn).await {
522-
error!("Failed to enqueue `rss::SyncUpdatesFeed` job: {error}");
523-
}
524-
525-
if existing_crate.is_none() {
526-
if let Err(error) = jobs::rss::SyncCratesFeed.enqueue(conn).await {
527-
error!("Failed to enqueue `rss::SyncCratesFeed` job: {error}");
528-
}
529+
let crates_feed_job = jobs::rss::SyncCratesFeed;
530+
let typosquat_job = CheckTyposquat::new(&krate.name);
531+
532+
tokio::try_join!(
533+
crates_feed_job.enqueue(conn).or_else(|error| async move {
534+
error!("Failed to enqueue `rss::SyncCratesFeed` job: {error}");
535+
Ok::<_, EnqueueError>(None)
536+
}),
537+
typosquat_job.enqueue(conn).or_else(|error| async move {
538+
error!("Failed to enqueue `CheckTyposquat` job: {error}");
539+
Ok::<_, EnqueueError>(None)
540+
}),
541+
)?;
529542
}
530543

531544
// The `other` field on `PublishWarnings` was introduced to handle a temporary warning

0 commit comments

Comments
 (0)