diff --git a/cookbook/07_knowledge/04_advanced/03_graph_rag.py b/cookbook/07_knowledge/04_advanced/03_graph_rag.py index 965fb15754..c87e97b5ff 100644 --- a/cookbook/07_knowledge/04_advanced/03_graph_rag.py +++ b/cookbook/07_knowledge/04_advanced/03_graph_rag.py @@ -1,7 +1,7 @@ """ Graph RAG: LightRAG Integration ================================= -LightRAG is a managed knowledge backend that builds a knowledge graph +LightRAG is an external knowledge provider that builds a knowledge graph from your documents. It handles its own ingestion and retrieval, providing graph-based RAG capabilities. @@ -24,10 +24,10 @@ # --------------------------------------------------------------------------- try: - from agno.vectordb.lightrag import LightRag + from agno.knowledge.external_provider import LightRagProvider knowledge = Knowledge( - vector_db=LightRag( + external_provider=LightRagProvider( server_url="http://localhost:9621", ), ) diff --git a/cookbook/07_knowledge/05_integrations/README.md b/cookbook/07_knowledge/05_integrations/README.md index 6d755603cb..53f9a96bd4 100644 --- a/cookbook/07_knowledge/05_integrations/README.md +++ b/cookbook/07_knowledge/05_integrations/README.md @@ -1,6 +1,6 @@ # Integrations -Specific reader, cloud storage, and vector database integrations. +Specific reader, cloud storage, vector database, and external provider integrations. ## Prerequisites @@ -8,6 +8,7 @@ Specific reader, cloud storage, and vector database integrations. 2. Set `OPENAI_API_KEY` environment variable 3. For cloud: set provider-specific credentials (see each file) 4. For managed DBs: install provider packages (see each file) +5. For external providers: run the provider server (see each file) ## Readers @@ -34,6 +35,15 @@ Specific reader, cloud storage, and vector database integrations. | [vector_dbs/02_local.py](./vector_dbs/02_local.py) | ChromaDB + LanceDB (local development) | | [vector_dbs/03_managed.py](./vector_dbs/03_managed.py) | Pinecone + PgVector (managed/production) | +## External Providers + +External providers manage their own indexing pipeline (chunking, embedding, storage). +You send documents to them and they handle everything internally. + +| File | Provider | +|------|----------| +| [external_providers/01_lightrag.py](./external_providers/01_lightrag.py) | LightRAG (graph-based RAG) | + ## Running ```bash diff --git a/cookbook/07_knowledge/05_integrations/external_providers/01_lightrag.py b/cookbook/07_knowledge/05_integrations/external_providers/01_lightrag.py new file mode 100644 index 0000000000..6156839756 --- /dev/null +++ b/cookbook/07_knowledge/05_integrations/external_providers/01_lightrag.py @@ -0,0 +1,97 @@ +""" +LightRAG: Graph-Based External Provider +========================================= +LightRAG is an external knowledge provider that manages its own +graph-based indexing pipeline. Unlike vector databases where Agno +handles chunking, embedding, and storage, LightRAG runs its own +server and handles everything internally. + +How it works: +- Documents are sent to LightRAG's HTTP API +- LightRAG extracts entities and relationships +- Queries traverse the knowledge graph for multi-hop reasoning + +This is useful when you want graph-based RAG without managing +the graph construction yourself. + +Requirements: +- LightRAG server running (default: http://localhost:9621) +- pip install lightrag-agno +""" + +import asyncio +from os import getenv + +from agno.agent import Agent +from agno.knowledge.knowledge import Knowledge +from agno.models.openai import OpenAIResponses + +# --------------------------------------------------------------------------- +# Setup +# --------------------------------------------------------------------------- + +try: + from agno.knowledge.external_provider import LightRagProvider + + # LightRagProvider implements the ExternalKnowledgeProvider protocol. + # It handles ingestion, querying, and deletion via LightRAG's HTTP API. + provider = LightRagProvider( + server_url=getenv("LIGHTRAG_SERVER_URL", "http://localhost:9621"), + api_key=getenv("LIGHTRAG_API_KEY"), + ) + + # Pass the provider directly — no vector_db needed. + knowledge = Knowledge( + name="lightrag-demo", + external_provider=provider, + ) + + agent = Agent( + model=OpenAIResponses(id="gpt-4o"), + knowledge=knowledge, + search_knowledge=True, + markdown=True, + ) + +except ImportError: + provider = None + knowledge = None + agent = None + print("LightRAG not installed. Run: pip install lightrag-agno") + + +# --------------------------------------------------------------------------- +# Run Demo +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + + async def main(): + if not knowledge or not agent: + print("Skipping demo: LightRAG not installed.") + return + + # 1. Ingest a document — LightRAG builds a knowledge graph from it + print("\n--- Ingesting document into LightRAG ---\n") + await knowledge.ainsert( + url="https://agno-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf" + ) + + # 2. Query via the agent — retrieval goes through LightRAG's graph + print("\n" + "=" * 60) + print("LightRAG: graph-based knowledge retrieval") + print("=" * 60 + "\n") + + agent.print_response( + "What ingredients are commonly shared across Thai recipes?", + stream=True, + ) + + # 3. You can also query the provider directly for raw results + print("\n--- Direct provider query ---\n") + results = await provider.aquery("What Thai dishes use coconut milk?") + if results: + print(f"Got {len(results)} result(s)") + print(f"References: {results[0].meta_data.get('references', [])}") + + asyncio.run(main()) diff --git a/cookbook/07_knowledge/09_archive/vector_dbs/lightrag.py b/cookbook/07_knowledge/09_archive/vector_dbs/lightrag.py index 10af2a9b5b..5a8d25c031 100644 --- a/cookbook/07_knowledge/09_archive/vector_dbs/lightrag.py +++ b/cookbook/07_knowledge/09_archive/vector_dbs/lightrag.py @@ -10,14 +10,14 @@ from os import getenv from agno.agent import Agent +from agno.knowledge.external_provider import LightRagProvider from agno.knowledge.knowledge import Knowledge from agno.knowledge.reader.wikipedia_reader import WikipediaReader -from agno.vectordb.lightrag import LightRag # --------------------------------------------------------------------------- # Setup # --------------------------------------------------------------------------- -vector_db = LightRag( +provider = LightRagProvider( server_url=getenv("LIGHTRAG_SERVER_URL", "http://localhost:9621"), api_key=getenv("LIGHTRAG_API_KEY"), ) @@ -29,7 +29,7 @@ knowledge = Knowledge( name="LightRAG Knowledge Base", description="Knowledge base using LightRAG for graph-based retrieval", - vector_db=vector_db, + external_provider=provider, ) @@ -70,7 +70,7 @@ async def main() -> None: markdown=True, ) - results = await vector_db.async_search("What skills does Jordan Mitchell have?") + results = await provider.aquery("What skills does Jordan Mitchell have?") if results: doc = results[0] print(f"References: {doc.meta_data.get('references', [])}") diff --git a/cookbook/92_integrations/rag/agentic_rag_with_lightrag.py b/cookbook/92_integrations/rag/agentic_rag_with_lightrag.py index b45cb70063..88a1762710 100644 --- a/cookbook/92_integrations/rag/agentic_rag_with_lightrag.py +++ b/cookbook/92_integrations/rag/agentic_rag_with_lightrag.py @@ -9,19 +9,19 @@ from os import getenv from agno.agent import Agent +from agno.knowledge.external_provider import LightRagProvider from agno.knowledge.knowledge import Knowledge from agno.knowledge.reader.wikipedia_reader import WikipediaReader -from agno.vectordb.lightrag import LightRag # --------------------------------------------------------------------------- # Setup # --------------------------------------------------------------------------- -vector_db = LightRag(api_key=getenv("LIGHTRAG_API_KEY")) +provider = LightRagProvider(api_key=getenv("LIGHTRAG_API_KEY")) knowledge = Knowledge( name="My LightRag Knowledge Base", - description="Knowledge base using a LightRag vector database", - vector_db=vector_db, + description="Knowledge base using LightRAG as an external provider", + external_provider=provider, ) # --------------------------------------------------------------------------- diff --git a/libs/agno/agno/knowledge/__init__.py b/libs/agno/agno/knowledge/__init__.py index 18b5a4c82e..0521c96ee2 100644 --- a/libs/agno/agno/knowledge/__init__.py +++ b/libs/agno/agno/knowledge/__init__.py @@ -1,3 +1,4 @@ +from agno.knowledge.external_provider import ExternalKnowledgeProvider from agno.knowledge.filesystem import FileSystemKnowledge from agno.knowledge.knowledge import Knowledge from agno.knowledge.protocol import KnowledgeProtocol @@ -6,4 +7,5 @@ "FileSystemKnowledge", "Knowledge", "KnowledgeProtocol", + "ExternalKnowledgeProvider", ] diff --git a/libs/agno/agno/knowledge/external_provider/__init__.py b/libs/agno/agno/knowledge/external_provider/__init__.py new file mode 100644 index 0000000000..35601612c4 --- /dev/null +++ b/libs/agno/agno/knowledge/external_provider/__init__.py @@ -0,0 +1,5 @@ +from agno.knowledge.external_provider.lightrag import LightRagProvider +from agno.knowledge.external_provider.protocol import ExternalKnowledgeProvider +from agno.knowledge.external_provider.schemas import ProcessingResult + +__all__ = ["LightRagProvider", "ExternalKnowledgeProvider", "ProcessingResult"] diff --git a/libs/agno/agno/knowledge/external_provider/lightrag.py b/libs/agno/agno/knowledge/external_provider/lightrag.py new file mode 100644 index 0000000000..d491202708 --- /dev/null +++ b/libs/agno/agno/knowledge/external_provider/lightrag.py @@ -0,0 +1,389 @@ +"""LightRagProvider — ExternalKnowledgeProvider implementation for LightRAG. + +Encapsulates all HTTP communication with a LightRAG server. Pass directly +to ``Knowledge(external_provider=LightRagProvider(...))`` for graph-based RAG. + +Ingestion is fire-and-forget: the provider hands content to LightRAG and +returns immediately with a ``ProcessingResult`` containing a ``processing_id`` +(the LightRAG track_id). LightRAG processes documents asynchronously in the +background; queries will include the new content once processing completes. +The Agno contents-db tracks content as PROCESSING until it is resolved via +``get_status``/``aget_status``. +""" + +import asyncio +import concurrent.futures +from typing import Any, Dict, List, Optional + +import httpx + +from agno.knowledge.content import ContentStatus +from agno.knowledge.document import Document +from agno.knowledge.external_provider.schemas import ProcessingResult +from agno.utils.log import log_debug, log_error, log_info, log_warning + +DEFAULT_SERVER_URL = "http://localhost:9621" + +TRACK_ID_PREFIX = "track:" + + +class LightRagProvider: + """External knowledge provider that talks to a LightRAG HTTP server.""" + + def __init__( + self, + server_url: str = DEFAULT_SERVER_URL, + api_key: Optional[str] = None, + auth_header_name: str = "X-API-KEY", + auth_header_format: str = "{api_key}", + ): + self.server_url = server_url + self.api_key = api_key + self.auth_header_name = auth_header_name + self.auth_header_format = auth_header_format + + # ------------------------------------------------------------------ + # Async-to-sync helper + # ------------------------------------------------------------------ + + @staticmethod + def _run_sync(coro): # type: ignore[no-untyped-def] + """Run an async coroutine from sync code, even inside a running event loop.""" + try: + asyncio.get_running_loop() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + return pool.submit(asyncio.run, coro).result() + except RuntimeError: + return asyncio.run(coro) + + # ------------------------------------------------------------------ + # Header helpers + # ------------------------------------------------------------------ + + def _get_headers(self) -> Dict[str, str]: + headers: Dict[str, str] = {"Content-Type": "application/json"} + if self.api_key: + headers[self.auth_header_name] = self.auth_header_format.format(api_key=self.api_key) + return headers + + def _get_auth_headers(self) -> Dict[str, str]: + headers: Dict[str, str] = {} + if self.api_key: + headers[self.auth_header_name] = self.auth_header_format.format(api_key=self.api_key) + return headers + + # ------------------------------------------------------------------ + # Ingestion — file bytes + # ------------------------------------------------------------------ + + def ingest_file( + self, + file_content: bytes, + filename: Optional[str] = None, + content_type: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + """Sync wrapper around aingest_file.""" + return self._run_sync( + self.aingest_file( + file_content=file_content, + filename=filename, + content_type=content_type, + metadata=metadata, + ) + ) + + async def aingest_file( + self, + file_content: bytes, + filename: Optional[str] = None, + content_type: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + """Upload raw file bytes to the LightRAG server. + + Returns a ``ProcessingResult`` with the track_id as ``processing_id``. + LightRAG processes the document in the background. + """ + if not file_content: + log_warning("File content is empty.") + return ProcessingResult( + processing_id="", + status=ContentStatus.FAILED, + status_message="File content is empty", + ) + + if filename and content_type: + files = {"file": (filename, file_content, content_type)} + else: + files = {"file": file_content} # type: ignore[dict-item] + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{self.server_url}/documents/upload", + files=files, + headers=self._get_auth_headers(), + ) + response.raise_for_status() + result = response.json() + track_id = result["track_id"] + log_info(f"File submitted to LightRAG, track_id: {track_id}") + return ProcessingResult( + processing_id=track_id, + status=ContentStatus.PROCESSING, + ) + + # ------------------------------------------------------------------ + # Ingestion — plain text + # ------------------------------------------------------------------ + + def ingest_text( + self, + text: str, + source_name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + """Sync wrapper around aingest_text.""" + return self._run_sync(self.aingest_text(text=text, source_name=source_name, metadata=metadata)) + + async def aingest_text( + self, + text: str, + source_name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + """Insert text into the LightRAG server. + + Returns a ``ProcessingResult`` with the track_id as ``processing_id``. + LightRAG processes the document in the background. + """ + payload: Dict[str, str] = {"text": text} + if source_name: + payload["file_source"] = source_name + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{self.server_url}/documents/text", + json=payload, + headers=self._get_headers(), + ) + response.raise_for_status() + result = response.json() + track_id = result["track_id"] + log_info(f"Text submitted to LightRAG, track_id: {track_id}") + return ProcessingResult( + processing_id=track_id, + status=ContentStatus.PROCESSING, + ) + + # ------------------------------------------------------------------ + # Query + # ------------------------------------------------------------------ + + def query( + self, + query: str, + limit: int = 10, + mode: Optional[str] = None, + filters: Optional[Dict[str, Any]] = None, + ) -> List[Document]: + """Sync wrapper around aquery.""" + result = self._run_sync(self.aquery(query=query, limit=limit, mode=mode, filters=filters)) + return result if result is not None else [] + + async def aquery( + self, + query: str, + limit: int = 10, + mode: Optional[str] = None, + filters: Optional[Dict[str, Any]] = None, + ) -> List[Document]: + """Query the LightRAG server. Returns matching documents.""" + query_mode = mode or "hybrid" + if filters is not None: + log_warning("Filters are not supported in LightRAG. No filters will be applied.") + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + f"{self.server_url}/query", + json={"query": query, "mode": query_mode, "include_references": True}, + headers=self._get_headers(), + ) + response.raise_for_status() + result = response.json() + return self._format_response(result, query, query_mode) + + except httpx.RequestError as e: + log_error(f"HTTP Request Error: {type(e).__name__}: {str(e)}") + return [] + except httpx.HTTPStatusError as e: + log_error(f"HTTP Status Error: {e.response.status_code} - {e.response.text}") + return [] + except Exception as e: + log_error(f"Unexpected error during LightRAG server search: {type(e).__name__}: {str(e)}") + import traceback + + log_error(f"Full traceback: {traceback.format_exc()}") + return [] + + # ------------------------------------------------------------------ + # Deletion + # ------------------------------------------------------------------ + + def delete_content( + self, + external_id: str, + ) -> bool: + """Sync wrapper around adelete_content.""" + try: + return self._run_sync(self.adelete_content(external_id)) + except Exception as e: + log_error(f"Error in sync delete_content: {e}") + return False + + async def adelete_content( + self, + external_id: str, + ) -> bool: + """Delete a document from LightRAG by its external ID. + + Accepts either a raw doc ID or a ``track:`` reference. + Track references are resolved to a doc ID on the fly. + """ + try: + doc_id = await self._resolve_doc_id(external_id) + if doc_id is None: + log_warning(f"Could not resolve LightRAG doc ID from: {external_id}") + return False + + payload = {"doc_ids": [doc_id], "delete_file": False} + async with httpx.AsyncClient() as client: + response = await client.request( + method="DELETE", + url=f"{self.server_url}/documents/delete_document", + headers=self._get_headers(), + json=payload, + ) + response.raise_for_status() + return True + except Exception as e: + log_error(f"Error deleting document {external_id}: {e}") + return False + + # ------------------------------------------------------------------ + # Status polling + # ------------------------------------------------------------------ + + def get_status(self, processing_id: str) -> ProcessingResult: + """Sync wrapper around aget_status.""" + return self._run_sync(self.aget_status(processing_id)) + + async def aget_status(self, processing_id: str) -> ProcessingResult: + """Check LightRAG processing status for a track ID. + + Queries the track status endpoint and returns a ``ProcessingResult`` + with the resolved ``external_id`` (document ID) when completed. + """ + try: + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.server_url}/documents/track_status/{processing_id}", + headers=self._get_headers(), + ) + response.raise_for_status() + result = response.json() + log_debug(f"Track status for {processing_id}: {result}") + + status_summary = result.get("status_summary", {}) + if status_summary.get("failed", 0) > 0: + return ProcessingResult( + processing_id=processing_id, + status=ContentStatus.FAILED, + status_message="External provider processing failed", + ) + + # Still processing if any documents are pending or in-progress + if status_summary.get("processing", 0) > 0 or status_summary.get("pending", 0) > 0: + return ProcessingResult( + processing_id=processing_id, + status=ContentStatus.PROCESSING, + ) + + documents = result.get("documents", []) + if documents and status_summary.get("completed", 0) > 0: + external_id = documents[0]["id"] + return ProcessingResult( + processing_id=processing_id, + external_id=external_id, + status=ContentStatus.COMPLETED, + ) + + return ProcessingResult( + processing_id=processing_id, + status=ContentStatus.PROCESSING, + ) + except Exception as e: + log_error(f"Error checking track status {processing_id}: {e}") + return ProcessingResult( + processing_id=processing_id, + status=ContentStatus.PROCESSING, + status_message=f"Error checking status: {e}", + ) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + async def _resolve_doc_id(self, external_id: str) -> Optional[str]: + """Resolve an external_id to a LightRAG document ID. + + If the external_id is a ``track:`` reference, queries the + track status endpoint to find the actual document ID. Otherwise + returns the external_id as-is (assumed to be a doc ID already). + """ + if not external_id.startswith(TRACK_ID_PREFIX): + return external_id + + track_id = external_id[len(TRACK_ID_PREFIX) :] + try: + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.server_url}/documents/track_status/{track_id}", + headers=self._get_headers(), + ) + response.raise_for_status() + result = response.json() + + if "documents" in result and len(result["documents"]) > 0: + return result["documents"][0]["id"] + + log_warning(f"No documents found for track_id {track_id}: {result}") + return None + except Exception as e: + log_error(f"Error resolving doc ID from track_id {track_id}: {e}") + return None + + def _format_response(self, result: Any, query: str, mode: str) -> List[Document]: + """Format LightRAG server response into Document objects.""" + if isinstance(result, dict) and "response" in result: + meta_data: Dict[str, Any] = {"source": "lightrag", "query": query, "mode": mode} + if "references" in result: + meta_data["references"] = result["references"] + return [Document(content=result["response"], meta_data=meta_data)] + elif isinstance(result, list): + documents = [] + for item in result: + if isinstance(item, dict) and "content" in item: + documents.append( + Document( + content=item["content"], + meta_data=item.get("metadata", {"source": "lightrag", "query": query, "mode": mode}), + ) + ) + else: + documents.append( + Document(content=str(item), meta_data={"source": "lightrag", "query": query, "mode": mode}) + ) + return documents + else: + return [Document(content=str(result), meta_data={"source": "lightrag", "query": query, "mode": mode})] diff --git a/libs/agno/agno/knowledge/external_provider/protocol.py b/libs/agno/agno/knowledge/external_provider/protocol.py new file mode 100644 index 0000000000..d843dff4a9 --- /dev/null +++ b/libs/agno/agno/knowledge/external_provider/protocol.py @@ -0,0 +1,131 @@ +"""ExternalKnowledgeProvider — protocol for external providers that manage their own indexing. + +Providers implementing this protocol handle ingestion (file/text), search, +deletion, and status polling internally, bypassing Agno's default chunk-embed-store +pipeline. LightRAG is the canonical example: it runs its own graph-based indexing +server and exposes HTTP endpoints for upload, query, and delete. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol, runtime_checkable + +from agno.knowledge.document import Document + +if TYPE_CHECKING: + from agno.knowledge.external_provider.schemas import ProcessingResult + + +@runtime_checkable +class ExternalKnowledgeProvider(Protocol): + """Protocol for external knowledge providers that manage their own indexing pipeline. + + Any class that implements these methods can be passed to + ``Knowledge(external_provider=...)``. + """ + + # ------------------------------------------------------------------ + # Ingestion — file bytes + # ------------------------------------------------------------------ + + def ingest_file( + self, + file_content: bytes, + filename: Optional[str] = None, + content_type: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + """Ingest a file from raw bytes. Returns a ProcessingResult.""" + ... + + async def aingest_file( + self, + file_content: bytes, + filename: Optional[str] = None, + content_type: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + """Async variant of ingest_file.""" + ... + + # ------------------------------------------------------------------ + # Ingestion — plain text + # ------------------------------------------------------------------ + + def ingest_text( + self, + text: str, + source_name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + """Ingest plain text. Returns a ProcessingResult.""" + ... + + async def aingest_text( + self, + text: str, + source_name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + """Async variant of ingest_text.""" + ... + + # ------------------------------------------------------------------ + # Query + # ------------------------------------------------------------------ + + def query( + self, + query: str, + limit: int = 10, + mode: Optional[str] = None, + filters: Optional[Dict[str, Any]] = None, + ) -> List[Document]: + """Search the provider. Returns matching documents.""" + ... + + async def aquery( + self, + query: str, + limit: int = 10, + mode: Optional[str] = None, + filters: Optional[Dict[str, Any]] = None, + ) -> List[Document]: + """Async variant of query.""" + ... + + # ------------------------------------------------------------------ + # Deletion + # ------------------------------------------------------------------ + + def delete_content( + self, + external_id: str, + ) -> bool: + """Delete a document by its external ID. Returns True on success.""" + ... + + async def adelete_content( + self, + external_id: str, + ) -> bool: + """Async variant of delete_content.""" + ... + + # ------------------------------------------------------------------ + # Status polling + # ------------------------------------------------------------------ + + def get_status( + self, + processing_id: str, + ) -> ProcessingResult: + """Get the processing status of a document by its processing ID.""" + ... + + async def aget_status( + self, + processing_id: str, + ) -> ProcessingResult: + """Async variant of get_status.""" + ... diff --git a/libs/agno/agno/knowledge/external_provider/schemas.py b/libs/agno/agno/knowledge/external_provider/schemas.py new file mode 100644 index 0000000000..5e98414fe2 --- /dev/null +++ b/libs/agno/agno/knowledge/external_provider/schemas.py @@ -0,0 +1,23 @@ +"""Schemas for external knowledge providers.""" + +from dataclasses import dataclass, field +from typing import Optional + +from agno.knowledge.content import ContentStatus + + +@dataclass +class ProcessingResult: + """Result from an external provider ingestion or status check. + + Attributes: + processing_id: The ID used to poll for status (e.g. LightRAG track_id). + external_id: The final, resolved external document ID (available once processing completes). + status: Current processing status. + status_message: Optional human-readable message about the status. + """ + + processing_id: str + external_id: Optional[str] = None + status: ContentStatus = field(default=ContentStatus.PROCESSING) + status_message: Optional[str] = None diff --git a/libs/agno/agno/knowledge/knowledge.py b/libs/agno/agno/knowledge/knowledge.py index 7ad340861c..30b6cda06b 100644 --- a/libs/agno/agno/knowledge/knowledge.py +++ b/libs/agno/agno/knowledge/knowledge.py @@ -14,7 +14,7 @@ ) from agno.knowledge.remote_knowledge import RemoteLoader from agno.knowledge.store.content_store import ContentStore -from agno.knowledge.utils import strip_agno_metadata +from agno.knowledge.utils import get_agno_metadata, set_agno_metadata, strip_agno_metadata from agno.utils.log import log_debug, log_info, log_warning from agno.utils.string import generate_id @@ -28,6 +28,7 @@ class Knowledge: name: Optional[str] = None description: Optional[str] = None vector_db: Optional[Any] = None + external_provider: Optional[Any] = None contents_db: Optional[Union[BaseDb, AsyncBaseDb]] = None max_results: int = 10 readers: Optional[Dict[str, Reader]] = None @@ -59,6 +60,7 @@ def __post_init__(self): reader_registry=self._reader_registry, knowledge_name=self.name, isolate_vector_search=self.isolate_vector_search, + external_provider=self.external_provider, ) self._remote_loader = RemoteLoader( pipeline=self._pipeline, @@ -470,6 +472,17 @@ def search( search_type: Optional[str] = None, ) -> List[Document]: """Returns relevant documents matching a query""" + _max_results = max_results or self.max_results + + # Route through external provider if detected + if self.external_provider is not None: + try: + log_debug(f"Getting {_max_results} relevant documents via external provider for query: {query}") + return self.external_provider.query(query=query, limit=_max_results) + except Exception as e: + log_warning(f"Error searching external provider: {e}") + return [] + from agno.vectordb import VectorDb from agno.vectordb.search import SearchType @@ -495,7 +508,6 @@ def search( elif isinstance(search_filters, list): search_filters = [EQ("linked_to", self.name), *search_filters] - _max_results = max_results or self.max_results log_debug(f"Getting {_max_results} relevant documents for query: {query}") return self.vector_db.search(query=query, limit=_max_results, filters=search_filters) except Exception as e: @@ -512,6 +524,17 @@ async def asearch( search_type: Optional[str] = None, ) -> List[Document]: """Returns relevant documents matching a query""" + _max_results = max_results or self.max_results + + # Route through external provider if detected + if self.external_provider is not None: + try: + log_debug(f"Getting {_max_results} relevant documents via external provider for query: {query}") + return await self.external_provider.aquery(query=query, limit=_max_results) + except Exception as e: + log_warning(f"Error searching external provider: {e}") + return [] + from agno.vectordb import VectorDb from agno.vectordb.search import SearchType @@ -536,7 +559,6 @@ async def asearch( elif isinstance(search_filters, list): search_filters = [EQ("linked_to", self.name), *search_filters] - _max_results = max_results or self.max_results log_debug(f"Getting {_max_results} relevant documents for query: {query}") try: return await self.vector_db.async_search(query=query, limit=_max_results, filters=search_filters) @@ -578,10 +600,100 @@ async def aget_content_by_id(self, content_id: str) -> Optional[Content]: return await self._content_store.aget_content_by_id(content_id) def get_content_status(self, content_id: str) -> Tuple[Optional[ContentStatus], Optional[str]]: - return self._content_store.get_content_status(content_id) + status, message = self._content_store.get_content_status(content_id) + if status == ContentStatus.PROCESSING and self.external_provider is not None: + status, message = self._resolve_external_status(content_id, status, message) + return status, message async def aget_content_status(self, content_id: str) -> Tuple[Optional[ContentStatus], Optional[str]]: - return await self._content_store.aget_content_status(content_id) + status, message = await self._content_store.aget_content_status(content_id) + if status == ContentStatus.PROCESSING and self.external_provider is not None: + status, message = await self._aresolve_external_status(content_id, status, message) + return status, message + + def _resolve_external_status( + self, + content_id: str, + current_status: ContentStatus, + current_message: Optional[str], + ) -> Tuple[ContentStatus, Optional[str]]: + """Check an external provider for updated processing status (sync).""" + content = self._content_store.get_content_by_id(content_id) + if content is None: + return current_status, current_message + + # Read processing_id from _agno metadata (persisted via the metadata JSON column) + polling_id = get_agno_metadata(content.metadata, "processing_id") or content.external_id + if polling_id is None: + return current_status, current_message + + try: + provider = self.external_provider + if provider is not None and hasattr(provider, "get_status"): + result = provider.get_status(polling_id) + return self._apply_provider_result(content, result, current_status, current_message) + except Exception as e: + log_debug(f"Could not check external provider status: {e}") + return current_status, current_message + + async def _aresolve_external_status( + self, + content_id: str, + current_status: ContentStatus, + current_message: Optional[str], + ) -> Tuple[ContentStatus, Optional[str]]: + """Check an external provider for updated processing status (async).""" + content = await self._content_store.aget_content_by_id(content_id) + if content is None: + return current_status, current_message + + # Read processing_id from _agno metadata (persisted via the metadata JSON column) + polling_id = get_agno_metadata(content.metadata, "processing_id") or content.external_id + if polling_id is None: + return current_status, current_message + + try: + provider = self.external_provider + if provider is not None and hasattr(provider, "aget_status"): + result = await provider.aget_status(polling_id) + return await self._aapply_provider_result(content, result, current_status, current_message) + except Exception as e: + log_debug(f"Could not check external provider status: {e}") + return current_status, current_message + + def _apply_provider_result( + self, + content: Content, + result: Any, + current_status: ContentStatus, + current_message: Optional[str], + ) -> Tuple[ContentStatus, Optional[str]]: + """Apply a ProcessingResult from the external provider and persist changes (sync).""" + content.status = result.status + content.status_message = result.status_message + if result.status == ContentStatus.COMPLETED and result.external_id: + content.external_id = result.external_id + content.metadata = set_agno_metadata(content.metadata, "external_id", result.external_id) + if result.status in (ContentStatus.COMPLETED, ContentStatus.FAILED): + self._content_store.update(content, vector_db=self.vector_db) + return content.status, content.status_message + + async def _aapply_provider_result( + self, + content: Content, + result: Any, + current_status: ContentStatus, + current_message: Optional[str], + ) -> Tuple[ContentStatus, Optional[str]]: + """Apply a ProcessingResult from the external provider and persist changes (async).""" + content.status = result.status + content.status_message = result.status_message + if result.status == ContentStatus.COMPLETED and result.external_id: + content.external_id = result.external_id + content.metadata = set_agno_metadata(content.metadata, "external_id", result.external_id) + if result.status in (ContentStatus.COMPLETED, ContentStatus.FAILED): + await self._content_store.aupdate(content, vector_db=self.vector_db) + return content.status, content.status_message def patch_content(self, content: Content) -> Optional[Dict[str, Any]]: return self._content_store.patch_content(content, vector_db=self.vector_db) @@ -594,12 +706,12 @@ def remove_content_by_id(self, content_id: str): self.vector_db = cast(VectorDb, self.vector_db) if self.vector_db is not None: - if self.vector_db.__class__.__name__ == "LightRag": + if self.external_provider is not None: content = self.get_content_by_id(content_id) if content and content.external_id: - self.vector_db.delete_by_external_id(content.external_id) # type: ignore + self.external_provider.delete_content(content.external_id) else: - log_warning(f"No external_id found for content {content_id}, cannot delete from LightRAG") + log_warning(f"No external_id found for content {content_id}, cannot delete from external provider") else: self.vector_db.delete_by_content_id(content_id) @@ -608,12 +720,12 @@ def remove_content_by_id(self, content_id: str): async def aremove_content_by_id(self, content_id: str): if self.vector_db is not None: - if self.vector_db.__class__.__name__ == "LightRag": + if self.external_provider is not None: content = await self.aget_content_by_id(content_id) if content and content.external_id: - self.vector_db.delete_by_external_id(content.external_id) # type: ignore + await self.external_provider.adelete_content(content.external_id) else: - log_warning(f"No external_id found for content {content_id}, cannot delete from LightRAG") + log_warning(f"No external_id found for content {content_id}, cannot delete from external provider") else: self.vector_db.delete_by_content_id(content_id) diff --git a/libs/agno/agno/knowledge/pipeline/ingestion.py b/libs/agno/agno/knowledge/pipeline/ingestion.py index 16517a18c0..c36a20297c 100644 --- a/libs/agno/agno/knowledge/pipeline/ingestion.py +++ b/libs/agno/agno/knowledge/pipeline/ingestion.py @@ -4,7 +4,6 @@ vector database. Orchestrates reading, chunking, hashing, and insertion. """ -import asyncio import hashlib import io from dataclasses import dataclass @@ -18,6 +17,7 @@ from agno.knowledge.content import Content, ContentStatus, FileData from agno.knowledge.document import Document +from agno.knowledge.external_provider.schemas import ProcessingResult from agno.knowledge.reader import Reader, ReaderFactory from agno.knowledge.reader_registry import ReaderRegistry from agno.knowledge.store.content_store import ContentStore @@ -43,6 +43,7 @@ class IngestionPipeline: reader_registry: Optional[ReaderRegistry] = None knowledge_name: Optional[str] = None isolate_vector_search: bool = False + external_provider: Optional[Any] = None # ========================================== # MAIN ENTRY POINTS @@ -121,8 +122,8 @@ def load_from_path( self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] return - if self.vector_db.__class__.__name__ == "LightRag": - self.process_lightrag_content(content, KnowledgeContentOrigin.PATH) + if self.external_provider is not None: + self._ingest_external(content, KnowledgeContentOrigin.PATH) return if content.reader: @@ -205,8 +206,8 @@ async def aload_from_path( await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] return - if self.vector_db.__class__.__name__ == "LightRag": - await self.aprocess_lightrag_content(content, KnowledgeContentOrigin.PATH) + if self.external_provider is not None: + await self._aingest_external(content, KnowledgeContentOrigin.PATH) return if content.reader: @@ -297,8 +298,8 @@ def load_from_url( self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] return - if self.vector_db.__class__.__name__ == "LightRag": - self.process_lightrag_content(content, KnowledgeContentOrigin.URL) + if self.external_provider is not None: + self._ingest_external(content, KnowledgeContentOrigin.URL) return try: @@ -426,8 +427,8 @@ async def aload_from_url( await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] return - if self.vector_db.__class__.__name__ == "LightRag": - await self.aprocess_lightrag_content(content, KnowledgeContentOrigin.URL) + if self.external_provider is not None: + await self._aingest_external(content, KnowledgeContentOrigin.URL) return try: @@ -568,8 +569,8 @@ def load_from_content( self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] return - if content.file_data and self.vector_db.__class__.__name__ == "LightRag": - self.process_lightrag_content(content, KnowledgeContentOrigin.CONTENT) + if content.file_data and self.external_provider is not None: + self._ingest_external(content, KnowledgeContentOrigin.CONTENT) return read_documents = [] @@ -670,8 +671,8 @@ async def aload_from_content( await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] return - if content.file_data and self.vector_db.__class__.__name__ == "LightRag": - await self.aprocess_lightrag_content(content, KnowledgeContentOrigin.CONTENT) + if content.file_data and self.external_provider is not None: + await self._aingest_external(content, KnowledgeContentOrigin.CONTENT) return read_documents = [] @@ -776,8 +777,8 @@ def load_from_topics( self.content_store.update(topic_content, vector_db=self.vector_db) # type: ignore[union-attr] continue - if self.vector_db.__class__.__name__ == "LightRag": - self.process_lightrag_content(topic_content, KnowledgeContentOrigin.TOPIC) + if self.external_provider is not None: + self._ingest_external(topic_content, KnowledgeContentOrigin.TOPIC) continue if self.vector_db and self.vector_db.content_hash_exists(topic_content.content_hash) and skip_if_exists: @@ -840,8 +841,8 @@ async def aload_from_topics( await self.content_store.aupdate(topic_content, vector_db=self.vector_db) # type: ignore[union-attr] continue - if self.vector_db.__class__.__name__ == "LightRag": - await self.aprocess_lightrag_content(topic_content, KnowledgeContentOrigin.TOPIC) + if self.external_provider is not None: + await self._aingest_external(topic_content, KnowledgeContentOrigin.TOPIC) continue if self.vector_db and self.vector_db.content_hash_exists(topic_content.content_hash) and skip_if_exists: @@ -1092,321 +1093,190 @@ def chunk_documents_sync(self, reader: Reader, documents: List[Document]) -> Lis return chunked_documents # ========================================== - # LIGHTRAG PROCESSING + # EXTERNAL PROVIDER INGESTION # ========================================== - async def aprocess_lightrag_content(self, content: Content, content_type: KnowledgeContentOrigin) -> None: - from agno.vectordb import VectorDb - - self.vector_db = cast(VectorDb, self.vector_db) - - await self.content_store.ainsert(content) # type: ignore[union-attr] - if content_type == KnowledgeContentOrigin.PATH: - if content.file_data is None: - log_warning("No file data provided") - - if content.path is None: - log_error("No path provided for content") - return - - path = Path(content.path) + def _ingest_external(self, content: Content, content_type: KnowledgeContentOrigin) -> None: + """Route content to the external provider (sync). Handles PATH, URL, CONTENT, and TOPIC origins. - log_info(f"Uploading file to LightRAG from path: {path}") - try: - with open(path, "rb") as f: - file_content = f.read() + Content stays in PROCESSING state because the external provider (e.g. LightRAG) + processes documents asynchronously. Status is resolved when polled. + """ + content.metadata = self._build_external_provider_metadata(content, content_type) + try: + result = self._do_external_ingest(content, content_type) + content.external_id = result.external_id + content.status = result.status + content.status_message = result.status_message + if result.processing_id: + content.metadata = set_agno_metadata(content.metadata, "processing_id", result.processing_id) + if result.external_id: + content.metadata = set_agno_metadata(content.metadata, "external_id", result.external_id) + except Exception as e: + log_error(f"Error ingesting via external provider: {e}") + content.status = ContentStatus.FAILED + content.status_message = f"External provider ingestion failed: {str(e)}" + self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - file_type = content.file_type or path.suffix + async def _aingest_external(self, content: Content, content_type: KnowledgeContentOrigin) -> None: + """Route content to the external provider (async). Handles PATH, URL, CONTENT, and TOPIC origins. - if self.vector_db and hasattr(self.vector_db, "insert_file_bytes"): - result = await self.vector_db.insert_file_bytes( - file_content=file_content, - filename=path.name, - content_type=file_type, - send_metadata=True, - ) + Content stays in PROCESSING state because the external provider (e.g. LightRAG) + processes documents asynchronously. Status is resolved when polled. + """ + content.metadata = self._build_external_provider_metadata(content, content_type) + try: + result = await self._ado_external_ingest(content, content_type) + content.external_id = result.external_id + content.status = result.status + content.status_message = result.status_message + if result.processing_id: + content.metadata = set_agno_metadata(content.metadata, "processing_id", result.processing_id) + if result.external_id: + content.metadata = set_agno_metadata(content.metadata, "external_id", result.external_id) + except Exception as e: + log_error(f"Error ingesting via external provider: {e}") + content.status = ContentStatus.FAILED + content.status_message = f"External provider ingestion failed: {str(e)}" + await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - else: - log_error("Vector database does not support file insertion") - content.status = ContentStatus.FAILED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - content.external_id = result - content.status = ContentStatus.COMPLETED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return + def _build_external_provider_metadata( + self, + content: Content, + content_type: KnowledgeContentOrigin, + ) -> Dict[str, Any]: + """Build _agno metadata for content managed by an external provider.""" + provider = self.external_provider + assert provider is not None + + provider_class = type(provider).__name__ + metadata = content.metadata or {} + metadata = set_agno_metadata(metadata, "source_type", "external_provider") + metadata = set_agno_metadata(metadata, "provider", provider_class) + metadata = set_agno_metadata(metadata, "content_origin", content_type.value) + + if hasattr(provider, "server_url"): + metadata = set_agno_metadata(metadata, "server_url", provider.server_url) + + if content_type == KnowledgeContentOrigin.URL and content.url: + metadata = set_agno_metadata(metadata, "source_url", content.url) + elif content_type == KnowledgeContentOrigin.PATH and content.path: + metadata = set_agno_metadata(metadata, "source_path", content.path) + elif content_type == KnowledgeContentOrigin.TOPIC and content.topics: + metadata = set_agno_metadata(metadata, "source_topics", content.topics) + + return metadata + + def _do_external_ingest(self, content: Content, content_type: KnowledgeContentOrigin) -> ProcessingResult: + """Dispatch to the appropriate external provider ingestion method (sync).""" + provider = self.external_provider + assert provider is not None - except Exception as e: - log_error(f"Error uploading file to LightRAG: {e}") - content.status = ContentStatus.FAILED - content.status_message = f"Could not upload to LightRAG: {str(e)}" - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return + if content_type == KnowledgeContentOrigin.PATH: + if content.path is None: + raise ValueError("No path provided for content") + path = Path(content.path) + log_info(f"Ingesting file via external provider from path: {path}") + with open(path, "rb") as f: + file_content = f.read() + file_type = content.file_type or path.suffix + return provider.ingest_file(file_content=file_content, filename=path.name, content_type=file_type) elif content_type == KnowledgeContentOrigin.URL: - log_info(f"Uploading file to LightRAG from URL: {content.url}") - try: - reader = content.reader or self.reader_registry.website_reader # type: ignore[union-attr] - if reader is None: - log_error("No URL reader available") - content.status = ContentStatus.FAILED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - - reader.chunk = False - read_documents = reader.read(content.url, name=content.name) - if not content.id: - content.id = generate_id(content.content_hash or "") - self.prepare_documents_for_insert(read_documents, content.id) - - if not read_documents: - log_error("No documents read from URL") - content.status = ContentStatus.FAILED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - - if self.vector_db and hasattr(self.vector_db, "insert_text"): - result = await self.vector_db.insert_text( - file_source=content.url, - text=read_documents[0].content, - ) - else: - log_error("Vector database does not support text insertion") - content.status = ContentStatus.FAILED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - - content.external_id = result - content.status = ContentStatus.COMPLETED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - - except Exception as e: - log_error(f"Error uploading file to LightRAG: {e}") - content.status = ContentStatus.FAILED - content.status_message = f"Could not upload to LightRAG: {str(e)}" - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return + log_info(f"Ingesting content via external provider from URL: {content.url}") + reader = content.reader or self.reader_registry.website_reader # type: ignore[union-attr] + if reader is None: + raise ValueError("No URL reader available") + reader.chunk = False + read_documents = reader.read(content.url, name=content.name) + if not content.id: + content.id = generate_id(content.content_hash or "") + self.prepare_documents_for_insert(read_documents, content.id) + if not read_documents: + raise ValueError("No documents read from URL") + return provider.ingest_text(text=read_documents[0].content, source_name=content.url) elif content_type == KnowledgeContentOrigin.CONTENT: filename = ( content.file_data.filename if content.file_data and content.file_data.filename else "uploaded_file" ) - log_info(f"Uploading file to LightRAG: {filename}") - + log_info(f"Ingesting file via external provider: {filename}") if content.file_data and content.file_data.content: - if self.vector_db and hasattr(self.vector_db, "insert_file_bytes"): - result = await self.vector_db.insert_file_bytes( - file_content=content.file_data.content, - filename=filename, - content_type=content.file_data.type, - send_metadata=True, - ) - else: - log_error("Vector database does not support file insertion") - content.status = ContentStatus.FAILED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - content.external_id = result - content.status = ContentStatus.COMPLETED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] + return provider.ingest_file( + file_content=content.file_data.content, + filename=filename, + content_type=content.file_data.type, + ) else: - log_warning(f"No file data available for LightRAG upload: {content.name}") - return + raise ValueError(f"No file data available for external provider upload: {content.name}") elif content_type == KnowledgeContentOrigin.TOPIC: - log_info(f"Uploading file to LightRAG: {content.name}") - + log_info(f"Ingesting topic via external provider: {content.name}") if content.reader is None: - log_error("No reader available for topic content") - content.status = ContentStatus.FAILED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - + raise ValueError("No reader available for topic content") if not content.topics: - log_error("No topics available for content") - content.status = ContentStatus.FAILED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - + raise ValueError("No topics available for content") read_documents = content.reader.read(content.topics) if len(read_documents) > 0: - if self.vector_db and hasattr(self.vector_db, "insert_text"): - result = await self.vector_db.insert_text( - file_source=content.topics[0], - text=read_documents[0].content, - ) - else: - log_error("Vector database does not support text insertion") - content.status = ContentStatus.FAILED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - content.external_id = result - content.status = ContentStatus.COMPLETED - await self.content_store.aupdate(content, vector_db=self.vector_db) # type: ignore[union-attr] - return + return provider.ingest_text(text=read_documents[0].content, source_name=content.topics[0]) else: - log_warning(f"No documents found for LightRAG upload: {content.name}") - return + raise ValueError(f"No documents found for external provider upload: {content.name}") - def process_lightrag_content(self, content: Content, content_type: KnowledgeContentOrigin) -> None: - """Synchronously process LightRAG content. Uses asyncio.run() only for LightRAG-specific async methods.""" - from agno.vectordb import VectorDb + raise ValueError(f"Unsupported content type: {content_type}") - self.vector_db = cast(VectorDb, self.vector_db) + async def _ado_external_ingest(self, content: Content, content_type: KnowledgeContentOrigin) -> ProcessingResult: + """Dispatch to the appropriate external provider ingestion method (async).""" + provider = self.external_provider + assert provider is not None - self.content_store.insert(content) # type: ignore[union-attr] if content_type == KnowledgeContentOrigin.PATH: - if content.file_data is None: - log_warning("No file data provided") - if content.path is None: - log_error("No path provided for content") - return - + raise ValueError("No path provided for content") path = Path(content.path) - - log_info(f"Uploading file to LightRAG from path: {path}") - try: - with open(path, "rb") as f: - file_content = f.read() - - file_type = content.file_type or path.suffix - - if self.vector_db and hasattr(self.vector_db, "insert_file_bytes"): - result = asyncio.run( - self.vector_db.insert_file_bytes( - file_content=file_content, - filename=path.name, - content_type=file_type, - send_metadata=True, - ) - ) - else: - log_error("Vector database does not support file insertion") - content.status = ContentStatus.FAILED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - content.external_id = result - content.status = ContentStatus.COMPLETED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - - except Exception as e: - log_error(f"Error uploading file to LightRAG: {e}") - content.status = ContentStatus.FAILED - content.status_message = f"Could not upload to LightRAG: {str(e)}" - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return + log_info(f"Ingesting file via external provider from path: {path}") + with open(path, "rb") as f: + file_content = f.read() + file_type = content.file_type or path.suffix + return await provider.aingest_file(file_content=file_content, filename=path.name, content_type=file_type) elif content_type == KnowledgeContentOrigin.URL: - log_info(f"Uploading file to LightRAG from URL: {content.url}") - try: - reader = content.reader or self.reader_registry.website_reader # type: ignore[union-attr] - if reader is None: - log_error("No URL reader available") - content.status = ContentStatus.FAILED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - - reader.chunk = False - read_documents = reader.read(content.url, name=content.name) - if not content.id: - content.id = generate_id(content.content_hash or "") - self.prepare_documents_for_insert(read_documents, content.id) - - if not read_documents: - log_error("No documents read from URL") - content.status = ContentStatus.FAILED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - - if self.vector_db and hasattr(self.vector_db, "insert_text"): - result = asyncio.run( - self.vector_db.insert_text( - file_source=content.url, - text=read_documents[0].content, - ) - ) - else: - log_error("Vector database does not support text insertion") - content.status = ContentStatus.FAILED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - - content.external_id = result - content.status = ContentStatus.COMPLETED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - - except Exception as e: - log_error(f"Error uploading file to LightRAG: {e}") - content.status = ContentStatus.FAILED - content.status_message = f"Could not upload to LightRAG: {str(e)}" - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return + log_info(f"Ingesting content via external provider from URL: {content.url}") + reader = content.reader or self.reader_registry.website_reader # type: ignore[union-attr] + if reader is None: + raise ValueError("No URL reader available") + reader.chunk = False + read_documents = reader.read(content.url, name=content.name) + if not content.id: + content.id = generate_id(content.content_hash or "") + self.prepare_documents_for_insert(read_documents, content.id) + if not read_documents: + raise ValueError("No documents read from URL") + return await provider.aingest_text(text=read_documents[0].content, source_name=content.url) elif content_type == KnowledgeContentOrigin.CONTENT: filename = ( content.file_data.filename if content.file_data and content.file_data.filename else "uploaded_file" ) - log_info(f"Uploading file to LightRAG: {filename}") - + log_info(f"Ingesting file via external provider: {filename}") if content.file_data and content.file_data.content: - if self.vector_db and hasattr(self.vector_db, "insert_file_bytes"): - result = asyncio.run( - self.vector_db.insert_file_bytes( - file_content=content.file_data.content, - filename=filename, - content_type=content.file_data.type, - send_metadata=True, - ) - ) - else: - log_error("Vector database does not support file insertion") - content.status = ContentStatus.FAILED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - content.external_id = result - content.status = ContentStatus.COMPLETED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] + return await provider.aingest_file( + file_content=content.file_data.content, + filename=filename, + content_type=content.file_data.type, + ) else: - log_warning(f"No file data available for LightRAG upload: {content.name}") - return + raise ValueError(f"No file data available for external provider upload: {content.name}") elif content_type == KnowledgeContentOrigin.TOPIC: - log_info(f"Uploading file to LightRAG: {content.name}") - + log_info(f"Ingesting topic via external provider: {content.name}") if content.reader is None: - log_error("No reader available for topic content") - content.status = ContentStatus.FAILED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - + raise ValueError("No reader available for topic content") if not content.topics: - log_error("No topics available for content") - content.status = ContentStatus.FAILED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - + raise ValueError("No topics available for content") read_documents = content.reader.read(content.topics) if len(read_documents) > 0: - if self.vector_db and hasattr(self.vector_db, "insert_text"): - result = asyncio.run( - self.vector_db.insert_text( - file_source=content.topics[0], - text=read_documents[0].content, - ) - ) - else: - log_error("Vector database does not support text insertion") - content.status = ContentStatus.FAILED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return - content.external_id = result - content.status = ContentStatus.COMPLETED - self.content_store.update(content, vector_db=self.vector_db) # type: ignore[union-attr] - return + return await provider.aingest_text(text=read_documents[0].content, source_name=content.topics[0]) else: - log_warning(f"No documents found for LightRAG upload: {content.name}") - return + raise ValueError(f"No documents found for external provider upload: {content.name}") + + raise ValueError(f"Unsupported content type: {content_type}") diff --git a/libs/agno/agno/knowledge/remote_knowledge.py b/libs/agno/agno/knowledge/remote_knowledge.py index e8bc964c2b..230aeda9aa 100644 --- a/libs/agno/agno/knowledge/remote_knowledge.py +++ b/libs/agno/agno/knowledge/remote_knowledge.py @@ -9,7 +9,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, List, Optional +from typing import TYPE_CHECKING, List, Optional if TYPE_CHECKING: from agno.knowledge.pipeline.ingestion import IngestionPipeline diff --git a/libs/agno/agno/vectordb/lightrag/__init__.py b/libs/agno/agno/vectordb/lightrag/__init__.py index 1d246bd67e..63eb4059f9 100644 --- a/libs/agno/agno/vectordb/lightrag/__init__.py +++ b/libs/agno/agno/vectordb/lightrag/__init__.py @@ -1,5 +1,5 @@ -from agno.vectordb.lightrag.lightrag import LightRag +from agno.knowledge.external_provider.lightrag import LightRagProvider __all__ = [ - "LightRag", + "LightRagProvider", ] diff --git a/libs/agno/agno/vectordb/lightrag/lightrag.py b/libs/agno/agno/vectordb/lightrag/lightrag.py deleted file mode 100644 index d4b54ad3fa..0000000000 --- a/libs/agno/agno/vectordb/lightrag/lightrag.py +++ /dev/null @@ -1,389 +0,0 @@ -import asyncio -from typing import Any, Dict, List, Optional, Union - -import httpx - -from agno.filters import FilterExpr -from agno.knowledge.document import Document -from agno.utils.log import log_debug, log_error, log_info, log_warning -from agno.vectordb.base import VectorDb - -DEFAULT_SERVER_URL = "http://localhost:9621" - - -class LightRag(VectorDb): - """ - LightRAG VectorDB implementation - """ - - def __init__( - self, - server_url: str = DEFAULT_SERVER_URL, - api_key: Optional[str] = None, - auth_header_name: str = "X-API-KEY", - auth_header_format: str = "{api_key}", - name: Optional[str] = None, - description: Optional[str] = None, - ): - self.server_url = server_url - self.api_key = api_key - # Initialize base class with name and description - super().__init__(name=name, description=description) - - self.auth_header_name = auth_header_name - self.auth_header_format = auth_header_format - - def _get_headers(self) -> Dict[str, str]: - """Get headers with optional API key authentication.""" - headers = {"Content-Type": "application/json"} - if self.api_key: - headers[self.auth_header_name] = self.auth_header_format.format(api_key=self.api_key) - return headers - - def _get_auth_headers(self) -> Dict[str, str]: - """Get minimal headers with just authentication (for file uploads).""" - headers = {} - if self.api_key: - headers[self.auth_header_name] = self.auth_header_format.format(api_key=self.api_key) - return headers - - def create(self) -> None: - """Create the vector database""" - pass - - async def async_create(self) -> None: - """Async create the vector database""" - pass - - def name_exists(self, name: str) -> bool: - """Check if a document with the given name exists""" - return False - - async def async_name_exists(self, name: str) -> bool: - """Async check if a document with the given name exists""" - return False - - def id_exists(self, id: str) -> bool: - """Check if a document with the given ID exists""" - return False - - def content_hash_exists(self, content_hash: str) -> bool: - """Check if content with the given hash exists""" - return False - - def insert(self, content_hash: str, documents: List[Document], filters: Optional[Dict[str, Any]] = None) -> None: - """Insert documents into the vector database""" - pass - - async def async_insert( - self, content_hash: str, documents: List[Document], filters: Optional[Dict[str, Any]] = None - ) -> None: - """Async insert documents into the vector database""" - pass - - def upsert(self, content_hash: str, documents: List[Document], filters: Optional[Dict[str, Any]] = None) -> None: - """Upsert documents into the vector database""" - pass - - def delete_by_content_id(self, content_id: str) -> None: - """Delete documents by content ID""" - pass - - async def async_upsert(self, documents: List[Document], filters: Optional[Dict[str, Any]] = None) -> None: - """Async upsert documents into the vector database""" - pass - - def search( - self, query: str, limit: int = 5, filters: Optional[Union[Dict[str, Any], List[FilterExpr]]] = None - ) -> List[Document]: - result = asyncio.run(self.async_search(query, limit=limit, filters=filters)) - return result if result is not None else [] - - async def async_search( - self, query: str, limit: Optional[int] = None, filters: Optional[Union[Dict[str, Any], List[FilterExpr]]] = None - ) -> Optional[List[Document]]: - mode: str = "hybrid" # Default mode, can be "local", "global", or "hybrid" - if filters is not None: - log_warning("Filters are not supported in LightRAG. No filters will be applied.") - try: - async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.post( - f"{self.server_url}/query", - json={"query": query, "mode": "hybrid", "include_references": True}, - headers=self._get_headers(), - ) - - response.raise_for_status() - result = response.json() - - return self._format_lightrag_response(result, query, mode) - - except httpx.RequestError as e: - log_error(f"HTTP Request Error: {type(e).__name__}: {str(e)}") - return [] - except httpx.HTTPStatusError as e: - log_error(f"HTTP Status Error: {e.response.status_code} - {e.response.text}") - return [] - except Exception as e: - log_error(f"Unexpected error during LightRAG server search: {type(e).__name__}: {str(e)}") - import traceback - - log_error(f"Full traceback: {traceback.format_exc()}") - return None - - def drop(self) -> None: - """Drop the vector database""" - asyncio.run(self.async_drop()) - - async def async_drop(self) -> None: - """Async drop the vector database""" - async with httpx.AsyncClient(timeout=30.0) as client: - await client.delete(f"{self.server_url}/documents", headers=self._get_headers()) - - async with httpx.AsyncClient(timeout=30.0) as client: - await client.post( - f"{self.server_url}/documents/clear_cache", - json={"modes": ["default", "naive"]}, - headers=self._get_headers(), - ) - - def exists(self) -> bool: - """Check if the vector database exists""" - return False - - async def async_exists(self) -> bool: - """Async check if the vector database exists""" - return False - - def delete(self) -> bool: - """Delete all documents from the vector database""" - return False - - def delete_by_id(self, id: str) -> bool: - """Delete documents by ID""" - return False - - def delete_by_name(self, name: str) -> bool: - """Delete documents by name""" - return False - - def delete_by_metadata(self, metadata: Dict[str, Any]) -> bool: - """Delete documents by metadata""" - return False - - def delete_by_external_id(self, external_id: str) -> bool: - """Delete documents by external ID (sync wrapper)""" - import asyncio - - try: - return asyncio.run(self.async_delete_by_external_id(external_id)) - except Exception as e: - log_error(f"Error in sync delete_by_external_id: {e}") - return False - - async def async_delete_by_external_id(self, external_id: str) -> bool: - """Delete documents by external ID""" - try: - payload = {"doc_ids": [external_id], "delete_file": False} - - async with httpx.AsyncClient() as client: - response = await client.request( - method="DELETE", - url=f"{self.server_url}/documents/delete_document", - headers=self._get_headers(), - json=payload, - ) - response.raise_for_status() - return True - except Exception as e: - log_error(f"Error deleting document {external_id}: {e}") - return False - - # We use this method when content is coming from unsupported file types that LightRAG can't process - # For these we process the content in Agno and then insert it into LightRAG using text - async def _insert_text(self, text: str) -> Dict[str, Any]: - """Insert text into the LightRAG server.""" - - async with httpx.AsyncClient() as client: - response = await client.post( - f"{self.server_url}/documents/text", - json={"text": text}, - headers=self._get_headers(), - ) - response.raise_for_status() - result = response.json() - log_debug(f"Text insertion result: {result}") - return result - - async def insert_file_bytes( - self, - file_content: bytes, - filename: Optional[str] = None, - content_type: Optional[str] = None, - send_metadata: bool = False, - skip_if_exists: bool = False, - ) -> Optional[str]: - """Insert file from raw bytes into the LightRAG server.""" - - if not file_content: - log_warning("File content is empty.") - return None - - if send_metadata and filename and content_type: - # Send with filename and content type (full UploadFile format) - files = {"file": (filename, file_content, content_type)} - else: - files = {"file": file_content} # type: ignore - - async with httpx.AsyncClient() as client: - response = await client.post( - f"{self.server_url}/documents/upload", - files=files, - headers=self._get_auth_headers(), - ) - response.raise_for_status() - result = response.json() - log_info(f"File insertion result: {result}") - track_id = result["track_id"] - log_info(f"Track ID: {track_id}") - result = await self._get_document_id(track_id) # type: ignore - log_info(f"Document ID: {result}") - - return result - - async def insert_text(self, file_source: str, text: str) -> Optional[str]: - """Insert text into the LightRAG server.""" - import httpx - - async with httpx.AsyncClient() as client: - response = await client.post( - f"{self.server_url}/documents/text", - json={"file_source": file_source, "text": text}, - headers=self._get_headers(), - ) - response.raise_for_status() - result = response.json() - - log_info(f"Text insertion result: {result}") - track_id = result["track_id"] - log_info(f"Track ID: {track_id}") - result = await self._get_document_id(track_id) # type: ignore - log_info(f"Document ID: {result}") - - return result - - async def _get_document_id(self, track_id: str) -> Optional[str]: - """Get the document ID from the upload ID.""" - async with httpx.AsyncClient() as client: - response = await client.get( - f"{self.server_url}/documents/track_status/{track_id}", - headers=self._get_headers(), - ) - response.raise_for_status() - result = response.json() - - log_debug(f"Document ID result: {result}") - - # Extract document ID from the documents array - if "documents" in result and len(result["documents"]) > 0: - document_id = result["documents"][0]["id"] - return document_id - else: - log_error(f"No documents found in track response: {result}") - return None - - def _is_valid_url(self, url: str) -> bool: - """Helper to check if URL is valid.""" - # TODO: Define supported extensions or implement proper URL validation - return True - - async def lightrag_knowledge_retriever( - self, - query: str, - ) -> Optional[List[Document]]: - """ - Custom knowledge retriever function to search the LightRAG server for relevant documents. - - Args: - query: The search query string - num_documents: Number of documents to retrieve (currently unused by LightRAG) - mode: Query mode - "local", "global", or "hybrid" - lightrag_server_url: URL of the LightRAG server - - Returns: - List of retrieved documents or None if search fails - """ - - mode: str = "hybrid" # Default mode, can be "local", "global", or "hybrid" - - try: - import httpx - - async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.post( - f"{self.server_url}/query", - json={"query": query, "mode": "hybrid", "include_references": True}, - headers=self._get_headers(), - ) - - response.raise_for_status() - result = response.json() - - return self._format_lightrag_response(result, query, mode) - - except httpx.RequestError as e: - log_error(f"HTTP Request Error: {type(e).__name__}: {str(e)}") - return None - except httpx.HTTPStatusError as e: - log_error(f"HTTP Status Error: {e.response.status_code} - {e.response.text}") - return None - except Exception as e: - log_error(f"Unexpected error during LightRAG server search: {type(e).__name__}: {str(e)}") - import traceback - - log_error(f"Full traceback: {traceback.format_exc()}") - return None - - def _format_lightrag_response(self, result: Any, query: str, mode: str) -> List[Document]: - """Format LightRAG server response to expected document format.""" - # LightRAG server returns a dict with 'response' key, but we expect a list of documents - # Convert the response to the expected format - if isinstance(result, dict) and "response" in result: - meta_data = {"source": "lightrag", "query": query, "mode": mode} - # Preserve references from LightRAG response for document citations - if "references" in result: - meta_data["references"] = result["references"] - return [Document(content=result["response"], meta_data=meta_data)] - elif isinstance(result, list): - # Convert list items to Document objects - documents = [] - for item in result: - if isinstance(item, dict) and "content" in item: - documents.append( - Document( - content=item["content"], - meta_data=item.get("metadata", {"source": "lightrag", "query": query, "mode": mode}), - ) - ) - else: - documents.append( - Document(content=str(item), meta_data={"source": "lightrag", "query": query, "mode": mode}) - ) - return documents - else: - # If it's a string or other format, wrap it in a Document - return [Document(content=str(result), meta_data={"source": "lightrag", "query": query, "mode": mode})] - - def update_metadata(self, content_id: str, metadata: Dict[str, Any]) -> None: - """ - Update metadata is not supported for LightRag as it manages its own graph structure. - - Args: - content_id (str): The content ID to update - metadata (Dict[str, Any]): The metadata to update - """ - raise NotImplementedError("update_metadata not supported for LightRag - use LightRag's native methods") - - def get_supported_search_types(self) -> List[str]: - """Get the supported search types for this vector database.""" - return [] # LightRag doesn't use SearchType enum diff --git a/libs/agno/tests/unit/knowledge/test_external_provider.py b/libs/agno/tests/unit/knowledge/test_external_provider.py new file mode 100644 index 0000000000..d573891eda --- /dev/null +++ b/libs/agno/tests/unit/knowledge/test_external_provider.py @@ -0,0 +1,500 @@ +"""Tests for ExternalKnowledgeProvider protocol and integration.""" + +from typing import Any, Dict, List, Optional +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agno.knowledge.content import Content, ContentStatus, FileData +from agno.knowledge.document import Document +from agno.knowledge.external_provider import ExternalKnowledgeProvider +from agno.knowledge.external_provider.lightrag import LightRagProvider +from agno.knowledge.external_provider.schemas import ProcessingResult +from agno.knowledge.knowledge import Knowledge +from agno.knowledge.pipeline.ingestion import KnowledgeContentOrigin +from agno.knowledge.utils import get_agno_metadata +from agno.vectordb.base import VectorDb + +# ------------------------------------------------------------------ +# Mock implementations +# ------------------------------------------------------------------ + + +class MockExternalProvider: + """A mock that satisfies the ExternalKnowledgeProvider protocol.""" + + def __init__(self): + self.ingested_files: List[str] = [] + self.ingested_texts: List[str] = [] + self.queries: List[str] = [] + self.deleted_ids: List[str] = [] + self._status_responses: Dict[str, ProcessingResult] = {} + + def ingest_file( + self, + file_content: bytes, + filename: Optional[str] = None, + content_type: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + self.ingested_files.append(filename or "unknown") + pid = f"proc-file-{len(self.ingested_files)}" + return ProcessingResult(processing_id=pid, status=ContentStatus.PROCESSING) + + async def aingest_file( + self, + file_content: bytes, + filename: Optional[str] = None, + content_type: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + self.ingested_files.append(filename or "unknown") + pid = f"proc-file-{len(self.ingested_files)}" + return ProcessingResult(processing_id=pid, status=ContentStatus.PROCESSING) + + def ingest_text( + self, + text: str, + source_name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + self.ingested_texts.append(source_name or "unknown") + pid = f"proc-text-{len(self.ingested_texts)}" + return ProcessingResult(processing_id=pid, status=ContentStatus.PROCESSING) + + async def aingest_text( + self, + text: str, + source_name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> ProcessingResult: + self.ingested_texts.append(source_name or "unknown") + pid = f"proc-text-{len(self.ingested_texts)}" + return ProcessingResult(processing_id=pid, status=ContentStatus.PROCESSING) + + def query( + self, + query: str, + limit: int = 10, + mode: Optional[str] = None, + filters: Optional[Dict[str, Any]] = None, + ) -> List[Document]: + self.queries.append(query) + return [Document(content=f"Result for: {query}", meta_data={"source": "mock"})] + + async def aquery( + self, + query: str, + limit: int = 10, + mode: Optional[str] = None, + filters: Optional[Dict[str, Any]] = None, + ) -> List[Document]: + self.queries.append(query) + return [Document(content=f"Result for: {query}", meta_data={"source": "mock"})] + + def delete_content(self, external_id: str) -> bool: + self.deleted_ids.append(external_id) + return True + + async def adelete_content(self, external_id: str) -> bool: + self.deleted_ids.append(external_id) + return True + + def get_status(self, processing_id: str) -> ProcessingResult: + if processing_id in self._status_responses: + return self._status_responses[processing_id] + return ProcessingResult(processing_id=processing_id, status=ContentStatus.PROCESSING) + + async def aget_status(self, processing_id: str) -> ProcessingResult: + return self.get_status(processing_id) + + +class MockVectorDb(VectorDb): + """A regular VectorDb that does NOT satisfy ExternalKnowledgeProvider.""" + + def create(self) -> None: + pass + + async def async_create(self) -> None: + pass + + def name_exists(self, name: str) -> bool: + return False + + async def async_name_exists(self, name: str) -> bool: + return False + + def id_exists(self, id: str) -> bool: + return False + + def content_hash_exists(self, content_hash: str) -> bool: + return False + + def insert(self, content_hash: str, documents: List[Document], filters=None) -> None: + pass + + async def async_insert(self, content_hash: str, documents: List[Document], filters=None) -> None: + pass + + def upsert(self, content_hash: str, documents: List[Document], filters=None) -> None: + pass + + async def async_upsert(self, content_hash: str, documents: List[Document], filters=None) -> None: + pass + + def search(self, query: str, limit: int = 5, filters=None) -> List[Document]: + return [Document(content="regular-result")] + + async def async_search(self, query: str, limit: int = 5, filters=None) -> List[Document]: + return [Document(content="regular-result")] + + def drop(self) -> None: + pass + + async def async_drop(self) -> None: + pass + + def exists(self) -> bool: + return True + + async def async_exists(self) -> bool: + return True + + def delete(self) -> bool: + return True + + def delete_by_id(self, id: str) -> bool: + return True + + def delete_by_name(self, name: str) -> bool: + return True + + def delete_by_metadata(self, metadata) -> bool: + return True + + def update_metadata(self, content_id: str, metadata) -> None: + pass + + def delete_by_content_id(self, content_id: str) -> bool: + return True + + def get_supported_search_types(self) -> List[str]: + return ["vector"] + + +# ------------------------------------------------------------------ +# Protocol detection tests +# ------------------------------------------------------------------ + + +class TestProtocolDetection: + def test_mock_provider_satisfies_protocol(self): + provider = MockExternalProvider() + assert isinstance(provider, ExternalKnowledgeProvider) + + def test_lightrag_provider_satisfies_protocol(self): + provider = LightRagProvider() + assert isinstance(provider, ExternalKnowledgeProvider) + + def test_regular_vectordb_does_not_satisfy_protocol(self): + vdb = MockVectorDb() + assert not isinstance(vdb, ExternalKnowledgeProvider) + + +# ------------------------------------------------------------------ +# Knowledge external_provider field tests +# ------------------------------------------------------------------ + + +class TestKnowledgeExternalProvider: + def test_regular_vectordb_no_external_provider(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + assert knowledge.external_provider is None + + def test_external_provider_not_passed_to_pipeline_when_regular_vdb(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + assert knowledge._pipeline.external_provider is None + + def test_external_provider_passed_to_pipeline(self): + provider = MockExternalProvider() + knowledge = Knowledge(external_provider=provider) + assert knowledge._pipeline.external_provider is provider + + +# ------------------------------------------------------------------ +# Search routing tests +# ------------------------------------------------------------------ + + +class TestSearchRouting: + def test_search_routes_to_regular_vectordb(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + results = knowledge.search("test query") + assert len(results) == 1 + assert results[0].content == "regular-result" + + def test_search_routes_to_external_provider(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + knowledge.external_provider = provider + + results = knowledge.search("test query") + assert len(results) == 1 + assert "test query" in results[0].content + assert len(provider.queries) == 1 + + @pytest.mark.asyncio + async def test_asearch_routes_to_external_provider(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + knowledge.external_provider = provider + + results = await knowledge.asearch("async query") + assert len(results) == 1 + assert "async query" in results[0].content + assert len(provider.queries) == 1 + + +# ------------------------------------------------------------------ +# Delete routing tests +# ------------------------------------------------------------------ + + +class TestDeleteRouting: + def test_delete_routes_to_external_provider(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + knowledge.external_provider = provider + + # Mock content store to return content with external_id + mock_content = Content(name="test", external_id="ext-123") + knowledge._content_store.get_content_by_id = MagicMock(return_value=mock_content) + knowledge._content_store.contents_db = None + knowledge.contents_db = None + + knowledge.remove_content_by_id("content-1") + assert "ext-123" in provider.deleted_ids + + @pytest.mark.asyncio + async def test_adelete_routes_to_external_provider(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + knowledge.external_provider = provider + + mock_content = Content(name="test", external_id="ext-456") + knowledge._content_store.aget_content_by_id = AsyncMock(return_value=mock_content) + knowledge._content_store.contents_db = None + knowledge.contents_db = None + + await knowledge.aremove_content_by_id("content-2") + assert "ext-456" in provider.deleted_ids + + def test_delete_regular_vectordb_uses_content_id(self): + vdb = MockVectorDb() + vdb.delete_by_content_id = MagicMock(return_value=True) + knowledge = Knowledge(vector_db=vdb) + knowledge.contents_db = None + + knowledge.remove_content_by_id("content-3") + vdb.delete_by_content_id.assert_called_once_with("content-3") + + +# ------------------------------------------------------------------ +# Pipeline external ingestion tests +# ------------------------------------------------------------------ + + +class TestPipelineExternalIngestion: + def test_ingest_external_content_type(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + pipeline = knowledge._pipeline + pipeline.external_provider = provider + pipeline.content_store.insert = MagicMock() + pipeline.content_store.update = MagicMock() + + content = Content( + name="test.txt", + file_data=FileData(content=b"hello world", type="text/plain", filename="test.txt"), + ) + pipeline._ingest_external(content, KnowledgeContentOrigin.CONTENT) + + assert len(provider.ingested_files) == 1 + assert content.status == ContentStatus.PROCESSING + assert get_agno_metadata(content.metadata, "processing_id") is not None + + def test_ingest_external_topic(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + pipeline = knowledge._pipeline + pipeline.external_provider = provider + pipeline.content_store.insert = MagicMock() + pipeline.content_store.update = MagicMock() + + mock_reader = MagicMock() + mock_reader.read = MagicMock(return_value=[Document(content="topic content")]) + + content = Content(name="topic1", topics=["topic1"], reader=mock_reader) + pipeline._ingest_external(content, KnowledgeContentOrigin.TOPIC) + + assert len(provider.ingested_texts) == 1 + assert content.status == ContentStatus.PROCESSING + + @pytest.mark.asyncio + async def test_aingest_external_content_type(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + pipeline = knowledge._pipeline + pipeline.external_provider = provider + pipeline.content_store.ainsert = AsyncMock() + pipeline.content_store.aupdate = AsyncMock() + + content = Content( + name="test.txt", + file_data=FileData(content=b"hello world", type="text/plain", filename="test.txt"), + ) + await pipeline._aingest_external(content, KnowledgeContentOrigin.CONTENT) + + assert len(provider.ingested_files) == 1 + assert content.status == ContentStatus.PROCESSING + assert get_agno_metadata(content.metadata, "processing_id") is not None + + def test_ingest_external_failure_sets_status(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + provider.ingest_file = MagicMock(side_effect=Exception("Upload failed")) + pipeline = knowledge._pipeline + pipeline.external_provider = provider + pipeline.content_store.insert = MagicMock() + pipeline.content_store.update = MagicMock() + + content = Content( + name="test.txt", + file_data=FileData(content=b"hello world", type="text/plain", filename="test.txt"), + ) + pipeline._ingest_external(content, KnowledgeContentOrigin.CONTENT) + + assert content.status == ContentStatus.FAILED + assert "Upload failed" in (content.status_message or "") + + +# ------------------------------------------------------------------ +# Status resolution tests +# ------------------------------------------------------------------ + + +class TestStatusResolution: + def test_resolve_external_status_completed(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + provider._status_responses["proc-123"] = ProcessingResult( + processing_id="proc-123", + external_id="doc-456", + status=ContentStatus.COMPLETED, + ) + knowledge.external_provider = provider + + mock_content = Content( + id="content-1", + name="test", + metadata={"_agno": {"processing_id": "proc-123"}}, + status=ContentStatus.PROCESSING, + ) + knowledge._content_store.get_content_by_id = MagicMock(return_value=mock_content) + knowledge._content_store.update = MagicMock() + + status, message = knowledge._resolve_external_status("content-1", ContentStatus.PROCESSING, None) + assert status == ContentStatus.COMPLETED + assert mock_content.external_id == "doc-456" + knowledge._content_store.update.assert_called_once() + + def test_resolve_external_status_failed(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + provider._status_responses["proc-789"] = ProcessingResult( + processing_id="proc-789", + status=ContentStatus.FAILED, + status_message="External provider processing failed", + ) + knowledge.external_provider = provider + + mock_content = Content( + id="content-2", + name="test", + metadata={"_agno": {"processing_id": "proc-789"}}, + status=ContentStatus.PROCESSING, + ) + knowledge._content_store.get_content_by_id = MagicMock(return_value=mock_content) + knowledge._content_store.update = MagicMock() + + status, message = knowledge._resolve_external_status("content-2", ContentStatus.PROCESSING, None) + assert status == ContentStatus.FAILED + assert message == "External provider processing failed" + knowledge._content_store.update.assert_called_once() + + def test_resolve_external_status_still_processing(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + knowledge.external_provider = provider + + mock_content = Content( + id="content-3", + name="test", + metadata={"_agno": {"processing_id": "proc-still"}}, + status=ContentStatus.PROCESSING, + ) + knowledge._content_store.get_content_by_id = MagicMock(return_value=mock_content) + knowledge._content_store.update = MagicMock() + + status, message = knowledge._resolve_external_status("content-3", ContentStatus.PROCESSING, None) + assert status == ContentStatus.PROCESSING + # Should NOT persist when still processing + knowledge._content_store.update.assert_not_called() + + def test_resolve_external_status_no_processing_id(self): + """Falls back to external_id when processing_id is not in _agno metadata.""" + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + provider._status_responses["ext-fallback"] = ProcessingResult( + processing_id="ext-fallback", + status=ContentStatus.COMPLETED, + external_id="ext-fallback", + ) + knowledge.external_provider = provider + + mock_content = Content( + id="content-4", + name="test", + external_id="ext-fallback", + status=ContentStatus.PROCESSING, + ) + knowledge._content_store.get_content_by_id = MagicMock(return_value=mock_content) + knowledge._content_store.update = MagicMock() + + status, _ = knowledge._resolve_external_status("content-4", ContentStatus.PROCESSING, None) + assert status == ContentStatus.COMPLETED + + @pytest.mark.asyncio + async def test_aresolve_external_status_completed(self): + knowledge = Knowledge(vector_db=MockVectorDb()) + provider = MockExternalProvider() + provider._status_responses["proc-async"] = ProcessingResult( + processing_id="proc-async", + external_id="doc-async", + status=ContentStatus.COMPLETED, + ) + knowledge.external_provider = provider + + mock_content = Content( + id="content-5", + name="test", + metadata={"_agno": {"processing_id": "proc-async"}}, + status=ContentStatus.PROCESSING, + ) + knowledge._content_store.aget_content_by_id = AsyncMock(return_value=mock_content) + knowledge._content_store.aupdate = AsyncMock() + + status, message = await knowledge._aresolve_external_status("content-5", ContentStatus.PROCESSING, None) + assert status == ContentStatus.COMPLETED + assert mock_content.external_id == "doc-async" + knowledge._content_store.aupdate.assert_called_once() diff --git a/libs/agno/tests/unit/knowledge/test_knowledge_topic_loading.py b/libs/agno/tests/unit/knowledge/test_knowledge_topic_loading.py index fd06a7e7f4..2192aba144 100644 --- a/libs/agno/tests/unit/knowledge/test_knowledge_topic_loading.py +++ b/libs/agno/tests/unit/knowledge/test_knowledge_topic_loading.py @@ -206,15 +206,16 @@ def test_load_from_topics_all_skipped(): assert pipeline.content_store.update.call_count == 3 -def test_load_from_topics_lightrag_continues(): +def test_load_from_topics_external_provider_continues(): knowledge = Knowledge(vector_db=MockVectorDb()) - knowledge.vector_db.__class__.__name__ = "LightRag" processed_topics = [] + mock_provider = MagicMock() + mock_provider.ingest_text = MagicMock(return_value="ext-123") + pipeline = knowledge._pipeline - pipeline.process_lightrag_content = MagicMock( - side_effect=lambda content, origin: processed_topics.append(content.name) - ) + pipeline.external_provider = mock_provider + pipeline._ingest_external = MagicMock(side_effect=lambda content, origin: processed_topics.append(content.name)) pipeline.build_content_hash = MagicMock(return_value="hash") pipeline.content_store.insert = MagicMock() @@ -229,17 +230,19 @@ def test_load_from_topics_lightrag_continues(): @pytest.mark.asyncio -async def test_aload_from_topics_lightrag_continues(): +async def test_aload_from_topics_external_provider_continues(): knowledge = Knowledge(vector_db=MockVectorDb()) - knowledge.vector_db.__class__.__name__ = "LightRag" processed_topics = [] - async def mock_process_lightrag(content, origin): + async def mock_aingest_external(content, origin): processed_topics.append(content.name) + mock_provider = MagicMock() + pipeline = knowledge._pipeline - pipeline.aprocess_lightrag_content = mock_process_lightrag + pipeline.external_provider = mock_provider + pipeline._aingest_external = mock_aingest_external pipeline.build_content_hash = MagicMock(return_value="hash") pipeline.content_store.ainsert = AsyncMock() diff --git a/libs/agno/tests/unit/vectordb/test_lightrag.py b/libs/agno/tests/unit/vectordb/test_lightrag.py index fa91a35c7c..2e6494c61a 100644 --- a/libs/agno/tests/unit/vectordb/test_lightrag.py +++ b/libs/agno/tests/unit/vectordb/test_lightrag.py @@ -1,47 +1,43 @@ import pytest -from agno.vectordb.lightrag import LightRag +from agno.knowledge.external_provider.lightrag import LightRagProvider TEST_SERVER_URL = "http://localhost:9621" TEST_API_KEY = "test_api_key" @pytest.fixture -def lightrag_db(): - """Fixture to create a LightRag instance""" - db = LightRag( +def lightrag_provider(): + """Fixture to create a LightRagProvider instance""" + provider = LightRagProvider( server_url=TEST_SERVER_URL, api_key=TEST_API_KEY, ) - yield db + yield provider def test_initialization(): """Test basic initialization with defaults""" - db = LightRag() + provider = LightRagProvider() - assert db.server_url == "http://localhost:9621" - assert db.api_key is None + assert provider.server_url == "http://localhost:9621" + assert provider.api_key is None def test_initialization_with_params(): """Test initialization with custom parameters""" - db = LightRag( + provider = LightRagProvider( server_url="http://custom:8080", api_key="secret", - name="test_db", - description="Test database", ) - assert db.server_url == "http://custom:8080" - assert db.api_key == "secret" - assert db.name == "test_db" - assert db.description == "Test database" + assert provider.server_url == "http://custom:8080" + assert provider.api_key == "secret" -def test_get_headers_with_api_key(lightrag_db): +def test_get_headers_with_api_key(lightrag_provider): """Test headers include API key when configured""" - headers = lightrag_db._get_headers() + headers = lightrag_provider._get_headers() assert headers["Content-Type"] == "application/json" assert headers["X-API-KEY"] == TEST_API_KEY @@ -49,16 +45,16 @@ def test_get_headers_with_api_key(lightrag_db): def test_get_headers_without_api_key(): """Test headers without API key""" - db = LightRag(server_url=TEST_SERVER_URL) - headers = db._get_headers() + provider = LightRagProvider(server_url=TEST_SERVER_URL) + headers = provider._get_headers() assert headers["Content-Type"] == "application/json" assert "X-API-KEY" not in headers -def test_get_auth_headers(lightrag_db): +def test_get_auth_headers(lightrag_provider): """Test auth headers for file uploads""" - headers = lightrag_db._get_auth_headers() + headers = lightrag_provider._get_auth_headers() assert "Content-Type" not in headers assert headers["X-API-KEY"] == TEST_API_KEY @@ -66,18 +62,18 @@ def test_get_auth_headers(lightrag_db): def test_custom_auth_header_format(): """Test custom auth header name and format""" - db = LightRag( + provider = LightRagProvider( server_url=TEST_SERVER_URL, api_key="my_key", auth_header_name="Authorization", auth_header_format="Bearer {api_key}", ) - headers = db._get_headers() + headers = provider._get_headers() assert headers["Authorization"] == "Bearer my_key" -def test_format_response_with_references(lightrag_db): +def test_format_response_with_references(lightrag_provider): """Test that references are preserved in meta_data""" result = { "response": "Jordan Mitchell has skills in Python and JavaScript.", @@ -87,7 +83,7 @@ def test_format_response_with_references(lightrag_db): ], } - documents = lightrag_db._format_lightrag_response(result, "What skills?", "hybrid") + documents = lightrag_provider._format_response(result, "What skills?", "hybrid") assert len(documents) == 1 assert documents[0].content == "Jordan Mitchell has skills in Python and JavaScript." @@ -99,47 +95,47 @@ def test_format_response_with_references(lightrag_db): assert documents[0].meta_data["references"][0]["file_path"] == "cv_1.pdf" -def test_format_response_without_references(lightrag_db): +def test_format_response_without_references(lightrag_provider): """Test backward compatibility when no references in response""" result = {"response": "Some content without references."} - documents = lightrag_db._format_lightrag_response(result, "query", "local") + documents = lightrag_provider._format_response(result, "query", "local") assert len(documents) == 1 assert documents[0].content == "Some content without references." assert "references" not in documents[0].meta_data -def test_format_response_list_with_content(lightrag_db): +def test_format_response_list_with_content(lightrag_provider): """Test formatting list response with content field""" result = [ {"content": "First document", "metadata": {"source": "custom"}}, {"content": "Second document"}, ] - documents = lightrag_db._format_lightrag_response(result, "query", "global") + documents = lightrag_provider._format_response(result, "query", "global") assert len(documents) == 2 assert documents[0].content == "First document" assert documents[0].meta_data["source"] == "custom" -def test_format_response_list_plain_strings(lightrag_db): +def test_format_response_list_plain_strings(lightrag_provider): """Test formatting list response with plain strings""" result = ["plain text item 1", "plain text item 2"] - documents = lightrag_db._format_lightrag_response(result, "query", "hybrid") + documents = lightrag_provider._format_response(result, "query", "hybrid") assert len(documents) == 2 assert documents[0].content == "plain text item 1" assert documents[0].meta_data["source"] == "lightrag" -def test_format_response_string(lightrag_db): +def test_format_response_string(lightrag_provider): """Test formatting plain string response""" result = "Just a plain string response" - documents = lightrag_db._format_lightrag_response(result, "query", "hybrid") + documents = lightrag_provider._format_response(result, "query", "hybrid") assert len(documents) == 1 assert documents[0].content == "Just a plain string response"