diff --git a/apps/labrinth/src/search/indexing/mod.rs b/apps/labrinth/src/search/indexing/mod.rs index 52c8c5d909..2c149198a7 100644 --- a/apps/labrinth/src/search/indexing/mod.rs +++ b/apps/labrinth/src/search/indexing/mod.rs @@ -14,7 +14,7 @@ use meilisearch_sdk::indexes::Index; use meilisearch_sdk::settings::{PaginationSetting, Settings}; use sqlx::postgres::PgPool; use thiserror::Error; -use tracing::{info, trace}; +use tracing::{Instrument, error, info, info_span, instrument, trace}; #[derive(Error, Debug)] pub enum IndexingError { @@ -36,7 +36,7 @@ pub enum IndexingError { // is too large (>10MiB) then the request fails with an error. This chunk size // assumes a max average size of 4KiB per project to avoid this cap. const MEILISEARCH_CHUNK_SIZE: usize = 10000000; -const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); +const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120); pub async fn remove_documents( ids: &[crate::models::ids::VersionId], @@ -167,6 +167,7 @@ pub async fn swap_index( Ok(()) } +#[instrument(skip(config))] pub async fn get_indexes_for_indexing( config: &SearchConfig, next: bool, // Get the 'next' one @@ -215,13 +216,13 @@ pub async fn get_indexes_for_indexing( Ok(results) } -#[tracing::instrument(skip_all, fields(%name))] +#[instrument(skip_all, fields(name))] async fn create_or_update_index( client: &Client, name: &str, custom_rules: Option<&'static [&'static str]>, ) -> Result { - info!("Updating/creating index {}", name); + info!("Updating/creating index"); match client.get_index(name).await { Ok(index) => { @@ -236,9 +237,13 @@ async fn create_or_update_index( info!("Performing index settings set."); index .set_settings(&settings) - .await? + .await + .inspect_err(|e| error!("Error setting index settings: {e:?}"))? .wait_for_completion(client, None, Some(TIMEOUT)) - .await?; + .await + .inspect_err(|e| { + error!("Error setting index settings while waiting: {e:?}") + })?; info!("Done performing index settings set."); Ok(index) @@ -250,7 +255,10 @@ async fn create_or_update_index( let task = client.create_index(name, Some("version_id")).await?; let task = task .wait_for_completion(client, None, Some(TIMEOUT)) - .await?; + .await + .inspect_err(|e| { + error!("Error creating index while waiting: {e:?}") + })?; let index = task .try_make_index(client) .map_err(|x| x.unwrap_failure())?; @@ -263,15 +271,20 @@ async fn create_or_update_index( index .set_settings(&settings) - .await? + .await + .inspect_err(|e| error!("Error setting index settings: {e:?}"))? .wait_for_completion(client, None, Some(TIMEOUT)) - .await?; + .await + .inspect_err(|e| { + error!("Error setting index settings while waiting: {e:?}") + })?; Ok(index) } } } +#[instrument(skip_all, fields(index.name, mods.len = mods.len()))] async fn add_to_index( client: &Client, index: &Index, @@ -282,21 +295,31 @@ async fn add_to_index( "Adding chunk starting with version id {}", chunk[0].version_id ); + + let now = std::time::Instant::now(); + index .add_or_replace(chunk, Some("version_id")) - .await? + .await + .inspect_err(|e| error!("Error adding chunk to index: {e:?}"))? .wait_for_completion( client, None, - Some(std::time::Duration::from_secs(3600)), + Some(std::time::Duration::from_secs(7200)), // 2 hours ) - .await?; - info!("Added chunk of {} projects to index", chunk.len()); + .await + .inspect_err(|e| error!("Error adding chunk to index: {e:?}"))?; + info!( + "Added chunk of {} projects to index in {:.2} seconds", + chunk.len(), + now.elapsed().as_secs_f64() + ); } Ok(()) } +#[instrument(skip_all, fields(index.name))] async fn update_and_add_to_index( client: &Client, index: &Index, @@ -357,20 +380,28 @@ pub async fn add_projects_batch_client( let mut tasks = FuturesOrdered::new(); + let mut id = 0; + client.across_all(index_references, |index_list, client| { + let span = info_span!("add_projects_batch", client.idx = id); + id += 1; + for index in index_list { let owned_client = client.clone(); let projects_ref = &projects; let additional_fields_ref = &additional_fields; - tasks.push_back(async move { - update_and_add_to_index( - &owned_client, - index, - projects_ref, - additional_fields_ref, - ) - .await - }); + tasks.push_back( + async move { + update_and_add_to_index( + &owned_client, + index, + projects_ref, + additional_fields_ref, + ) + .await + } + .instrument(span.clone()), + ); } });