Skip to content

Commit 6066d18

Browse files
authored
worker: Add DEDUPLICATED flag to the BackgroundJob trait (#9627)
This flag can be set when implementing background jobs to automatically deduplicate jobs when they are enqueued. If an unstarted job already exists in the queue the `enqueue()` fn will return `Ok(None)` instead of the job ID.
1 parent f7d0fd8 commit 6066d18

File tree

4 files changed

+149
-12
lines changed

4 files changed

+149
-12
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/crates_io_worker/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,7 @@ tokio = { version = "=1.40.0", features = ["rt", "time"]}
2121
tracing = "=0.1.40"
2222

2323
[dev-dependencies]
24+
claims = "=0.7.1"
2425
crates_io_test_db = { path = "../crates_io_test_db" }
26+
insta = { version = "=1.40.0", features = ["json"] }
2527
tokio = { version = "=1.40.0", features = ["macros", "rt", "rt-multi-thread", "sync"]}

crates/crates_io_worker/src/background_job.rs

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use crate::errors::EnqueueError;
22
use crate::schema::background_jobs;
33
use diesel::connection::LoadConnection;
4+
use diesel::dsl::{exists, not};
45
use diesel::pg::Pg;
56
use diesel::prelude::*;
7+
use diesel::sql_types::{Int2, Jsonb, Text};
68
use serde::de::DeserializeOwned;
79
use serde::Serialize;
810
use std::future::Future;
@@ -21,6 +23,12 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
2123
/// [Self::enqueue_with_priority] can be used to override the priority value.
2224
const PRIORITY: i16 = 0;
2325

26+
/// Whether the job should be deduplicated.
27+
///
28+
/// If true, the job will not be enqueued if there is already an unstarted
29+
/// job with the same data.
30+
const DEDUPLICATED: bool = false;
31+
2432
/// Job queue where this job will be executed.
2533
const QUEUE: &'static str = DEFAULT_QUEUE;
2634

@@ -30,7 +38,10 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
3038
/// Execute the task. This method should define its logic.
3139
fn run(&self, ctx: Self::Context) -> impl Future<Output = anyhow::Result<()>> + Send;
3240

33-
fn enqueue(&self, conn: &mut impl LoadConnection<Backend = Pg>) -> Result<i64, EnqueueError> {
41+
fn enqueue(
42+
&self,
43+
conn: &mut impl LoadConnection<Backend = Pg>,
44+
) -> Result<Option<i64>, EnqueueError> {
3445
self.enqueue_with_priority(conn, Self::PRIORITY)
3546
}
3647

@@ -39,16 +50,48 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
3950
&self,
4051
conn: &mut impl LoadConnection<Backend = Pg>,
4152
job_priority: i16,
42-
) -> Result<i64, EnqueueError> {
53+
) -> Result<Option<i64>, EnqueueError> {
4354
let job_data = serde_json::to_value(self)?;
44-
let id = diesel::insert_into(background_jobs::table)
45-
.values((
46-
background_jobs::job_type.eq(Self::JOB_NAME),
47-
background_jobs::data.eq(job_data),
48-
background_jobs::priority.eq(job_priority),
55+
56+
if Self::DEDUPLICATED {
57+
let similar_jobs = background_jobs::table
58+
.select(background_jobs::id)
59+
.filter(background_jobs::job_type.eq(Self::JOB_NAME))
60+
.filter(background_jobs::data.eq(&job_data))
61+
.filter(background_jobs::priority.eq(job_priority))
62+
.for_update()
63+
.skip_locked();
64+
65+
let deduplicated_select = diesel::select((
66+
Self::JOB_NAME.into_sql::<Text>(),
67+
(&job_data).into_sql::<Jsonb>(),
68+
job_priority.into_sql::<Int2>(),
4969
))
50-
.returning(background_jobs::id)
51-
.get_result(conn)?;
52-
Ok(id)
70+
.filter(not(exists(similar_jobs)));
71+
72+
let id = diesel::insert_into(background_jobs::table)
73+
.values(deduplicated_select)
74+
.into_columns((
75+
background_jobs::job_type,
76+
background_jobs::data,
77+
background_jobs::priority,
78+
))
79+
.returning(background_jobs::id)
80+
.get_result::<i64>(conn)
81+
.optional()?;
82+
83+
Ok(id)
84+
} else {
85+
let id = diesel::insert_into(background_jobs::table)
86+
.values((
87+
background_jobs::job_type.eq(Self::JOB_NAME),
88+
background_jobs::data.eq(job_data),
89+
background_jobs::priority.eq(job_priority),
90+
))
91+
.returning(background_jobs::id)
92+
.get_result(conn)?;
93+
94+
Ok(Some(id))
95+
}
5396
}
5497
}

crates/crates_io_worker/tests/runner.rs

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
1+
use claims::{assert_none, assert_some};
12
use crates_io_test_db::TestDatabase;
23
use crates_io_worker::schema::background_jobs;
34
use crates_io_worker::{BackgroundJob, Runner};
45
use diesel::prelude::*;
56
use diesel_async::pooled_connection::deadpool::Pool;
67
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
78
use diesel_async::AsyncPgConnection;
9+
use insta::assert_compact_json_snapshot;
810
use serde::{Deserialize, Serialize};
11+
use serde_json::Value;
12+
use std::sync::atomic::{AtomicU8, Ordering};
913
use std::sync::Arc;
1014
use tokio::sync::Barrier;
1115

16+
fn all_jobs(conn: &mut PgConnection) -> Vec<(String, Value)> {
17+
background_jobs::table
18+
.select((background_jobs::job_type, background_jobs::data))
19+
.get_results(conn)
20+
.unwrap()
21+
}
22+
1223
fn job_exists(id: i64, conn: &mut PgConnection) -> bool {
1324
background_jobs::table
1425
.find(id)
@@ -63,7 +74,7 @@ async fn jobs_are_locked_when_fetched() {
6374
let runner = runner(test_database.url(), test_context.clone()).register_job_type::<TestJob>();
6475

6576
let mut conn = test_database.connect();
66-
let job_id = TestJob.enqueue(&mut conn).unwrap();
77+
let job_id = TestJob.enqueue(&mut conn).unwrap().unwrap();
6778

6879
assert!(job_exists(job_id, &mut conn));
6980
assert!(!job_is_locked(job_id, &mut conn));
@@ -193,7 +204,7 @@ async fn panicking_in_jobs_updates_retry_counter() {
193204

194205
let mut conn = test_database.connect();
195206

196-
let job_id = TestJob.enqueue(&mut conn).unwrap();
207+
let job_id = TestJob.enqueue(&mut conn).unwrap().unwrap();
197208

198209
let runner = runner.start();
199210
runner.wait_for_shutdown().await;
@@ -207,6 +218,85 @@ async fn panicking_in_jobs_updates_retry_counter() {
207218
assert_eq!(tries, 1);
208219
}
209220

221+
#[tokio::test(flavor = "multi_thread")]
222+
async fn jobs_can_be_deduplicated() {
223+
#[derive(Clone)]
224+
struct TestContext {
225+
runs: Arc<AtomicU8>,
226+
job_started_barrier: Arc<Barrier>,
227+
assertions_finished_barrier: Arc<Barrier>,
228+
}
229+
230+
#[derive(Serialize, Deserialize)]
231+
struct TestJob {
232+
value: String,
233+
}
234+
235+
impl TestJob {
236+
fn new(value: impl Into<String>) -> Self {
237+
let value = value.into();
238+
Self { value }
239+
}
240+
}
241+
242+
impl BackgroundJob for TestJob {
243+
const JOB_NAME: &'static str = "test";
244+
const DEDUPLICATED: bool = true;
245+
type Context = TestContext;
246+
247+
async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
248+
let runs = ctx.runs.fetch_add(1, Ordering::SeqCst);
249+
if runs == 0 {
250+
ctx.job_started_barrier.wait().await;
251+
ctx.assertions_finished_barrier.wait().await;
252+
}
253+
Ok(())
254+
}
255+
}
256+
257+
let test_database = TestDatabase::new();
258+
259+
let test_context = TestContext {
260+
runs: Arc::new(AtomicU8::new(0)),
261+
job_started_barrier: Arc::new(Barrier::new(2)),
262+
assertions_finished_barrier: Arc::new(Barrier::new(2)),
263+
};
264+
265+
let runner = runner(test_database.url(), test_context.clone()).register_job_type::<TestJob>();
266+
267+
let mut conn = test_database.connect();
268+
269+
// Enqueue first job
270+
assert_some!(TestJob::new("foo").enqueue(&mut conn).unwrap());
271+
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}]]"#);
272+
273+
// Try to enqueue the same job again, which should be deduplicated
274+
assert_none!(TestJob::new("foo").enqueue(&mut conn).unwrap());
275+
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}]]"#);
276+
277+
// Start processing the first job
278+
let runner = runner.start();
279+
test_context.job_started_barrier.wait().await;
280+
281+
// Enqueue the same job again, which should NOT be deduplicated,
282+
// since the first job already still running
283+
assert_some!(TestJob::new("foo").enqueue(&mut conn).unwrap());
284+
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#);
285+
286+
// Try to enqueue the same job again, which should be deduplicated again
287+
assert_none!(TestJob::new("foo").enqueue(&mut conn).unwrap());
288+
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#);
289+
290+
// Enqueue the same job but with different data, which should
291+
// NOT be deduplicated
292+
assert_some!(TestJob::new("bar").enqueue(&mut conn).unwrap());
293+
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"#);
294+
295+
// Resolve the final barrier to finish the test
296+
test_context.assertions_finished_barrier.wait().await;
297+
runner.wait_for_shutdown().await;
298+
}
299+
210300
fn runner<Context: Clone + Send + Sync + 'static>(
211301
database_url: &str,
212302
context: Context,

0 commit comments

Comments
 (0)