Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/admin/delete_crate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@
};

info!("{name}: Enqueuing index sync jobs…");
if let Err(error) = jobs::enqueue_sync_to_index(name, conn) {
warn!("{name}: Failed to enqueue index sync jobs: {error}");
if let Err(error) = jobs::SyncToGitIndex::new(name).enqueue(conn) {
warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}");
}
if let Err(error) = jobs::SyncToSparseIndex::new(name).enqueue(conn) {
warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}");

Check warning on line 104 in src/admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_crate.rs#L100-L104

Added lines #L100 - L104 were not covered by tests
}

info!("{name}: Enqueuing DeleteCrateFromStorage job…");
Expand Down
8 changes: 6 additions & 2 deletions src/admin/delete_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::worker::jobs;
use crate::{admin::dialoguer, db, schema::versions};
use anyhow::Context;
use crates_io_worker::BackgroundJob;
use diesel::{Connection, ExpressionMethods, QueryDsl};
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;

Expand Down Expand Up @@ -102,8 +103,11 @@
})?;

info!(%crate_name, "Enqueuing index sync jobs");
if let Err(error) = jobs::enqueue_sync_to_index(crate_name, conn) {
warn!(%crate_name, ?error, "Failed to enqueue index sync jobs");
if let Err(error) = jobs::SyncToGitIndex::new(crate_name).enqueue(conn) {
warn!(%crate_name, ?error, "Failed to enqueue SyncToGitIndex job");
}
if let Err(error) = jobs::SyncToSparseIndex::new(crate_name).enqueue(conn) {
warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job");

Check warning on line 110 in src/admin/delete_version.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_version.rs#L106-L110

Added lines #L106 - L110 were not covered by tests
}

Ok(opts)
Expand Down
7 changes: 3 additions & 4 deletions src/admin/yank_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
use crate::models::{Crate, Version};
use crate::schema::versions;
use crate::tasks::spawn_blocking;
use crate::worker::jobs;
use crate::worker::jobs::UpdateDefaultVersion;
use crate::worker::jobs::{SyncToGitIndex, SyncToSparseIndex, UpdateDefaultVersion};
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;

Expand Down Expand Up @@ -63,8 +62,8 @@
.set(versions::yanked.eq(true))
.execute(conn)?;

jobs::enqueue_sync_to_index(&krate.name, conn)?;

SyncToGitIndex::new(&krate.name).enqueue(conn)?;
SyncToSparseIndex::new(&krate.name).enqueue(conn)?;

Check warning on line 66 in src/admin/yank_version.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/yank_version.rs#L65-L66

Added lines #L65 - L66 were not covered by tests
UpdateDefaultVersion::new(krate.id).enqueue(conn)?;

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCra
))
.map_err(|e| internal(format!("failed to upload crate: {e}")))?;

jobs::enqueue_sync_to_index(&krate.name, conn)?;
jobs::SyncToGitIndex::new(&krate.name).enqueue(conn)?;
jobs::SyncToSparseIndex::new(&krate.name).enqueue(conn)?;

SendPublishNotificationsJob::new(version.id).enqueue(conn)?;

Expand Down
5 changes: 3 additions & 2 deletions src/controllers/version/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::tasks::spawn_blocking;
use crate::util::diesel::Conn;
use crate::util::errors::{bad_request, custom, version_not_found, AppResult};
use crate::views::{EncodableDependency, EncodableVersion};
use crate::worker::jobs::{self, UpdateDefaultVersion};
use crate::worker::jobs::{SyncToGitIndex, SyncToSparseIndex, UpdateDefaultVersion};

use super::version_and_crate;

Expand Down Expand Up @@ -233,7 +233,8 @@ pub fn perform_version_yank_update(
};
insert_version_owner_action(conn, version.id, user.id, api_token_id, action)?;

jobs::enqueue_sync_to_index(&krate.name, conn)?;
SyncToGitIndex::new(&krate.name).enqueue(conn)?;
SyncToSparseIndex::new(&krate.name).enqueue(conn)?;
UpdateDefaultVersion::new(krate.id).enqueue(conn)?;

Ok(())
Expand Down
4 changes: 3 additions & 1 deletion src/tests/worker/git.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::models::Crate;
use crate::tests::builders::PublishBuilder;
use crate::tests::util::{RequestHelper, TestApp};
use crate::worker::jobs;
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use http::StatusCode;

Expand Down Expand Up @@ -51,7 +52,8 @@ async fn index_smoke_test() {
let krate: Crate = assert_ok!(Crate::by_name("serde").first(conn));
assert_ok!(diesel::delete(crates::table.find(krate.id)).execute(conn));

assert_ok!(jobs::enqueue_sync_to_index("serde", conn));
assert_ok!(jobs::SyncToGitIndex::new("serde").enqueue(conn));
assert_ok!(jobs::SyncToSparseIndex::new("serde").enqueue(conn));
});

app.run_pending_background_jobs().await;
Expand Down
79 changes: 0 additions & 79 deletions src/worker/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
use crate::util::diesel::Conn;
use crates_io_worker::schema::background_jobs;
use crates_io_worker::{BackgroundJob, EnqueueError};
use diesel::dsl::{exists, not};
use diesel::prelude::*;
use diesel::sql_types::{Int2, Jsonb, Text};
use std::fmt::Display;

mod archive_version_downloads;
mod daily_db_maintenance;
mod delete_crate;
Expand Down Expand Up @@ -36,74 +28,3 @@ pub use self::send_publish_notifications::SendPublishNotificationsJob;
pub use self::sync_admins::SyncAdmins;
pub use self::typosquat::CheckTyposquat;
pub use self::update_default_version::UpdateDefaultVersion;

/// Enqueue both index sync jobs (git and sparse) for a crate, unless they
/// already exist in the background job queue.
///
/// Note that there are currently no explicit tests for this functionality,
/// since our test suite only allows us to use a single database connection
/// and the background worker queue locking only work when using multiple
/// connections.
#[instrument(name = "swirl.enqueue", skip_all, fields(message = "sync_to_index", krate = %krate))]
pub fn enqueue_sync_to_index<T: Display>(
krate: T,
conn: &mut impl Conn,
) -> Result<(), EnqueueError> {
// Returns jobs with matching `job_type`, `data` and `priority`,
// skipping ones that are already locked by the background worker.
let find_similar_jobs_query =
|job_type: &'static str, data: serde_json::Value, priority: i16| {
background_jobs::table
.select(background_jobs::id)
.filter(background_jobs::job_type.eq(job_type))
.filter(background_jobs::data.eq(data))
.filter(background_jobs::priority.eq(priority))
.for_update()
.skip_locked()
};

// Returns one `job_type, data, priority` row with values from the
// passed-in `job`, unless a similar row already exists.
let deduplicated_select_query =
|job_type: &'static str, data: serde_json::Value, priority: i16| {
diesel::select((
job_type.into_sql::<Text>(),
data.clone().into_sql::<Jsonb>(),
priority.into_sql::<Int2>(),
))
.filter(not(exists(find_similar_jobs_query(
job_type, data, priority,
))))
};

let to_git = deduplicated_select_query(
SyncToGitIndex::JOB_NAME,
serde_json::to_value(SyncToGitIndex::new(krate.to_string()))?,
SyncToGitIndex::PRIORITY,
);

let to_sparse = deduplicated_select_query(
SyncToSparseIndex::JOB_NAME,
serde_json::to_value(SyncToSparseIndex::new(krate.to_string()))?,
SyncToSparseIndex::PRIORITY,
);

// Insert index update background jobs, but only if they do not
// already exist.
let added_jobs_count = diesel::insert_into(background_jobs::table)
.values(to_git.union_all(to_sparse))
.into_columns((
background_jobs::job_type,
background_jobs::data,
background_jobs::priority,
))
.execute(conn)?;

// Print a log event if we skipped inserting a job due to deduplication.
if added_jobs_count != 2 {
let skipped_jobs_count = 2 - added_jobs_count;
info!(%skipped_jobs_count, "Skipped adding duplicate jobs to the background worker queue");
}

Ok(())
}