Skip to content

Commit ac21dff

Browse files
committed
admin: Add analyze-crates command
1 parent a3fbfd8 commit ac21dff

File tree

2 files changed

+155
-0
lines changed

2 files changed

+155
-0
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use anyhow::Context;
2+
use crates_io::db;
3+
use crates_io::schema::background_jobs;
4+
use crates_io::schema::{crates, default_versions, versions};
5+
use crates_io::worker::jobs::AnalyzeCrateFile;
6+
use crates_io_worker::BackgroundJob;
7+
use diesel::prelude::*;
8+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
9+
use indicatif::{ProgressBar, ProgressStyle};
10+
11+
const CHUNK_SIZE: usize = 100;
12+
13+
#[derive(clap::Parser, Debug, Eq, PartialEq)]
14+
#[clap(
15+
name = "analyze-crates",
16+
about = "Queue background jobs to analyze uploaded crate file.",
17+
group(clap::ArgGroup::new("mode").required(true))
18+
)]
19+
pub struct Options {
20+
/// Backfill *all* versions that are missing line count statistics.
21+
#[clap(long, group = "mode")]
22+
backfill: bool,
23+
24+
/// Crate specifications to analyze (format: `crate@version` or just `crate`)
25+
#[clap(group = "mode")]
26+
crates: Vec<String>,
27+
}
28+
29+
pub async fn run(opts: Options) -> anyhow::Result<()> {
30+
let conn = db::oneoff_connection().await;
31+
let mut conn = conn.context("Failed to connect to the database")?;
32+
33+
let results = load_versions(&opts, &mut conn).await;
34+
let results = results.context("Failed to load versions")?;
35+
if results.is_empty() {
36+
println!("No matching versions found.");
37+
return Ok(());
38+
}
39+
40+
println!("Found {} matching versions", results.len());
41+
if opts.backfill {
42+
let default_count = results.iter().filter(|(_, c)| *c).count();
43+
println!(" {default_count} default versions (priority -20)");
44+
45+
let regular_count = results.len() - default_count;
46+
println!(" {regular_count} regular versions (priority -50)");
47+
}
48+
49+
let pb_style = ProgressStyle::with_template("{bar:60} ({pos}/{len}, ETA {eta})")?;
50+
let pb = ProgressBar::new(results.len() as u64).with_style(pb_style);
51+
52+
let mut queued_count = 0;
53+
let mut error_count = 0;
54+
55+
for batch in results.chunks(CHUNK_SIZE) {
56+
let jobs = batch
57+
.iter()
58+
.map(|(version_id, is_default_version)| {
59+
let priority = if *is_default_version { -20 } else { -50 };
60+
NewBackgroundJob::new(*version_id, priority)
61+
})
62+
.collect::<anyhow::Result<Vec<_>>>()?;
63+
64+
let num_jobs = jobs.len();
65+
66+
let result = diesel::insert_into(background_jobs::table)
67+
.values(&jobs)
68+
.execute(&mut conn)
69+
.await;
70+
71+
pb.inc(num_jobs as u64);
72+
73+
if let Err(err) = result {
74+
error_count += num_jobs;
75+
pb.suspend(|| eprintln!("Failed to queue background jobs: {err}"));
76+
} else {
77+
queued_count += num_jobs;
78+
}
79+
}
80+
81+
pb.finish_with_message("Done");
82+
83+
println!("Successfully queued {queued_count} jobs");
84+
if error_count > 0 {
85+
println!("Failed to queue {error_count} jobs");
86+
}
87+
88+
Ok(())
89+
}
90+
91+
async fn load_versions(
92+
opts: &Options,
93+
conn: &mut AsyncPgConnection,
94+
) -> QueryResult<Vec<(i32, bool)>> {
95+
let mut query = versions::table
96+
.inner_join(crates::table)
97+
.left_join(default_versions::table.on(default_versions::version_id.eq(versions::id)))
98+
.select((
99+
versions::id,
100+
default_versions::crate_id.nullable().is_not_null(),
101+
))
102+
.into_boxed();
103+
104+
if opts.backfill {
105+
// Backfill mode: get all versions missing linecount data
106+
query = query.filter(versions::linecounts.is_null())
107+
} else {
108+
// Crate-specific mode: build a dynamic query with `or_filter`
109+
for crate_spec in &opts.crates {
110+
let (krate, version) = parse_crate_spec(crate_spec);
111+
112+
query = match version {
113+
Some(ver) => query.or_filter(crates::name.eq(krate).and(versions::num.eq(ver))),
114+
None => query.or_filter(crates::name.eq(krate)),
115+
};
116+
}
117+
}
118+
119+
query.load(conn).await
120+
}
121+
122+
/// Parse crate specification in the format "crate@version" or just "crate"
123+
fn parse_crate_spec(spec: &str) -> (&str, Option<&str>) {
124+
if let Some((name, ver)) = spec.split_once('@') {
125+
(name, Some(ver))
126+
} else {
127+
(spec, None)
128+
}
129+
}
130+
131+
/// Represents a new background job to be inserted into the database
132+
#[derive(Debug, diesel::Insertable)]
133+
#[diesel(table_name = background_jobs)]
134+
struct NewBackgroundJob {
135+
job_type: &'static str,
136+
data: serde_json::Value,
137+
priority: i16,
138+
}
139+
140+
impl NewBackgroundJob {
141+
/// Create a new [AnalyzeCrateFile] background job with the specified priority
142+
fn new(version_id: i32, priority: i16) -> anyhow::Result<Self> {
143+
let job = AnalyzeCrateFile::new(version_id);
144+
let data = serde_json::to_value(&job).context("Failed to serialize job data")?;
145+
146+
Ok(Self {
147+
job_type: AnalyzeCrateFile::JOB_NAME,
148+
data,
149+
priority,
150+
})
151+
}
152+
}

src/bin/crates-admin/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#[macro_use]
22
extern crate tracing;
33

4+
mod analyze_crates;
45
mod default_versions;
56
mod delete_crate;
67
mod delete_version;
@@ -18,6 +19,7 @@ mod yank_version;
1819
#[derive(clap::Parser, Debug)]
1920
#[command(name = "crates-admin")]
2021
enum Command {
22+
AnalyzeCrates(analyze_crates::Options),
2123
RenderOgImages(render_og_images::Opts),
2224
DeleteCrate(delete_crate::Opts),
2325
DeleteVersion(delete_version::Opts),
@@ -48,6 +50,7 @@ async fn main() -> anyhow::Result<()> {
4850
span.record("command", tracing::field::debug(&command));
4951

5052
match command {
53+
Command::AnalyzeCrates(opts) => analyze_crates::run(opts).await,
5154
Command::RenderOgImages(opts) => render_og_images::run(opts).await,
5255
Command::DeleteCrate(opts) => delete_crate::run(opts).await,
5356
Command::DeleteVersion(opts) => delete_version::run(opts).await,

0 commit comments

Comments
 (0)