@@ -173,7 +173,9 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
173173 )
174174 if attempt_id :
175175 task_logger .info (
176- f"Indexing queued: cc_pair={ cc_pair .id } index_attempt={ attempt_id } "
176+ f"Indexing queued: index_attempt={ attempt_id } "
177+ f"cc_pair={ cc_pair .id } "
178+ f"search_settings={ search_settings_instance .id } "
177179 )
178180 tasks_created += 1
179181 except SoftTimeLimitExceeded :
@@ -489,7 +491,7 @@ def connector_indexing_task(
489491 f"search_settings={ search_settings_id } "
490492 )
491493
492- attempt = None
494+ attempt_found = False
493495 n_final_progress : int | None = None
494496
495497 redis_connector = RedisConnector (tenant_id , cc_pair_id )
@@ -529,6 +531,13 @@ def connector_indexing_task(
529531 sleep (1 )
530532 continue
531533
534+ if payload .index_attempt_id != index_attempt_id :
535+ raise ValueError (
536+ f"connector_indexing_task - id mismatch. Task may be left over from previous run.: "
537+ f"task_index_attempt={ index_attempt_id } "
538+ f"payload_index_attempt={ payload .index_attempt_id } "
539+ )
540+
532541 logger .info (
533542 f"connector_indexing_task - Fence found, continuing...: fence={ redis_connector_index .fence_key } "
534543 )
@@ -557,6 +566,7 @@ def connector_indexing_task(
557566 raise ValueError (
558567 f"Index attempt not found: index_attempt={ index_attempt_id } "
559568 )
569+ attempt_found = True
560570
561571 cc_pair = get_connector_credential_pair_from_id (
562572 cc_pair_id = cc_pair_id ,
@@ -576,44 +586,43 @@ def connector_indexing_task(
576586 f"Credential not found: cc_pair={ cc_pair_id } credential={ cc_pair .credential_id } "
577587 )
578588
579- # define a callback class
580- callback = RunIndexingCallback (
581- redis_connector .stop .fence_key ,
582- redis_connector_index .generator_progress_key ,
583- lock ,
584- r ,
585- )
589+ # define a callback class
590+ callback = RunIndexingCallback (
591+ redis_connector .stop .fence_key ,
592+ redis_connector_index .generator_progress_key ,
593+ lock ,
594+ r ,
595+ )
586596
587- logger .info (
588- f"Indexing spawned task running entrypoint: attempt={ index_attempt_id } "
589- f"tenant={ tenant_id } "
590- f"cc_pair={ cc_pair_id } "
591- f"search_settings={ search_settings_id } "
592- )
597+ logger .info (
598+ f"Indexing spawned task running entrypoint: attempt={ index_attempt_id } "
599+ f"tenant={ tenant_id } "
600+ f"cc_pair={ cc_pair_id } "
601+ f"search_settings={ search_settings_id } "
602+ )
593603
594- run_indexing_entrypoint (
595- index_attempt_id ,
596- tenant_id ,
597- cc_pair_id ,
598- is_ee ,
599- callback = callback ,
600- )
604+ run_indexing_entrypoint (
605+ index_attempt_id ,
606+ tenant_id ,
607+ cc_pair_id ,
608+ is_ee ,
609+ callback = callback ,
610+ )
601611
602- # get back the total number of indexed docs and return it
603- n_final_progress = redis_connector_index .get_progress ()
604- redis_connector_index .set_generator_complete (HTTPStatus .OK .value )
612+ # get back the total number of indexed docs and return it
613+ n_final_progress = redis_connector_index .get_progress ()
614+ redis_connector_index .set_generator_complete (HTTPStatus .OK .value )
605615 except Exception as e :
606616 logger .exception (
607617 f"Indexing spawned task failed: attempt={ index_attempt_id } "
608618 f"tenant={ tenant_id } "
609619 f"cc_pair={ cc_pair_id } "
610620 f"search_settings={ search_settings_id } "
611621 )
612- if attempt :
622+ if attempt_found :
613623 with get_session_with_tenant (tenant_id ) as db_session :
614- mark_attempt_failed (attempt , db_session , failure_reason = str (e ))
624+ mark_attempt_failed (index_attempt_id , db_session , failure_reason = str (e ))
615625
616- redis_connector_index .reset ()
617626 raise e
618627 finally :
619628 if lock .owned ():
0 commit comments