Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions api/db/services/document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,25 @@ def get_unfinished_docs(cls):
(cls.model.id.in_(unfinished_task_query)))) # including unfinished tasks like GraphRAG, RAPTOR and Mindmap
return list(docs.dicts())

@classmethod
@DB.connection_context()
def get_orphaned_parsing_docs(cls):
"""Get documents stuck in 'Parsing' state (run=RUNNING, progress > 0 and < 1).

These are documents that were being parsed when the system crashed/rebooted.
Their tasks may have been lost from the Redis queue while MySQL still shows
them as running.
"""
fields = [cls.model.id, cls.model.kb_id, cls.model.progress, cls.model.run]
docs = cls.model.select(*fields).where(
cls.model.status == StatusEnum.VALID.value,
~(cls.model.type == FileType.VIRTUAL.value),
cls.model.run == TaskStatus.RUNNING.value,
cls.model.progress > 0,
cls.model.progress < 1,
)
return list(docs.dicts())

@classmethod
@DB.connection_context()
def increment_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration):
Expand Down
58 changes: 58 additions & 0 deletions api/db/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,26 @@ def get_tasks(cls, doc_id: str):
return None
return tasks

@classmethod
@DB.connection_context()
def get_incomplete_tasks_by_doc_ids(cls, doc_ids: list[str]):
"""Get incomplete tasks (progress >= 0 and < 1) for the given document IDs."""
if not doc_ids:
return []
fields = [
cls.model.id,
cls.model.doc_id,
cls.model.from_page,
cls.model.to_page,
cls.model.priority,
]
tasks = cls.model.select(*fields).where(
cls.model.doc_id.in_(doc_ids),
cls.model.progress >= 0,
cls.model.progress < 1,
)
return list(tasks.dicts())

@classmethod
@DB.connection_context()
def get_tasks_progress_by_doc_ids(cls, doc_ids: list[str]):
Expand Down Expand Up @@ -464,6 +484,44 @@ def new_task():
), "Can't access Redis. Please check the Redis' status."


def requeue_orphaned_tasks():
"""Re-queue orphaned tasks after a system crash or reboot.

When the system crashes during document parsing, Redis loses queued tasks
while MySQL retains the 'Parsing' state. This function detects such orphaned
documents and re-queues their incomplete tasks into Redis.
"""
orphaned_docs = DocumentService.get_orphaned_parsing_docs()
if not orphaned_docs:
return 0

doc_ids = [d["id"] for d in orphaned_docs]
incomplete_tasks = TaskService.get_incomplete_tasks_by_doc_ids(doc_ids)
if not incomplete_tasks:
# No incomplete tasks found; reset orphaned docs to failed state
for doc_id in doc_ids:
DocumentService.update_by_id(doc_id, {
"run": TaskStatus.FAIL.value,
"progress_msg": "Task lost after system restart. No incomplete tasks found to re-queue.",
})
return 0

requeued = 0
for task in incomplete_tasks:
priority = task.get("priority", 0) or 0
queue_name = settings.get_svr_queue_name(priority)
if REDIS_CONN.queue_product(queue_name, message=task):
requeued += 1
else:
logging.warning(f"Failed to re-queue orphaned task {task['id']} for doc {task['doc_id']}")

logging.info(
f"Orphaned task reconciliation: found {len(orphaned_docs)} orphaned docs, "
f"re-queued {requeued}/{len(incomplete_tasks)} tasks"
)
return requeued


def reuse_prev_task_chunks(task: dict, prev_tasks: list[dict], chunking_config: dict):
"""Attempt to reuse chunks from previous tasks for optimization.

Expand Down
15 changes: 14 additions & 1 deletion rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from api.db.services.document_service import DocumentService
from api.db.services.doc_metadata_service import DocMetadataService
from api.db.services.llm_service import LLMBundle
from api.db.services.task_service import TaskService, has_canceled, CANVAS_DEBUG_DOC_ID, GRAPH_RAPTOR_FAKE_DOC_ID
from api.db.services.task_service import TaskService, has_canceled, CANVAS_DEBUG_DOC_ID, GRAPH_RAPTOR_FAKE_DOC_ID, requeue_orphaned_tasks
from api.db.services.file2document_service import File2DocumentService
from common.versions import get_ragflow_version
from api.db.db_models import close_connection
Expand Down Expand Up @@ -1383,6 +1383,19 @@ async def main():
report_task = asyncio.create_task(report_status())
tasks = []

# Reconcile orphaned tasks on startup (only first worker to avoid duplicates)
try:
reconcile_lock = RedisDistributedLock("orphaned_task_reconciliation", timeout=60)
if reconcile_lock.acquire():
try:
requeued = requeue_orphaned_tasks()
if requeued:
logging.info(f"Re-queued {requeued} orphaned tasks from previous crash/reboot")
finally:
reconcile_lock.release()
except Exception as e:
logging.warning(f"Orphaned task reconciliation failed: {e}")

logging.info(f"RAGFlow ingestion is ready after {time.time() - start_ts}s initialization.")
try:
while not stop_event.is_set():
Expand Down