Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 53 additions & 22 deletions apps/labrinth/src/search/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use meilisearch_sdk::indexes::Index;
use meilisearch_sdk::settings::{PaginationSetting, Settings};
use sqlx::postgres::PgPool;
use thiserror::Error;
use tracing::{info, trace};
use tracing::{Instrument, error, info, info_span, instrument, trace};

#[derive(Error, Debug)]
pub enum IndexingError {
Expand All @@ -36,7 +36,7 @@ pub enum IndexingError {
// is too large (>10MiB) then the request fails with an error. This chunk size
// assumes a max average size of 4KiB per project to avoid this cap.
const MEILISEARCH_CHUNK_SIZE: usize = 10000000;
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);

pub async fn remove_documents(
ids: &[crate::models::ids::VersionId],
Expand Down Expand Up @@ -167,6 +167,7 @@ pub async fn swap_index(
Ok(())
}

#[instrument(skip(config))]
pub async fn get_indexes_for_indexing(
config: &SearchConfig,
next: bool, // Get the 'next' one
Expand Down Expand Up @@ -215,13 +216,13 @@ pub async fn get_indexes_for_indexing(
Ok(results)
}

#[tracing::instrument(skip_all, fields(%name))]
#[instrument(skip_all, fields(name))]
async fn create_or_update_index(
client: &Client,
name: &str,
custom_rules: Option<&'static [&'static str]>,
) -> Result<Index, meilisearch_sdk::errors::Error> {
info!("Updating/creating index {}", name);
info!("Updating/creating index");

match client.get_index(name).await {
Ok(index) => {
Expand All @@ -236,9 +237,13 @@ async fn create_or_update_index(
info!("Performing index settings set.");
index
.set_settings(&settings)
.await?
.await
.inspect_err(|e| error!("Error setting index settings: {e:?}"))?
.wait_for_completion(client, None, Some(TIMEOUT))
.await?;
.await
.inspect_err(|e| {
error!("Error setting index settings while waiting: {e:?}")
})?;
info!("Done performing index settings set.");

Ok(index)
Expand All @@ -250,7 +255,10 @@ async fn create_or_update_index(
let task = client.create_index(name, Some("version_id")).await?;
let task = task
.wait_for_completion(client, None, Some(TIMEOUT))
.await?;
.await
.inspect_err(|e| {
error!("Error creating index while waiting: {e:?}")
})?;
let index = task
.try_make_index(client)
.map_err(|x| x.unwrap_failure())?;
Expand All @@ -263,15 +271,20 @@ async fn create_or_update_index(

index
.set_settings(&settings)
.await?
.await
.inspect_err(|e| error!("Error setting index settings: {e:?}"))?
.wait_for_completion(client, None, Some(TIMEOUT))
.await?;
.await
.inspect_err(|e| {
error!("Error setting index settings while waiting: {e:?}")
})?;

Ok(index)
}
}
}

#[instrument(skip_all, fields(index.name, mods.len = mods.len()))]
async fn add_to_index(
client: &Client,
index: &Index,
Expand All @@ -282,21 +295,31 @@ async fn add_to_index(
"Adding chunk starting with version id {}",
chunk[0].version_id
);

let now = std::time::Instant::now();

index
.add_or_replace(chunk, Some("version_id"))
.await?
.await
.inspect_err(|e| error!("Error adding chunk to index: {e:?}"))?
.wait_for_completion(
client,
None,
Some(std::time::Duration::from_secs(3600)),
Some(std::time::Duration::from_secs(7200)), // 2 hours
)
.await?;
info!("Added chunk of {} projects to index", chunk.len());
.await
.inspect_err(|e| error!("Error adding chunk to index: {e:?}"))?;
info!(
"Added chunk of {} projects to index in {:.2} seconds",
chunk.len(),
now.elapsed().as_secs_f64()
);
}

Ok(())
}

#[instrument(skip_all, fields(index.name))]
async fn update_and_add_to_index(
client: &Client,
index: &Index,
Expand Down Expand Up @@ -357,20 +380,28 @@ pub async fn add_projects_batch_client(

let mut tasks = FuturesOrdered::new();

let mut id = 0;

client.across_all(index_references, |index_list, client| {
let span = info_span!("add_projects_batch", client.idx = id);
id += 1;

for index in index_list {
let owned_client = client.clone();
let projects_ref = &projects;
let additional_fields_ref = &additional_fields;
tasks.push_back(async move {
update_and_add_to_index(
&owned_client,
index,
projects_ref,
additional_fields_ref,
)
.await
});
tasks.push_back(
async move {
update_and_add_to_index(
&owned_client,
index,
projects_ref,
additional_fields_ref,
)
.await
}
.instrument(span.clone()),
);
}
});

Expand Down