Skip to content

Commit 4b46a71

Browse files
committed
worker/jobs: Simplify enqueue_sync_to_index() fn
This requires two queries instead of one, but it significantly simplifies the code, now that our worker system supports automatic deduplication. Once the enqueuing code uses `diesel-async` the performance hit can be mitigated by using pipelining via `try_join!()`.
1 parent 868a502 commit 4b46a71

File tree

1 file changed

+3
-58
lines changed

1 file changed

+3
-58
lines changed

src/worker/jobs/mod.rs

Lines changed: 3 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
use crate::util::diesel::Conn;
2-
use crates_io_worker::schema::background_jobs;
32
use crates_io_worker::{BackgroundJob, EnqueueError};
4-
use diesel::dsl::{exists, not};
5-
use diesel::prelude::*;
6-
use diesel::sql_types::{Int2, Jsonb, Text};
73
use std::fmt::Display;
84

95
mod archive_version_downloads;
@@ -49,61 +45,10 @@ pub fn enqueue_sync_to_index<T: Display>(
4945
krate: T,
5046
conn: &mut impl Conn,
5147
) -> Result<(), EnqueueError> {
52-
// Returns jobs with matching `job_type`, `data` and `priority`,
53-
// skipping ones that are already locked by the background worker.
54-
let find_similar_jobs_query =
55-
|job_type: &'static str, data: serde_json::Value, priority: i16| {
56-
background_jobs::table
57-
.select(background_jobs::id)
58-
.filter(background_jobs::job_type.eq(job_type))
59-
.filter(background_jobs::data.eq(data))
60-
.filter(background_jobs::priority.eq(priority))
61-
.for_update()
62-
.skip_locked()
63-
};
48+
let krate = krate.to_string();
6449

65-
// Returns one `job_type, data, priority` row with values from the
66-
// passed-in `job`, unless a similar row already exists.
67-
let deduplicated_select_query =
68-
|job_type: &'static str, data: serde_json::Value, priority: i16| {
69-
diesel::select((
70-
job_type.into_sql::<Text>(),
71-
data.clone().into_sql::<Jsonb>(),
72-
priority.into_sql::<Int2>(),
73-
))
74-
.filter(not(exists(find_similar_jobs_query(
75-
job_type, data, priority,
76-
))))
77-
};
78-
79-
let to_git = deduplicated_select_query(
80-
SyncToGitIndex::JOB_NAME,
81-
serde_json::to_value(SyncToGitIndex::new(krate.to_string()))?,
82-
SyncToGitIndex::PRIORITY,
83-
);
84-
85-
let to_sparse = deduplicated_select_query(
86-
SyncToSparseIndex::JOB_NAME,
87-
serde_json::to_value(SyncToSparseIndex::new(krate.to_string()))?,
88-
SyncToSparseIndex::PRIORITY,
89-
);
90-
91-
// Insert index update background jobs, but only if they do not
92-
// already exist.
93-
let added_jobs_count = diesel::insert_into(background_jobs::table)
94-
.values(to_git.union_all(to_sparse))
95-
.into_columns((
96-
background_jobs::job_type,
97-
background_jobs::data,
98-
background_jobs::priority,
99-
))
100-
.execute(conn)?;
101-
102-
// Print a log event if we skipped inserting a job due to deduplication.
103-
if added_jobs_count != 2 {
104-
let skipped_jobs_count = 2 - added_jobs_count;
105-
info!(%skipped_jobs_count, "Skipped adding duplicate jobs to the background worker queue");
106-
}
50+
SyncToGitIndex::new(krate.clone()).enqueue(conn)?;
51+
SyncToSparseIndex::new(krate).enqueue(conn)?;
10752

10853
Ok(())
10954
}

0 commit comments

Comments
 (0)