diff --git a/Cargo.toml b/Cargo.toml index 7622ee086e2..82ab780acaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,7 @@ postgres-native-tls = "=0.5.0" prometheus = { version = "=0.13.4", default-features = false } quick-xml = "=0.37.0" rand = "=0.8.5" -reqwest = { version = "=0.12.9", features = ["blocking", "gzip", "json"] } +reqwest = { version = "=0.12.9", features = ["gzip", "json"] } rss = { version = "=2.0.9", default-features = false, features = ["atom"] } secrecy = "=0.10.3" semver = { version = "=1.0.23", features = ["serde"] } diff --git a/src/bin/crates-admin/render_readmes.rs b/src/bin/crates-admin/render_readmes.rs index 96233cf2eb9..d0d85da290a 100644 --- a/src/bin/crates-admin/render_readmes.rs +++ b/src/bin/crates-admin/render_readmes.rs @@ -5,7 +5,7 @@ use crates_io::{ schema::{crates, readme_renderings, versions}, }; use std::path::PathBuf; -use std::{io::Read, path::Path, sync::Arc, thread}; +use std::{io::Read, path::Path, sync::Arc}; use chrono::{NaiveDateTime, Utc}; use crates_io::storage::Storage; @@ -14,12 +14,11 @@ use crates_io::util::diesel::prelude::*; use crates_io_markdown::text_to_html; use crates_io_tarball::{Manifest, StringOrBool}; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; -use diesel_async::AsyncPgConnection; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; use flate2::read::GzDecoder; -use reqwest::{blocking::Client, header}; +use reqwest::{header, Client}; use std::str::FromStr; use tar::{self, Archive}; -use tokio::runtime::Handle; const USER_AGENT: &str = "crates-admin"; @@ -50,114 +49,107 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> { .context("Failed to connect to the database")?; let mut conn = AsyncConnectionWrapper::::from(conn); - spawn_blocking(move || { - use diesel::RunQueryDsl; - let storage = Arc::new(Storage::from_environment()); + let storage = Arc::new(Storage::from_environment()); - let start_time = Utc::now(); + let start_time = Utc::now(); - let older_than = if let Some(ref time) = opts.older_than { - NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S") - .context("Could not parse --older-than argument as a time")? - } else { - start_time.naive_utc() - }; + let older_than = if let Some(ref time) = opts.older_than { + NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S") + .context("Could not parse --older-than argument as a time")? + } else { + start_time.naive_utc() + }; - println!("Start time: {start_time}"); - println!("Rendering readmes older than: {older_than}"); + println!("Start time: {start_time}"); + println!("Rendering readmes older than: {older_than}"); + + let mut query = versions::table + .inner_join(crates::table) + .left_outer_join(readme_renderings::table) + .filter( + readme_renderings::rendered_at + .lt(older_than) + .or(readme_renderings::version_id.is_null()), + ) + .select(versions::id) + .into_boxed(); + + if let Some(crate_name) = opts.crate_name { + println!("Rendering readmes for {crate_name}"); + query = query.filter(crates::name.eq(crate_name)); + } - let mut query = versions::table - .inner_join(crates::table) - .left_outer_join(readme_renderings::table) - .filter( - readme_renderings::rendered_at - .lt(older_than) - .or(readme_renderings::version_id.is_null()), - ) - .select(versions::id) - .into_boxed(); + let version_ids: Vec = query + .load(&mut conn) + .await + .context("error loading version ids")?; - if let Some(crate_name) = opts.crate_name { - println!("Rendering readmes for {crate_name}"); - query = query.filter(crates::name.eq(crate_name)); - } + let total_versions = version_ids.len(); + println!("Rendering {total_versions} versions"); - let version_ids: Vec = query.load(&mut conn).context("error loading version ids")?; + let page_size = opts.page_size; - let total_versions = version_ids.len(); - println!("Rendering {total_versions} versions"); + let total_pages = total_versions / page_size; + let total_pages = if total_versions % page_size == 0 { + total_pages + } else { + total_pages + 1 + }; - let page_size = opts.page_size; + let client = Client::new(); - let total_pages = total_versions / page_size; - let total_pages = if total_versions % page_size == 0 { + for (page_num, version_ids_chunk) in version_ids.chunks(page_size).enumerate() { + println!( + "= Page {} of {} ==================================", + page_num + 1, total_pages - } else { - total_pages + 1 - }; + ); - let client = Client::new(); - - for (page_num, version_ids_chunk) in version_ids.chunks(page_size).enumerate() { - println!( - "= Page {} of {} ==================================", - page_num + 1, - total_pages - ); - - let versions: Vec<(Version, String)> = versions::table - .inner_join(crates::table) - .filter(versions::id.eq_any(version_ids_chunk)) - .select((Version::as_select(), crates::name)) - .load(&mut conn) - .context("error loading versions")?; - - let mut tasks = Vec::with_capacity(page_size); - for (version, krate_name) in versions { - Handle::current() - .block_on(Version::record_readme_rendering(version.id, &mut conn)) - .context("Couldn't record rendering time")?; - - let client = client.clone(); - let storage = storage.clone(); - let handle = thread::spawn::<_, anyhow::Result<()>>(move || { - println!("[{}-{}] Rendering README...", krate_name, version.num); - let readme = get_readme(&storage, &client, &version, &krate_name)?; - if !readme.is_empty() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .context("Failed to initialize tokio runtime")?; - - rt.block_on(storage.upload_readme( - &krate_name, - &version.num, - readme.into(), - )) + let versions: Vec<(Version, String)> = versions::table + .inner_join(crates::table) + .filter(versions::id.eq_any(version_ids_chunk)) + .select((Version::as_select(), crates::name)) + .load(&mut conn) + .await + .context("error loading versions")?; + + let mut tasks = Vec::with_capacity(page_size); + for (version, krate_name) in versions { + Version::record_readme_rendering(version.id, &mut conn) + .await + .context("Couldn't record rendering time")?; + + let client = client.clone(); + let storage = storage.clone(); + let handle = tokio::spawn(async move { + println!("[{}-{}] Rendering README...", krate_name, version.num); + let readme = get_readme(&storage, &client, &version, &krate_name).await?; + if !readme.is_empty() { + storage + .upload_readme(&krate_name, &version.num, readme.into()) + .await .context("Failed to upload rendered README file to S3")?; - } - - Ok(()) - }); - tasks.push(handle); - } - for handle in tasks { - match handle.join() { - Err(err) => println!("Thread panicked: {err:?}"), - Ok(Err(err)) => println!("Thread failed: {err:?}"), - _ => {} } + + Ok::<_, anyhow::Error>(()) + }); + tasks.push(handle); + } + for handle in tasks { + match handle.await { + Err(err) => println!("Task panicked: {err:?}"), + Ok(Err(err)) => println!("Task failed: {err:?}"), + _ => {} } } + } - Ok(()) - }) - .await + Ok(()) } /// Renders the readme of an uploaded crate version. -fn get_readme( +async fn get_readme( storage: &Storage, client: &Client, version: &Version, @@ -173,18 +165,23 @@ fn get_readme( header::HeaderValue::from_static(USER_AGENT), ); let request = client.get(location).headers(extra_headers); - let response = request.send().context("Failed to fetch crate")?; + let response = request.send().await.context("Failed to fetch crate")?; if !response.status().is_success() { return Err(anyhow!( "Failed to get a 200 response: {}", - response.text()? + response.text().await? )); } - let reader = GzDecoder::new(response); - let archive = Archive::new(reader); - render_pkg_readme(archive, &pkg_name) + let body = response.bytes().await?; + + spawn_blocking(move || { + let reader = GzDecoder::new(&*body); + let archive = Archive::new(reader); + render_pkg_readme(archive, &pkg_name) + }) + .await } fn render_pkg_readme(mut archive: Archive, pkg_name: &str) -> anyhow::Result {