Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ examples/*.html
# UV.lock
uv.lock

# filelock artifacts from tests (mock paths like <MagicMock ...>.lock)
*MagicMock*.lock

# Root directory markdown files (except README.md and AGENTS.md)
*.md
!README.md
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,48 @@ def _resolve_embedding_adapter(
)


def _handle_ingestion_error(
exc: Exception,
collection: str,
doc_id: Optional[str],
parse_hash: Optional[str],
current_step: str,
completed_steps: List[IngestionStepResult],
chunk_count: int,
embedding_count: int,
vector_count: int,
warnings: List[str],
user_id: Optional[int] = None,
) -> IngestionResult:
"""Unify error handling for the ingestion pipeline."""
logger.exception(
"Document ingestion pipeline failed at step '%s': %s", current_step, exc
)

status = "partial" if completed_steps else "error"
_record_ingestion_status(
collection,
doc_id,
status=DocumentProcessingStatus.FAILED,
message=str(exc),
parse_hash=parse_hash,
user_id=user_id,
)

return IngestionResult(
status=status,
doc_id=doc_id,
parse_hash=parse_hash,
chunk_count=chunk_count if status == "partial" else 0,
embedding_count=embedding_count if status == "partial" else 0,
vector_count=vector_count if status == "partial" else 0,
completed_steps=completed_steps,
failed_step=current_step,
message=str(exc),
warnings=warnings,
)


def process_document(
collection: str,
source_path: str,
Expand Down Expand Up @@ -1013,51 +1055,18 @@ def process_document(
warnings=warnings,
)

except RagCoreException as exc:
logger.exception("Document ingestion pipeline failed: %s", exc)
status = "partial" if completed_steps else "error"
except (RagCoreException, Exception) as exc:
progress_manager.complete_task(task_id, success=False)
_record_ingestion_status(
collection,
doc_id,
status=DocumentProcessingStatus.FAILED,
message=str(exc),
parse_hash=parse_hash,
user_id=user_id,
)
return IngestionResult(
status=status,
return _handle_ingestion_error(
exc=exc,
collection=collection,
doc_id=doc_id,
parse_hash=parse_hash,
chunk_count=chunk_count if status == "partial" else 0,
embedding_count=embedding_count if status == "partial" else 0,
vector_count=vector_count if status == "partial" else 0,
current_step=current_step,
completed_steps=completed_steps,
failed_step=current_step,
message=str(exc),
chunk_count=chunk_count,
embedding_count=embedding_count,
vector_count=vector_count,
warnings=warnings,
)
except Exception as exc:
logger.exception("Document ingestion pipeline failed: %s", exc)
status = "partial" if completed_steps else "error"
progress_manager.complete_task(task_id, success=False)
_record_ingestion_status(
collection,
doc_id,
status=DocumentProcessingStatus.FAILED,
message=str(exc),
parse_hash=parse_hash,
user_id=user_id,
)
return IngestionResult(
status=status,
doc_id=doc_id,
parse_hash=parse_hash,
chunk_count=chunk_count if status == "partial" else 0,
embedding_count=embedding_count if status == "partial" else 0,
vector_count=vector_count if status == "partial" else 0,
completed_steps=completed_steps,
failed_step=current_step,
message=str(exc),
warnings=warnings,
)
52 changes: 26 additions & 26 deletions src/xagent/core/tools/core/RAG_tools/pipelines/document_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,27 @@ def _coerce_search_config(
return SearchConfig.model_validate(payload)


def _handle_search_error(
exc: Exception,
current_step: str,
search_type: SearchType,
warnings: List[str],
) -> SearchPipelineResult:
"""Unify error handling for the search pipeline."""
logger.exception(
"Document search pipeline failed at step '%s': %s", current_step, exc
)
return SearchPipelineResult(
status="error",
search_type=search_type,
results=[],
result_count=0,
warnings=warnings + [f"{current_step}: {exc}"],
message=f"{current_step} failed: {exc}",
used_rerank=False,
)


def search_documents(
collection: str,
query_text: str,
Expand Down Expand Up @@ -722,33 +743,12 @@ def search_documents(
cfg=cfg,
)

except RagCoreException as exc:
logger.exception(
"Document search pipeline failed at step '%s': %s", current_step, exc
)
return SearchPipelineResult(
status="error",
search_type=cfg.search_type,
results=[],
result_count=0,
warnings=warnings + [f"{current_step}: {exc}"],
message=f"{current_step} failed: {exc}",
used_rerank=False,
)
except Exception as exc: # noqa: BLE001
logger.exception(
"Document search pipeline encountered unexpected error at step '%s': %s",
current_step,
exc,
)
return SearchPipelineResult(
status="error",
except (RagCoreException, Exception) as exc:
return _handle_search_error(
exc=exc,
current_step=current_step,
search_type=cfg.search_type,
results=[],
result_count=0,
warnings=warnings + [f"{current_step}: {exc}"],
message=f"{current_step} failed: {exc}",
used_rerank=False,
warnings=warnings,
)


Expand Down
Loading
Loading