@@ -14,7 +14,7 @@ use meilisearch_sdk::indexes::Index;
1414use meilisearch_sdk:: settings:: { PaginationSetting , Settings } ;
1515use sqlx:: postgres:: PgPool ;
1616use thiserror:: Error ;
17- use tracing:: { info, trace} ;
17+ use tracing:: { Instrument , error , info, info_span , instrument , trace} ;
1818
1919#[ derive( Error , Debug ) ]
2020pub enum IndexingError {
@@ -36,7 +36,7 @@ pub enum IndexingError {
3636// is too large (>10MiB) then the request fails with an error. This chunk size
3737// assumes a max average size of 4KiB per project to avoid this cap.
3838const MEILISEARCH_CHUNK_SIZE : usize = 10000000 ;
39- const TIMEOUT : std:: time:: Duration = std:: time:: Duration :: from_secs ( 60 ) ;
39+ const TIMEOUT : std:: time:: Duration = std:: time:: Duration :: from_secs ( 120 ) ;
4040
4141pub async fn remove_documents (
4242 ids : & [ crate :: models:: ids:: VersionId ] ,
@@ -167,6 +167,7 @@ pub async fn swap_index(
167167 Ok ( ( ) )
168168}
169169
170+ #[ instrument( skip( config) ) ]
170171pub async fn get_indexes_for_indexing (
171172 config : & SearchConfig ,
172173 next : bool , // Get the 'next' one
@@ -215,13 +216,13 @@ pub async fn get_indexes_for_indexing(
215216 Ok ( results)
216217}
217218
218- #[ tracing :: instrument( skip_all, fields( % name) ) ]
219+ #[ instrument( skip_all, fields( name) ) ]
219220async fn create_or_update_index (
220221 client : & Client ,
221222 name : & str ,
222223 custom_rules : Option < & ' static [ & ' static str ] > ,
223224) -> Result < Index , meilisearch_sdk:: errors:: Error > {
224- info ! ( "Updating/creating index {}" , name ) ;
225+ info ! ( "Updating/creating index" ) ;
225226
226227 match client. get_index ( name) . await {
227228 Ok ( index) => {
@@ -236,9 +237,13 @@ async fn create_or_update_index(
236237 info ! ( "Performing index settings set." ) ;
237238 index
238239 . set_settings ( & settings)
239- . await ?
240+ . await
241+ . inspect_err ( |e| error ! ( "Error setting index settings: {e:?}" ) ) ?
240242 . wait_for_completion ( client, None , Some ( TIMEOUT ) )
241- . await ?;
243+ . await
244+ . inspect_err ( |e| {
245+ error ! ( "Error setting index settings while waiting: {e:?}" )
246+ } ) ?;
242247 info ! ( "Done performing index settings set." ) ;
243248
244249 Ok ( index)
@@ -250,7 +255,10 @@ async fn create_or_update_index(
250255 let task = client. create_index ( name, Some ( "version_id" ) ) . await ?;
251256 let task = task
252257 . wait_for_completion ( client, None , Some ( TIMEOUT ) )
253- . await ?;
258+ . await
259+ . inspect_err ( |e| {
260+ error ! ( "Error creating index while waiting: {e:?}" )
261+ } ) ?;
254262 let index = task
255263 . try_make_index ( client)
256264 . map_err ( |x| x. unwrap_failure ( ) ) ?;
@@ -263,15 +271,20 @@ async fn create_or_update_index(
263271
264272 index
265273 . set_settings ( & settings)
266- . await ?
274+ . await
275+ . inspect_err ( |e| error ! ( "Error setting index settings: {e:?}" ) ) ?
267276 . wait_for_completion ( client, None , Some ( TIMEOUT ) )
268- . await ?;
277+ . await
278+ . inspect_err ( |e| {
279+ error ! ( "Error setting index settings while waiting: {e:?}" )
280+ } ) ?;
269281
270282 Ok ( index)
271283 }
272284 }
273285}
274286
287+ #[ instrument( skip_all, fields( index. name, mods. len = mods. len( ) ) ) ]
275288async fn add_to_index (
276289 client : & Client ,
277290 index : & Index ,
@@ -282,21 +295,31 @@ async fn add_to_index(
282295 "Adding chunk starting with version id {}" ,
283296 chunk[ 0 ] . version_id
284297 ) ;
298+
299+ let now = std:: time:: Instant :: now ( ) ;
300+
285301 index
286302 . add_or_replace ( chunk, Some ( "version_id" ) )
287- . await ?
303+ . await
304+ . inspect_err ( |e| error ! ( "Error adding chunk to index: {e:?}" ) ) ?
288305 . wait_for_completion (
289306 client,
290307 None ,
291- Some ( std:: time:: Duration :: from_secs ( 3600 ) ) ,
308+ Some ( std:: time:: Duration :: from_secs ( 7200 ) ) , // 2 hours
292309 )
293- . await ?;
294- info ! ( "Added chunk of {} projects to index" , chunk. len( ) ) ;
310+ . await
311+ . inspect_err ( |e| error ! ( "Error adding chunk to index: {e:?}" ) ) ?;
312+ info ! (
313+ "Added chunk of {} projects to index in {:.2} seconds" ,
314+ chunk. len( ) ,
315+ now. elapsed( ) . as_secs_f64( )
316+ ) ;
295317 }
296318
297319 Ok ( ( ) )
298320}
299321
322+ #[ instrument( skip_all, fields( index. name) ) ]
300323async fn update_and_add_to_index (
301324 client : & Client ,
302325 index : & Index ,
@@ -357,20 +380,28 @@ pub async fn add_projects_batch_client(
357380
358381 let mut tasks = FuturesOrdered :: new ( ) ;
359382
383+ let mut id = 0 ;
384+
360385 client. across_all ( index_references, |index_list, client| {
386+ let span = info_span ! ( "add_projects_batch" , client. idx = id) ;
387+ id += 1 ;
388+
361389 for index in index_list {
362390 let owned_client = client. clone ( ) ;
363391 let projects_ref = & projects;
364392 let additional_fields_ref = & additional_fields;
365- tasks. push_back ( async move {
366- update_and_add_to_index (
367- & owned_client,
368- index,
369- projects_ref,
370- additional_fields_ref,
371- )
372- . await
373- } ) ;
393+ tasks. push_back (
394+ async move {
395+ update_and_add_to_index (
396+ & owned_client,
397+ index,
398+ projects_ref,
399+ additional_fields_ref,
400+ )
401+ . await
402+ }
403+ . instrument ( span. clone ( ) ) ,
404+ ) ;
374405 }
375406 } ) ;
376407
0 commit comments