Skip to content

Commit b54d0eb

Browse files
committed
worker: Implement async enqueue() fn
1 parent bbe1922 commit b54d0eb

File tree

1 file changed

+83
-1
lines changed

1 file changed

+83
-1
lines changed

crates/crates_io_worker/src/background_job.rs

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ use crate::schema::background_jobs;
33
use diesel::connection::LoadConnection;
44
use diesel::dsl::{exists, not};
55
use diesel::pg::Pg;
6-
use diesel::prelude::*;
76
use diesel::sql_types::{Int2, Jsonb, Text};
7+
use diesel::{ExpressionMethods, IntoSql, OptionalExtension, QueryDsl};
8+
use diesel_async::AsyncPgConnection;
89
use serde::de::DeserializeOwned;
910
use serde::Serialize;
1011
use serde_json::Value;
@@ -53,6 +54,24 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
5354
Ok(Some(enqueue_simple(conn, Self::JOB_NAME, &data, priority)?))
5455
}
5556
}
57+
58+
#[allow(async_fn_in_trait)]
59+
#[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))]
60+
async fn async_enqueue(
61+
&self,
62+
conn: &mut AsyncPgConnection,
63+
) -> Result<Option<i64>, EnqueueError> {
64+
let data = serde_json::to_value(self)?;
65+
let priority = Self::PRIORITY;
66+
67+
if Self::DEDUPLICATED {
68+
Ok(async_enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?)
69+
} else {
70+
Ok(Some(
71+
async_enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?,
72+
))
73+
}
74+
}
5675
}
5776

5877
fn enqueue_deduplicated(
@@ -61,6 +80,8 @@ fn enqueue_deduplicated(
6180
data: &Value,
6281
priority: i16,
6382
) -> Result<Option<i64>, EnqueueError> {
83+
use diesel::RunQueryDsl;
84+
6485
let similar_jobs = background_jobs::table
6586
.select(background_jobs::id)
6687
.filter(background_jobs::job_type.eq(job_type))
@@ -90,12 +111,52 @@ fn enqueue_deduplicated(
90111
Ok(id)
91112
}
92113

114+
async fn async_enqueue_deduplicated(
115+
conn: &mut AsyncPgConnection,
116+
job_type: &str,
117+
data: &Value,
118+
priority: i16,
119+
) -> Result<Option<i64>, EnqueueError> {
120+
use diesel_async::RunQueryDsl;
121+
122+
let similar_jobs = background_jobs::table
123+
.select(background_jobs::id)
124+
.filter(background_jobs::job_type.eq(job_type))
125+
.filter(background_jobs::data.eq(data))
126+
.filter(background_jobs::priority.eq(priority))
127+
.for_update()
128+
.skip_locked();
129+
130+
let deduplicated_select = diesel::select((
131+
job_type.into_sql::<Text>(),
132+
data.into_sql::<Jsonb>(),
133+
priority.into_sql::<Int2>(),
134+
))
135+
.filter(not(exists(similar_jobs)));
136+
137+
let id = diesel::insert_into(background_jobs::table)
138+
.values(deduplicated_select)
139+
.into_columns((
140+
background_jobs::job_type,
141+
background_jobs::data,
142+
background_jobs::priority,
143+
))
144+
.returning(background_jobs::id)
145+
.get_result::<i64>(conn)
146+
.await
147+
.optional()?;
148+
149+
Ok(id)
150+
}
151+
93152
fn enqueue_simple(
94153
conn: &mut impl LoadConnection<Backend = Pg>,
95154
job_type: &str,
96155
data: &Value,
97156
priority: i16,
98157
) -> Result<i64, EnqueueError> {
158+
use diesel::RunQueryDsl;
159+
99160
let id = diesel::insert_into(background_jobs::table)
100161
.values((
101162
background_jobs::job_type.eq(job_type),
@@ -107,3 +168,24 @@ fn enqueue_simple(
107168

108169
Ok(id)
109170
}
171+
172+
async fn async_enqueue_simple(
173+
conn: &mut AsyncPgConnection,
174+
job_type: &str,
175+
data: &Value,
176+
priority: i16,
177+
) -> Result<i64, EnqueueError> {
178+
use diesel_async::RunQueryDsl;
179+
180+
let id = diesel::insert_into(background_jobs::table)
181+
.values((
182+
background_jobs::job_type.eq(job_type),
183+
background_jobs::data.eq(data),
184+
background_jobs::priority.eq(priority),
185+
))
186+
.returning(background_jobs::id)
187+
.get_result(conn)
188+
.await?;
189+
190+
Ok(id)
191+
}

0 commit comments

Comments
 (0)