diff --git a/README.md b/README.md index eac04b6..f7bdeef 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,390 @@ -# AI powered Web crawler +# πŸ•·οΈ AI-Powered Web Crawler with Knowledge Graph Generation -### README Coming soon! +An advanced web crawling system that automatically extracts content from websites, generates knowledge graphs using AI (Google Gemini), and stores them in Neo4j for intelligent querying and decision-making. -![Hacker Cat](https://media.tenor.com/qMH5o_XizbcAAAAM/but-here%27s-the-coder.gif) +![Python](https://img.shields.io/badge/Python-3.13-blue) +![FastAPI](https://img.shields.io/badge/FastAPI-0.119-green) +![LangChain](https://img.shields.io/badge/LangChain-1.0-orange) +![Neo4j](https://img.shields.io/badge/Neo4j-5.22-blue) +![MongoDB](https://img.shields.io/badge/MongoDB-Atlas-green) + +## πŸ“‹ Table of Contents + +- [Features](#-features) +- [Architecture](#-architecture) +- [Tech Stack](#-tech-stack) +- [Installation](#-installation) +- [Configuration](#-configuration) +- [API Endpoints](#-api-endpoints) +- [How It Works](#-how-it-works) +- [Project Structure](#-project-structure) +- [Usage Examples](#-usage-examples) + +## ✨ Features + +- **🌐 Intelligent Web Crawling**: Scrapy-based crawler with smart link following and social media extraction +- **πŸ€– AI-Powered Knowledge Graph Generation**: Uses Google Gemini 2.5 Flash to extract entities and relationships +- **πŸ“Š Neo4j Graph Database**: Stores knowledge graphs for complex relationship queries +- **πŸ”„ Automatic Content Chunking**: Handles large content by splitting into manageable chunks +- **πŸ’Ύ MongoDB Storage**: Stores crawled content, keywords, and summaries +- **🧠 AI Reasoning Agent**: Query the knowledge graph using natural language +- **πŸ“ Auto-Summarization**: Generates AI summaries of crawled content +- **πŸ”— Incremental KG Updates**: MERGE mode adds to existing graphs without losing data + +## πŸ—οΈ Architecture + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ USER REQUEST β”‚ +β”‚ (keyword, optional URLs) β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + ↓ +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ FASTAPI BACKEND β”‚ +β”‚ main.py β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + ↓ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + ↓ ↓ ↓ +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ MONGODB β”‚ β”‚ WEB CRAWLER β”‚ β”‚ NEO4J β”‚ +β”‚ β”‚ β”‚ (Scrapy) β”‚ β”‚ Knowledge β”‚ +β”‚ β€’ Keywords β”‚ β”‚ β”‚ β”‚ Graph β”‚ +β”‚ β€’ Site Data β”‚ β”‚ β€’ Extract text β”‚ β”‚ β”‚ +β”‚ β€’ Summaries β”‚ β”‚ β€’ Follow links β”‚ β”‚ β€’ Nodes β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β€’ Get images β”‚ β”‚ β€’ Relationships β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + ↓ ↑ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ + β”‚ LANGGRAPH β”‚ β”‚ + β”‚ AI AGENTS β”‚β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ + β”‚ β€’ getCrawlContent + β”‚ β€’ createKG β”‚ + β”‚ β€’ queryNeo4J β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + ↓ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ GOOGLE GEMINI β”‚ + β”‚ 2.5 Flash LLM β”‚ + β”‚ β”‚ + β”‚ β€’ Entity Extractβ”‚ + β”‚ β€’ KG Generation β”‚ + β”‚ β€’ Summarization β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +## πŸ› οΈ Tech Stack + +| Category | Technology | +|----------|------------| +| **Backend Framework** | FastAPI | +| **Web Crawling** | Scrapy | +| **AI/LLM** | Google Gemini 2.5 Flash | +| **Agent Framework** | LangChain + LangGraph | +| **Graph Database** | Neo4j | +| **Document Database** | MongoDB Atlas | +| **HTML Parsing** | BeautifulSoup4 | +| **Async Support** | Motor (MongoDB), asyncio | + +## πŸ“¦ Installation + +### Prerequisites + +- Python 3.13+ +- MongoDB Atlas account +- Neo4j Aura account (or local Neo4j) +- Google AI API key + +### Setup + +1. **Clone the repository** +```bash +git clone https://github.com/dinethjanitha/webcrawl.git +cd webcrawl +``` + +2. **Create virtual environment** +```bash +python -m venv . +# Windows +Scripts\activate +# Linux/Mac +source bin/activate +``` + +3. **Install dependencies** +```bash +pip install -r requirements.txt +``` + +4. **Configure environment variables** +Create a `.env` file in the root directory: +```env +CONNECTION_STRING=mongodb+srv://your-username:your-password@cluster.mongodb.net/ +GOOGLE_API_KEY=your-google-ai-api-key +NEO4J_URI=neo4j+s://your-instance.databases.neo4j.io +NEO4J_USERNAME=neo4j +NEO4J_PASSWORD=your-neo4j-password +``` + +5. **Run the application** +```bash +fastapi dev main.py +``` + +The API will be available at `http://localhost:8000` + +## βš™οΈ Configuration + +### Chunking Settings (crawlProcess.py) +```python +MAX_CHUNK_SIZE = 5000 # Characters per chunk +CHUNK_OVERLAP = 500 # Overlap between chunks +``` + +### Crawler Settings (web_spider_new.py) +```python +custom_settings = { + 'ROBOTSTXT_OBEY': False, + 'CONCURRENT_REQUESTS': 1, + 'DOWNLOAD_DELAY': 1, + 'DOWNLOAD_TIMEOUT': 30, + 'CLOSESPIDER_PAGECOUNT': 3, # Max pages per crawl +} +``` + +## πŸ”Œ API Endpoints + +### Crawl a Website +```http +POST /api/v1/crawl?keyword={url}&url_list={additional_urls} +``` +Crawls the specified URL(s), generates knowledge graph, and creates summary. + +**Parameters:** +- `keyword` (string): Main URL/domain to crawl +- `url_list` (array): Additional URLs to include + +**Response:** +```json +{ + "status": "success", + "keyword_id": "507f1f77bcf86cd799439011", + "urls_crawled": 5, + "urls": ["https://example.com", ...], + "summary": "## Summary\n..." +} +``` + +### Query Knowledge Graph +```http +GET /api/v1/dicission?keywordId={id}&user_prompt={question} +``` +Ask questions about the crawled data using natural language. + +**Parameters:** +- `keywordId` (string): MongoDB ObjectId of the keyword +- `user_prompt` (string): Your question in natural language + +**Response:** +```json +{ + "status": "success", + "message": "Based on the knowledge graph analysis..." +} +``` + +### Get Full Details +```http +GET /api/v1/keyword/full?keyword={id} +``` +Returns all crawled data, content, and summary for a keyword. + +### Get All Keywords +```http +GET /api/v1/keyword/all +``` +Returns list of all previously crawled keywords. + +### Delete Crawl Data +```http +DELETE /api/v1/keyword/{id} +``` +Deletes all data associated with a keyword ID. + +### Health Check +```http +GET /api/v1/test +``` +Returns `{"status": 200}` if API is running. + +## πŸ”„ How It Works + +### Complete Workflow + +``` +1. User Request (keyword + optional URLs) + ↓ +2. Store/Check Keyword in MongoDB + ↓ +3. Web Crawl (Scrapy subprocess) + β€’ Extract text content + β€’ Collect images + β€’ Find social media links + β€’ Store in MongoDB (sitesData) + ↓ +4. AI Agent Processing (LangGraph) + β”œβ”€β†’ getCrawlContent(): Fetch from MongoDB + β”‚ └─→ If content > 5000 chars: CHUNK + β”‚ β”œβ”€β†’ Split into overlapping chunks + β”‚ β”œβ”€β†’ Process each chunk with LLM + β”‚ β”œβ”€β†’ Merge partial KGs + β”‚ └─→ Save to Neo4j + β”‚ + └─→ createKG(): Generate Knowledge Graph + └─→ If small content: Direct LLM processing + └─→ Save to Neo4j + ↓ +5. Generate AI Summary + ↓ +6. Return Results to User +``` + +### Knowledge Graph Structure + +**Nodes:** +```json +{ + "label": "Company", + "name": "SLT Mobitel", + "properties": { + "type": "Telecommunications", + "country": "Sri Lanka" + } +} +``` + +**Edges:** +```json +{ + "from": "SLT Mobitel", + "type": "PROVIDES", + "to": "Fiber Internet", + "properties": {} +} +``` + +## πŸ“ Project Structure + +``` +webcrawl/ +β”œβ”€β”€ main.py # FastAPI application & endpoints +β”œβ”€β”€ crawlProcess.py # Core processing logic & AI agents +β”œβ”€β”€ web_crawl_runner.py # Scrapy subprocess runner +β”œβ”€β”€ requirements.txt # Python dependencies +β”œβ”€β”€ .env # Environment variables +β”‚ +β”œβ”€β”€ connection/ +β”‚ β”œβ”€β”€ database.py # MongoDB connection +β”‚ └── mongocon.py # MongoDB utilities +β”‚ +β”œβ”€β”€ model/ +β”‚ β”œβ”€β”€ keyword.py # Keyword collection +β”‚ β”œβ”€β”€ siteData.py # Site data collection +β”‚ └── summary.py # Summary collection +β”‚ +β”œβ”€β”€ schema/ +β”‚ β”œβ”€β”€ keywordSchema.py # Pydantic models for keywords +β”‚ β”œβ”€β”€ fullDetailsSchema.py # Full response schema +β”‚ β”œβ”€β”€ sitesDataSchema.py # Site data schema +β”‚ └── summarySchema.py # Summary schema +β”‚ +β”œβ”€β”€ service/ +β”‚ └── privousChats.py # Data retrieval services +β”‚ +β”œβ”€β”€ config/ +β”‚ β”œβ”€β”€ objectIdConterver.py # ObjectId converter +β”‚ └── get_schema.py # Schema utilities +β”‚ +β”œβ”€β”€ webscrapy/ +β”‚ └── webscrapy/ +β”‚ └── spiders/ +β”‚ β”œβ”€β”€ web_spider.py # Basic spider +β”‚ └── web_spider_new.py # Advanced spider with link following +β”‚ +└── googlesearchmethod/ + └── googlesearch.py # Google search integration +``` + +## πŸ’‘ Usage Examples + +### 1. Crawl a Company Website +```bash +curl -X POST "http://localhost:8000/api/v1/crawl?keyword=https://www.slt.lk" \ + -H "Content-Type: application/json" \ + -d '[]' +``` + +### 2. Crawl with Additional URLs +```bash +curl -X POST "http://localhost:8000/api/v1/crawl?keyword=https://example.com" \ + -H "Content-Type: application/json" \ + -d '["https://example.com/about", "https://example.com/products"]' +``` + +### 3. Query the Knowledge Graph +```bash +curl "http://localhost:8000/api/v1/dicission?keywordId=507f1f77bcf86cd799439011&user_prompt=What%20services%20does%20this%20company%20offer?" +``` + +### 4. Get Crawl Summary +```bash +curl "http://localhost:8000/api/v1/keyword/full?keyword=507f1f77bcf86cd799439011" +``` + +## 🐳 Docker Support + +```dockerfile +FROM python:3.13-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install -r requirements.txt +COPY . . +EXPOSE 8000 +CMD ["fastapi", "run", "main.py", "--host", "0.0.0.0"] +``` + +Build and run: +```bash +docker build -t webcrawl . +docker run -p 8000:8000 --env-file .env webcrawl +``` + +## πŸ”’ Security Notes + +- Never commit `.env` file to version control +- Use environment variables for all sensitive data +- The crawler respects `ROBOTSTXT_OBEY` setting (currently disabled for flexibility) +- Rate limiting is built-in via `DOWNLOAD_DELAY` + +## 🀝 Contributing + +1. Fork the repository +2. Create a feature branch (`git checkout -b feature/amazing-feature`) +3. Commit changes (`git commit -m 'Add amazing feature'`) +4. Push to branch (`git push origin feature/amazing-feature`) +5. Open a Pull Request + +## πŸ“„ License + +This project is licensed under the MIT License. + +## πŸ‘¨β€πŸ’» Author + +**Dineth Janitha** +- GitHub: [@dinethjanitha](https://github.com/dinethjanitha) + +--- + +⭐ Star this repo if you find it useful! diff --git a/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/data_level0.bin b/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/data_level0.bin new file mode 100644 index 0000000..63480db Binary files /dev/null and b/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/data_level0.bin differ diff --git a/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/header.bin b/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/header.bin new file mode 100644 index 0000000..bb54792 Binary files /dev/null and b/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/header.bin differ diff --git a/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/length.bin b/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/length.bin new file mode 100644 index 0000000..cb3e162 Binary files /dev/null and b/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/length.bin differ diff --git a/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/link_lists.bin b/chroma_db/74d2496a-d8f8-4990-a426-bd5ae932021e/link_lists.bin new file mode 100644 index 0000000..e69de29 diff --git a/chroma_db/chroma.sqlite3 b/chroma_db/chroma.sqlite3 new file mode 100644 index 0000000..d1607af Binary files /dev/null and b/chroma_db/chroma.sqlite3 differ diff --git a/crawlProcess.py b/crawlProcess.py index d6bbebb..c3187ed 100644 --- a/crawlProcess.py +++ b/crawlProcess.py @@ -25,137 +25,538 @@ from model.siteData import siteDataCollection from model.summary import summaryCollection -from neo4j import GraphDatabase +# ChromaDB for vector storage (replaces Neo4j) +import chromadb +from chromadb.config import Settings +from service.vector_embedding import generate_embeddings import subprocess import sys import json import re +import asyncio load_dotenv("./env") -URI = os.getenv("NEO4J_URI") -AUTH = (os.getenv("NEO4J_USERNAME"), os.getenv("NEO4J_PASSWORD")) - -# print(URI) -# print(AUTH) +CHROMA_KEY = os.getenv("CHROMA_API_KEY") +# Initialize ChromaDB client (persistent storage) +# CHROMA_PERSIST_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db") +chroma_client = chromadb.CloudClient( + api_key=CHROMA_KEY, + tenant='43f271a6-f843-4f7c-99ff-8e69900c3341', + database='Development' +) + +# Get or create collection for knowledge graph data +kg_collection = chroma_client.get_or_create_collection( + name="web_chats", + metadata={"description": "Web crawl content storage for semantic search"} +) llm = init_chat_model("gemini-2.5-flash", model_provider="google_genai") -# Agent to access neo4j -@tool -def queryNeo4J(cypher_query:str) -> dict: +# Error tracking for model/agent errors +error_log = [] + +def trackError(component: str, error_type: str, error_message: str, keywordId: str = None, details: dict = None): + """ + Track errors that occur during model/agent execution + + Args: + component: Where the error occurred (e.g., 'createKG', 'FullAutoAgent', 'LLM') + error_type: Type of error (e.g., 'JSONParseError', 'ValidationError', 'TimeoutError') + error_message: The error message + keywordId: Associated keyword ID if applicable + details: Additional details about the error + """ + error_entry = { + "timestamp": datetime.utcnow().isoformat(), + "component": component, + "error_type": error_type, + "error_message": str(error_message), + "keywordId": keywordId, + "details": details or {} + } + + error_log.append(error_entry) + + # Print formatted error + print("\n" + "πŸ”΄" * 40) + print(f" ERROR TRACKED:") + print(f" Component: {component}") + print(f" Type: {error_type}") + print(f" Message: {error_message}") + if keywordId: + print(f" Keyword ID: {keywordId}") + if details: + print(f" Details: {json.dumps(details, indent=2)}") + print("πŸ”΄" * 40 + "\n") + + return error_entry + + +def getErrorLog(component: str = None, keywordId: str = None): + """ + Retrieve error logs with optional filtering + + Args: + component: Filter by component name + keywordId: Filter by keyword ID + + Returns: + List of error entries + """ + filtered_errors = error_log + + if component: + filtered_errors = [e for e in filtered_errors if e["component"] == component] - """Get KG from Neo4j""" + if keywordId: + filtered_errors = [e for e in filtered_errors if e["keywordId"] == keywordId] + + return filtered_errors + + +def getErrorSummary(): + """ + Get a summary of all tracked errors + + Returns: + Dictionary with error statistics and recent errors + """ + if not error_log: + return { + "total_errors": 0, + "message": "No errors tracked" + } + + # Count by component + component_counts = {} + error_type_counts = {} + + for error in error_log: + comp = error["component"] + err_type = error["error_type"] + + component_counts[comp] = component_counts.get(comp, 0) + 1 + error_type_counts[err_type] = error_type_counts.get(err_type, 0) + 1 + + return { + "total_errors": len(error_log), + "errors_by_component": component_counts, + "errors_by_type": error_type_counts, + "recent_errors": error_log[-5:], # Last 5 errors + "all_errors": error_log + } + +import hashlib + +def hash_text(text: str) -> str: + """Generate SHA256 hash for deduplication""" + return hashlib.sha256(text.encode()).hexdigest() + + +def semantic_chunk(text: str, max_size=1200, overlap=150): + """ + Recursive character-based semantic chunking. + - max_size: ~1000-1500 chars best for RAG + - overlap: 100-200 chars for context retention + """ + + # Normalize whitespace + text = re.sub(r'\s+', ' ', text).strip() + + if len(text) <= max_size: + return [text] + + chunks = [] + start = 0 + while start < len(text): + end = start + max_size + + # Try to split at nearest sentence boundary + if end < len(text): + period_pos = text.rfind('.', start, end) + if period_pos != -1: + end = period_pos + 1 + + chunk = text[start:end].strip() + chunks.append(chunk) + + start = end - overlap # Overlap window + + return chunks + + +def saveContentToChromaDB(keywordId: str, content: str, source_url: str = None): + """ + Production RAG ingestion: + - Semantic chunking + - Overlap + - Hash deduplication + - Manual vector embeddings + """ + + print("\n" + "=" * 80) + print("STEP: Saving content to ChromaDB (Production Mode)") + print("=" * 80) + + if not content or len(content.strip()) < 10: + print("Content too short, skipping") + return + + try: + # ----------------------------- + # 1. Semantic Chunking + # ----------------------------- + chunks = semantic_chunk(content, max_size=1200, overlap=150) + print(f"πŸ“¦ Total semantic chunks generated: {len(chunks)}") + + ids = [] + documents = [] + metadatas = [] + vectors = [] # embeddings will go here + + for i, chunk in enumerate(chunks): + chunk_hash = hash_text(keywordId + "_" + chunk) + + # Skip if this hash already exists (dedup) + existing = kg_collection.get(ids=[chunk_hash]) + if existing and existing.get("documents"): + print(f"⏭️ Skipping duplicate chunk {i}") + continue + + # --------------------------------- + # 2. Generate embedding manually + # --------------------------------- + # πŸ‘‡πŸ‘‡πŸ‘‡ Replace this with YOUR embedding model + vector = generate_embeddings(chunk) # <--- YOU MUST IMPLEMENT THIS + # Example: vector = embedding_client.embed_text(chunk) + + # --------------------------------- + # 3. Prepare for insertion + # --------------------------------- + ids.append(chunk_hash) + documents.append(chunk) + vectors.append(vector) + + + metadatas.append({ + "keywordId": keywordId, + "chunk_index": i, + "source_urls": source_url, + "hash": chunk_hash, + "embedding_model": "voyage-3.5" + }) + + # --------------------------------- + # 4. Insert into Chroma + # --------------------------------- + if documents: + print(f"πŸš€ Saving {len(documents)} chunks into ChromaDB...") + kg_collection.add( + ids=ids, + documents=documents, + metadatas=metadatas, + embeddings=vectors + ) + print("βœ… Completed ChromaDB save!") + else: + print("πŸ” No new chunks to save (all duplicates).") + + return {"chunks_saved": len(documents)} + + except Exception as e: + print(f"Error saving to ChromaDB: {e}") + import traceback + traceback.print_exc() + raise + + +# ChromaDB Query Tool - Semantic search on stored content +@tool +def queryKnowledgeGraph(query: str, keywordId: str = None, n_results: int = 10) -> dict: + """ + Search the stored content using semantic similarity. + + Args: + query: Search query (natural language) + keywordId: Optional - filter by specific keyword ID + n_results: Number of results to return (default 10) + + Returns: + Relevant content chunks matching the query + """ print("\n" + "=" * 80) - print("STEP 1.*: Getting details from Neo4j...") + print("STEP: Querying ChromaDB for relevant content...") print("=" * 80) + print(f" Query: {query}") + print(f" KeywordId filter: {keywordId or 'None'}") - # print("NeoStart") - with GraphDatabase.driver(URI, auth=AUTH) as driver: - with driver.session() as session: - result = session.run(cypher_query) - records = [record.data() for record in result] - return records + try: + # Build where filter if keywordId provided + where_filter = None + if keywordId: + where_filter = {"keywordId": keywordId} + + # Generate query embedding using the same model as storage + query_vector = generate_embeddings(query) + + # Query ChromaDB with semantic search using embedding + results = kg_collection.query( + query_embeddings=[query_vector], # Use embedding instead of text + n_results=n_results, + where=where_filter, + include=["documents", "metadatas", "distances"] + ) + + if not results or not results.get("documents") or not results["documents"][0]: + print(" ⚠️ No results found") + return {"results": [], "message": "No matching content found"} + + # Process results + processed_results = [] + documents = results["documents"][0] + metadatas = results["metadatas"][0] + distances = results.get("distances", [[]])[0] + + for i, (doc, meta) in enumerate(zip(documents, metadatas)): + # Calculate relevance score (1 = most relevant, 0 = least) + relevance = 1 - distances[i] if i < len(distances) else 0 + + result = { + "content": doc, + "relevance_score": round(relevance, 3), + "keywordId": meta.get("keywordId", ""), + "chunk_index": meta.get("chunk_index", 0), + "total_chunks": meta.get("total_chunks", 1) + } + + # Add source URLs if available + source_urls = meta.get("source_urls", "[]") + try: + result["source_urls"] = json.loads(source_urls) + except: + result["source_urls"] = [] + + processed_results.append(result) + + print(f" βœ… Found {len(processed_results)} relevant content chunks") + return {"results": processed_results} + + except Exception as e: + error_msg = str(e) + print(f" ❌ ChromaDB error: {error_msg}") + return {"error": f"Query error: {error_msg}"} + + +@tool +def getFullKnowledgeGraph(keywordId: str) -> dict: + """ + Get all stored content for a specific keyword ID. + + Args: + keywordId: The keyword ID to retrieve content for + + Returns: + All content chunks stored for this keyword + """ + print("\n" + "=" * 80) + print(f"STEP: Getting all content for keywordId: {keywordId}") + print("=" * 80) + + try: + # Query all documents for this keywordId + all_results = kg_collection.get( + where={"keywordId": keywordId}, + include=["documents", "metadatas"] + ) + + if not all_results or not all_results.get("documents"): + print(" ⚠️ No content found for this keywordId") + return {"content": "", "chunks": [], "message": "No content found"} + + # Sort chunks by index and combine + chunks_with_meta = [] + for doc, meta in zip(all_results["documents"], all_results["metadatas"]): + chunks_with_meta.append({ + "content": doc, + "chunk_index": meta.get("chunk_index", 0), + "source_urls": json.loads(meta.get("source_urls", "[]")) + }) + + # Sort by chunk index + chunks_with_meta.sort(key=lambda x: x["chunk_index"]) + + # Combine all content + full_content = "\n\n".join([c["content"] for c in chunks_with_meta]) + + # Get unique source URLs + all_urls = [] + for chunk in chunks_with_meta: + all_urls.extend(chunk.get("source_urls", [])) + unique_urls = list(set(all_urls)) + + print(f" βœ… Found {len(chunks_with_meta)} chunks, {len(full_content)} total characters") + return { + "content": full_content, + "chunks": chunks_with_meta, + "total_chunks": len(chunks_with_meta), + "source_urls": unique_urls + } + + except Exception as e: + error_msg = str(e) + print(f" ❌ Error: {error_msg}") + return {"error": f"Failed to get content: {error_msg}"} # Agent for make decision @tool -def makeDecisionFromKG(query: str) -> str: +def makeDecisionFromKG(query: str, context: str = "") -> str: """ - Ask the LLM to make a decision based on knowledge graph data. + Ask the LLM to analyze knowledge graph data and make a decision. + + Args: + query: The question to answer + context: Optional context/data from knowledge graph """ reasoning_prompt = f""" - You are an intelligent analyst with Neo4j. + You are an intelligent analyst helping answer questions based on knowledge graph data. Question: {query} + + Context/Data: + {context if context else "No additional context provided."} - Analyze the relationships, infer insights, and give a concise, logical answer. + Analyze the information, infer insights, and give a clear, helpful answer. + If you don't have enough information, say so clearly. """ print("\n" + "=" * 80) - print("STEP 1.*: Making Decision From Neo4j KG...") + print("STEP 1.*: Making Decision from KG data...") print("=" * 80) - # print("In here nowwww") - response = llm.invoke([HumanMessage(content=reasoning_prompt)]) return response.content +# ⚑ FAST Direct Query - No Agent overhead (single LLM call) +async def fast_query(keywordId: str, user_prompt: str) -> str: + """ + Fast decision making - bypasses agent for speed. + 1. Query ChromaDB directly + 2. Single LLM call to answer + + ~2-5 seconds instead of 15-30 seconds with agent + """ + print("\n" + "=" * 80) + print("⚑ FAST QUERY MODE") + print("=" * 80) + + import time + start_time = time.time() + + # Step 1: Query ChromaDB directly (fast - no LLM) + print("πŸ“Š Step 1: Querying ChromaDB...") + try: + # Generate query embedding using the same model as storage + query_vector = generate_embeddings(user_prompt) + + results = kg_collection.query( + query_embeddings=[query_vector], # Use embedding instead of text + n_results=5, + where={"keywordId": keywordId}, + include=["documents", "metadatas", "distances"] + ) + + if not results or not results.get("documents") or not results["documents"][0]: + return "No relevant content found for this query." + + # Combine relevant content + documents = results["documents"][0] + context = "\n\n---\n\n".join(documents[:5]) # Top 5 results + + print(f" βœ… Found {len(documents)} relevant chunks") + + except Exception as e: + print(f" ❌ ChromaDB error: {e}") + return f"Error querying content: {str(e)}" + + query_time = time.time() - start_time + print(f" ⏱️ Query time: {query_time:.2f}s") + + # Step 2: Single LLM call to answer (fast model) + print("πŸ€– Step 2: Generating answer...") + llm_start = time.time() + + prompt = f"""Based on the following crawled web content, answer the user's question. + +USER QUESTION: {user_prompt} + +RELEVANT CONTENT: +{context} + +INSTRUCTIONS: +- Answer based ONLY on the provided content +- Be concise and helpful +- Use markdown formatting for readability +- If the content doesn't contain the answer, say so clearly + +ANSWER:""" + + try: + response = llm.invoke([HumanMessage(content=prompt)]) + answer = response.content + except Exception as e: + print(f" ❌ LLM error: {e}") + return f"Error generating answer: {str(e)}" + + llm_time = time.time() - llm_start + total_time = time.time() - start_time + + print(f" ⏱️ LLM time: {llm_time:.2f}s") + print(f" ⏱️ Total time: {total_time:.2f}s") + print("=" * 80) + + print("answer") + print("answer") + print("answer") + print("answer") + print("answer") + print(answer) + + return answer + + async def ReasoningAgent(): SYSTEM_PROMPT = """ - You are an intelligent AI reasoning agent connected to a Neo4j Knowledge Graph. - Your capabilities: - - Discover schema elements (labels, relationship types, property keys) when the user doesn't know exact KG keywords. - - Generate Cypher queries that use fuzzy/partial matching to find relevant nodes and relationships. - - Analyze query results and make decisions or summaries using the tool `makeDecisionFromKG`. - - Link node by similarity and check again with that and find how relation in it neo4j if you look it as another type make query with it - + You are an intelligent AI reasoning agent that answers questions using crawled web content stored in ChromaDB. + Tools available: - 1. queryNeo4J(query: str) β€” Execute Cypher queries on Neo4j and return results. - 2. makeDecisionFromKG(data: dict) β€” Analyze Neo4j query results and make a decision or summary. - - High-level rules: - - Always start by discovering schema candidates relevant to the user's query (labels, relationship types, property keys) before issuing content queries. - - NEVER hallucinate labels, relationship types, or properties that are not discoverable in the graph. Use actual results from Neo4j to decide. - - Prefer safe, read-only Cypher (MATCH, RETURN, CALL db.*) unless explicitly asked to write. - - Use fuzzy matching (`CONTAINS`, `toLower()`, or case-insensitive regex) when matching user terms to schema elements or data values. - - If no matches are found, report that clearly and provide suggested alternative search terms, synonyms, or explain how the user could rephrase. - - Schema-discovery queries (Neo4j-native): - - List all relationship types: - CALL db.relationshipTypes() YIELD relationshipType RETURN relationshipType; - - List all labels: - CALL db.labels() YIELD label RETURN label; - - List all property keys: - CALL db.propertyKeys() YIELD propertyKey RETURN propertyKey; - - Fuzzy-search templates (replace ): - - Find relationship types matching a user term: - MATCH ()-[r]-() - WHERE toLower(type(r)) CONTAINS toLower('') - RETURN DISTINCT type(r) AS relType, count(r) AS occurrences - ORDER BY occurrences DESC; - - Find labels that match a user term: - CALL db.labels() YIELD label - WHERE toLower(label) CONTAINS toLower('') - RETURN label; - - Find nodes whose properties match a user term: - MATCH (n) - WHERE any(k IN keys(n) WHERE toString(n[k]) =~ '(?i).*.*') - RETURN labels(n) AS labels, n AS node, size(keys(n)) AS propertyCount; - - Once a candidate relationship or label is found, fetch content nodes: - MATCH (a)-[r:``]->(b) - RETURN labels(a) AS fromLabels, a.name AS fromName, - type(r) AS rel, - labels(b) AS toLabels, b.name AS toName; - - When you find candidate relationship types or labels: - - Return a short ranked list of best matches (relType or label, count of occurrences). - - Automatically run a follow-up content query on the top candidates and summarize results using `makeDecisionFromKG`. - - Fallback behavior: - - If no schema or data matches are found for the user term: - - Return: "No matching labels or relationship types found for '' in the knowledge graph." - - Provide 2–4 suggested synonyms or alternate search terms the user might try. - - Suggest an explicit schema-discovery run (CALL db.* queries) if permitted by the user. - - Safety and precision: - - Always put the user term into safe, parameterized Cypher or escape user input properly to avoid syntax issues. - - Prefer `toLower(... ) CONTAINS toLower(...)` for robust partial matching. Use regex `=~ '(?i).*term.*'` only when needed. - - Response style: - - Be clear, structured, and logical. - - For schema discovery steps, show the query used and the succinct ranked results (up to 5 candidates). - - For content queries, summarize findings and pass the raw results to `makeDecisionFromKG` for final interpretation. - """ - - tools = [queryNeo4J, makeDecisionFromKG] + 1. queryKnowledgeGraph(query, keywordId, n_results) β€” Semantic search for relevant content + 2. getFullKnowledgeGraph(keywordId) β€” Get all content for a specific crawl session + 3. makeDecisionFromKG(query, context) β€” Analyze data and provide insights + + Workflow: + 1. Use queryKnowledgeGraph to search for relevant information based on the user's question + 2. If you need more complete context, use getFullKnowledgeGraph with the keywordId + 3. Use makeDecisionFromKG to analyze the retrieved content and formulate your answer + + Tips: + - Use natural language queries - ChromaDB performs semantic similarity search + - Filter by keywordId when you know the specific crawl session to query + - The content returned is the actual crawled web text, use it to answer questions + - Combine multiple search results for comprehensive answers + + Never mention internal keywordId in your responses to users. + Be helpful, accurate, and provide clear, well-structured answers based on the crawled content. + """ + + tools = [queryKnowledgeGraph, getFullKnowledgeGraph, makeDecisionFromKG] agent = create_agent( model=llm, @@ -300,141 +701,172 @@ async def test_decision(keywordId: str , user_prompt:str): @tool -async def getCrawlContent(keywordId:str) -> str: - - """Fetch crawl text data by keyword ID (string). Returns all combined text content for that keyword.""" +async def getCrawlContent(keywordId: str): + """ + Fetch crawl text data by keyword ID. + Each crawled document (URL) is saved separately to ChromaDB. + """ print("\n" + "=" * 80) - print("STEP 5.*: Getting crawling content from database...") + print("STEP 5: Loading crawled pages and saving individually to ChromaDB...") print("=" * 80) now = datetime.utcnow() - ten_minutes_ago = now - timedelta(minutes=10) - - siteDataResults = await siteDataCollection.find({'keywordId' : ObjectId(keywordId) , 'createdAt': {'$gte': ten_minutes_ago} }).to_list(length=None) - - content = [] - for document in siteDataResults: - content.append(document['content']) - print("content") - print(len(content)) - if len(content) > 0 : - joinAllContent = "".join(content) - print(f"Total content length: {len(joinAllContent)} characters") - return joinAllContent - else : - return "" - + ten_minutes_ago = now - timedelta(minutes=6) -@tool -def createKG(content:str , keywordId:str) -> object: - """After get crawl content create Knowledge Graph and return Knowledge Graph JSON format """ + # Fetch all relevant site data + siteDataResults = await siteDataCollection.find({ + 'keywordId': ObjectId(keywordId), + 'createdAt': {'$gte': ten_minutes_ago, '$lte': now} + }).to_list(None) - print("\n" + "=" * 80) - print("STEP 5.*: Creating Knowledge Graph...") - print("=" * 80) + print(f"πŸ“Š Found {len(siteDataResults)} documents in DB") - prompt_template = """ - You are an expert in extracting structured knowledge from text. - - Input: {crawl_text} - - Task: - - Identify all nodes (entities) and relationships (edges) mentioned in the text. - - Output ONLY valid JSON in this format: - - All letters should be simple letters - - {{ - "nodes": [ - {{ - "label": "", - "name": "", - "properties": {{"key": "value"}} - }} - ], - "edges": [ - {{ - "from": "", - "type": "", - "to": "", - "properties": {{"key": "value"}} - }} - ] - }} - """ - - - prompt = PromptTemplate( - input_variables=["crawl_text"], - template=prompt_template, - ) + if not siteDataResults: + print("⚠️ No crawl documents found.") + return [] - full_prompt = prompt.format_prompt( - crawl_text=content - ) + all_contents_preview = [] - try: - print("Generating JSON schema for create knowledge graph... ") - llm_response = llm.invoke(full_prompt) - - clean_text = re.sub(r"^```json\s*|\s*```$", "", llm_response.content.strip()) + for doc_index, document in enumerate(siteDataResults): - json_out = json.loads(clean_text) - except Exception as e: - print(e) - raise HTTPException(status_code=500, detail="Internal server error!") - - print(llm_response.content) + content = document.get("content") + url = document.get("siteUrl") + + if not content: + continue + + print(f"\n--- Document {doc_index+1}/{len(siteDataResults)} ---") + print(f"🌐 URL: {url}") + print(f"πŸ“ Raw content length: {len(content)} chars") - saveKGToNeo4j(keywordId , json_out) - return json_out + all_contents_preview.append(content[:200]) + try: + saveContentToChromaDB( + keywordId=keywordId, + content=content, + source_url=url # per-document metadata + ) + print("Saved to ChromaDB") + except Exception as e: + print(f"Error saving to ChromaDB: {e}") + trackError( + component="getCrawlContent->saveContentToChromaDB", + error_type=type(e).__name__, + error_message=str(e), + keywordId=keywordId, + details={"url": url, "content_length": len(content)} + ) + + return { + "documents_processed": len(siteDataResults), + "previews": all_contents_preview + } + + + +@tool +def createKG(content:str , keywordId:str) -> object: + """Process content and confirm it's stored in ChromaDB. Content is automatically saved by getCrawlContent.""" -def saveKGToNeo4j(keywordId: str, kg_json: dict): print("\n" + "=" * 80) - print("STEP 5.*: Saving KG in Neo4j...") + print("STEP 6: Content Processing Confirmation") print("=" * 80) + print(f"πŸ€– Agent called createKG tool for keywordId: {keywordId}") + + # Content is already saved to ChromaDB by getCrawlContent + # This tool now just confirms the storage and returns status + + if not content or len(content.strip()) < 10: + print("⚠️ Content is empty or too short") + return { + "status": "no_content", + "message": "No content available to process", + "keywordId": keywordId + } + + content_length = len(content) + print(f"βœ… Content processed: {content_length} characters") + print(f" Preview: {content[:200]}...") + + # Verify content is in ChromaDB + try: + results = kg_collection.get( + where={"keywordId": keywordId}, + include=["metadatas"] + ) + doc_count = len(results.get("ids", [])) + print(f"βœ… Verified {doc_count} documents in ChromaDB for keywordId: {keywordId}") + + return { + "status": "success", + "message": f"Content stored and indexed in ChromaDB", + "keywordId": keywordId, + "content_length": content_length, + "documents_stored": doc_count + } + except Exception as e: + print(f"⚠️ Could not verify ChromaDB storage: {str(e)}") + return { + "status": "stored", + "message": "Content was saved (verification failed)", + "keywordId": keywordId, + "content_length": content_length + } - with GraphDatabase.driver(URI, auth=AUTH) as driver: - with driver.session() as session: - try: - # Delete old graph for this keyword - session.run("MATCH (n {keywordId: $id}) DETACH DELETE n", {"id": keywordId}) - - # Create all nodes - for node in kg_json["nodes"]: - label = node["label"] - name = node["name"] - properties = node.get("properties", {}) - properties.update({"name": name, "keywordId": keywordId}) - prop_str = ", ".join([f"{k}: ${k}" for k in properties.keys()]) - session.run(f"CREATE (n:{label} {{ {prop_str} }})", properties) - - # Create relationships - for edge in kg_json["edges"]: - rel_type = re.sub(r"[^A-Za-z0-9_]", "_", edge["type"]).upper() - props = edge.get("properties", {}) - props["keywordId"] = keywordId - props["from"] = edge["from"] - props["to"] = edge["to"] - - session.run(f""" - MATCH (a {{name: $from, keywordId: $keywordId}}), - (b {{name: $to, keywordId: $keywordId}}) - CREATE (a)-[r:{rel_type} {{keywordId: $keywordId}}]->(b) - """, props) - - except Exception as e: - print(" Neo4j error:", e) - raise HTTPException(status_code=500, detail=f"Neo4j error: {e}") + +def deleteKGFromChromaDB(keywordId: str): + """ + Delete all KG data for a specific keywordId from ChromaDB. + """ + print(f"\nπŸ—‘οΈ Deleting KG data for keywordId: {keywordId}") + + try: + # Get all documents with this keywordId + results = kg_collection.get( + where={"keywordId": keywordId}, + include=["metadatas"] + ) + + if results and results.get("ids"): + ids_to_delete = results["ids"] + kg_collection.delete(ids=ids_to_delete) + print(f" βœ… Deleted {len(ids_to_delete)} documents") + return {"deleted": len(ids_to_delete)} + else: + print(f" ⚠️ No documents found for this keywordId") + return {"deleted": 0} + + except Exception as e: + print(f" ❌ Delete error: {str(e)}") + return {"error": str(e)} async def MyAgent(): SYSTEM_PROMPT = """ - You are an intelligent agent that can gather crawl data by keyword and create knowledge graphs automatically. - You have access to two tools: - - getCrawlContent: fetches all crawl text for a given keyword ID. - - createKG: converts raw text into a structured knowledge graph. + You are an intelligent agent that processes and stores crawled web content. + + YOUR WORKFLOW (SIMPLE 2-STEP PROCESS): + 1. First, call getCrawlContent(keywordId) to fetch and store the crawled text data + - This automatically saves content to ChromaDB for semantic search + 2. Then, call createKG(content, keywordId) to confirm storage completion + + IMPORTANT RULES: + - Always use BOTH tools in sequence + - Pass the keywordId as a STRING (not ObjectId) + - Pass the full content text to createKG + - Report when each step is completed + + AVAILABLE TOOLS: + - getCrawlContent(keywordId: str) -> Fetches crawled text AND saves to ChromaDB + - createKG(content: str, keywordId: str) -> Confirms storage and returns status + + Example flow for keywordId "507f1f77bcf86cd799439011": + 1. Call: getCrawlContent("507f1f77bcf86cd799439011") + 2. Receive: "SLT Mobitel offers fiber internet..." + 3. Call: createKG("SLT Mobitel offers fiber internet...", "507f1f77bcf86cd799439011") + 4. Report: "Content stored in ChromaDB successfully" """ checkpointer = InMemorySaver() @@ -444,38 +876,98 @@ async def MyAgent(): model=llm, system_prompt=SYSTEM_PROMPT, tools=tools, - # checkpointer=checkpointer + checkpointer=checkpointer ) - return agent # Run Agent -async def FullAutoAgent(keywordId: str): - +async def FullAutoAgent(keywordId): + """ + Run agent to create Knowledge Graph with error tracking + """ + keywordId_str = str(keywordId) + print("\n" + "=" * 80) - print("STEP 5.1: Calling Agents") + print(f"STEP 5.1: Calling Agents for keywordId: {keywordId_str}") print("=" * 80) - agent_executor = await MyAgent() - - print("keywordId") - print(keywordId) - - # Step 1 + 2 + 3: Crawl content β†’ Create KG - response = await agent_executor.ainvoke( - { - "messages": [ - {"role": "user", "content": f"Generate a knowledge graph for keyword ID {keywordId}"} - ] - }, - config={"configurable": {"thread_id": "kg_1"}} - ) - - - # Step 4: Save to Neo4j - print("Knowledge Graph saved to Neo4j successfully.") + + try: + agent_executor = await MyAgent() + + print(f"πŸ€– Invoking agent with keywordId: {keywordId_str}") + print(f" Agent will: 1) Get crawl content, 2) Create KG (with auto-chunking if needed)") + + # Step 1 + 2 + 3: Crawl content β†’ Create KG + response = await agent_executor.ainvoke( + { + "messages": [ + {"role": "user", "content": f"Generate a knowledge graph for keyword ID {keywordId_str}"} + ] + }, + config={"configurable": {"thread_id": f"kg_{keywordId_str}"}} + ) - return response + print(response) + # Check if response is valid + if not response or "messages" not in response: + error_msg = "Agent returned invalid response structure" + trackError( + component="FullAutoAgent", + error_type="InvalidAgentResponse", + error_message=error_msg, + keywordId=keywordId_str, + details={ + "response_type": type(response).__name__, + "response_keys": list(response.keys()) if isinstance(response, dict) else "Not a dict" + } + ) + return { + "status": "failed", + "reason": error_msg, + "keywordId": keywordId_str + } + + # Log successful execution + messages = response.get("messages", []) + print(f"\n Agent completed successfully with {len(messages)} messages") + + return response + + except TimeoutError as e: + trackError( + component="FullAutoAgent", + error_type="TimeoutError", + error_message=f"Agent execution timed out: {str(e)}", + keywordId=keywordId_str, + details={"timeout_duration": "unknown"} + ) + print(f" Agent timeout for keywordId: {keywordId_str}") + return { + "status": "failed", + "reason": "Agent execution timed out", + "keywordId": keywordId_str + } + + except Exception as e: + trackError( + component="FullAutoAgent", + error_type=type(e).__name__, + error_message=str(e), + keywordId=keywordId_str, + details={ + "exception_type": type(e).__name__, + "traceback": __import__('traceback').format_exc() + } + ) + print(f" Error in FullAutoAgent: {e}") + import traceback + traceback.print_exc() + return { + "status": "failed", + "reason": str(e), + "keywordId": keywordId_str + } # Stored Keyword in mongoDB @@ -536,40 +1028,39 @@ async def getKeywordByDomain(url): # Add urls to keyword document -async def storeRelevantUrls(keywordId , urls_list): +# async def storeRelevantUrls(keywordId): - try: - keywordDetails = await getKeywordById(keywordId) +# try: +# keywordDetails = await getKeywordById(keywordId) - keyword = keywordDetails["keyword"] - # siteDomain = keywordDetails["siteDomain"] +# keyword = keywordDetails["keyword"] - # results = googlesearch(keyword , siteDomain) +# results = googlesearch(keyword) - # urlList = [] +# urlList = [] - # for item in results.get("items", []): - # print(f"Title: {item['title']}") - # urlList.append(item['link']) - # print(f"Link: {item['link']}\n") +# for item in results.get("items", []): +# print(f"Title: {item['title']}") +# urlList.append(item['link']) +# print(f"Link: {item['link']}\n") - print(urls_list) +# # print(urls_list) - updatedValues = await keyword_collection.update_one( - {"_id": ObjectId(keywordId)}, - {"$push": {"urls": {"$each": urls_list}}} - ) - print("Updated Values") - print(updatedValues) +# updatedValues = await keyword_collection.update_one( +# {"_id": ObjectId(keywordId)}, +# {"$push": {"urls": {"$each": urlList}}} +# ) +# print("Updated Values") +# print(updatedValues) - if updatedValues.acknowledged: - print("Update successful!") - result = keywordId - return result - return None - except Exception as e: - print(e) - return None +# if updatedValues.acknowledged: +# print("Update successful!") +# result = keywordId +# return result +# return None +# except Exception as e: +# print(e) +# return None # Crawl web data using subprocess @@ -716,11 +1207,16 @@ async def exec(keyword , url_list): resultMongo = await getKeywordById(storedKeywordId) keywordId = resultMongo["_id"] # Step 3: Fetch Google URLs - # print("\n" + "=" * 80) - # print("STEP 3: Fetching Google search URLs") - # print("=" * 80) - # updatedKey = await storeRelevantUrls(storedKeyword.inserted_id) + + + # if not url_list or len(url_list) == 0: + # print("\n" + "=" * 80) + # print("STEP 3: Fetching Social media data from google search URLs") + # print("=" * 80) + # print("Finding in here!") + # await storeRelevantUrls(keywordId) + # if not keywordId: # print("ERROR: Failed to store URLs") # return {"error": "Failed to fetch URLs from Google"} @@ -736,8 +1232,14 @@ async def exec(keyword , url_list): # return {"error": "No URLs found in Google search results"} url = updatedDetails["keyword"] - # url_list = updatedDetails["urls"] + # url_list_search = updatedDetails["urls"] + + urls = [url] + + # if url_list_search and len(url_list_search) > 0: + # urls += url_list_search + if url_list and len(url_list) > 0: print("\n" + "=" * 80) print("STEP 3.1: Manual added url fined crawling started with it!") @@ -771,7 +1273,7 @@ async def exec(keyword , url_list): resultAgent = await FullAutoAgent(keywordId) print("------------------------\n Result Agent\n------------------------") - print(resultAgent) + # print(resultAgent) # Step 5: Summarize (only if crawl succeeded) print("\n" + "=" * 80) print("STEP 6: Generating AI summary") diff --git a/googlesearchmethod/googlesearch.py b/googlesearchmethod/googlesearch.py index 0ca07af..166e5bb 100644 --- a/googlesearchmethod/googlesearch.py +++ b/googlesearchmethod/googlesearch.py @@ -8,17 +8,17 @@ api_key = os.getenv("GOOGLE_SEARCH_API_KEY") cse_id = os.getenv("CUSTOM_SEARCH_ENGIN_ID") -def googlesearch(keyword , siteDomain): - if siteDomain == "" or siteDomain == None: - siteDomain = "com" +def googlesearch(keyword): + + query = f'{keyword} site:facebook.com OR site:instagram.com OR site:linkedin.com OR site:twitter.com OR site:x.com OR site:youtube.com OR site:tiktok.com OR site:threads.net' + try : service = build("customsearch", "v1", developerKey=api_key) results = service.cse().list( - q=keyword, + q=query, cx=cse_id, - num=10, # Number of results to return (max 10) + num=5, # Number of results to return (max 10) cr="sri lanka", - siteSearch=siteDomain ).execute() except Exception as e : print(e) diff --git a/main.py b/main.py index 610386e..bd1b804 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,7 @@ from typing import Union from fastapi import FastAPI , HTTPException, status from fastapi.middleware.cors import CORSMiddleware -from crawlProcess import exec ,test_decision # Assuming these are in other files +from crawlProcess import exec, test_decision, fast_query # Added fast_query from service.privousChats import getAllDetailsById , getAllPreviousKeywords , deletePreviousCrawl # from testdb import getKeywordAll , getKeywordById # Assuming these are in other files from schema.keywordSchema import Keyword , KeywordOut # Assuming these are in other files @@ -98,13 +98,29 @@ async def testTwo(keyword: str, url_list: list[str]): return result -@app.get("/api/v1/dicission") +@app.get("/api/v1/query") async def testDesi(keywordId:str , user_prompt:str): result = await test_decision(keywordId, user_prompt) return result + +# ⚑ FAST endpoint - 3-5x faster than /dicission +@app.get("/api/v1/dicission") +async def fastQuery(keywordId: str, user_prompt: str): + """ + Fast query endpoint - bypasses agent for speed. + Use this for quick Q&A on crawled content. + """ + result = await fast_query(keywordId, user_prompt) + return { + "status" : "success", + "message" : result + } + + + @app.get("/test/{id}") def read(id:int , q: Union[str,None] = None): return {"item_id" : id , "q" : q} diff --git a/requirements.txt b/requirements.txt index 5d17cfc..cf27824 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/service/privousChats.py b/service/privousChats.py index 4c4a568..492e867 100644 --- a/service/privousChats.py +++ b/service/privousChats.py @@ -4,6 +4,9 @@ from bson import ObjectId from fastapi.responses import JSONResponse +# Import ChromaDB cleanup function +from crawlProcess import deleteKGFromChromaDB + @@ -80,18 +83,22 @@ async def getAllPreviousKeywords(): async def deletePreviousCrawl(id): try : + # Delete from MongoDB await keyword_collection.delete_many({"_id" : ObjectId(id)}) await siteDataCollection.delete_many({"keywordId" : ObjectId(id)}) await summaryCollection.delete_many({"keywordId" : ObjectId(id)}) + # Delete from ChromaDB + deleteKGFromChromaDB(id) + return JSONResponse( status_code=200, - content={"status" : "success"} + content={"status" : "success", "message": "Deleted from MongoDB and ChromaDB"} ) except Exception as e : print(e) return JSONResponse( status_code=400, - content={"status" : "fail"} + content={"status" : "fail", "error": str(e)} ) \ No newline at end of file diff --git a/service/vector_embedding.py b/service/vector_embedding.py new file mode 100644 index 0000000..fb1748f --- /dev/null +++ b/service/vector_embedding.py @@ -0,0 +1,25 @@ +import voyageai +from dotenv import load_dotenv +import time + +load_dotenv("./../.env") + + +def generate_embeddings(data): + """ + Generate embeddings for a list of text items using VoyageAI. + Each item should be a dict with 'text', 'id', and 'type'. + """ + try: + + vo = voyageai.Client() + + + result = vo.embed(data, model="voyage-3.5") + + # print(result.embeddings[0]) + + return result.embeddings[0] + except Exception as error: + print("Error occurred while generating embeddings!") + raise error diff --git a/webscrapy/webscrapy/spiders/web_spider_new.py b/webscrapy/webscrapy/spiders/web_spider_new.py index c6e0f68..b4814ba 100644 --- a/webscrapy/webscrapy/spiders/web_spider_new.py +++ b/webscrapy/webscrapy/spiders/web_spider_new.py @@ -8,6 +8,19 @@ import datetime from urllib.parse import urlparse, urljoin, urlunparse +""" +WebCrawSpider - Simple Web Crawler with Social Media Link Extraction + +Single URL Mode: +- Starts from the exact URL you provide +- Follows internal links found on that page (up to 100 pages) +- Extracts social media links but does NOT crawl them + +Multiple URL Mode: +- Crawls only the URLs you provide +- Does NOT follow any links +""" + load_dotenv("./.env") CONNECTION_STRING = os.getenv("CONNECTION_STRING") @@ -18,15 +31,15 @@ class WebCrawSpider(scrapy.Spider): # Custom settings custom_settings = { - 'ROBOTSTXT_OBEY': False, # Bypass robots.txt blocking - 'CONCURRENT_REQUESTS': 1, # Process URLs one at a time - 'DOWNLOAD_DELAY': 1, # 1 second between requests - 'DOWNLOAD_TIMEOUT': 30, # 30 second timeout per URL + 'ROBOTSTXT_OBEY': False, + 'CONCURRENT_REQUESTS': 1, + 'DOWNLOAD_DELAY': 1, + 'DOWNLOAD_TIMEOUT': 30, 'RETRY_ENABLED': True, 'RETRY_TIMES': 2, 'LOG_LEVEL': 'INFO', 'CLOSESPIDER_TIMEOUT': 0, - 'CLOSESPIDER_PAGECOUNT': 5, # Stop after 100 pages + 'CLOSESPIDER_PAGECOUNT': 3, # Disabled, manual limit } def __init__(self, start_urls=None, keywordId=None, *args, **kwargs): @@ -43,91 +56,29 @@ def __init__(self, start_urls=None, keywordId=None, *args, **kwargs): if not start_urls[i].startswith("https://") and not start_urls[i].startswith("http://"): start_urls[i] = "https://" + start_urls[i] + self.start_urls = start_urls self.keywordId = keywordId self.client = pymongo.MongoClient(CONNECTION_STRING) self.db = self.client['webcrawl'] self.collection = self.db['sitesData'] - self.collection2 = self.db['keyword'] - - # Get previously crawled URLs using aggregate - aggregate = [ - { - '$match': { - '_id': ObjectId(keywordId) - } - }, { - '$lookup': { - 'from': 'sitesData', - 'localField': '_id', - 'foreignField': 'keywordId', - 'as': 'sitesInfo' - } - }, { - '$project': { - '_id': 1, - 'keyword': 1, - 'urls': '$sitesInfo.siteUrl' - } - } - ] - - self.previous_crawled_urls = set() - - try: - result = list(self.collection2.aggregate(aggregate)) - - if result and len(result) > 0: - previous_crawled_urls_list = result[0].get("urls", []) - self.previous_crawled_urls = set(previous_crawled_urls_list) - print(f"Loaded {len(self.previous_crawled_urls)} previously crawled URLs") - else: - print("No previous crawled URLs found (new keyword)") - - except Exception as e: - print(f"Error loading previous URLs: {e}") - import traceback - traceback.print_exc() - self.previous_crawled_urls = set() - # remember original provided urls (before filtering) - original_count = len(start_urls) - original_start_urls = start_urls.copy() - - # Filter out already crawled URLs from start_urls - self.start_urls = [url for url in start_urls if url not in self.previous_crawled_urls] - skipped_count = original_count - len(self.start_urls) - - # === FIX: if user provided exactly ONE URL, don't silently drop it just because it exists in DB. - # We want single-url mode to run and follow internal links even if that URL was previously crawled. - if original_count == 1 and len(self.start_urls) == 0: - # restore the original single URL so spider runs in single_url_with_links mode - self.start_urls = [original_start_urls[0]] - # adjust skipped_count accordingly (we still mark it as previously crawled) - skipped_count = 1 if original_start_urls[0] in self.previous_crawled_urls else 0 - # === end fix - # Track progress and visited URLs self.processed_count = 0 self.success_count = 0 self.fail_count = 0 - self.skipped_count = skipped_count - self.visited_urls = set(self.previous_crawled_urls) + self.visited_urls = set() # URLs visited in THIS session only self.queued_urls = set() - self.max_pages = 100 + self.max_pages = 3 # Determine crawl mode - if len(self.start_urls) == 0: - self.crawl_mode = "none" - elif len(self.start_urls) == 1: + if len(self.start_urls) == 1: self.crawl_mode = "single_url_with_links" else: self.crawl_mode = "multiple_urls" # Extract allowed domains (include www and non-www versions) - # NOTE: use original_start_urls so domain checks align with the provided URL(s), - # even if we restored a single URL above. self.allowed_domains = set() - for url in original_start_urls: + for url in self.start_urls: parsed = urlparse(url) domain = parsed.netloc if domain: @@ -147,27 +98,15 @@ def __init__(self, start_urls=None, keywordId=None, *args, **kwargs): elif self.crawl_mode == "single_url_with_links": print(f" Single URL: Follow internal links (max {self.max_pages} pages)") else: - print(f" No URLs to crawl (all already processed)") + print(f" Multiple URLs: Parse only provided URLs") print(f"Allowed domains: {', '.join(sorted(self.allowed_domains))}") - print(f"Total URLs provided: {original_count}") - print(f"Already crawled (skipped): {skipped_count}") - print(f"New URLs to crawl: {len(self.start_urls)}") - - if len(self.start_urls) > 1 : - if skipped_count > 0: - print(f"\nSkipped URLs (already crawled):") - for url in original_start_urls: - if url in self.previous_crawled_urls: - print(f" {url}") - - if len(self.start_urls) > 0: - print(f"\nNew URLs to crawl:") - for i, url in enumerate(self.start_urls, 1): - print(f" [{i}] {url}") - else: - print("\nNo new URLs to crawl - all URLs have been previously processed!") - + print(f"Maximum pages to crawl: {self.max_pages if self.crawl_mode == 'single_url_with_links' else len(self.start_urls)}") + print(f"Initial URLs to crawl: {len(self.start_urls)}") + for i, url in enumerate(self.start_urls, 1): + print(f" [{i}] {url}") + print(f"\nNote: Will check database for each URL before saving") + print(f" Already crawled URLs will be skipped but links extracted") print("=" * 80) def normalize_url(self, url): @@ -191,26 +130,16 @@ def parse(self, response): normalized_url = self.normalize_url(response.url) - # Skip if already visited - # if normalized_url in self.visited_urls: - # print(f"[SKIP] Already visited: {normalized_url}") - # return - # === FIX: allow single-url-with-links mode to crawl even if URL was previously crawled. - # if normalized_url in self.previous_crawled_urls and self.crawl_mode != "single_url_with_links": - # print(f"[SKIP] Already visited (previously crawled): {normalized_url}") - # return - # === end fix - # Check if max pages reached if self.crawl_mode == "single_url_with_links" and self.processed_count >= self.max_pages: - print(f"\n[STOP] Max pages limit ({self.max_pages}) reached.") - print(f"\n[STOP] Max pages limit ({self.max_pages}) reached.") - print(f"\n[STOP] Max pages limit ({self.max_pages}) reached.") - print(f"\n[STOP] Max pages limit ({self.max_pages}) reached.") - print(f"\n[STOP] Max pages limit ({self.max_pages}) reached.") print(f"\n[STOP] Max pages limit ({self.max_pages}) reached.") return + # Skip if already visited in this session + if normalized_url in self.visited_urls: + print(f"[SKIP] Already visited in this session: {normalized_url}") + return + self.visited_urls.add(normalized_url) self.processed_count += 1 @@ -229,6 +158,43 @@ def parse(self, response): image_urls = list(set(image_urls)) + # Extract social media links + social_media_domains = { + 'facebook.com', 'fb.com', 'fb.me', 'www.facebook.com', + 'twitter.com', 'x.com', 'www.twitter.com', 'www.x.com', + 'instagram.com', 'www.instagram.com', + 'linkedin.com', 'www.linkedin.com', + 'youtube.com', 'youtu.be', 'www.youtube.com', + 'tiktok.com', 'www.tiktok.com', + 'pinterest.com', 'www.pinterest.com', + 'reddit.com', 'www.reddit.com', + 'snapchat.com', 'www.snapchat.com', + 'whatsapp.com', 'wa.me', + 'telegram.org', 't.me' + } + + social_media_links = [] + + for link in soup.find_all('a', href=True): + href = link['href'].strip() + + if not href or href.startswith(('#', 'javascript:', 'mailto:', 'tel:')): + continue + + try: + absolute_url = urljoin(response.url, href) + link_domain = urlparse(absolute_url).netloc.lower() + + # Check if it's a social media link + for sm_domain in social_media_domains: + if sm_domain in link_domain: + social_media_links.append(absolute_url) + break + except: + continue + + social_media_links = list(set(social_media_links)) + # Remove unwanted tags for tag in soup(["script", "style", "noscript", "header", "footer", "svg", "meta"]): tag.decompose() @@ -236,30 +202,46 @@ def parse(self, response): # Extract clean text body_text = " ".join(soup.get_text(separator=" ").split()) - # Prepare data - data = { + # Check if URL already exists in database + existing_doc = self.collection.find_one({ "keywordId": ObjectId(self.keywordId), - "siteUrl": normalized_url, - "content": body_text, - "imageUrls": image_urls, - "createdAt" : datetime.datetime.utcnow() - } - - # Save to MongoDB - result = self.collection.insert_one(data) - self.success_count += 1 + "siteUrl": response.url + }) - print(f"[SUCCESS] Saved to MongoDB") - print(f" Document ID: {result.inserted_id}") - print(f" Content: {len(body_text)} chars") - print(f" Images: {len(image_urls)}") + if existing_doc: + print(f"[SKIP] Already in database - not saving again") + print(f" Existing Document ID: {existing_doc['_id']}") + print(f" Will continue to extract links from this page") + else: + # Prepare data + data = { + "keywordId": ObjectId(self.keywordId), + "siteUrl": response.url, + "content": body_text, + "imageUrls": image_urls, + # "socialMediaLinks": social_media_links, + "createdAt" : datetime.datetime.utcnow() + } + + # Save to MongoDB + result = self.collection.insert_one(data) + self.success_count += 1 + + print(f"[SUCCESS] Saved to MongoDB") + print(f" Document ID: {result.inserted_id}") + print(f" URL Saved: {response.url}") + print(f" Content: {len(body_text)} chars") + print(f" Images: {len(image_urls)}") + print(f" Social Media Links: {len(social_media_links)}") + if social_media_links: + print(f" Social Media Found:") + for sm_link in social_media_links[:3]: + print(f" - {sm_link}") + if len(social_media_links) > 3: + print(f" ... and {len(social_media_links) - 3} more") # Follow links ONLY if single URL mode - - print("Before start here") - - if len(self.start_urls) == 1 : - print("Start here") + if self.crawl_mode == "single_url_with_links": if self.processed_count < self.max_pages: links_found = 0 links_queued = 0 @@ -269,11 +251,24 @@ def parse(self, response): print(f"\n [EXTRACTING LINKS]") + # Social media domains to skip (don't crawl these) + social_media_domains = { + 'facebook.com', 'fb.com', 'fb.me', 'www.facebook.com', + 'twitter.com', 'x.com', 'www.twitter.com', 'www.x.com', + 'instagram.com', 'www.instagram.com', + 'linkedin.com', 'www.linkedin.com', + 'youtube.com', 'youtu.be', 'www.youtube.com', + 'tiktok.com', 'www.tiktok.com', + 'pinterest.com', 'www.pinterest.com', + 'reddit.com', 'www.reddit.com', + 'snapchat.com', 'www.snapchat.com', + 'whatsapp.com', 'wa.me', + 'telegram.org', 't.me' + } + for link in soup.find_all('a', href=True): href = link['href'].strip() - print("Href") - print(href) # Skip invalid links if (not href or href.startswith('#') or @@ -290,7 +285,13 @@ def parse(self, response): links_skipped_invalid += 1 continue - link_domain = urlparse(absolute_url).netloc + link_domain = urlparse(absolute_url).netloc.lower() + + # Skip social media links (already collected separately) + is_social_media = any(sm_domain in link_domain for sm_domain in social_media_domains) + if is_social_media: + links_skipped_external += 1 + continue links_found += 1 @@ -352,10 +353,10 @@ def closed(self, reason): if self.crawl_mode == "single_url_with_links": print(f" Max pages limit: {self.max_pages}") print(f" Total Processed: {self.processed_count}") - print(f" Success: {self.success_count}") + print(f" New Documents Saved: {self.success_count}") + print(f" Already in DB (skipped): {self.processed_count - self.success_count - self.fail_count}") print(f" Failed: {self.fail_count}") print(f" Unique URLs visited: {len(self.visited_urls)}") - print(f" URLs queued: {len(self.queued_urls)}") print("=" * 80) if hasattr(self, 'client'):