Skip to content

Commit a13e915

Browse files
committed
worker: Add DEDUPLICATED flag to the BackgroundJob trait
This flag can be set when implementing background jobs to automatically deduplicate jobs when they are enqueued. If an unstarted job already exists in the queue the `enqueue()` fn will return `Ok(None)` instead of the job ID.
1 parent f7d0fd8 commit a13e915

File tree

4 files changed

+149
-12
lines changed

4 files changed

+149
-12
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/crates_io_worker/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,7 @@ tokio = { version = "=1.40.0", features = ["rt", "time"]}
2121
tracing = "=0.1.40"
2222

2323
[dev-dependencies]
24+
claims = "=0.7.1"
2425
crates_io_test_db = { path = "../crates_io_test_db" }
26+
insta = { version = "=1.40.0", features = ["json"] }
2527
tokio = { version = "=1.40.0", features = ["macros", "rt", "rt-multi-thread", "sync"]}

crates/crates_io_worker/src/background_job.rs

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use crate::errors::EnqueueError;
22
use crate::schema::background_jobs;
33
use diesel::connection::LoadConnection;
4+
use diesel::dsl::{exists, not};
45
use diesel::pg::Pg;
56
use diesel::prelude::*;
7+
use diesel::sql_types::{Int2, Jsonb, Text};
68
use serde::de::DeserializeOwned;
79
use serde::Serialize;
810
use std::future::Future;
@@ -21,6 +23,12 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
2123
/// [Self::enqueue_with_priority] can be used to override the priority value.
2224
const PRIORITY: i16 = 0;
2325

26+
/// Whether the job should be deduplicated.
27+
///
28+
/// If true, the job will not be enqueued if there is already an unstarted
29+
/// job with the same data.
30+
const DEDUPLICATED: bool = false;
31+
2432
/// Job queue where this job will be executed.
2533
const QUEUE: &'static str = DEFAULT_QUEUE;
2634

@@ -30,7 +38,10 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
3038
/// Execute the task. This method should define its logic.
3139
fn run(&self, ctx: Self::Context) -> impl Future<Output = anyhow::Result<()>> + Send;
3240

33-
fn enqueue(&self, conn: &mut impl LoadConnection<Backend = Pg>) -> Result<i64, EnqueueError> {
41+
fn enqueue(
42+
&self,
43+
conn: &mut impl LoadConnection<Backend = Pg>,
44+
) -> Result<Option<i64>, EnqueueError> {
3445
self.enqueue_with_priority(conn, Self::PRIORITY)
3546
}
3647

@@ -39,16 +50,48 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
3950
&self,
4051
conn: &mut impl LoadConnection<Backend = Pg>,
4152
job_priority: i16,
42-
) -> Result<i64, EnqueueError> {
53+
) -> Result<Option<i64>, EnqueueError> {
4354
let job_data = serde_json::to_value(self)?;
44-
let id = diesel::insert_into(background_jobs::table)
45-
.values((
46-
background_jobs::job_type.eq(Self::JOB_NAME),
47-
background_jobs::data.eq(job_data),
48-
background_jobs::priority.eq(job_priority),
55+
56+
if Self::DEDUPLICATED {
57+
let similar_jobs = background_jobs::table
58+
.select(background_jobs::id)
59+
.filter(background_jobs::job_type.eq(Self::JOB_NAME))
60+
.filter(background_jobs::data.eq(&job_data))
61+
.filter(background_jobs::priority.eq(job_priority))
62+
.for_update()
63+
.skip_locked();
64+
65+
let deduplicated_select = diesel::select((
66+
Self::JOB_NAME.into_sql::<Text>(),
67+
(&job_data).into_sql::<Jsonb>(),
68+
job_priority.into_sql::<Int2>(),
4969
))
50-
.returning(background_jobs::id)
51-
.get_result(conn)?;
52-
Ok(id)
70+
.filter(not(exists(similar_jobs)));
71+
72+
let id = diesel::insert_into(background_jobs::table)
73+
.values(deduplicated_select)
74+
.into_columns((
75+
background_jobs::job_type,
76+
background_jobs::data,
77+
background_jobs::priority,
78+
))
79+
.returning(background_jobs::id)
80+
.get_result::<i64>(conn)
81+
.optional()?;
82+
83+
Ok(id)
84+
} else {
85+
let id = diesel::insert_into(background_jobs::table)
86+
.values((
87+
background_jobs::job_type.eq(Self::JOB_NAME),
88+
background_jobs::data.eq(job_data),
89+
background_jobs::priority.eq(job_priority),
90+
))
91+
.returning(background_jobs::id)
92+
.get_result(conn)?;
93+
94+
Ok(Some(id))
95+
}
5396
}
5497
}

crates/crates_io_worker/tests/runner.rs

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
1+
use claims::{assert_none, assert_some};
12
use crates_io_test_db::TestDatabase;
23
use crates_io_worker::schema::background_jobs;
34
use crates_io_worker::{BackgroundJob, Runner};
45
use diesel::prelude::*;
56
use diesel_async::pooled_connection::deadpool::Pool;
67
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
78
use diesel_async::AsyncPgConnection;
9+
use insta::assert_compact_json_snapshot;
810
use serde::{Deserialize, Serialize};
11+
use serde_json::Value;
12+
use std::sync::atomic::{AtomicU8, Ordering};
913
use std::sync::Arc;
1014
use tokio::sync::Barrier;
1115

16+
fn all_jobs(conn: &mut PgConnection) -> Vec<(String, Value)> {
17+
background_jobs::table
18+
.select((background_jobs::job_type, background_jobs::data))
19+
.get_results(conn)
20+
.unwrap()
21+
}
22+
1223
fn job_exists(id: i64, conn: &mut PgConnection) -> bool {
1324
background_jobs::table
1425
.find(id)
@@ -63,7 +74,7 @@ async fn jobs_are_locked_when_fetched() {
6374
let runner = runner(test_database.url(), test_context.clone()).register_job_type::<TestJob>();
6475

6576
let mut conn = test_database.connect();
66-
let job_id = TestJob.enqueue(&mut conn).unwrap();
77+
let job_id = TestJob.enqueue(&mut conn).unwrap().unwrap();
6778

6879
assert!(job_exists(job_id, &mut conn));
6980
assert!(!job_is_locked(job_id, &mut conn));
@@ -193,7 +204,7 @@ async fn panicking_in_jobs_updates_retry_counter() {
193204

194205
let mut conn = test_database.connect();
195206

196-
let job_id = TestJob.enqueue(&mut conn).unwrap();
207+
let job_id = TestJob.enqueue(&mut conn).unwrap().unwrap();
197208

198209
let runner = runner.start();
199210
runner.wait_for_shutdown().await;
@@ -207,6 +218,85 @@ async fn panicking_in_jobs_updates_retry_counter() {
207218
assert_eq!(tries, 1);
208219
}
209220

221+
#[tokio::test(flavor = "multi_thread")]
222+
async fn jobs_can_be_deduplicated() {
223+
#[derive(Clone)]
224+
struct TestContext {
225+
runs: Arc<AtomicU8>,
226+
job_started_barrier: Arc<Barrier>,
227+
assertions_finished_barrier: Arc<Barrier>,
228+
}
229+
230+
#[derive(Serialize, Deserialize)]
231+
struct TestJob {
232+
value: String,
233+
}
234+
235+
impl TestJob {
236+
fn new(value: impl Into<String>) -> Self {
237+
let value = value.into();
238+
Self { value }
239+
}
240+
}
241+
242+
impl BackgroundJob for TestJob {
243+
const JOB_NAME: &'static str = "test";
244+
const DEDUPLICATED: bool = true;
245+
type Context = TestContext;
246+
247+
async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
248+
let runs = ctx.runs.fetch_add(1, Ordering::SeqCst);
249+
if runs == 0 {
250+
ctx.job_started_barrier.wait().await;
251+
ctx.assertions_finished_barrier.wait().await;
252+
}
253+
Ok(())
254+
}
255+
}
256+
257+
let test_database = TestDatabase::new();
258+
259+
let test_context = TestContext {
260+
runs: Arc::new(AtomicU8::new(0)),
261+
job_started_barrier: Arc::new(Barrier::new(2)),
262+
assertions_finished_barrier: Arc::new(Barrier::new(2)),
263+
};
264+
265+
let runner = runner(test_database.url(), test_context.clone()).register_job_type::<TestJob>();
266+
267+
let mut conn = test_database.connect();
268+
269+
// Enqueue first job
270+
assert_some!(TestJob::new("foo").enqueue(&mut conn).unwrap());
271+
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}]]"#);
272+
273+
// Try to enqueue the same job again, which should be deduplicated
274+
assert_none!(TestJob::new("foo").enqueue(&mut conn).unwrap());
275+
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}]]"#);
276+
277+
// Start processing the first job
278+
let runner = runner.start();
279+
test_context.job_started_barrier.wait().await;
280+
281+
// Enqueue the same job again, which should NOT be deduplicated,
282+
// since the first job already still running
283+
assert_some!(TestJob::new("foo").enqueue(&mut conn).unwrap());
284+
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#);
285+
286+
// Try to enqueue the same job again, which should be deduplicated again
287+
assert_none!(TestJob::new("foo").enqueue(&mut conn).unwrap());
288+
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#);
289+
290+
// Enqueue the same job but with different data, which should
291+
// NOT be deduplicated
292+
assert_some!(TestJob::new("bar").enqueue(&mut conn).unwrap());
293+
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"#);
294+
295+
// Resolve the final barrier to finish the test
296+
test_context.assertions_finished_barrier.wait().await;
297+
runner.wait_for_shutdown().await;
298+
}
299+
210300
fn runner<Context: Clone + Send + Sync + 'static>(
211301
database_url: &str,
212302
context: Context,

0 commit comments

Comments
 (0)