@@ -39,6 +39,7 @@ use crate::indexer::deserialize_index;
3939use crate :: indexer:: operations_to_point_operations;
4040use crate :: indexer:: search;
4141use crate :: indexer:: serialize_index;
42+ use crate :: indexer:: IndexError ;
4243use crate :: indexer:: Point ;
4344use crate :: indexer:: PointOperation ;
4445use crate :: indexer:: SearchError ;
@@ -244,7 +245,7 @@ fn uri_to_spec(uri: &Uri) -> Result<ResourceSpec, SpecParseError> {
244245#[ derive( Clone , Debug ) ]
245246pub enum TaskStatus {
246247 Pending ( f32 ) ,
247- Error ,
248+ Error ( String ) ,
248249 Completed ,
249250}
250251
@@ -422,6 +423,7 @@ impl Service {
422423 api_key : String ,
423424 ) -> Result < ( ) , StartIndexError > {
424425 let content_endpoint = self . content_endpoint . clone ( ) ;
426+ let internal_task_id = task_id. clone ( ) ;
425427 if let Some ( content_endpoint) = content_endpoint {
426428 tokio:: spawn ( async move {
427429 let index_id = create_index_name ( & domain, & commit) ;
@@ -436,13 +438,24 @@ impl Service {
436438 . await
437439 . unwrap ( )
438440 . chunks ( 100 ) ;
439- let ( id , hnsw ) = self
441+ match self
440442 . process_operation_chunks (
441443 opstream, domain, commit, previous, & index_id, & task_id, & api_key,
442444 )
443- . await ;
444- self . set_index ( id, hnsw. into ( ) ) . await ;
445- self . clear_pending ( & index_id) . await ;
445+ . await
446+ {
447+ Ok ( ( id, hnsw) ) => {
448+ self . set_index ( id, hnsw. into ( ) ) . await ;
449+ self . clear_pending ( & index_id) . await ;
450+ }
451+ Err ( err) => {
452+ self . set_task_status (
453+ internal_task_id,
454+ TaskStatus :: Error ( err. to_string ( ) ) ,
455+ )
456+ . await ;
457+ }
458+ }
446459 }
447460 self . set_task_status ( task_id, TaskStatus :: Completed ) . await ;
448461 } ) ;
@@ -471,6 +484,7 @@ impl Service {
471484 Ok ( ( ) )
472485 }
473486
487+ #[ allow( clippy:: too_many_arguments) ]
474488 async fn process_operation_chunks (
475489 self : & Arc < Self > ,
476490 mut opstream : futures:: stream:: Chunks <
@@ -482,7 +496,7 @@ impl Service {
482496 index_id : & str ,
483497 task_id : & str ,
484498 api_key : & str ,
485- ) -> ( String , HnswIndex ) {
499+ ) -> Result < ( String , HnswIndex ) , IndexError > {
486500 let id = create_index_name ( & domain, & commit) ;
487501 let mut hnsw = self
488502 . load_hnsw_for_indexing ( IndexIdentifier {
@@ -500,14 +514,14 @@ impl Service {
500514 structs,
501515 api_key,
502516 )
503- . await ;
504- hnsw = start_indexing_from_operations ( hnsw, new_ops) . unwrap ( ) ;
517+ . await ? ;
518+ hnsw = start_indexing_from_operations ( hnsw, new_ops) ? ;
505519 }
506520 self . set_task_status ( task_id. to_string ( ) , TaskStatus :: Pending ( 0.8 ) )
507521 . await ;
508522 let path = self . path . clone ( ) ;
509- serialize_index ( path, index_id, hnsw. clone ( ) ) . unwrap ( ) ;
510- ( id, hnsw)
523+ serialize_index ( path, index_id, hnsw. clone ( ) ) ? ;
524+ Ok ( ( id, hnsw) )
511525 }
512526
513527 async fn get_start_index (
@@ -526,7 +540,7 @@ impl Service {
526540
527541 async fn get ( self : Arc < Self > , req : Request < Body > ) -> Result < Response < Body > , Infallible > {
528542 let uri = req. uri ( ) ;
529- match dbg ! ( uri_to_spec( uri) ) {
543+ match uri_to_spec ( uri) {
530544 Ok ( ResourceSpec :: StartIndex {
531545 domain,
532546 commit,
@@ -557,8 +571,9 @@ impl Service {
557571 TaskStatus :: Pending ( f) => {
558572 Ok ( Response :: builder ( ) . body ( format ! ( "{}" , f) . into ( ) ) . unwrap ( ) )
559573 }
560- TaskStatus :: Error => Ok ( Response :: builder ( )
561- . body ( format ! ( "{:?}" , state) . into ( ) )
574+ TaskStatus :: Error ( msg) => Ok ( Response :: builder ( )
575+ . status ( StatusCode :: INTERNAL_SERVER_ERROR )
576+ . body ( format ! ( "{:?}" , msg) . into ( ) )
562577 . unwrap ( ) ) ,
563578 TaskStatus :: Completed => {
564579 Ok ( Response :: builder ( ) . body ( format ! ( "{}" , 1.0 ) . into ( ) ) . unwrap ( ) )
0 commit comments