Skip to content

Commit 67354ac

Browse files
authored
Merge pull request #10087 from Turbo87/pipelined-enqueue
Enqueue background jobs in parallel
2 parents f88a3eb + 289be81 commit 67354ac

File tree

6 files changed

+99
-75
lines changed

6 files changed

+99
-75
lines changed

crates/crates_io_worker/src/background_job.rs

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use diesel::dsl::{exists, not};
44
use diesel::sql_types::{Int2, Jsonb, Text};
55
use diesel::{ExpressionMethods, IntoSql, OptionalExtension, QueryDsl};
66
use diesel_async::{AsyncPgConnection, RunQueryDsl};
7+
use futures_util::future::BoxFuture;
8+
use futures_util::FutureExt;
79
use serde::de::DeserializeOwned;
810
use serde::Serialize;
911
use serde_json::Value;
@@ -38,32 +40,37 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
3840
/// Execute the task. This method should define its logic.
3941
fn run(&self, ctx: Self::Context) -> impl Future<Output = anyhow::Result<()>> + Send;
4042

41-
#[allow(async_fn_in_trait)]
4243
#[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))]
43-
async fn enqueue(&self, conn: &mut AsyncPgConnection) -> Result<Option<i64>, EnqueueError> {
44-
let data = serde_json::to_value(self)?;
44+
fn enqueue(
45+
&self,
46+
conn: &mut AsyncPgConnection,
47+
) -> BoxFuture<'_, Result<Option<i64>, EnqueueError>> {
48+
let data = match serde_json::to_value(self) {
49+
Ok(data) => data,
50+
Err(err) => return async move { Err(EnqueueError::SerializationError(err)) }.boxed(),
51+
};
4552
let priority = Self::PRIORITY;
4653

4754
if Self::DEDUPLICATED {
48-
Ok(enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?)
55+
let future = enqueue_deduplicated(conn, Self::JOB_NAME, data, priority);
56+
future.boxed()
4957
} else {
50-
Ok(Some(
51-
enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?,
52-
))
58+
let future = enqueue_simple(conn, Self::JOB_NAME, data, priority);
59+
async move { Ok(Some(future.await?)) }.boxed()
5360
}
5461
}
5562
}
5663

57-
async fn enqueue_deduplicated(
64+
fn enqueue_deduplicated(
5865
conn: &mut AsyncPgConnection,
59-
job_type: &str,
60-
data: &Value,
66+
job_type: &'static str,
67+
data: Value,
6168
priority: i16,
62-
) -> Result<Option<i64>, EnqueueError> {
69+
) -> impl Future<Output = Result<Option<i64>, EnqueueError>> {
6370
let similar_jobs = background_jobs::table
6471
.select(background_jobs::id)
6572
.filter(background_jobs::job_type.eq(job_type))
66-
.filter(background_jobs::data.eq(data))
73+
.filter(background_jobs::data.eq(data.clone()))
6774
.filter(background_jobs::priority.eq(priority))
6875
.for_update()
6976
.skip_locked();
@@ -75,36 +82,33 @@ async fn enqueue_deduplicated(
7582
))
7683
.filter(not(exists(similar_jobs)));
7784

78-
let id = diesel::insert_into(background_jobs::table)
85+
let future = diesel::insert_into(background_jobs::table)
7986
.values(deduplicated_select)
8087
.into_columns((
8188
background_jobs::job_type,
8289
background_jobs::data,
8390
background_jobs::priority,
8491
))
8592
.returning(background_jobs::id)
86-
.get_result::<i64>(conn)
87-
.await
88-
.optional()?;
93+
.get_result::<i64>(conn);
8994

90-
Ok(id)
95+
async move { Ok(future.await.optional()?) }
9196
}
9297

93-
async fn enqueue_simple(
98+
fn enqueue_simple(
9499
conn: &mut AsyncPgConnection,
95-
job_type: &str,
96-
data: &Value,
100+
job_type: &'static str,
101+
data: Value,
97102
priority: i16,
98-
) -> Result<i64, EnqueueError> {
99-
let id = diesel::insert_into(background_jobs::table)
103+
) -> impl Future<Output = Result<i64, EnqueueError>> {
104+
let future = diesel::insert_into(background_jobs::table)
100105
.values((
101106
background_jobs::job_type.eq(job_type),
102107
background_jobs::data.eq(data),
103108
background_jobs::priority.eq(priority),
104109
))
105110
.returning(background_jobs::id)
106-
.get_result(conn)
107-
.await?;
111+
.get_result(conn);
108112

109-
Ok(id)
113+
async move { Ok(future.await?) }
110114
}

src/bin/crates-admin/delete_crate.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -107,21 +107,17 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
107107
info!("{name}: Skipped missing crate");
108108
};
109109

110-
info!("{name}: Enqueuing index sync jobs…");
111-
let job = jobs::SyncToGitIndex::new(name);
112-
if let Err(error) = job.enqueue(&mut conn).await {
113-
warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}");
114-
}
115-
116-
let job = jobs::SyncToSparseIndex::new(name);
117-
if let Err(error) = job.enqueue(&mut conn).await {
118-
warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}");
119-
}
120-
121-
info!("{name}: Enqueuing DeleteCrateFromStorage job…");
122-
let job = jobs::DeleteCrateFromStorage::new(name.into());
123-
if let Err(error) = job.enqueue(&mut conn).await {
124-
warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}");
110+
info!("{name}: Enqueuing background jobs…");
111+
let git_index_job = jobs::SyncToGitIndex::new(name);
112+
let sparse_index_job = jobs::SyncToSparseIndex::new(name);
113+
let delete_from_storage_job = jobs::DeleteCrateFromStorage::new(name.into());
114+
115+
if let Err(error) = tokio::try_join!(
116+
git_index_job.enqueue(&mut conn),
117+
sparse_index_job.enqueue(&mut conn),
118+
delete_from_storage_job.enqueue(&mut conn),
119+
) {
120+
warn!("{name}: Failed to enqueue background job: {error}");
125121
}
126122
}
127123

src/bin/crates-admin/delete_version.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,14 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
9595
let crate_name = &opts.crate_name;
9696

9797
info!(%crate_name, "Enqueuing index sync jobs");
98-
let job = jobs::SyncToGitIndex::new(crate_name);
99-
if let Err(error) = job.enqueue(&mut conn).await {
100-
warn!(%crate_name, ?error, "Failed to enqueue SyncToGitIndex job");
101-
}
102-
let job = jobs::SyncToSparseIndex::new(crate_name);
103-
if let Err(error) = job.enqueue(&mut conn).await {
104-
warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job");
98+
let git_index_job = jobs::SyncToGitIndex::new(crate_name);
99+
let sparse_index_job = jobs::SyncToSparseIndex::new(crate_name);
100+
101+
if let Err(error) = tokio::try_join!(
102+
git_index_job.enqueue(&mut conn),
103+
sparse_index_job.enqueue(&mut conn),
104+
) {
105+
warn!(%crate_name, "Failed to enqueue background job: {error}");
105106
}
106107

107108
for version in &opts.versions {

src/bin/crates-admin/yank_version.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,15 @@ async fn yank(opts: Opts, conn: &mut AsyncPgConnection) -> anyhow::Result<()> {
6767
.execute(conn)
6868
.await?;
6969

70-
SyncToGitIndex::new(&krate.name).enqueue(conn).await?;
70+
let git_index_job = SyncToGitIndex::new(&krate.name);
71+
let sparse_index_job = SyncToSparseIndex::new(&krate.name);
72+
let update_default_version_job = UpdateDefaultVersion::new(krate.id);
7173

72-
SyncToSparseIndex::new(&krate.name).enqueue(conn).await?;
73-
74-
UpdateDefaultVersion::new(krate.id).enqueue(conn).await?;
74+
tokio::try_join!(
75+
git_index_job.enqueue(conn),
76+
sparse_index_job.enqueue(conn),
77+
update_default_version_job.enqueue(conn),
78+
)?;
7579

7680
Ok(())
7781
}

src/controllers/krate/publish.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ use axum::Json;
1010
use cargo_manifest::{Dependency, DepsSet, TargetDepsSet};
1111
use chrono::{DateTime, SecondsFormat, Utc};
1212
use crates_io_tarball::{process_tarball, TarballError};
13-
use crates_io_worker::BackgroundJob;
13+
use crates_io_worker::{BackgroundJob, EnqueueError};
1414
use diesel::dsl::{exists, select};
1515
use diesel::prelude::*;
1616
use diesel_async::scoped_futures::ScopedFutureExt;
1717
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
18+
use futures_util::TryFutureExt;
1819
use futures_util::TryStreamExt;
1920
use hex::ToHex;
2021
use http::StatusCode;
@@ -503,29 +504,41 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCra
503504
.await
504505
.map_err(|e| internal(format!("failed to upload crate: {e}")))?;
505506

506-
jobs::SyncToGitIndex::new(&krate.name).enqueue(conn).await?;
507-
jobs::SyncToSparseIndex::new(&krate.name).enqueue(conn).await?;
508-
509-
SendPublishNotificationsJob::new(version.id).enqueue(conn).await?;
507+
let git_index_job = jobs::SyncToGitIndex::new(&krate.name);
508+
let sparse_index_job = jobs::SyncToSparseIndex::new(&krate.name);
509+
let publish_notifications_job = SendPublishNotificationsJob::new(version.id);
510+
let crate_feed_job = jobs::rss::SyncCrateFeed::new(krate.name.clone());
511+
let updates_feed_job = jobs::rss::SyncUpdatesFeed;
512+
513+
tokio::try_join!(
514+
git_index_job.enqueue(conn),
515+
sparse_index_job.enqueue(conn),
516+
publish_notifications_job.enqueue(conn),
517+
crate_feed_job.enqueue(conn).or_else(|error| async move {
518+
error!("Failed to enqueue `rss::SyncCrateFeed` job: {error}");
519+
Ok::<_, EnqueueError>(None)
520+
}),
521+
updates_feed_job.enqueue(conn).or_else(|error| async move {
522+
error!("Failed to enqueue `rss::SyncUpdatesFeed` job: {error}");
523+
Ok::<_, EnqueueError>(None)
524+
}),
525+
)?;
510526

511527
// Experiment: check new crates for potential typosquatting.
512528
if existing_crate.is_none() {
513-
CheckTyposquat::new(&krate.name).enqueue(conn).await?;
514-
}
515-
516-
let job = jobs::rss::SyncCrateFeed::new(krate.name.clone());
517-
if let Err(error) = job.enqueue(conn).await {
518-
error!("Failed to enqueue `rss::SyncCrateFeed` job: {error}");
519-
}
520-
521-
if let Err(error) = jobs::rss::SyncUpdatesFeed.enqueue(conn).await {
522-
error!("Failed to enqueue `rss::SyncUpdatesFeed` job: {error}");
523-
}
524-
525-
if existing_crate.is_none() {
526-
if let Err(error) = jobs::rss::SyncCratesFeed.enqueue(conn).await {
527-
error!("Failed to enqueue `rss::SyncCratesFeed` job: {error}");
528-
}
529+
let crates_feed_job = jobs::rss::SyncCratesFeed;
530+
let typosquat_job = CheckTyposquat::new(&krate.name);
531+
532+
tokio::try_join!(
533+
crates_feed_job.enqueue(conn).or_else(|error| async move {
534+
error!("Failed to enqueue `rss::SyncCratesFeed` job: {error}");
535+
Ok::<_, EnqueueError>(None)
536+
}),
537+
typosquat_job.enqueue(conn).or_else(|error| async move {
538+
error!("Failed to enqueue `CheckTyposquat` job: {error}");
539+
Ok::<_, EnqueueError>(None)
540+
}),
541+
)?;
529542
}
530543

531544
// The `other` field on `PublishWarnings` was introduced to handle a temporary warning

src/controllers/version/metadata.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,15 @@ pub async fn perform_version_yank_update(
239239
.insert(conn)
240240
.await?;
241241

242-
SyncToGitIndex::new(&krate.name).enqueue(conn).await?;
243-
SyncToSparseIndex::new(&krate.name).enqueue(conn).await?;
244-
UpdateDefaultVersion::new(krate.id).enqueue(conn).await?;
242+
let git_index_job = SyncToGitIndex::new(&krate.name);
243+
let sparse_index_job = SyncToSparseIndex::new(&krate.name);
244+
let update_default_version_job = UpdateDefaultVersion::new(krate.id);
245+
246+
tokio::try_join!(
247+
git_index_job.enqueue(conn),
248+
sparse_index_job.enqueue(conn),
249+
update_default_version_job.enqueue(conn),
250+
)?;
245251

246252
Ok(())
247253
}

0 commit comments

Comments
 (0)