Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions agentic_rag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The system has the following features:

- Intelligent query routing
- PDF processing using Docling for accurate text extraction and chunking
- Persistent vector storage with ChromaDB (PDF and Websites)
- Persistent vector storage with ChromaDB and Oracle Database 23ai (PDF and Websites)
- Smart context retrieval and response generation
- FastAPI-based REST API for document upload and querying
- Support for both OpenAI-based agents or local, transformer-based agents (`Mistral-7B` by default)
Expand Down Expand Up @@ -116,9 +116,10 @@ python main.py

The API will be available at `http://localhost:8000`. You can then use the API endpoints as described in the API Endpoints section below.

### 2. Using the Gradio Interface
### 2. Using the Gradio Interface (Recommended)

The system provides a user-friendly web interface using Gradio, which allows you to:
- Select and pull `ollama` models directly from the interface
- Upload and process PDF documents
- Process web content from URLs
- Chat with your documents using either local or OpenAI models
Expand Down Expand Up @@ -357,7 +358,7 @@ The system consists of several key components:
1. **PDF Processor**: we use `docling` to extract and chunk text from PDF documents
2. **Web Processor**: we use `trafilatura` to extract and chunk text from websites
3. **GitHub Repository Processor**: we use `gitingest` to extract and chunk text from repositories
4. **Vector Store**: Manages document embeddings and similarity search using `ChromaDB`
4. **Vector Store**: Manages document embeddings and similarity search using `ChromaDB` and `Oracle Database 23ai`
5. **RAG Agent**: Makes intelligent decisions about query routing and response generation
- OpenAI Agent: Uses `gpt-4-turbo-preview` for high-quality responses, but requires an OpenAI API key
- Local Agent: Uses `Mistral-7B` as an open-source alternative
Expand Down
7 changes: 7 additions & 0 deletions agentic_rag/articles/kubernetes_rag.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ Then, we can start setting up the solution in our cluster by following these ste
kubectl apply -n agentic-rag -f local-deployment/service.yaml
```

If for some reason, after applying these, there's a `NoSchedule` policy being triggered, you can untaint the nodes and try again:

```bash
kubectl taint nodes -l node.kubernetes.io/instance-type=VM.GPU.A10.1 nvidia.com/gpu:NoSchedule-
# make sure to select your own instance shape if you're using a different type than A10 GPU.
```

5. Monitor the Deployment

With the following commands, we can check the status of our pod:
Expand Down
38 changes: 30 additions & 8 deletions agentic_rag/gradio_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ def chat(message: str, history: List[List[str]], agent_type: str, use_cot: bool,
# Skip analysis for General Knowledge or when using standard chat interface (not CoT)
skip_analysis = collection == "General Knowledge" or not use_cot

# Map collection names to actual collection names in vector store
collection_mapping = {
"PDF Collection": "pdf_documents",
"Repository Collection": "repository_documents",
"Web Knowledge Base": "web_documents",
"General Knowledge": "general_knowledge"
}

# Get the actual collection name
actual_collection = collection_mapping.get(collection, "pdf_documents")

# Parse agent type to determine model and quantization
quantization = None
model_name = None
Expand Down Expand Up @@ -354,26 +365,35 @@ def create_interface():
repo_button = gr.Button("Process Repository")
repo_output = gr.Textbox(label="Repository Processing Output")

# Define collection choices once to ensure consistency
collection_choices = [
"PDF Collection",
"Repository Collection",
"Web Knowledge Base",
"General Knowledge"
]

with gr.Tab("Standard Chat Interface"):
with gr.Row():
with gr.Column(scale=1):
# Create model choices with quantization options
standard_agent_dropdown = gr.Dropdown(
choices=model_choices,
value=model_choices[0] if model_choices else None,
label="Select Agent"
)
with gr.Column(scale=1):
standard_collection_dropdown = gr.Dropdown(
choices=["PDF Collection", "Repository Collection", "General Knowledge"],
value="PDF Collection",
label="Knowledge Collection"
choices=collection_choices,
value=collection_choices[0],
label="Select Knowledge Base",
info="Choose which knowledge base to use for answering questions"
)
gr.Markdown("""
> **Collection Selection**:
> - This interface ALWAYS uses the selected collection without performing query analysis.
> - "PDF Collection": Will ALWAYS search the PDF documents regardless of query type.
> - "Repository Collection": Will ALWAYS search the repository code regardless of query type.
> - "Web Knowledge Base": Will ALWAYS search web content regardless of query type.
> - "General Knowledge": Will ALWAYS use the model's built-in knowledge without searching collections.
""")
standard_chatbot = gr.Chatbot(height=400)
Expand All @@ -385,23 +405,24 @@ def create_interface():
with gr.Tab("Chain of Thought Chat Interface"):
with gr.Row():
with gr.Column(scale=1):
# Create model choices with quantization options
cot_agent_dropdown = gr.Dropdown(
choices=model_choices,
value=model_choices[0] if model_choices else None,
label="Select Agent"
)
with gr.Column(scale=1):
cot_collection_dropdown = gr.Dropdown(
choices=["PDF Collection", "Repository Collection", "General Knowledge"],
value="PDF Collection",
label="Knowledge Collection"
choices=collection_choices,
value=collection_choices[0],
label="Select Knowledge Base",
info="Choose which knowledge base to use for answering questions"
)
gr.Markdown("""
> **Collection Selection**:
> - When a specific collection is selected, the system will ALWAYS use that collection without analysis:
> - "PDF Collection": Will ALWAYS search the PDF documents.
> - "Repository Collection": Will ALWAYS search the repository code.
> - "Web Knowledge Base": Will ALWAYS search web content.
> - "General Knowledge": Will ALWAYS use the model's built-in knowledge.
> - This interface shows step-by-step reasoning and may perform query analysis when needed.
""")
Expand Down Expand Up @@ -485,6 +506,7 @@ def create_interface():
- Select which knowledge collection to query:
- **PDF Collection**: Always searches PDF documents
- **Repository Collection**: Always searches code repositories
- **Web Knowledge Base**: Always searches web content
- **General Knowledge**: Uses the model's built-in knowledge without searching collections

3. **Chain of Thought Chat Interface**:
Expand Down
Binary file modified agentic_rag/img/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
182 changes: 94 additions & 88 deletions agentic_rag/local_rag_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,126 +245,132 @@ def process_query(self, query: str) -> Dict[str, Any]:
else:
return self._generate_general_response(query)
else:
# For PDF or Repository collections, use context-based processing
# For PDF, Repository, or Web collections, use context-based processing
if self.use_cot:
return self._process_query_with_cot(query)
else:
return self._process_query_standard(query)

def _process_query_with_cot(self, query: str) -> Dict[str, Any]:
"""Process query using Chain of Thought reasoning with multiple agents"""
logger.info("Processing query with Chain of Thought reasoning")

# Get initial context based on selected collection
initial_context = []
if self.collection == "PDF Collection":
logger.info(f"Retrieving context from PDF Collection for query: '{query}'")
pdf_context = self.vector_store.query_pdf_collection(query)
initial_context.extend(pdf_context)
logger.info(f"Retrieved {len(pdf_context)} chunks from PDF Collection")
# Don't log individual sources to keep console clean
elif self.collection == "Repository Collection":
logger.info(f"Retrieving context from Repository Collection for query: '{query}'")
repo_context = self.vector_store.query_repo_collection(query)
initial_context.extend(repo_context)
logger.info(f"Retrieved {len(repo_context)} chunks from Repository Collection")
# Don't log individual sources to keep console clean
# For General Knowledge, no context is needed
else:
logger.info("Using General Knowledge collection, no context retrieval needed")

"""Process query using Chain of Thought reasoning"""
try:
# Step 1: Planning
logger.info("Step 1: Planning")
if not self.agents or "planner" not in self.agents:
logger.warning("No planner agent available, using direct response")
return self._generate_general_response(query)
# Get context based on collection type
if self.collection == "PDF Collection":
context = self.vector_store.query_pdf_collection(query)
elif self.collection == "Repository Collection":
context = self.vector_store.query_repo_collection(query)
elif self.collection == "Web Knowledge Base":
context = self.vector_store.query_web_collection(query)
else:
context = []

# Log number of chunks retrieved
logger.info(f"Retrieved {len(context)} chunks from {self.collection}")

plan = self.agents["planner"].plan(query, initial_context)
logger.info(f"Generated plan:\n{plan}")
# Create agents if not already created
if not self.agents:
self.agents = create_agents(self.llm, self.vector_store)

# Step 2: Research each step (if researcher is available)
logger.info("Step 2: Research")
# Get planning step
try:
planning_result = self.agents["planner"].plan(query, context)
logger.info("Planning step completed")
except Exception as e:
logger.error(f"Error in planning step: {str(e)}")
logger.info("Falling back to general response")
return self._generate_general_response(query)

# Get research step
research_results = []
if self.agents.get("researcher") is not None and initial_context:
for step in plan.split("\n"):
if self.agents.get("researcher") is not None and context:
for step in planning_result.split("\n"):
if not step.strip():
continue
step_research = self.agents["researcher"].research(query, step)
research_results.append({"step": step, "findings": step_research})
# Don't log source indices to keep console clean
logger.info(f"Research for step: {step}")
try:
step_research = self.agents["researcher"].research(query, step)
# Extract findings from research result
findings = step_research.get("findings", []) if isinstance(step_research, dict) else []
research_results.append({"step": step, "findings": findings})

# Log which sources were used for this step
try:
source_indices = [context.index(finding) + 1 for finding in findings if finding in context]
logger.info(f"Research for step: {step}\nUsing sources: {source_indices}")
except ValueError as ve:
logger.warning(f"Could not find some findings in initial context: {str(ve)}")
except Exception as e:
logger.error(f"Error during research for step '{step}': {str(e)}")
research_results.append({"step": step, "findings": []})
else:
# If no researcher or no context, use the steps directly
research_results = [{"step": step, "findings": []} for step in plan.split("\n") if step.strip()]
research_results = [{"step": step, "findings": []} for step in planning_result.split("\n") if step.strip()]
logger.info("No research performed (no researcher agent or no context available)")

# Step 3: Reasoning about each step
logger.info("Step 3: Reasoning")
# Get reasoning step
reasoning_steps = []
if not self.agents.get("reasoner"):
logger.warning("No reasoner agent available, using direct response")
return self._generate_general_response(query)

reasoning_steps = []
for result in research_results:
step_reasoning = self.agents["reasoner"].reason(
query,
result["step"],
result["findings"] if result["findings"] else [{"content": "Using general knowledge", "metadata": {"source": "General Knowledge"}}]
)
reasoning_steps.append(step_reasoning)
# Log just the step, not the full reasoning
logger.info(f"Reasoning for step: {result['step']}")
try:
step_reasoning = self.agents["reasoner"].reason(
query,
result["step"],
result["findings"] if result["findings"] else [{"content": "Using general knowledge", "metadata": {"source": "General Knowledge"}}]
)
reasoning_steps.append(step_reasoning)
logger.info(f"Reasoning for step: {result['step']}\n{step_reasoning}")
except Exception as e:
logger.error(f"Error in reasoning for step '{result['step']}': {str(e)}")
reasoning_steps.append(f"Error in reasoning for this step: {str(e)}")

# Step 4: Synthesize final answer
logger.info("Step 4: Synthesis")
# Get synthesis step
if not self.agents.get("synthesizer"):
logger.warning("No synthesizer agent available, using direct response")
return self._generate_general_response(query)

final_answer = self.agents["synthesizer"].synthesize(query, reasoning_steps)
logger.info("Final answer synthesized successfully")
try:
synthesis_result = self.agents["synthesizer"].synthesize(query, reasoning_steps)
logger.info("Synthesis step completed")
except Exception as e:
logger.error(f"Error in synthesis step: {str(e)}")
logger.info("Falling back to general response")
return self._generate_general_response(query)

return {
"answer": final_answer,
"context": initial_context,
"reasoning_steps": reasoning_steps
"answer": synthesis_result["answer"],
"reasoning_steps": reasoning_steps,
"context": context
}

except Exception as e:
logger.error(f"Error in CoT processing: {str(e)}")
logger.info("Falling back to general response")
return self._generate_general_response(query)
raise

def _process_query_standard(self, query: str) -> Dict[str, Any]:
"""Process query using standard approach without Chain of Thought"""
# Initialize context variables
pdf_context = []
repo_context = []

# Get context based on selected collection
if self.collection == "PDF Collection":
logger.info(f"Retrieving context from PDF Collection for query: '{query}'")
pdf_context = self.vector_store.query_pdf_collection(query)
logger.info(f"Retrieved {len(pdf_context)} chunks from PDF Collection")
# Don't log individual sources to keep console clean
elif self.collection == "Repository Collection":
logger.info(f"Retrieving context from Repository Collection for query: '{query}'")
repo_context = self.vector_store.query_repo_collection(query)
logger.info(f"Retrieved {len(repo_context)} chunks from Repository Collection")
# Don't log individual sources to keep console clean

# Combine all context
all_context = pdf_context + repo_context

# Generate response using context if available, otherwise use general knowledge
if all_context:
logger.info(f"Generating response using {len(all_context)} context chunks")
response = self._generate_response(query, all_context)
else:
logger.info("No context found, using general knowledge")
response = self._generate_general_response(query)

return response
"""Process query using standard RAG approach"""
try:
# Get context based on collection type
if self.collection == "PDF Collection":
context = self.vector_store.query_pdf_collection(query)
elif self.collection == "Repository Collection":
context = self.vector_store.query_repo_collection(query)
elif self.collection == "Web Knowledge Base":
context = self.vector_store.query_web_collection(query)
else:
context = []

# Log number of chunks retrieved
logger.info(f"Retrieved {len(context)} chunks from {self.collection}")

# Generate response using context
response = self._generate_response(query, context)
return response

except Exception as e:
logger.error(f"Error in standard processing: {str(e)}")
raise

def _generate_text(self, prompt: str, max_length: int = 512) -> str:
"""Generate text using the local model"""
Expand Down Expand Up @@ -456,7 +462,7 @@ def main():
parser.add_argument("--model", default="mistralai/Mistral-7B-Instruct-v0.2", help="Model to use")
parser.add_argument("--quiet", action="store_true", help="Disable verbose logging")
parser.add_argument("--use-cot", action="store_true", help="Enable Chain of Thought reasoning")
parser.add_argument("--collection", choices=["PDF Collection", "Repository Collection", "General Knowledge"],
parser.add_argument("--collection", choices=["PDF Collection", "Repository Collection", "General Knowledge", "Web Knowledge Base"],
help="Specify which collection to query")
parser.add_argument("--skip-analysis", action="store_true", help="Skip query analysis step")
parser.add_argument("--verbose", action="store_true", help="Show full content of sources")
Expand Down
Loading