Skip to content

Commit 13dd11a

Browse files
committed
bin/crates-admin: Move spawn_blocking() calls into subcommands
1 parent b534f4e commit 13dd11a

13 files changed

+473
-422
lines changed

src/admin/default_versions.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::models::{update_default_version, verify_default_version};
2+
use crate::tasks::spawn_blocking;
23
use crate::{db, schema::crates};
34
use anyhow::Context;
45
use diesel::prelude::*;
@@ -14,29 +15,32 @@ pub enum Command {
1415
Verify,
1516
}
1617

17-
pub fn run(command: Command) -> anyhow::Result<()> {
18-
let mut conn = db::oneoff_connection().context("Failed to connect to the database")?;
18+
pub async fn run(command: Command) -> anyhow::Result<()> {
19+
spawn_blocking(move || {
20+
let mut conn = db::oneoff_connection().context("Failed to connect to the database")?;
1921

20-
let crate_ids: Vec<i32> = crates::table
21-
.select(crates::id)
22-
.load(&mut conn)
23-
.context("Failed to load crates")?;
22+
let crate_ids: Vec<i32> = crates::table
23+
.select(crates::id)
24+
.load(&mut conn)
25+
.context("Failed to load crates")?;
2426

25-
let pb = ProgressBar::new(crate_ids.len() as u64);
26-
pb.set_style(ProgressStyle::with_template(
27-
"{bar:60} ({pos}/{len}, ETA {eta})",
28-
)?);
27+
let pb = ProgressBar::new(crate_ids.len() as u64);
28+
pb.set_style(ProgressStyle::with_template(
29+
"{bar:60} ({pos}/{len}, ETA {eta})",
30+
)?);
2931

30-
for crate_id in crate_ids.into_iter().progress_with(pb.clone()) {
31-
let func = match command {
32-
Command::Update => update_default_version,
33-
Command::Verify => verify_default_version,
34-
};
32+
for crate_id in crate_ids.into_iter().progress_with(pb.clone()) {
33+
let func = match command {
34+
Command::Update => update_default_version,
35+
Command::Verify => verify_default_version,
36+
};
3537

36-
if let Err(error) = func(crate_id, &mut conn) {
37-
pb.suspend(|| warn!(%crate_id, %error, "Failed to update the default version"));
38+
if let Err(error) = func(crate_id, &mut conn) {
39+
pb.suspend(|| warn!(%crate_id, %error, "Failed to update the default version"));
40+
}
3841
}
39-
}
4042

41-
Ok(())
43+
Ok(())
44+
})
45+
.await
4246
}

src/admin/delete_crate.rs

Lines changed: 49 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::schema::{crate_owners, teams, users};
2+
use crate::tasks::spawn_blocking;
23
use crate::worker::jobs;
34
use crate::{admin::dialoguer, db, schema::crates};
45
use anyhow::Context;
@@ -24,13 +25,15 @@ pub struct Opts {
2425
yes: bool,
2526
}
2627

27-
pub fn run(opts: Opts) -> anyhow::Result<()> {
28-
let conn = &mut db::oneoff_connection().context("Failed to establish database connection")?;
28+
pub async fn run(opts: Opts) -> anyhow::Result<()> {
29+
spawn_blocking(move || {
30+
let conn =
31+
&mut db::oneoff_connection().context("Failed to establish database connection")?;
2932

30-
let mut crate_names = opts.crate_names;
31-
crate_names.sort();
33+
let mut crate_names = opts.crate_names;
34+
crate_names.sort();
3235

33-
let query_result = crates::table
36+
let query_result = crates::table
3437
.select((
3538
crates::name,
3639
crates::id,
@@ -45,53 +48,55 @@ pub fn run(opts: Opts) -> anyhow::Result<()> {
4548
.load::<(String, i32, String)>(conn)
4649
.context("Failed to look up crate name from the database")?;
4750

48-
let mut existing_crates: HashMap<String, (i32, Vec<String>)> = HashMap::new();
49-
for (name, id, login) in query_result {
50-
let entry = existing_crates
51-
.entry(name)
52-
.or_insert_with(|| (id, Vec::new()));
51+
let mut existing_crates: HashMap<String, (i32, Vec<String>)> = HashMap::new();
52+
for (name, id, login) in query_result {
53+
let entry = existing_crates
54+
.entry(name)
55+
.or_insert_with(|| (id, Vec::new()));
5356

54-
entry.1.push(login);
55-
}
57+
entry.1.push(login);
58+
}
5659

57-
println!("Deleting the following crates:");
58-
println!();
59-
for name in &crate_names {
60-
match existing_crates.get(name) {
61-
Some((id, owners)) => {
62-
let owners = owners.join(", ");
63-
println!(" - {name} (id={id}, owners={owners})");
60+
println!("Deleting the following crates:");
61+
println!();
62+
for name in &crate_names {
63+
match existing_crates.get(name) {
64+
Some((id, owners)) => {
65+
let owners = owners.join(", ");
66+
println!(" - {name} (id={id}, owners={owners})");
67+
}
68+
None => println!(" - {name} (⚠️ crate not found)"),
6469
}
65-
None => println!(" - {name} (⚠️ crate not found)"),
6670
}
67-
}
68-
println!();
71+
println!();
6972

70-
if !opts.yes && !dialoguer::confirm("Do you want to permanently delete these crates?") {
71-
return Ok(());
72-
}
73+
if !opts.yes && !dialoguer::confirm("Do you want to permanently delete these crates?") {
74+
return Ok(());
75+
}
7376

74-
for name in &crate_names {
75-
if let Some((id, _)) = existing_crates.get(name) {
76-
info!("{name}: Deleting crate from the database…");
77-
if let Err(error) = diesel::delete(crates::table.find(id)).execute(conn) {
78-
warn!(%id, "{name}: Failed to delete crate from the database: {error}");
79-
}
80-
} else {
81-
info!("{name}: Skipped missing crate");
82-
};
77+
for name in &crate_names {
78+
if let Some((id, _)) = existing_crates.get(name) {
79+
info!("{name}: Deleting crate from the database…");
80+
if let Err(error) = diesel::delete(crates::table.find(id)).execute(conn) {
81+
warn!(%id, "{name}: Failed to delete crate from the database: {error}");
82+
}
83+
} else {
84+
info!("{name}: Skipped missing crate");
85+
};
8386

84-
info!("{name}: Enqueuing index sync jobs…");
85-
if let Err(error) = jobs::enqueue_sync_to_index(name, conn) {
86-
warn!("{name}: Failed to enqueue index sync jobs: {error}");
87-
}
87+
info!("{name}: Enqueuing index sync jobs…");
88+
if let Err(error) = jobs::enqueue_sync_to_index(name, conn) {
89+
warn!("{name}: Failed to enqueue index sync jobs: {error}");
90+
}
8891

89-
info!("{name}: Enqueuing DeleteCrateFromStorage job…");
90-
let job = jobs::DeleteCrateFromStorage::new(name.into());
91-
if let Err(error) = job.enqueue(conn) {
92-
warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}");
92+
info!("{name}: Enqueuing DeleteCrateFromStorage job…");
93+
let job = jobs::DeleteCrateFromStorage::new(name.into());
94+
if let Err(error) = job.enqueue(conn) {
95+
warn!("{name}: Failed to enqueue DeleteCrateFromStorage job: {error}");
96+
}
9397
}
94-
}
9598

96-
Ok(())
99+
Ok(())
100+
})
101+
.await
97102
}

src/admin/delete_version.rs

Lines changed: 73 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::models::update_default_version;
22
use crate::schema::crates;
33
use crate::storage::Storage;
4+
use crate::tasks::spawn_blocking;
45
use crate::worker::jobs;
56
use crate::{admin::dialoguer, db, schema::versions};
67
use anyhow::Context;
@@ -25,86 +26,88 @@ pub struct Opts {
2526
yes: bool,
2627
}
2728

28-
pub fn run(opts: Opts) -> anyhow::Result<()> {
29-
let crate_name = &opts.crate_name;
30-
31-
let conn = &mut db::oneoff_connection().context("Failed to establish database connection")?;
32-
33-
let store = Storage::from_environment();
34-
35-
let crate_id: i32 = crates::table
36-
.select(crates::id)
37-
.filter(crates::name.eq(crate_name))
38-
.first(conn)
39-
.context("Failed to look up crate id from the database")?;
40-
41-
println!("Deleting the following versions of the `{crate_name}` crate:");
42-
println!();
43-
for version in &opts.versions {
44-
println!(" - {version}");
45-
}
46-
println!();
47-
48-
if !opts.yes && !dialoguer::confirm("Do you want to permanently delete these versions?") {
49-
return Ok(());
50-
}
51-
52-
conn.transaction(|conn| {
53-
info!(%crate_name, %crate_id, versions = ?opts.versions, "Deleting versions from the database");
54-
let result = diesel::delete(
55-
versions::table
56-
.filter(versions::crate_id.eq(crate_id))
57-
.filter(versions::num.eq_any(&opts.versions)),
58-
)
59-
.execute(conn);
60-
61-
match result {
62-
Ok(num_deleted) if num_deleted == opts.versions.len() => {}
63-
Ok(num_deleted) => {
64-
warn!(
65-
%crate_name,
66-
"Deleted only {num_deleted} of {num_expected} versions from the database",
67-
num_expected = opts.versions.len()
68-
);
69-
}
70-
Err(error) => {
71-
warn!(%crate_name, ?error, "Failed to delete versions from the database")
72-
}
29+
pub async fn run(opts: Opts) -> anyhow::Result<()> {
30+
spawn_blocking(move || {
31+
let crate_name = &opts.crate_name;
32+
33+
let conn = &mut db::oneoff_connection().context("Failed to establish database connection")?;
34+
35+
let store = Storage::from_environment();
36+
37+
let crate_id: i32 = crates::table
38+
.select(crates::id)
39+
.filter(crates::name.eq(crate_name))
40+
.first(conn)
41+
.context("Failed to look up crate id from the database")?;
42+
43+
println!("Deleting the following versions of the `{crate_name}` crate:");
44+
println!();
45+
for version in &opts.versions {
46+
println!(" - {version}");
7347
}
48+
println!();
7449

75-
info!(%crate_name, %crate_id, "Updating default version in the database");
76-
if let Err(error) = update_default_version(crate_id, conn) {
77-
warn!(%crate_name, %crate_id, ?error, "Failed to update default version");
50+
if !opts.yes && !dialoguer::confirm("Do you want to permanently delete these versions?") {
51+
return Ok(());
7852
}
7953

80-
Ok::<_, anyhow::Error>(())
81-
})?;
54+
conn.transaction(|conn| {
55+
info!(%crate_name, %crate_id, versions = ?opts.versions, "Deleting versions from the database");
56+
let result = diesel::delete(
57+
versions::table
58+
.filter(versions::crate_id.eq(crate_id))
59+
.filter(versions::num.eq_any(&opts.versions)),
60+
)
61+
.execute(conn);
62+
63+
match result {
64+
Ok(num_deleted) if num_deleted == opts.versions.len() => {}
65+
Ok(num_deleted) => {
66+
warn!(
67+
%crate_name,
68+
"Deleted only {num_deleted} of {num_expected} versions from the database",
69+
num_expected = opts.versions.len()
70+
);
71+
}
72+
Err(error) => {
73+
warn!(%crate_name, ?error, "Failed to delete versions from the database")
74+
}
75+
}
8276

83-
info!(%crate_name, "Enqueuing index sync jobs");
84-
if let Err(error) = jobs::enqueue_sync_to_index(crate_name, conn) {
85-
warn!(%crate_name, ?error, "Failed to enqueue index sync jobs");
86-
}
77+
info!(%crate_name, %crate_id, "Updating default version in the database");
78+
if let Err(error) = update_default_version(crate_id, conn) {
79+
warn!(%crate_name, %crate_id, ?error, "Failed to update default version");
80+
}
8781

88-
let rt = tokio::runtime::Builder::new_current_thread()
89-
.enable_all()
90-
.build()
91-
.context("Failed to initialize tokio runtime")?;
82+
Ok::<_, anyhow::Error>(())
83+
})?;
9284

93-
for version in &opts.versions {
94-
debug!(%crate_name, %version, "Deleting crate file from S3");
95-
if let Err(error) = rt.block_on(store.delete_crate_file(crate_name, version)) {
96-
warn!(%crate_name, %version, ?error, "Failed to delete crate file from S3");
85+
info!(%crate_name, "Enqueuing index sync jobs");
86+
if let Err(error) = jobs::enqueue_sync_to_index(crate_name, conn) {
87+
warn!(%crate_name, ?error, "Failed to enqueue index sync jobs");
9788
}
9889

99-
debug!(%crate_name, %version, "Deleting readme file from S3");
100-
match rt.block_on(store.delete_readme(crate_name, version)) {
101-
Err(object_store::Error::NotFound { .. }) => {}
102-
Err(error) => {
103-
warn!(%crate_name, %version, ?error, "Failed to delete readme file from S3")
90+
let rt = tokio::runtime::Builder::new_current_thread()
91+
.enable_all()
92+
.build()
93+
.context("Failed to initialize tokio runtime")?;
94+
95+
for version in &opts.versions {
96+
debug!(%crate_name, %version, "Deleting crate file from S3");
97+
if let Err(error) = rt.block_on(store.delete_crate_file(crate_name, version)) {
98+
warn!(%crate_name, %version, ?error, "Failed to delete crate file from S3");
99+
}
100+
101+
debug!(%crate_name, %version, "Deleting readme file from S3");
102+
match rt.block_on(store.delete_readme(crate_name, version)) {
103+
Err(object_store::Error::NotFound { .. }) => {}
104+
Err(error) => {
105+
warn!(%crate_name, %version, ?error, "Failed to delete readme file from S3")
106+
}
107+
Ok(_) => {}
104108
}
105-
Ok(_) => {}
106109
}
107-
}
108110

109-
Ok(())
111+
Ok(())
112+
}).await
110113
}

0 commit comments

Comments
 (0)