Skip to content

Commit 93ec2a6

Browse files
committed
abort mismatched tasks
1 parent 62c4127 commit 93ec2a6

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

backend/danswer/background/celery/tasks/indexing/tasks.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
173173
)
174174
if attempt_id:
175175
task_logger.info(
176-
f"Indexing queued: cc_pair={cc_pair.id} index_attempt={attempt_id}"
176+
f"Indexing queued: index_attempt={attempt_id} "
177+
f"cc_pair={cc_pair.id} "
178+
f"search_settings={search_settings_instance.id} "
177179
)
178180
tasks_created += 1
179181
except SoftTimeLimitExceeded:
@@ -529,6 +531,13 @@ def connector_indexing_task(
529531
sleep(1)
530532
continue
531533

534+
if payload.index_attempt_id != index_attempt_id:
535+
raise ValueError(
536+
f"connector_indexing_task - id mismatch. Task may be left over from previous run.: "
537+
f"task_index_attempt={index_attempt_id} "
538+
f"payload_index_attempt={payload.index_attempt_id}"
539+
)
540+
532541
logger.info(
533542
f"connector_indexing_task - Fence found, continuing...: fence={redis_connector_index.fence_key}"
534543
)
@@ -614,7 +623,6 @@ def connector_indexing_task(
614623
with get_session_with_tenant(tenant_id) as db_session:
615624
mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e))
616625

617-
redis_connector_index.reset()
618626
raise e
619627
finally:
620628
if lock.owned():

backend/danswer/background/celery/tasks/vespa/tasks.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -690,12 +690,17 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
690690

691691
for a in attempts:
692692
# if attempts exist in the db but we don't detect them in redis, mark them as failed
693-
failure_reason = f"Unknown index attempt {a.id}. Might be left over from a process restart."
694-
if not r.exists(
695-
RedisConnectorIndex.fence_key_with_ids(
696-
a.connector_credential_pair_id, a.search_settings_id
693+
fence_key = RedisConnectorIndex.fence_key_with_ids(
694+
a.connector_credential_pair_id, a.search_settings_id
695+
)
696+
if not r.exists(fence_key):
697+
failure_reason = (
698+
f"Unknown index attempt. Might be left over from a process restart: "
699+
f"index_attempt={a.id} "
700+
f"cc_pair={a.connector_credential_pair_id} "
701+
f"search_settings={a.search_settings_id}"
697702
)
698-
):
703+
task_logger.warning(failure_reason)
699704
mark_attempt_failed(a.id, db_session, failure_reason=failure_reason)
700705

701706
lock_beat.reacquire()

0 commit comments

Comments
 (0)