Skip to content

Commit 61a5ea8

Browse files
committed
worker/background_job: Adjust enqueue() fns to allow pipelining
1 parent f88a3eb commit 61a5ea8

File tree

1 file changed

+29
-25
lines changed

1 file changed

+29
-25
lines changed

crates/crates_io_worker/src/background_job.rs

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use diesel::dsl::{exists, not};
44
use diesel::sql_types::{Int2, Jsonb, Text};
55
use diesel::{ExpressionMethods, IntoSql, OptionalExtension, QueryDsl};
66
use diesel_async::{AsyncPgConnection, RunQueryDsl};
7+
use futures_util::future::BoxFuture;
8+
use futures_util::FutureExt;
79
use serde::de::DeserializeOwned;
810
use serde::Serialize;
911
use serde_json::Value;
@@ -38,32 +40,37 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
3840
/// Execute the task. This method should define its logic.
3941
fn run(&self, ctx: Self::Context) -> impl Future<Output = anyhow::Result<()>> + Send;
4042

41-
#[allow(async_fn_in_trait)]
4243
#[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))]
43-
async fn enqueue(&self, conn: &mut AsyncPgConnection) -> Result<Option<i64>, EnqueueError> {
44-
let data = serde_json::to_value(self)?;
44+
fn enqueue(
45+
&self,
46+
conn: &mut AsyncPgConnection,
47+
) -> BoxFuture<'_, Result<Option<i64>, EnqueueError>> {
48+
let data = match serde_json::to_value(self) {
49+
Ok(data) => data,
50+
Err(err) => return async move { Err(EnqueueError::SerializationError(err)) }.boxed(),
51+
};
4552
let priority = Self::PRIORITY;
4653

4754
if Self::DEDUPLICATED {
48-
Ok(enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?)
55+
let future = enqueue_deduplicated(conn, Self::JOB_NAME, data, priority);
56+
future.boxed()
4957
} else {
50-
Ok(Some(
51-
enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?,
52-
))
58+
let future = enqueue_simple(conn, Self::JOB_NAME, data, priority);
59+
async move { Ok(Some(future.await?)) }.boxed()
5360
}
5461
}
5562
}
5663

57-
async fn enqueue_deduplicated(
64+
fn enqueue_deduplicated(
5865
conn: &mut AsyncPgConnection,
59-
job_type: &str,
60-
data: &Value,
66+
job_type: &'static str,
67+
data: Value,
6168
priority: i16,
62-
) -> Result<Option<i64>, EnqueueError> {
69+
) -> impl Future<Output = Result<Option<i64>, EnqueueError>> {
6370
let similar_jobs = background_jobs::table
6471
.select(background_jobs::id)
6572
.filter(background_jobs::job_type.eq(job_type))
66-
.filter(background_jobs::data.eq(data))
73+
.filter(background_jobs::data.eq(data.clone()))
6774
.filter(background_jobs::priority.eq(priority))
6875
.for_update()
6976
.skip_locked();
@@ -75,36 +82,33 @@ async fn enqueue_deduplicated(
7582
))
7683
.filter(not(exists(similar_jobs)));
7784

78-
let id = diesel::insert_into(background_jobs::table)
85+
let future = diesel::insert_into(background_jobs::table)
7986
.values(deduplicated_select)
8087
.into_columns((
8188
background_jobs::job_type,
8289
background_jobs::data,
8390
background_jobs::priority,
8491
))
8592
.returning(background_jobs::id)
86-
.get_result::<i64>(conn)
87-
.await
88-
.optional()?;
93+
.get_result::<i64>(conn);
8994

90-
Ok(id)
95+
async move { Ok(future.await.optional()?) }
9196
}
9297

93-
async fn enqueue_simple(
98+
fn enqueue_simple(
9499
conn: &mut AsyncPgConnection,
95-
job_type: &str,
96-
data: &Value,
100+
job_type: &'static str,
101+
data: Value,
97102
priority: i16,
98-
) -> Result<i64, EnqueueError> {
99-
let id = diesel::insert_into(background_jobs::table)
103+
) -> impl Future<Output = Result<i64, EnqueueError>> {
104+
let future = diesel::insert_into(background_jobs::table)
100105
.values((
101106
background_jobs::job_type.eq(job_type),
102107
background_jobs::data.eq(data),
103108
background_jobs::priority.eq(priority),
104109
))
105110
.returning(background_jobs::id)
106-
.get_result(conn)
107-
.await?;
111+
.get_result(conn);
108112

109-
Ok(id)
113+
async move { Ok(future.await?) }
110114
}

0 commit comments

Comments
 (0)