diff --git a/agentic_rag/README.md b/agentic_rag/README.md index e421048..c046acd 100644 --- a/agentic_rag/README.md +++ b/agentic_rag/README.md @@ -10,7 +10,7 @@ The system has the following features: - Intelligent query routing - PDF processing using Docling for accurate text extraction and chunking -- Persistent vector storage with ChromaDB (PDF and Websites) +- Persistent vector storage with ChromaDB and Oracle Database 23ai (PDF and Websites) - Smart context retrieval and response generation - FastAPI-based REST API for document upload and querying - Support for both OpenAI-based agents or local, transformer-based agents (`Mistral-7B` by default) @@ -116,9 +116,10 @@ python main.py The API will be available at `http://localhost:8000`. You can then use the API endpoints as described in the API Endpoints section below. -### 2. Using the Gradio Interface +### 2. Using the Gradio Interface (Recommended) The system provides a user-friendly web interface using Gradio, which allows you to: +- Select and pull `ollama` models directly from the interface - Upload and process PDF documents - Process web content from URLs - Chat with your documents using either local or OpenAI models @@ -357,7 +358,7 @@ The system consists of several key components: 1. **PDF Processor**: we use `docling` to extract and chunk text from PDF documents 2. **Web Processor**: we use `trafilatura` to extract and chunk text from websites 3. **GitHub Repository Processor**: we use `gitingest` to extract and chunk text from repositories -4. **Vector Store**: Manages document embeddings and similarity search using `ChromaDB` +4. **Vector Store**: Manages document embeddings and similarity search using `ChromaDB` and `Oracle Database 23ai` 5. **RAG Agent**: Makes intelligent decisions about query routing and response generation - OpenAI Agent: Uses `gpt-4-turbo-preview` for high-quality responses, but requires an OpenAI API key - Local Agent: Uses `Mistral-7B` as an open-source alternative diff --git a/agentic_rag/articles/kubernetes_rag.md b/agentic_rag/articles/kubernetes_rag.md index 48cd03d..f95c6d9 100644 --- a/agentic_rag/articles/kubernetes_rag.md +++ b/agentic_rag/articles/kubernetes_rag.md @@ -146,6 +146,13 @@ Then, we can start setting up the solution in our cluster by following these ste kubectl apply -n agentic-rag -f local-deployment/service.yaml ``` + If for some reason, after applying these, there's a `NoSchedule` policy being triggered, you can untaint the nodes and try again: + + ```bash + kubectl taint nodes -l node.kubernetes.io/instance-type=VM.GPU.A10.1 nvidia.com/gpu:NoSchedule- + # make sure to select your own instance shape if you're using a different type than A10 GPU. + ``` + 5. Monitor the Deployment With the following commands, we can check the status of our pod: diff --git a/agentic_rag/gradio_app.py b/agentic_rag/gradio_app.py index 6a8f98e..1aa74dd 100644 --- a/agentic_rag/gradio_app.py +++ b/agentic_rag/gradio_app.py @@ -91,6 +91,17 @@ def chat(message: str, history: List[List[str]], agent_type: str, use_cot: bool, # Skip analysis for General Knowledge or when using standard chat interface (not CoT) skip_analysis = collection == "General Knowledge" or not use_cot + # Map collection names to actual collection names in vector store + collection_mapping = { + "PDF Collection": "pdf_documents", + "Repository Collection": "repository_documents", + "Web Knowledge Base": "web_documents", + "General Knowledge": "general_knowledge" + } + + # Get the actual collection name + actual_collection = collection_mapping.get(collection, "pdf_documents") + # Parse agent type to determine model and quantization quantization = None model_name = None @@ -354,10 +365,17 @@ def create_interface(): repo_button = gr.Button("Process Repository") repo_output = gr.Textbox(label="Repository Processing Output") + # Define collection choices once to ensure consistency + collection_choices = [ + "PDF Collection", + "Repository Collection", + "Web Knowledge Base", + "General Knowledge" + ] + with gr.Tab("Standard Chat Interface"): with gr.Row(): with gr.Column(scale=1): - # Create model choices with quantization options standard_agent_dropdown = gr.Dropdown( choices=model_choices, value=model_choices[0] if model_choices else None, @@ -365,15 +383,17 @@ def create_interface(): ) with gr.Column(scale=1): standard_collection_dropdown = gr.Dropdown( - choices=["PDF Collection", "Repository Collection", "General Knowledge"], - value="PDF Collection", - label="Knowledge Collection" + choices=collection_choices, + value=collection_choices[0], + label="Select Knowledge Base", + info="Choose which knowledge base to use for answering questions" ) gr.Markdown(""" > **Collection Selection**: > - This interface ALWAYS uses the selected collection without performing query analysis. > - "PDF Collection": Will ALWAYS search the PDF documents regardless of query type. > - "Repository Collection": Will ALWAYS search the repository code regardless of query type. + > - "Web Knowledge Base": Will ALWAYS search web content regardless of query type. > - "General Knowledge": Will ALWAYS use the model's built-in knowledge without searching collections. """) standard_chatbot = gr.Chatbot(height=400) @@ -385,7 +405,6 @@ def create_interface(): with gr.Tab("Chain of Thought Chat Interface"): with gr.Row(): with gr.Column(scale=1): - # Create model choices with quantization options cot_agent_dropdown = gr.Dropdown( choices=model_choices, value=model_choices[0] if model_choices else None, @@ -393,15 +412,17 @@ def create_interface(): ) with gr.Column(scale=1): cot_collection_dropdown = gr.Dropdown( - choices=["PDF Collection", "Repository Collection", "General Knowledge"], - value="PDF Collection", - label="Knowledge Collection" + choices=collection_choices, + value=collection_choices[0], + label="Select Knowledge Base", + info="Choose which knowledge base to use for answering questions" ) gr.Markdown(""" > **Collection Selection**: > - When a specific collection is selected, the system will ALWAYS use that collection without analysis: > - "PDF Collection": Will ALWAYS search the PDF documents. > - "Repository Collection": Will ALWAYS search the repository code. + > - "Web Knowledge Base": Will ALWAYS search web content. > - "General Knowledge": Will ALWAYS use the model's built-in knowledge. > - This interface shows step-by-step reasoning and may perform query analysis when needed. """) @@ -485,6 +506,7 @@ def create_interface(): - Select which knowledge collection to query: - **PDF Collection**: Always searches PDF documents - **Repository Collection**: Always searches code repositories + - **Web Knowledge Base**: Always searches web content - **General Knowledge**: Uses the model's built-in knowledge without searching collections 3. **Chain of Thought Chat Interface**: diff --git a/agentic_rag/img/architecture.png b/agentic_rag/img/architecture.png index a7b1dcc..e22de00 100644 Binary files a/agentic_rag/img/architecture.png and b/agentic_rag/img/architecture.png differ diff --git a/agentic_rag/local_rag_agent.py b/agentic_rag/local_rag_agent.py index 4742160..3e539ec 100644 --- a/agentic_rag/local_rag_agent.py +++ b/agentic_rag/local_rag_agent.py @@ -245,126 +245,132 @@ def process_query(self, query: str) -> Dict[str, Any]: else: return self._generate_general_response(query) else: - # For PDF or Repository collections, use context-based processing + # For PDF, Repository, or Web collections, use context-based processing if self.use_cot: return self._process_query_with_cot(query) else: return self._process_query_standard(query) def _process_query_with_cot(self, query: str) -> Dict[str, Any]: - """Process query using Chain of Thought reasoning with multiple agents""" - logger.info("Processing query with Chain of Thought reasoning") - - # Get initial context based on selected collection - initial_context = [] - if self.collection == "PDF Collection": - logger.info(f"Retrieving context from PDF Collection for query: '{query}'") - pdf_context = self.vector_store.query_pdf_collection(query) - initial_context.extend(pdf_context) - logger.info(f"Retrieved {len(pdf_context)} chunks from PDF Collection") - # Don't log individual sources to keep console clean - elif self.collection == "Repository Collection": - logger.info(f"Retrieving context from Repository Collection for query: '{query}'") - repo_context = self.vector_store.query_repo_collection(query) - initial_context.extend(repo_context) - logger.info(f"Retrieved {len(repo_context)} chunks from Repository Collection") - # Don't log individual sources to keep console clean - # For General Knowledge, no context is needed - else: - logger.info("Using General Knowledge collection, no context retrieval needed") - + """Process query using Chain of Thought reasoning""" try: - # Step 1: Planning - logger.info("Step 1: Planning") - if not self.agents or "planner" not in self.agents: - logger.warning("No planner agent available, using direct response") - return self._generate_general_response(query) + # Get context based on collection type + if self.collection == "PDF Collection": + context = self.vector_store.query_pdf_collection(query) + elif self.collection == "Repository Collection": + context = self.vector_store.query_repo_collection(query) + elif self.collection == "Web Knowledge Base": + context = self.vector_store.query_web_collection(query) + else: + context = [] + + # Log number of chunks retrieved + logger.info(f"Retrieved {len(context)} chunks from {self.collection}") - plan = self.agents["planner"].plan(query, initial_context) - logger.info(f"Generated plan:\n{plan}") + # Create agents if not already created + if not self.agents: + self.agents = create_agents(self.llm, self.vector_store) - # Step 2: Research each step (if researcher is available) - logger.info("Step 2: Research") + # Get planning step + try: + planning_result = self.agents["planner"].plan(query, context) + logger.info("Planning step completed") + except Exception as e: + logger.error(f"Error in planning step: {str(e)}") + logger.info("Falling back to general response") + return self._generate_general_response(query) + + # Get research step research_results = [] - if self.agents.get("researcher") is not None and initial_context: - for step in plan.split("\n"): + if self.agents.get("researcher") is not None and context: + for step in planning_result.split("\n"): if not step.strip(): continue - step_research = self.agents["researcher"].research(query, step) - research_results.append({"step": step, "findings": step_research}) - # Don't log source indices to keep console clean - logger.info(f"Research for step: {step}") + try: + step_research = self.agents["researcher"].research(query, step) + # Extract findings from research result + findings = step_research.get("findings", []) if isinstance(step_research, dict) else [] + research_results.append({"step": step, "findings": findings}) + + # Log which sources were used for this step + try: + source_indices = [context.index(finding) + 1 for finding in findings if finding in context] + logger.info(f"Research for step: {step}\nUsing sources: {source_indices}") + except ValueError as ve: + logger.warning(f"Could not find some findings in initial context: {str(ve)}") + except Exception as e: + logger.error(f"Error during research for step '{step}': {str(e)}") + research_results.append({"step": step, "findings": []}) else: # If no researcher or no context, use the steps directly - research_results = [{"step": step, "findings": []} for step in plan.split("\n") if step.strip()] + research_results = [{"step": step, "findings": []} for step in planning_result.split("\n") if step.strip()] logger.info("No research performed (no researcher agent or no context available)") - # Step 3: Reasoning about each step - logger.info("Step 3: Reasoning") + # Get reasoning step + reasoning_steps = [] if not self.agents.get("reasoner"): logger.warning("No reasoner agent available, using direct response") return self._generate_general_response(query) - reasoning_steps = [] for result in research_results: - step_reasoning = self.agents["reasoner"].reason( - query, - result["step"], - result["findings"] if result["findings"] else [{"content": "Using general knowledge", "metadata": {"source": "General Knowledge"}}] - ) - reasoning_steps.append(step_reasoning) - # Log just the step, not the full reasoning - logger.info(f"Reasoning for step: {result['step']}") + try: + step_reasoning = self.agents["reasoner"].reason( + query, + result["step"], + result["findings"] if result["findings"] else [{"content": "Using general knowledge", "metadata": {"source": "General Knowledge"}}] + ) + reasoning_steps.append(step_reasoning) + logger.info(f"Reasoning for step: {result['step']}\n{step_reasoning}") + except Exception as e: + logger.error(f"Error in reasoning for step '{result['step']}': {str(e)}") + reasoning_steps.append(f"Error in reasoning for this step: {str(e)}") - # Step 4: Synthesize final answer - logger.info("Step 4: Synthesis") + # Get synthesis step if not self.agents.get("synthesizer"): logger.warning("No synthesizer agent available, using direct response") return self._generate_general_response(query) - final_answer = self.agents["synthesizer"].synthesize(query, reasoning_steps) - logger.info("Final answer synthesized successfully") + try: + synthesis_result = self.agents["synthesizer"].synthesize(query, reasoning_steps) + logger.info("Synthesis step completed") + except Exception as e: + logger.error(f"Error in synthesis step: {str(e)}") + logger.info("Falling back to general response") + return self._generate_general_response(query) return { - "answer": final_answer, - "context": initial_context, - "reasoning_steps": reasoning_steps + "answer": synthesis_result["answer"], + "reasoning_steps": reasoning_steps, + "context": context } + except Exception as e: logger.error(f"Error in CoT processing: {str(e)}") - logger.info("Falling back to general response") - return self._generate_general_response(query) + raise def _process_query_standard(self, query: str) -> Dict[str, Any]: - """Process query using standard approach without Chain of Thought""" - # Initialize context variables - pdf_context = [] - repo_context = [] - - # Get context based on selected collection - if self.collection == "PDF Collection": - logger.info(f"Retrieving context from PDF Collection for query: '{query}'") - pdf_context = self.vector_store.query_pdf_collection(query) - logger.info(f"Retrieved {len(pdf_context)} chunks from PDF Collection") - # Don't log individual sources to keep console clean - elif self.collection == "Repository Collection": - logger.info(f"Retrieving context from Repository Collection for query: '{query}'") - repo_context = self.vector_store.query_repo_collection(query) - logger.info(f"Retrieved {len(repo_context)} chunks from Repository Collection") - # Don't log individual sources to keep console clean - - # Combine all context - all_context = pdf_context + repo_context - - # Generate response using context if available, otherwise use general knowledge - if all_context: - logger.info(f"Generating response using {len(all_context)} context chunks") - response = self._generate_response(query, all_context) - else: - logger.info("No context found, using general knowledge") - response = self._generate_general_response(query) - - return response + """Process query using standard RAG approach""" + try: + # Get context based on collection type + if self.collection == "PDF Collection": + context = self.vector_store.query_pdf_collection(query) + elif self.collection == "Repository Collection": + context = self.vector_store.query_repo_collection(query) + elif self.collection == "Web Knowledge Base": + context = self.vector_store.query_web_collection(query) + else: + context = [] + + # Log number of chunks retrieved + logger.info(f"Retrieved {len(context)} chunks from {self.collection}") + + # Generate response using context + response = self._generate_response(query, context) + return response + + except Exception as e: + logger.error(f"Error in standard processing: {str(e)}") + raise def _generate_text(self, prompt: str, max_length: int = 512) -> str: """Generate text using the local model""" @@ -456,7 +462,7 @@ def main(): parser.add_argument("--model", default="mistralai/Mistral-7B-Instruct-v0.2", help="Model to use") parser.add_argument("--quiet", action="store_true", help="Disable verbose logging") parser.add_argument("--use-cot", action="store_true", help="Enable Chain of Thought reasoning") - parser.add_argument("--collection", choices=["PDF Collection", "Repository Collection", "General Knowledge"], + parser.add_argument("--collection", choices=["PDF Collection", "Repository Collection", "General Knowledge", "Web Knowledge Base"], help="Specify which collection to query") parser.add_argument("--skip-analysis", action="store_true", help="Skip query analysis step") parser.add_argument("--verbose", action="store_true", help="Show full content of sources") diff --git a/agentic_rag/rag_agent.py b/agentic_rag/rag_agent.py index 8a47fba..5d36a54 100644 --- a/agentic_rag/rag_agent.py +++ b/agentic_rag/rag_agent.py @@ -78,6 +78,19 @@ def _process_query_with_cot(self, query: str) -> Dict[str, Any]: # Only log content preview at debug level content_preview = chunk["content"][:150] + "..." if len(chunk["content"]) > 150 else chunk["content"] logger.debug(f"Content preview for source [{i+1}]: {content_preview}") + elif self.collection == "Web Knowledge Base": + logger.info(f"Retrieving context from Web Knowledge Base for query: '{query}'") + web_context = self.vector_store.query_web_collection(query) + initial_context.extend(web_context) + logger.info(f"Retrieved {len(web_context)} chunks from Web Knowledge Base") + # Log each chunk with citation number but not full content + for i, chunk in enumerate(web_context): + source = chunk["metadata"].get("source", "Unknown") + title = chunk["metadata"].get("title", "Unknown") + logger.info(f"Source [{i+1}]: {source} (title: {title})") + # Only log content preview at debug level + content_preview = chunk["content"][:150] + "..." if len(chunk["content"]) > 150 else chunk["content"] + logger.debug(f"Content preview for source [{i+1}]: {content_preview}") # For General Knowledge, no context is needed else: logger.info("Using General Knowledge collection, no context retrieval needed") @@ -89,8 +102,13 @@ def _process_query_with_cot(self, query: str) -> Dict[str, Any]: logger.warning("No planner agent available, using direct response") return self._generate_general_response(query) - plan = self.agents["planner"].plan(query, initial_context) - logger.info(f"Generated plan:\n{plan}") + try: + plan = self.agents["planner"].plan(query, initial_context) + logger.info(f"Generated plan:\n{plan}") + except Exception as e: + logger.error(f"Error in planning step: {str(e)}") + logger.info("Falling back to general response") + return self._generate_general_response(query) # Step 2: Research each step (if researcher is available) logger.info("Step 2: Research") @@ -99,11 +117,21 @@ def _process_query_with_cot(self, query: str) -> Dict[str, Any]: for step in plan.split("\n"): if not step.strip(): continue - step_research = self.agents["researcher"].research(query, step) - research_results.append({"step": step, "findings": step_research}) - # Log which sources were used for this step - source_indices = [initial_context.index(finding) + 1 for finding in step_research if finding in initial_context] - logger.info(f"Research for step: {step}\nUsing sources: {source_indices}") + try: + step_research = self.agents["researcher"].research(query, step) + # Extract findings from research result + findings = step_research.get("findings", []) if isinstance(step_research, dict) else [] + research_results.append({"step": step, "findings": findings}) + + # Log which sources were used for this step + try: + source_indices = [initial_context.index(finding) + 1 for finding in findings if finding in initial_context] + logger.info(f"Research for step: {step}\nUsing sources: {source_indices}") + except ValueError as ve: + logger.warning(f"Could not find some findings in initial context: {str(ve)}") + except Exception as e: + logger.error(f"Error during research for step '{step}': {str(e)}") + research_results.append({"step": step, "findings": []}) else: # If no researcher or no context, use the steps directly research_results = [{"step": step, "findings": []} for step in plan.split("\n") if step.strip()] @@ -117,13 +145,17 @@ def _process_query_with_cot(self, query: str) -> Dict[str, Any]: reasoning_steps = [] for result in research_results: - step_reasoning = self.agents["reasoner"].reason( - query, - result["step"], - result["findings"] if result["findings"] else [{"content": "Using general knowledge", "metadata": {"source": "General Knowledge"}}] - ) - reasoning_steps.append(step_reasoning) - logger.info(f"Reasoning for step: {result['step']}\n{step_reasoning}") + try: + step_reasoning = self.agents["reasoner"].reason( + query, + result["step"], + result["findings"] if result["findings"] else [{"content": "Using general knowledge", "metadata": {"source": "General Knowledge"}}] + ) + reasoning_steps.append(step_reasoning) + logger.info(f"Reasoning for step: {result['step']}\n{step_reasoning}") + except Exception as e: + logger.error(f"Error in reasoning for step '{result['step']}': {str(e)}") + reasoning_steps.append(f"Error in reasoning for this step: {str(e)}") # Step 4: Synthesize final answer logger.info("Step 4: Synthesis") @@ -131,8 +163,13 @@ def _process_query_with_cot(self, query: str) -> Dict[str, Any]: logger.warning("No synthesizer agent available, using direct response") return self._generate_general_response(query) - final_answer = self.agents["synthesizer"].synthesize(query, reasoning_steps) - logger.info(f"Final synthesized answer:\n{final_answer}") + try: + final_answer = self.agents["synthesizer"].synthesize(query, reasoning_steps) + logger.info(f"Final synthesized answer:\n{final_answer}") + except Exception as e: + logger.error(f"Error in synthesis step: {str(e)}") + logger.info("Falling back to general response") + return self._generate_general_response(query) return { "answer": final_answer, @@ -140,23 +177,22 @@ def _process_query_with_cot(self, query: str) -> Dict[str, Any]: "reasoning_steps": reasoning_steps } except Exception as e: - logger.error(f"Error in CoT processing: {str(e)}") + logger.error(f"Error in CoT processing: {str(e)}", exc_info=True) logger.info("Falling back to general response") return self._generate_general_response(query) def _process_query_standard(self, query: str) -> Dict[str, Any]: """Process query using standard approach without Chain of Thought""" # Initialize context variables - pdf_context = [] - repo_context = [] + context = [] # Get context based on selected collection if self.collection == "PDF Collection": logger.info(f"Retrieving context from PDF Collection for query: '{query}'") - pdf_context = self.vector_store.query_pdf_collection(query) - logger.info(f"Retrieved {len(pdf_context)} chunks from PDF Collection") + context = self.vector_store.query_pdf_collection(query) + logger.info(f"Retrieved {len(context)} chunks from PDF Collection") # Log each chunk with citation number but not full content - for i, chunk in enumerate(pdf_context): + for i, chunk in enumerate(context): source = chunk["metadata"].get("source", "Unknown") pages = chunk["metadata"].get("page_numbers", []) logger.info(f"Source [{i+1}]: {source} (pages: {pages})") @@ -165,24 +201,33 @@ def _process_query_standard(self, query: str) -> Dict[str, Any]: logger.debug(f"Content preview for source [{i+1}]: {content_preview}") elif self.collection == "Repository Collection": logger.info(f"Retrieving context from Repository Collection for query: '{query}'") - repo_context = self.vector_store.query_repo_collection(query) - logger.info(f"Retrieved {len(repo_context)} chunks from Repository Collection") + context = self.vector_store.query_repo_collection(query) + logger.info(f"Retrieved {len(context)} chunks from Repository Collection") # Log each chunk with citation number but not full content - for i, chunk in enumerate(repo_context): + for i, chunk in enumerate(context): source = chunk["metadata"].get("source", "Unknown") file_path = chunk["metadata"].get("file_path", "Unknown") logger.info(f"Source [{i+1}]: {source} (file: {file_path})") # Only log content preview at debug level content_preview = chunk["content"][:150] + "..." if len(chunk["content"]) > 150 else chunk["content"] logger.debug(f"Content preview for source [{i+1}]: {content_preview}") - - # Combine all context - all_context = pdf_context + repo_context + elif self.collection == "Web Knowledge Base": + logger.info(f"Retrieving context from Web Knowledge Base for query: '{query}'") + context = self.vector_store.query_web_collection(query) + logger.info(f"Retrieved {len(context)} chunks from Web Knowledge Base") + # Log each chunk with citation number but not full content + for i, chunk in enumerate(context): + source = chunk["metadata"].get("source", "Unknown") + title = chunk["metadata"].get("title", "Unknown") + logger.info(f"Source [{i+1}]: {source} (title: {title})") + # Only log content preview at debug level + content_preview = chunk["content"][:150] + "..." if len(chunk["content"]) > 150 else chunk["content"] + logger.debug(f"Content preview for source [{i+1}]: {content_preview}") # Generate response using context if available, otherwise use general knowledge - if all_context: - logger.info(f"Generating response using {len(all_context)} context chunks") - response = self._generate_response(query, all_context) + if context: + logger.info(f"Generating response using {len(context)} context chunks") + response = self._generate_response(query, context) else: logger.info("No context found, using general knowledge") response = self._generate_general_response(query)