diff --git a/surfsense_backend/alembic/versions/38_add_webcrawler_connector_enum.py b/surfsense_backend/alembic/versions/38_add_webcrawler_connector_enum.py new file mode 100644 index 00000000..1b33c31b --- /dev/null +++ b/surfsense_backend/alembic/versions/38_add_webcrawler_connector_enum.py @@ -0,0 +1,59 @@ +"""Add Webcrawler connector enums + +Revision ID: 38 +Revises: 37 +Create Date: 2025-11-17 17:00:00.000000 + +""" + +from collections.abc import Sequence + +from alembic import op + +revision: str = "38" +down_revision: str | None = "37" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Safely add 'WEBCRAWLER_CONNECTOR' to enum types if missing.""" + + # Add to searchsourceconnectortype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'WEBCRAWLER_CONNECTOR' + ) THEN + ALTER TYPE searchsourceconnectortype ADD VALUE 'WEBCRAWLER_CONNECTOR'; + END IF; + END + $$; + """ + ) + + # Add to documenttype enum + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_enum e ON t.oid = e.enumtypid + WHERE t.typname = 'documenttype' AND e.enumlabel = 'CRAWLED_URL' + ) THEN + ALTER TYPE documenttype ADD VALUE 'CRAWLED_URL'; + END IF; + END + $$; + """ + ) + + +def downgrade() -> None: + """Remove 'WEBCRAWLER_CONNECTOR' from enum types.""" + pass diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py index 7b0e18a1..8269f08d 100644 --- a/surfsense_backend/app/agents/researcher/nodes.py +++ b/surfsense_backend/app/agents/researcher/nodes.py @@ -667,7 +667,7 @@ async def fetch_relevant_documents( } ) - elif connector == "CRAWLED_URL": + elif connector == "WEBCRAWLER_CONNECTOR": ( source_object, crawled_urls_chunks, @@ -689,7 +689,7 @@ async def fetch_relevant_documents( writer( { "yield_value": streaming_service.format_terminal_info_delta( - f"🌐 Found {len(crawled_urls_chunks)} Web Pages chunks related to your query" + f"🌐 Found {len(crawled_urls_chunks)} Web Page chunks related to your query" ) } ) diff --git a/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py b/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py index 18ad1668..7b5d251f 100644 --- a/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py +++ b/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py @@ -17,7 +17,6 @@ {chat_history_section} - EXTENSION: "Web content saved via SurfSense browser extension" (personal browsing history) -- CRAWLED_URL: "Webpages indexed by SurfSense web crawler" (personally selected websites) - FILE: "User-uploaded documents (PDFs, Word, etc.)" (personal files) - SLACK_CONNECTOR: "Slack conversations and shared content" (personal workspace communications) - NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management) @@ -35,6 +34,7 @@ - TAVILY_API: "Tavily search API results" (personalized search results) - LINKUP_API: "Linkup search API results" (personalized search results) - LUMA_CONNECTOR: "Luma events" +- WEBCRAWLER_CONNECTOR: "Webpages indexed by SurfSense" (personally selected websites) diff --git a/surfsense_backend/app/agents/researcher/utils.py b/surfsense_backend/app/agents/researcher/utils.py index a2c211f2..41d5a1f5 100644 --- a/surfsense_backend/app/agents/researcher/utils.py +++ b/surfsense_backend/app/agents/researcher/utils.py @@ -19,7 +19,6 @@ def get_connector_emoji(connector_name: str) -> str: connector_emojis = { "YOUTUBE_VIDEO": "📹", "EXTENSION": "🧩", - "CRAWLED_URL": "🌐", "FILE": "📄", "SLACK_CONNECTOR": "💬", "NOTION_CONNECTOR": "📘", @@ -34,6 +33,7 @@ def get_connector_emoji(connector_name: str) -> str: "AIRTABLE_CONNECTOR": "🗃️", "LUMA_CONNECTOR": "✨", "ELASTICSEARCH_CONNECTOR": "⚡", + "WEBCRAWLER_CONNECTOR": "🌐", } return connector_emojis.get(connector_name, "🔎") @@ -43,7 +43,6 @@ def get_connector_friendly_name(connector_name: str) -> str: connector_friendly_names = { "YOUTUBE_VIDEO": "YouTube", "EXTENSION": "Browser Extension", - "CRAWLED_URL": "Web Pages", "FILE": "Files", "SLACK_CONNECTOR": "Slack", "NOTION_CONNECTOR": "Notion", @@ -59,6 +58,7 @@ def get_connector_friendly_name(connector_name: str) -> str: "AIRTABLE_CONNECTOR": "Airtable", "LUMA_CONNECTOR": "Luma", "ELASTICSEARCH_CONNECTOR": "Elasticsearch", + "WEBCRAWLER_CONNECTOR": "Web Pages", } return connector_friendly_names.get(connector_name, connector_name) diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index 7d06643e..efa51d2d 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -208,9 +208,6 @@ class Config: # LlamaCloud API Key LLAMA_CLOUD_API_KEY = os.getenv("LLAMA_CLOUD_API_KEY") - # Firecrawl API Key - FIRECRAWL_API_KEY = os.getenv("FIRECRAWL_API_KEY", None) - # Litellm TTS Configuration TTS_SERVICE = os.getenv("TTS_SERVICE") TTS_SERVICE_API_BASE = os.getenv("TTS_SERVICE_API_BASE") diff --git a/surfsense_backend/app/connectors/webcrawler_connector.py b/surfsense_backend/app/connectors/webcrawler_connector.py new file mode 100644 index 00000000..871b4d4b --- /dev/null +++ b/surfsense_backend/app/connectors/webcrawler_connector.py @@ -0,0 +1,191 @@ +""" +WebCrawler Connector Module + +A module for crawling web pages and extracting content using Firecrawl or AsyncChromiumLoader. +Provides a unified interface for web scraping. +""" + +from typing import Any + +import validators +from firecrawl import AsyncFirecrawlApp +from langchain_community.document_loaders import AsyncChromiumLoader + + +class WebCrawlerConnector: + """Class for crawling web pages and extracting content.""" + + def __init__(self, firecrawl_api_key: str | None = None): + """ + Initialize the WebCrawlerConnector class. + + Args: + firecrawl_api_key: Firecrawl API key (optional, will use AsyncChromiumLoader if not provided) + """ + self.firecrawl_api_key = firecrawl_api_key + self.use_firecrawl = bool(firecrawl_api_key) + + def set_api_key(self, api_key: str) -> None: + """ + Set the Firecrawl API key and enable Firecrawl usage. + + Args: + api_key: Firecrawl API key + """ + self.firecrawl_api_key = api_key + self.use_firecrawl = True + + async def crawl_url( + self, url: str, formats: list[str] | None = None + ) -> tuple[dict[str, Any] | None, str | None]: + """ + Crawl a single URL and extract its content. + + Args: + url: URL to crawl + formats: List of formats to extract (e.g., ["markdown", "html"]) - only for Firecrawl + + Returns: + Tuple containing (crawl result dict, error message or None) + Result dict contains: + - content: Extracted content (markdown or HTML) + - metadata: Page metadata (title, description, etc.) + - source: Original URL + - crawler_type: Type of crawler used + """ + try: + # Validate URL + if not validators.url(url): + return None, f"Invalid URL: {url}" + + if self.use_firecrawl: + result = await self._crawl_with_firecrawl(url, formats) + else: + result = await self._crawl_with_chromium(url) + + return result, None + + except Exception as e: + return None, f"Error crawling URL {url}: {e!s}" + + async def _crawl_with_firecrawl( + self, url: str, formats: list[str] | None = None + ) -> dict[str, Any]: + """ + Crawl URL using Firecrawl. + + Args: + url: URL to crawl + formats: List of formats to extract + + Returns: + Dict containing crawled content and metadata + + Raises: + ValueError: If Firecrawl scraping fails + """ + if not self.firecrawl_api_key: + raise ValueError("Firecrawl API key not set. Call set_api_key() first.") + + firecrawl_app = AsyncFirecrawlApp(api_key=self.firecrawl_api_key) + + # Default to markdown format + if formats is None: + formats = ["markdown"] + + scrape_result = await firecrawl_app.scrape_url(url=url, formats=formats) + + if not scrape_result or not scrape_result.success: + error_msg = ( + scrape_result.error + if scrape_result and hasattr(scrape_result, "error") + else "Unknown error" + ) + raise ValueError(f"Firecrawl failed to scrape URL: {error_msg}") + + # Extract content based on format + content = scrape_result.markdown or scrape_result.html or "" + + # Extract metadata + metadata = scrape_result.metadata if scrape_result.metadata else {} + + return { + "content": content, + "metadata": { + "source": url, + "title": metadata.get("title", url), + "description": metadata.get("description", ""), + "language": metadata.get("language", ""), + "sourceURL": metadata.get("sourceURL", url), + **metadata, + }, + "crawler_type": "firecrawl", + } + + async def _crawl_with_chromium(self, url: str) -> dict[str, Any]: + """ + Crawl URL using AsyncChromiumLoader. + + Args: + url: URL to crawl + + Returns: + Dict containing crawled content and metadata + + Raises: + Exception: If crawling fails + """ + crawl_loader = AsyncChromiumLoader(urls=[url], headless=True) + documents = await crawl_loader.aload() + + if not documents: + raise ValueError(f"Failed to load content from {url}") + + doc = documents[0] + + # Extract basic metadata from the document + metadata = doc.metadata if doc.metadata else {} + + return { + "content": doc.page_content, + "metadata": { + "source": url, + "title": metadata.get("title", url), + **metadata, + }, + "crawler_type": "chromium", + } + + def format_to_structured_document(self, crawl_result: dict[str, Any]) -> str: + """ + Format crawl result as a structured document. + + Args: + crawl_result: Result from crawl_url method + + Returns: + Structured document string + """ + metadata = crawl_result["metadata"] + content = crawl_result["content"] + + document_parts = ["", ""] + + # Add all metadata fields + for key, value in metadata.items(): + document_parts.append(f"{key.upper()}: {value}") + + document_parts.extend( + [ + "", + "", + "FORMAT: markdown", + "TEXT_START", + content, + "TEXT_END", + "", + "", + ] + ) + + return "\n".join(document_parts) diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 4ad31b50..06abb7a3 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -73,6 +73,7 @@ class SearchSourceConnectorType(str, Enum): AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR" LUMA_CONNECTOR = "LUMA_CONNECTOR" ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR" + WEBCRAWLER_CONNECTOR = "WEBCRAWLER_CONNECTOR" class ChatType(str, Enum): diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 344a2503..ae9df0cf 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -65,13 +65,6 @@ async def create_documents( process_extension_document_task.delay( document_dict, request.search_space_id, str(user.id) ) - elif request.document_type == DocumentType.CRAWLED_URL: - from app.tasks.celery_tasks.document_tasks import process_crawled_url_task - - for url in request.content: - process_crawled_url_task.delay( - url, request.search_space_id, str(user.id) - ) elif request.document_type == DocumentType.YOUTUBE_VIDEO: from app.tasks.celery_tasks.document_tasks import process_youtube_video_task diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 4e62035f..6bf7a97d 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -49,6 +49,7 @@ index_luma_events, index_notion_pages, index_slack_messages, + index_crawled_urls, ) from app.users import current_active_user from app.utils.check_ownership import check_ownership @@ -482,6 +483,7 @@ async def index_connector_content( - DISCORD_CONNECTOR: Indexes messages from all accessible Discord channels - LUMA_CONNECTOR: Indexes events from Luma - ELASTICSEARCH_CONNECTOR: Indexes documents from Elasticsearch + - WEBCRAWLER_CONNECTOR: Indexes web pages from crawled websites Args: connector_id: ID of the connector to use @@ -688,6 +690,17 @@ async def index_connector_content( ) response_message = "Elasticsearch indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: + from app.tasks.celery_tasks.connector_tasks import index_crawled_urls_task + + logger.info( + f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + index_crawled_urls_task.delay( + connector_id, search_space_id, str(user.id), indexing_from, indexing_to + ) + response_message = "Web page indexing started in the background." + else: raise HTTPException( status_code=400, @@ -1523,3 +1536,63 @@ async def run_elasticsearch_indexing( f"Critical error in run_elasticsearch_indexing for connector {connector_id}: {e}", exc_info=True, ) + +# Add new helper functions for crawled web page indexing +async def run_web_page_indexing_with_new_session( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """ + Create a new session and run the Web page indexing task. + This prevents session leaks by creating a dedicated session for the background task. + """ + async with async_session_maker() as session: + await run_web_page_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) + + +async def run_web_page_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """ + Background task to run Web page indexing. + Args: + session: Database session + connector_id: ID of the webcrawler connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + try: + documents_processed, error_or_warning = await index_crawled_urls( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + update_last_indexed=False, # Don't update timestamp in the indexing function + ) + + # Only update last_indexed_at if indexing was successful (either new docs or updated docs) + if documents_processed > 0: + await update_connector_last_indexed(session, connector_id) + logger.info( + f"Web page indexing completed successfully: {documents_processed} documents processed" + ) + else: + logger.error( + f"Web page indexing failed or no documents processed: {error_or_warning}" + ) + except Exception as e: + logger.error(f"Error in background Web page indexing task: {e!s}") \ No newline at end of file diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 28f70d28..add938e8 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -70,6 +70,13 @@ async def search_crawled_urls( """ Search for crawled URLs and return both the source information and langchain documents + Args: + user_query: The user's query + user_id: The user's ID + search_space_id: The search space ID to search in + top_k: Maximum number of results to return + search_mode: Search mode (CHUNKS or DOCUMENTS) + Returns: tuple: (sources_info, langchain_documents) """ @@ -109,15 +116,41 @@ async def search_crawled_urls( document = chunk.get("document", {}) metadata = document.get("metadata", {}) - # Create a source entry + # Extract webcrawler-specific metadata + url = metadata.get("source", metadata.get("url", "")) + title = document.get("title", metadata.get("title", "Untitled Document")) + description = metadata.get("description", "") + language = metadata.get("language", "") + last_crawled_at = metadata.get("last_crawled_at", "") + + # Build description with crawler info + content_preview = chunk.get("content", "") + if not description and content_preview: + # Use content preview if no description + description = content_preview[:200] + if len(content_preview) > 200: + description += "..." + + # Add crawler metadata to description if available + info_parts = [] + if language: + info_parts.append(f"Language: {language}") + if last_crawled_at: + info_parts.append(f"Last crawled: {last_crawled_at}") + + if info_parts: + if description: + description += f" | {' | '.join(info_parts)}" + else: + description = " | ".join(info_parts) + source = { "id": chunk.get("chunk_id", self.source_id_counter), - "title": document.get("title", "Untitled Document"), - "description": metadata.get( - "og:description", - metadata.get("ogDescription", chunk.get("content", "")), - ), - "url": metadata.get("url", ""), + "title": title, + "description": description, + "url": url, + "language": language, + "last_crawled_at": last_crawled_at, } self.source_id_counter += 1 @@ -2540,4 +2573,4 @@ async def search_elasticsearch( "sources": sources_list, } - return result_object, elasticsearch_chunks + return result_object, elasticsearch_chunks \ No newline at end of file diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 5e690749..b735741f 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -600,3 +600,46 @@ async def _index_elasticsearch_documents( await run_elasticsearch_indexing( session, connector_id, search_space_id, user_id, start_date, end_date ) + + +@celery_app.task(name="index_crawled_urls", bind=True) +def index_crawled_urls_task( + self, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Celery task to index Web page Urls.""" + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _index_crawled_urls( + connector_id, search_space_id, user_id, start_date, end_date + ) + ) + finally: + loop.close() + + +async def _index_crawled_urls( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Index Web page Urls with new session.""" + from app.routes.search_source_connectors_routes import ( + run_web_page_indexing, + ) + + async with get_celery_session_maker()() as session: + await run_web_page_indexing( + session, connector_id, search_space_id, user_id, start_date, end_date + ) diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 73af2155..5cf5a662 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -9,7 +9,6 @@ from app.config import config from app.services.task_logging_service import TaskLoggingService from app.tasks.document_processors import ( - add_crawled_url_document, add_extension_received_document, add_youtube_video_document, ) @@ -120,71 +119,6 @@ class IndividualDocument(BaseModel): raise -@celery_app.task(name="process_crawled_url", bind=True) -def process_crawled_url_task(self, url: str, search_space_id: int, user_id: str): - """ - Celery task to process crawled URL. - - Args: - url: URL to crawl and process - search_space_id: ID of the search space - user_id: ID of the user - """ - import asyncio - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - loop.run_until_complete(_process_crawled_url(url, search_space_id, user_id)) - finally: - loop.close() - - -async def _process_crawled_url(url: str, search_space_id: int, user_id: str): - """Process crawled URL with new session.""" - async with get_celery_session_maker()() as session: - task_logger = TaskLoggingService(session, search_space_id) - - log_entry = await task_logger.log_task_start( - task_name="process_crawled_url", - source="document_processor", - message=f"Starting URL crawling and processing for: {url}", - metadata={"document_type": "CRAWLED_URL", "url": url, "user_id": user_id}, - ) - - try: - result = await add_crawled_url_document( - session, url, search_space_id, user_id - ) - - if result: - await task_logger.log_task_success( - log_entry, - f"Successfully crawled and processed URL: {url}", - { - "document_id": result.id, - "title": result.title, - "content_hash": result.content_hash, - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"URL document already exists (duplicate): {url}", - {"duplicate_detected": True}, - ) - except Exception as e: - await task_logger.log_task_failure( - log_entry, - f"Failed to crawl URL: {url}", - str(e), - {"error_type": type(e).__name__}, - ) - logger.error(f"Error processing crawled URL: {e!s}") - raise - - @celery_app.task(name="process_youtube_video", bind=True) def process_youtube_video_task(self, url: str, search_space_id: int, user_id: str): """ diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py index 39d6bf84..05a74723 100644 --- a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -77,6 +77,7 @@ async def _check_and_trigger_schedules(): index_luma_events_task, index_notion_pages_task, index_slack_messages_task, + index_crawled_urls_task ) # Map connector types to their tasks @@ -94,6 +95,7 @@ async def _check_and_trigger_schedules(): SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task, SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, + SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task, } # Trigger indexing for each due connector diff --git a/surfsense_backend/app/tasks/connector_indexers/__init__.py b/surfsense_backend/app/tasks/connector_indexers/__init__.py index 766506f7..f6273967 100644 --- a/surfsense_backend/app/tasks/connector_indexers/__init__.py +++ b/surfsense_backend/app/tasks/connector_indexers/__init__.py @@ -17,6 +17,7 @@ - Google Gmail: Index messages from Google Gmail - Google Calendar: Index events from Google Calendar - Luma: Index events from Luma +- Webcrawler: Index crawled URLs - Elasticsearch: Index documents from Elasticsearch instances """ @@ -41,6 +42,7 @@ # Documentation and knowledge management from .notion_indexer import index_notion_pages from .slack_indexer import index_slack_messages +from .webcrawler_indexer import index_crawled_urls __all__ = [ # noqa: RUF022 "index_airtable_records", @@ -58,6 +60,7 @@ "index_linear_issues", # Documentation and knowledge management "index_notion_pages", + "index_crawled_urls", # Communication platforms "index_slack_messages", "index_google_gmail_messages", diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py new file mode 100644 index 00000000..6d16ae40 --- /dev/null +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -0,0 +1,439 @@ +""" +Webcrawler connector indexer. +""" + +from datetime import datetime + +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import config +from app.connectors.webcrawler_connector import WebCrawlerConnector +from app.db import Document, DocumentType, SearchSourceConnectorType +from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, + generate_unique_identifier_hash, +) + +from .base import ( + check_document_by_unique_identifier, + get_connector_by_id, + logger, + update_connector_last_indexed, +) + + +async def index_crawled_urls( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None = None, + end_date: str | None = None, + update_last_indexed: bool = True, +) -> tuple[int, str | None]: + """ + Index web page URLs. + + Args: + session: Database session + connector_id: ID of the webcrawler connector + search_space_id: ID of the search space to store documents in + user_id: User ID + start_date: Start date for filtering (YYYY-MM-DD format) - optional + end_date: End date for filtering (YYYY-MM-DD format) - optional + update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + + Returns: + Tuple containing (number of documents indexed, error message or None) + """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="crawled_url_indexing", + source="connector_indexing_task", + message=f"Starting web page URL indexing for connector {connector_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + "start_date": start_date, + "end_date": end_date, + }, + ) + + try: + # Get the connector + await task_logger.log_task_progress( + log_entry, + f"Retrieving webcrawler connector {connector_id} from database", + {"stage": "connector_retrieval"}, + ) + + # Get the connector from the database + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.WEBCRAWLER_CONNECTOR + ) + + if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found or is not a webcrawler connector", + "Connector not found", + {"error_type": "ConnectorNotFound"}, + ) + return ( + 0, + f"Connector with ID {connector_id} not found or is not a webcrawler connector", + ) + + # Get the Firecrawl API key from the connector config (optional) + api_key = connector.config.get("FIRECRAWL_API_KEY") + + # Get URLs from connector config + initial_urls = connector.config.get("INITIAL_URLS", "") + if isinstance(initial_urls, str): + urls = [url.strip() for url in initial_urls.split("\n") if url.strip()] + elif isinstance(initial_urls, list): + urls = [url.strip() for url in initial_urls if url.strip()] + else: + urls = [] + + logger.info( + f"Starting crawled web page indexing for connector {connector_id} with {len(urls)} URLs" + ) + + # Initialize webcrawler client + await task_logger.log_task_progress( + log_entry, + f"Initializing webcrawler client for connector {connector_id}", + { + "stage": "client_initialization", + "use_firecrawl": bool(api_key), + }, + ) + + crawler = WebCrawlerConnector(firecrawl_api_key=api_key) + + # Validate URLs + if not urls: + await task_logger.log_task_failure( + log_entry, + "No URLs provided for indexing", + "Empty URL list", + {"error_type": "ValidationError"}, + ) + return 0, "No URLs provided for indexing" + + await task_logger.log_task_progress( + log_entry, + f"Starting to crawl {len(urls)} URLs", + { + "stage": "crawling", + "total_urls": len(urls), + }, + ) + + documents_indexed = 0 + documents_updated = 0 + documents_skipped = 0 + failed_urls = [] + + for idx, url in enumerate(urls, 1): + try: + logger.info(f"Processing URL {idx}/{len(urls)}: {url}") + + await task_logger.log_task_progress( + log_entry, + f"Crawling URL {idx}/{len(urls)}: {url}", + { + "stage": "crawling_url", + "url_index": idx, + "url": url, + }, + ) + + # Crawl the URL + crawl_result, error = await crawler.crawl_url(url) + + if error or not crawl_result: + logger.warning(f"Failed to crawl URL {url}: {error}") + failed_urls.append((url, error or "Unknown error")) + continue + + # Extract content and metadata + content = crawl_result.get("content", "") + metadata = crawl_result.get("metadata", {}) + crawler_type = crawl_result.get("crawler_type", "unknown") + + if not content.strip(): + logger.warning(f"Skipping URL with no content: {url}") + failed_urls.append((url, "No content extracted")) + documents_skipped += 1 + continue + + # Format content as structured document + structured_document = crawler.format_to_structured_document(crawl_result) + + # Generate unique identifier hash for this URL + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.CRAWLED_URL, url, search_space_id + ) + + # Generate content hash + content_hash = generate_content_hash(structured_document, search_space_id) + + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + # Extract useful metadata + title = metadata.get("title", url) + description = metadata.get("description", "") + language = metadata.get("language", "") + + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info(f"Document for URL {url} unchanged. Skipping.") + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info(f"Content changed for URL {url}. Updating document.") + + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "url": url, + "title": title, + "description": description, + "language": language, + "document_type": "Crawled URL", + "crawler_type": crawler_type, + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + structured_document, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Crawled URL: {title}\n\n" + summary_content += f"URL: {url}\n" + if description: + summary_content += f"Description: {description}\n" + if language: + summary_content += f"Language: {language}\n" + summary_content += f"Crawler: {crawler_type}\n\n" + + # Add content preview + content_preview = content[:1000] + if len(content) > 1000: + content_preview += "..." + summary_content += f"Content Preview:\n{content_preview}\n" + + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(content) + + # Update existing document + existing_document.title = title + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + **metadata, + "crawler_type": crawler_type, + "last_crawled_at": datetime.now().strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + existing_document.chunks = chunks + + documents_updated += 1 + logger.info(f"Successfully updated URL {url}") + continue + + # Document doesn't exist - create new one + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "url": url, + "title": title, + "description": description, + "language": language, + "document_type": "Crawled URL", + "crawler_type": crawler_type, + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + structured_document, user_llm, document_metadata + ) + else: + # Fallback to simple summary if no LLM configured + summary_content = f"Crawled URL: {title}\n\n" + summary_content += f"URL: {url}\n" + if description: + summary_content += f"Description: {description}\n" + if language: + summary_content += f"Language: {language}\n" + summary_content += f"Crawler: {crawler_type}\n\n" + + # Add content preview + content_preview = content[:1000] + if len(content) > 1000: + content_preview += "..." + summary_content += f"Content Preview:\n{content_preview}\n" + + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(content) + + document = Document( + search_space_id=search_space_id, + title=title, + document_type=DocumentType.CRAWLED_URL, + document_metadata={ + **metadata, + "crawler_type": crawler_type, + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + }, + content=summary_content, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + embedding=summary_embedding, + chunks=chunks, + ) + + session.add(document) + documents_indexed += 1 + logger.info(f"Successfully indexed new URL {url}") + + # Batch commit every 10 documents + if (documents_indexed + documents_updated) % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed + documents_updated} URLs processed so far" + ) + await session.commit() + + except Exception as e: + logger.error( + f"Error processing URL {url}: {e!s}", + exc_info=True, + ) + failed_urls.append((url, str(e))) + continue + + total_processed = documents_indexed + documents_updated + + if total_processed > 0: + await update_connector_last_indexed(session, connector, update_last_indexed) + + # Final commit for any remaining documents not yet committed in batches + logger.info( + f"Final commit: Total {documents_indexed} new, {documents_updated} updated URLs processed" + ) + await session.commit() + + # Build result message + result_message = None + if failed_urls: + failed_summary = "; ".join([f"{url}: {error}" for url, error in failed_urls[:5]]) + if len(failed_urls) > 5: + failed_summary += f" (and {len(failed_urls) - 5} more)" + result_message = f"Completed with {len(failed_urls)} failures: {failed_summary}" + + await task_logger.log_task_success( + log_entry, + f"Successfully completed crawled web page indexing for connector {connector_id}", + { + "urls_processed": total_processed, + "documents_indexed": documents_indexed, + "documents_updated": documents_updated, + "documents_skipped": documents_skipped, + "failed_urls_count": len(failed_urls), + }, + ) + + logger.info( + f"Web page indexing completed: {documents_indexed} new, " + f"{documents_updated} updated, {documents_skipped} skipped, " + f"{len(failed_urls)} failed" + ) + return total_processed, result_message + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during web page indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error(f"Database error: {db_error!s}", exc_info=True) + return 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index web page URLs for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error(f"Failed to index web page URLs: {e!s}", exc_info=True) + return 0, f"Failed to index web page URLs: {e!s}" + + +async def get_crawled_url_documents( + session: AsyncSession, + search_space_id: int, + connector_id: int | None = None, +) -> list[Document]: + """ + Get all crawled URL documents for a search space. + + Args: + session: Database session + search_space_id: ID of the search space + connector_id: Optional connector ID to filter by + + Returns: + List of Document objects + """ + from sqlalchemy import select + + query = select(Document).filter( + Document.search_space_id == search_space_id, + Document.document_type == DocumentType.CRAWLED_URL, + ) + + if connector_id: + # Filter by connector if needed - you might need to add a connector_id field to Document + # or filter by some other means depending on your schema + pass + + result = await session.execute(query) + documents = result.scalars().all() + return list(documents) \ No newline at end of file diff --git a/surfsense_backend/app/tasks/document_processors/__init__.py b/surfsense_backend/app/tasks/document_processors/__init__.py index a238ac87..e70c41cb 100644 --- a/surfsense_backend/app/tasks/document_processors/__init__.py +++ b/surfsense_backend/app/tasks/document_processors/__init__.py @@ -6,7 +6,6 @@ processing task in the background. Available processors: -- URL crawler: Process web pages from URLs - Extension processor: Handle documents from browser extension - Markdown processor: Process markdown files - File processors: Handle files using different ETL services (Unstructured, LlamaCloud, Docling) @@ -26,14 +25,11 @@ # Markdown processor from .markdown_processor import add_received_markdown_file_document -from .url_crawler import add_crawled_url_document # YouTube processor from .youtube_processor import add_youtube_video_document __all__ = [ - # URL processing - "add_crawled_url_document", # Extension processing "add_extension_received_document", "add_received_file_document_using_docling", diff --git a/surfsense_backend/app/tasks/document_processors/url_crawler.py b/surfsense_backend/app/tasks/document_processors/url_crawler.py deleted file mode 100644 index ce276489..00000000 --- a/surfsense_backend/app/tasks/document_processors/url_crawler.py +++ /dev/null @@ -1,330 +0,0 @@ -""" -URL crawler document processor. -""" - -import logging - -import validators -from firecrawl import AsyncFirecrawlApp -from langchain_community.document_loaders import AsyncChromiumLoader -from langchain_core.documents import Document as LangchainDocument -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.ext.asyncio import AsyncSession - -from app.config import config -from app.db import Document, DocumentType -from app.services.llm_service import get_user_long_context_llm -from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) - -from .base import ( - check_document_by_unique_identifier, - md, -) - - -async def add_crawled_url_document( - session: AsyncSession, url: str, search_space_id: int, user_id: str -) -> Document | None: - """ - Process and store a document from a crawled URL. - - Args: - session: Database session - url: URL to crawl - search_space_id: ID of the search space - user_id: ID of the user - - Returns: - Document object if successful, None if failed - """ - task_logger = TaskLoggingService(session, search_space_id) - - # Log task start - log_entry = await task_logger.log_task_start( - task_name="crawl_url_document", - source="background_task", - message=f"Starting URL crawling process for: {url}", - metadata={"url": url, "user_id": str(user_id)}, - ) - - try: - # URL validation step - await task_logger.log_task_progress( - log_entry, f"Validating URL: {url}", {"stage": "validation"} - ) - - if not validators.url(url): - raise ValueError(f"Url {url} is not a valid URL address") - - # Set up crawler - await task_logger.log_task_progress( - log_entry, - f"Setting up crawler for URL: {url}", - { - "stage": "crawler_setup", - "firecrawl_available": bool(config.FIRECRAWL_API_KEY), - }, - ) - - use_firecrawl = bool(config.FIRECRAWL_API_KEY) - - if use_firecrawl: - # Use Firecrawl SDK directly - firecrawl_app = AsyncFirecrawlApp(api_key=config.FIRECRAWL_API_KEY) - else: - crawl_loader = AsyncChromiumLoader(urls=[url], headless=True) - - # Perform crawling - await task_logger.log_task_progress( - log_entry, - f"Crawling URL content: {url}", - { - "stage": "crawling", - "crawler_type": "AsyncFirecrawlApp" - if use_firecrawl - else "AsyncChromiumLoader", - }, - ) - - if use_firecrawl: - # Use async Firecrawl SDK with v1 API - properly awaited - scrape_result = await firecrawl_app.scrape_url( - url=url, formats=["markdown"] - ) - - # scrape_result is a Pydantic ScrapeResponse object - # Access attributes directly - if scrape_result and scrape_result.success: - # Extract markdown content - markdown_content = scrape_result.markdown or "" - - # Extract metadata - this is a DICT - metadata = scrape_result.metadata if scrape_result.metadata else {} - - # Convert to LangChain Document format - url_crawled = [ - LangchainDocument( - page_content=markdown_content, - metadata={ - "source": url, - "title": metadata.get("title", url), - "description": metadata.get("description", ""), - "language": metadata.get("language", ""), - "sourceURL": metadata.get("sourceURL", url), - **metadata, # Include all other metadata fields - }, - ) - ] - content_in_markdown = url_crawled[0].page_content - else: - error_msg = ( - scrape_result.error - if scrape_result and hasattr(scrape_result, "error") - else "Unknown error" - ) - raise ValueError(f"Firecrawl failed to scrape URL: {error_msg}") - else: - # Use AsyncChromiumLoader as fallback - url_crawled = await crawl_loader.aload() - content_in_markdown = md.transform_documents(url_crawled)[0].page_content - - # Format document - await task_logger.log_task_progress( - log_entry, - f"Processing crawled content from: {url}", - {"stage": "content_processing", "content_length": len(content_in_markdown)}, - ) - - # Format document metadata in a more maintainable way - metadata_sections = [ - ( - "METADATA", - [ - f"{key.upper()}: {value}" - for key, value in url_crawled[0].metadata.items() - ], - ), - ( - "CONTENT", - ["FORMAT: markdown", "TEXT_START", content_in_markdown, "TEXT_END"], - ), - ] - - # Build the document string more efficiently - document_parts = [] - document_parts.append("") - - for section_title, section_content in metadata_sections: - document_parts.append(f"<{section_title}>") - document_parts.extend(section_content) - document_parts.append(f"{section_title}>") - - document_parts.append("") - combined_document_string = "\n".join(document_parts) - - # Generate unique identifier hash for this URL - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.CRAWLED_URL, url, search_space_id - ) - - # Generate content hash - content_hash = generate_content_hash(combined_document_string, search_space_id) - - # Check if document with this unique identifier already exists - await task_logger.log_task_progress( - log_entry, - f"Checking for existing URL: {url}", - {"stage": "duplicate_check", "url": url}, - ) - - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - await task_logger.log_task_success( - log_entry, - f"URL document unchanged: {url}", - { - "duplicate_detected": True, - "existing_document_id": existing_document.id, - }, - ) - logging.info(f"Document for URL {url} unchanged. Skipping.") - return existing_document - else: - # Content has changed - update the existing document - logging.info(f"Content changed for URL {url}. Updating document.") - await task_logger.log_task_progress( - log_entry, - f"Updating URL document: {url}", - {"stage": "document_update", "url": url}, - ) - - # Get LLM for summary generation (needed for both create and update) - await task_logger.log_task_progress( - log_entry, - f"Preparing for summary generation: {url}", - {"stage": "llm_setup"}, - ) - - # Get user's long context LLM - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - if not user_llm: - raise RuntimeError( - f"No long context LLM configured for user {user_id} in search space {search_space_id}" - ) - - # Generate summary - await task_logger.log_task_progress( - log_entry, - f"Generating summary for URL content: {url}", - {"stage": "summary_generation"}, - ) - - # Generate summary with metadata - document_metadata = { - "url": url, - "title": url_crawled[0].metadata.get("title", url), - "document_type": "Crawled URL Document", - "crawler_type": "FirecrawlApp" if use_firecrawl else "AsyncChromiumLoader", - } - summary_content, summary_embedding = await generate_document_summary( - combined_document_string, user_llm, document_metadata - ) - - # Process chunks - await task_logger.log_task_progress( - log_entry, - f"Processing content chunks for URL: {url}", - {"stage": "chunk_processing"}, - ) - - chunks = await create_document_chunks(content_in_markdown) - - # Update or create document - if existing_document: - # Update existing document - await task_logger.log_task_progress( - log_entry, - f"Updating document in database for URL: {url}", - {"stage": "document_update", "chunks_count": len(chunks)}, - ) - - existing_document.title = url_crawled[0].metadata.get( - "title", url_crawled[0].metadata.get("source", url) - ) - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = url_crawled[0].metadata - existing_document.chunks = chunks - - document = existing_document - else: - # Create new document - await task_logger.log_task_progress( - log_entry, - f"Creating document in database for URL: {url}", - {"stage": "document_creation", "chunks_count": len(chunks)}, - ) - - document = Document( - search_space_id=search_space_id, - title=url_crawled[0].metadata.get( - "title", url_crawled[0].metadata.get("source", url) - ), - document_type=DocumentType.CRAWLED_URL, - document_metadata=url_crawled[0].metadata, - content=summary_content, - embedding=summary_embedding, - chunks=chunks, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - ) - - session.add(document) - await session.commit() - await session.refresh(document) - - # Log success - await task_logger.log_task_success( - log_entry, - f"Successfully crawled and processed URL: {url}", - { - "document_id": document.id, - "title": document.title, - "content_hash": content_hash, - "chunks_count": len(chunks), - "summary_length": len(summary_content), - }, - ) - - return document - - except SQLAlchemyError as db_error: - await session.rollback() - await task_logger.log_task_failure( - log_entry, - f"Database error while processing URL: {url}", - str(db_error), - {"error_type": "SQLAlchemyError"}, - ) - raise db_error - except Exception as e: - await session.rollback() - await task_logger.log_task_failure( - log_entry, - f"Failed to crawl URL: {url}", - str(e), - {"error_type": type(e).__name__}, - ) - raise RuntimeError(f"Failed to crawl URL: {e!s}") from e diff --git a/surfsense_backend/app/utils/periodic_scheduler.py b/surfsense_backend/app/utils/periodic_scheduler.py index 22542571..7ee8acf0 100644 --- a/surfsense_backend/app/utils/periodic_scheduler.py +++ b/surfsense_backend/app/utils/periodic_scheduler.py @@ -31,6 +31,7 @@ SearchSourceConnectorType.DISCORD_CONNECTOR: "index_discord_messages", SearchSourceConnectorType.LUMA_CONNECTOR: "index_luma_events", SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: "index_elasticsearch_documents", + SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: "index_crawled_urls", } @@ -79,6 +80,7 @@ def create_periodic_schedule( index_luma_events_task, index_notion_pages_task, index_slack_messages_task, + index_crawled_urls_task, ) # Map connector type to task @@ -96,6 +98,7 @@ def create_periodic_schedule( SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task, SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, + SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task, } # Trigger the first run immediately diff --git a/surfsense_backend/app/utils/validators.py b/surfsense_backend/app/utils/validators.py index a8460cd1..d0bdd792 100644 --- a/surfsense_backend/app/utils/validators.py +++ b/surfsense_backend/app/utils/validators.py @@ -468,6 +468,25 @@ def validate_list_field(key: str, field_name: str) -> None: value = config.get(key) if not isinstance(value, list) or not value: raise ValueError(f"{field_name} must be a non-empty list of strings") + + def validate_firecrawl_api_key_format() -> None: + """Validate Firecrawl API key format if provided.""" + api_key = config.get("FIRECRAWL_API_KEY", "") + if api_key and api_key.strip() and not api_key.strip().startswith("fc-"): + raise ValueError( + "Firecrawl API key should start with 'fc-'. Please verify your API key." + ) + + + def validate_initial_urls() -> None: + initial_urls = config.get("INITIAL_URLS", "") + if initial_urls and initial_urls.strip(): + urls = [url.strip() for url in initial_urls.split("\n") if url.strip()] + for url in urls: + if not validators.url(url): + raise ValueError( + f"Invalid URL format in INITIAL_URLS: {url}" + ) # Lookup table for connector validation rules connector_rules = { @@ -550,6 +569,14 @@ def validate_list_field(key: str, field_name: str) -> None: # "validators": {} # }, "LUMA_CONNECTOR": {"required": ["LUMA_API_KEY"], "validators": {}}, + "WEBCRAWLER_CONNECTOR": { + "required": [], # No required fields - API key is optional + "optional": ["FIRECRAWL_API_KEY", "INITIAL_URLS"], + "validators": { + "FIRECRAWL_API_KEY": lambda: validate_firecrawl_api_key_format(), + "INITIAL_URLS": lambda: validate_initial_urls(), + }, + }, } rules = connector_rules.get(connector_type_str) diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx index f0906952..5756278c 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/edit/page.tsx @@ -18,7 +18,16 @@ import { CardHeader, CardTitle, } from "@/components/ui/card"; -import { Form } from "@/components/ui/form"; +import { + Form, + FormControl, + FormDescription, + FormField, + FormItem, + FormLabel, + FormMessage, +} from "@/components/ui/form"; +import { Textarea } from "@/components/ui/textarea"; import { getConnectorIcon } from "@/contracts/enums/connectorIcons"; import { useConnectorEditPage } from "@/hooks/use-connector-edit-page"; // Import Utils, Types, Hook, and Components @@ -282,6 +291,40 @@ export default function EditConnectorPage() { placeholder="Your Elasticsearch API Key" /> )} + + {/* == Webcrawler == */} + {connector.connector_type === "WEBCRAWLER_CONNECTOR" && ( + + + ( + + URLs to Crawl + + + + + Enter URLs to crawl (one per line). These URLs will be indexed when you trigger indexing. + + + + )} + /> + + )} + diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/page.tsx index d1803325..07578b87 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/[connector_id]/page.tsx @@ -55,6 +55,7 @@ const getConnectorTypeDisplay = (type: string): string => { AIRTABLE_CONNECTOR: "Airtable Connector", LUMA_CONNECTOR: "Luma Connector", ELASTICSEARCH_CONNECTOR: "Elasticsearch Connector", + WEBCRAWLER_CONNECTOR: "Web Page Connector", // Add other connector types here as needed }; return typeMap[type] || type; @@ -75,6 +76,7 @@ const getApiKeyFieldName = (connectorType: string): string => { LINKUP_API: "LINKUP_API_KEY", LUMA_CONNECTOR: "LUMA_API_KEY", ELASTICSEARCH_CONNECTOR: "ELASTICSEARCH_API_KEY", + WEBCRAWLER_CONNECTOR: "FIRECRAWL_API_KEY", }; return fieldMap[connectorType] || ""; }; diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/add/webcrawler-connector/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/webcrawler-connector/page.tsx new file mode 100644 index 00000000..ce91106f --- /dev/null +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/add/webcrawler-connector/page.tsx @@ -0,0 +1,334 @@ +"use client"; + +import { zodResolver } from "@hookform/resolvers/zod"; +import { ArrowLeft, Check, Globe, Loader2 } from "lucide-react"; +import { motion } from "motion/react"; +import Link from "next/link"; +import { useParams, useRouter } from "next/navigation"; +import { useEffect, useState } from "react"; +import { useForm } from "react-hook-form"; +import { toast } from "sonner"; +import * as z from "zod"; +import { Button } from "@/components/ui/button"; +import { + Card, + CardContent, + CardDescription, + CardFooter, + CardHeader, + CardTitle, +} from "@/components/ui/card"; +import { + Form, + FormControl, + FormDescription, + FormField, + FormItem, + FormLabel, + FormMessage, +} from "@/components/ui/form"; +import { Input } from "@/components/ui/input"; +import { Textarea } from "@/components/ui/textarea"; +import { EnumConnectorName } from "@/contracts/enums/connector"; +import { getConnectorIcon } from "@/contracts/enums/connectorIcons"; +import { + type SearchSourceConnector, + useSearchSourceConnectors, +} from "@/hooks/use-search-source-connectors"; + +// Define the form schema with Zod +const webcrawlerConnectorFormSchema = z.object({ + name: z.string().min(3, { + message: "Connector name must be at least 3 characters.", + }), + api_key: z.string().optional(), + initial_urls: z.string().optional(), +}); + +// Define the type for the form values +type WebcrawlerConnectorFormValues = z.infer; + +export default function WebcrawlerConnectorPage() { + const router = useRouter(); + const params = useParams(); + const searchSpaceId = params.search_space_id as string; + const [isSubmitting, setIsSubmitting] = useState(false); + const [doesConnectorExist, setDoesConnectorExist] = useState(false); + + const { fetchConnectors, createConnector } = useSearchSourceConnectors( + true, + parseInt(searchSpaceId) + ); + + // Initialize the form + const form = useForm({ + resolver: zodResolver(webcrawlerConnectorFormSchema), + defaultValues: { + name: "Web Pages", + api_key: "", + initial_urls: "", + }, + }); + + useEffect(() => { + fetchConnectors(parseInt(searchSpaceId)) + .then((data) => { + if (data && Array.isArray(data)) { + const connector = data.find( + (c: SearchSourceConnector) => c.connector_type === EnumConnectorName.WEBCRAWLER_CONNECTOR + ); + if (connector) { + setDoesConnectorExist(true); + } + } + }) + .catch((error) => { + console.error("Error fetching connectors:", error); + }); + }, [fetchConnectors, searchSpaceId]); + + // Handle form submission + const onSubmit = async (values: WebcrawlerConnectorFormValues) => { + setIsSubmitting(true); + try { + const config: Record = {}; + + // Only add API key to config if provided + if (values.api_key && values.api_key.trim()) { + config.FIRECRAWL_API_KEY = values.api_key; + } + + // Parse initial URLs if provided + if (values.initial_urls && values.initial_urls.trim()) { + config.INITIAL_URLS = values.initial_urls; + } + + await createConnector( + { + name: values.name, + connector_type: EnumConnectorName.WEBCRAWLER_CONNECTOR, + config: config, + is_indexable: true, + last_indexed_at: null, + periodic_indexing_enabled: false, + indexing_frequency_minutes: null, + next_scheduled_at: null, + }, + parseInt(searchSpaceId) + ); + + toast.success("Webcrawler connector created successfully!"); + + // Navigate back to connectors page + router.push(`/dashboard/${searchSpaceId}/connectors`); + } catch (error) { + console.error("Error creating connector:", error); + toast.error(error instanceof Error ? error.message : "Failed to create connector"); + } finally { + setIsSubmitting(false); + } + }; + + return ( + + + {/* Header */} + + + + Back to connectors + + + + {getConnectorIcon(EnumConnectorName.WEBCRAWLER_CONNECTOR, "h-6 w-6")} + + + Connect Web Pages + Crawl and index web pages for search. + + + + + {/* Connection Card */} + {!doesConnectorExist ? ( + + + Set Up Web Page crawler + + Configure your web page crawler to index web pages. Optionally add a Firecrawl API key + for enhanced crawling capabilities. + + + + + + ( + + Connector Name + + + + + A friendly name to identify this connector. + + + + )} + /> + + ( + + Firecrawl API Key (Optional) + + + + + Add a Firecrawl API key for enhanced crawling. If not provided, will use + AsyncChromiumLoader as fallback. + + + + )} + /> + + ( + + Initial URLs (Optional) + + + + + Enter URLs to crawl (one per line). You can add more URLs later. + + + + )} + /> + + + + + Crawl any public web page + + + + Extract markdown content automatically + + + + Detect content changes and update documents + + + + Works with or without Firecrawl API key + + + + + router.push(`/dashboard/${searchSpaceId}/connectors/add`)} + > + Cancel + + + {isSubmitting ? ( + <> + + Setting up... + > + ) : ( + <> + + Create Crawler + > + )} + + + + + + ) : ( + /* Success Card */ + + + ✅ Your web page crawler is successfully set up! + + You can now add URLs to crawl from the connector management page. + + + + )} + + {/* Help Section */} + {!doesConnectorExist && ( + + + How It Works + + + + 1. Choose Your Crawler Method + + With Firecrawl (Recommended): Get your API key from{" "} + + firecrawl.dev + {" "} + for faster, more reliable crawling with better content extraction. + + + Without Firecrawl: The crawler will use AsyncChromiumLoader as a + free fallback option. This works well for most websites but may be slower. + + + + 2. Add URLs to Crawl (Optional) + + You can add initial URLs now or add them later from the connector management page. + Enter one URL per line. + + + + 3. Manage Your Crawler + + After setup, you can add more URLs, trigger manual crawls, or set up periodic + indexing to keep your content up-to-date. + + + + + )} + + + ); +} \ No newline at end of file diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/webpage/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/webpage/page.tsx deleted file mode 100644 index b24e1dba..00000000 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/webpage/page.tsx +++ /dev/null @@ -1,201 +0,0 @@ -"use client"; - -import { type Tag, TagInput } from "emblor"; -import { Globe, Loader2 } from "lucide-react"; -import { useParams, useRouter } from "next/navigation"; -import { useTranslations } from "next-intl"; -import { useState } from "react"; -import { toast } from "sonner"; -import { Button } from "@/components/ui/button"; -import { - Card, - CardContent, - CardDescription, - CardFooter, - CardHeader, - CardTitle, -} from "@/components/ui/card"; -import { Label } from "@/components/ui/label"; - -// URL validation regex -const urlRegex = /^(https?:\/\/)?([\da-z.-]+)\.([a-z.]{2,6})([/\w .-]*)*\/?$/; - -export default function WebpageCrawler() { - const t = useTranslations("add_webpage"); - const params = useParams(); - const router = useRouter(); - const search_space_id = params.search_space_id as string; - - const [urlTags, setUrlTags] = useState([]); - const [activeTagIndex, setActiveTagIndex] = useState(null); - const [isSubmitting, setIsSubmitting] = useState(false); - const [error, setError] = useState(null); - - // Function to validate a URL - const isValidUrl = (url: string): boolean => { - return urlRegex.test(url); - }; - - // Function to handle URL submission - const handleSubmit = async () => { - // Validate that we have at least one URL - if (urlTags.length === 0) { - setError(t("error_no_url")); - return; - } - - // Validate all URLs - const invalidUrls = urlTags.filter((tag) => !isValidUrl(tag.text)); - if (invalidUrls.length > 0) { - setError(t("error_invalid_urls", { urls: invalidUrls.map((tag) => tag.text).join(", ") })); - return; - } - - setError(null); - setIsSubmitting(true); - - try { - toast(t("crawling_toast"), { - description: t("crawling_toast_desc"), - }); - - // Extract URLs from tags - const urls = urlTags.map((tag) => tag.text); - - // Make API call to backend - const response = await fetch( - `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/documents`, - { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${localStorage.getItem("surfsense_bearer_token")}`, - }, - body: JSON.stringify({ - document_type: "CRAWLED_URL", - content: urls, - search_space_id: parseInt(search_space_id), - }), - } - ); - - if (!response.ok) { - throw new Error("Failed to crawl URLs"); - } - - await response.json(); - - toast(t("success_toast"), { - description: t("success_toast_desc"), - }); - - // Redirect to documents page - router.push(`/dashboard/${search_space_id}/documents`); - } catch (error: any) { - setError(error.message || t("error_generic")); - toast(t("error_toast"), { - description: `${t("error_toast_desc")}: ${error.message}`, - }); - } finally { - setIsSubmitting(false); - } - }; - - // Function to add a new URL tag - const handleAddTag = (text: string) => { - // Basic URL validation - if (!isValidUrl(text)) { - toast(t("invalid_url_toast"), { - description: t("invalid_url_toast_desc"), - }); - return; - } - - // Check for duplicates - if (urlTags.some((tag) => tag.text === text)) { - toast(t("duplicate_url_toast"), { - description: t("duplicate_url_toast_desc"), - }); - return; - } - - // Add the new tag - const newTag: Tag = { - id: Date.now().toString(), - text: text, - }; - - setUrlTags([...urlTags, newTag]); - }; - - return ( - - - - - - {t("title")} - - {t("subtitle")} - - - - - {t("label")} - - {t("hint")} - - - {error && {error}} - - - {t("tips_title")} - - {t("tip_1")} - {t("tip_2")} - {t("tip_3")} - {t("tip_4")} - - - - - - router.push(`/dashboard/${search_space_id}/documents`)} - > - {t("cancel")} - - - {isSubmitting ? ( - <> - - {t("submitting")} - > - ) : ( - t("submit") - )} - - - - - ); -} diff --git a/surfsense_web/components/dashboard-breadcrumb.tsx b/surfsense_web/components/dashboard-breadcrumb.tsx index 0324ee1b..3e2e5199 100644 --- a/surfsense_web/components/dashboard-breadcrumb.tsx +++ b/surfsense_web/components/dashboard-breadcrumb.tsx @@ -138,6 +138,7 @@ export function DashboardBreadcrumb() { "linkup-api": "LinkUp API", "luma-connector": "Luma", "elasticsearch-connector": "Elasticsearch", + "webcrawler-connector": "Web Pages", }; const connectorLabel = connectorLabels[connectorType] || connectorType; diff --git a/surfsense_web/components/editConnector/types.ts b/surfsense_web/components/editConnector/types.ts index 8a9ef29d..f61eaaf5 100644 --- a/surfsense_web/components/editConnector/types.ts +++ b/surfsense_web/components/editConnector/types.ts @@ -52,5 +52,7 @@ export const editConnectorSchema = z.object({ GOOGLE_CALENDAR_CALENDAR_IDS: z.string().optional(), LUMA_API_KEY: z.string().optional(), ELASTICSEARCH_API_KEY: z.string().optional(), + FIRECRAWL_API_KEY: z.string().optional(), + INITIAL_URLS: z.string().optional() }); export type EditConnectorFormValues = z.infer; diff --git a/surfsense_web/components/homepage/integrations.tsx b/surfsense_web/components/homepage/integrations.tsx index 6d8433ac..afa09db6 100644 --- a/surfsense_web/components/homepage/integrations.tsx +++ b/surfsense_web/components/homepage/integrations.tsx @@ -29,6 +29,7 @@ const INTEGRATIONS: Integration[] = [ // Documentation & Knowledge { name: "Confluence", icon: "https://cdn.simpleicons.org/confluence/172B4D" }, { name: "Notion", icon: "https://cdn.simpleicons.org/notion/000000/ffffff" }, + { name: "Web Pages", icon: "https://cdn.jsdelivr.net/npm/lucide-static@0.294.0/icons/globe.svg"}, // Cloud Storage { name: "Google Drive", icon: "https://cdn.simpleicons.org/googledrive/4285F4" }, diff --git a/surfsense_web/components/sources/connector-data.tsx b/surfsense_web/components/sources/connector-data.tsx index 7f8f6bdf..e96db2a4 100644 --- a/surfsense_web/components/sources/connector-data.tsx +++ b/surfsense_web/components/sources/connector-data.tsx @@ -138,6 +138,13 @@ export const connectorCategories: ConnectorCategory[] = [ icon: getConnectorIcon(EnumConnectorName.LUMA_CONNECTOR, "h-6 w-6"), status: "available", }, + { + id: "webcrawler-connector", + title: "Web Pages", + description: "webcrawler_desc", + icon: getConnectorIcon(EnumConnectorName.WEBCRAWLER_CONNECTOR, "h-6 w-6"), + status: "available", + }, ], }, { diff --git a/surfsense_web/content/docs/docker-installation.mdx b/surfsense_web/content/docs/docker-installation.mdx index 507003a5..46ef4128 100644 --- a/surfsense_web/content/docs/docker-installation.mdx +++ b/surfsense_web/content/docs/docker-installation.mdx @@ -97,7 +97,7 @@ Before you begin, ensure you have: | STT_SERVICE | Speech-to-Text API provider for Audio Files (e.g., `local/base`, `openai/whisper-1`). See [supported providers](https://docs.litellm.ai/docs/audio_transcription#supported-providers) | | STT_SERVICE_API_KEY | (Optional if local) API key for the Speech-to-Text service | | STT_SERVICE_API_BASE | (Optional) Custom API base URL for the Speech-to-Text service | -| FIRECRAWL_API_KEY | API key for Firecrawl service for web crawling | +| FIRECRAWL_API_KEY | API key for Firecrawl service for web crawling | | ETL_SERVICE | Document parsing service: `UNSTRUCTURED` (supports 34+ formats), `LLAMACLOUD` (supports 50+ formats including legacy document types), or `DOCLING` (local processing, supports PDF, Office docs, images, HTML, CSV) | | UNSTRUCTURED_API_KEY | API key for Unstructured.io service for document parsing (required if ETL_SERVICE=UNSTRUCTURED) | | LLAMA_CLOUD_API_KEY | API key for LlamaCloud service for document parsing (required if ETL_SERVICE=LLAMACLOUD) | diff --git a/surfsense_web/contracts/enums/connector.ts b/surfsense_web/contracts/enums/connector.ts index 50486c92..5fd6fb72 100644 --- a/surfsense_web/contracts/enums/connector.ts +++ b/surfsense_web/contracts/enums/connector.ts @@ -17,4 +17,5 @@ export enum EnumConnectorName { AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR", LUMA_CONNECTOR = "LUMA_CONNECTOR", ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR", + WEBCRAWLER_CONNECTOR = "WEBCRAWLER_CONNECTOR", } diff --git a/surfsense_web/contracts/enums/connectorIcons.tsx b/surfsense_web/contracts/enums/connectorIcons.tsx index 66e9edfc..a2b7e0b1 100644 --- a/surfsense_web/contracts/enums/connectorIcons.tsx +++ b/surfsense_web/contracts/enums/connectorIcons.tsx @@ -59,11 +59,13 @@ export const getConnectorIcon = (connectorType: EnumConnectorName | string, clas return ; case EnumConnectorName.ELASTICSEARCH_CONNECTOR: return ; + case EnumConnectorName.WEBCRAWLER_CONNECTOR: + return ; // Additional cases for non-enum connector types - case "YOUTUBE_VIDEO": - return ; case "CRAWLED_URL": return ; + case "YOUTUBE_VIDEO": + return ; case "FILE": return ; case "EXTENSION": diff --git a/surfsense_web/hooks/use-connector-edit-page.ts b/surfsense_web/hooks/use-connector-edit-page.ts index 870a87dc..cd2ba805 100644 --- a/surfsense_web/hooks/use-connector-edit-page.ts +++ b/surfsense_web/hooks/use-connector-edit-page.ts @@ -97,6 +97,8 @@ export function useConnectorEditPage(connectorId: number, searchSpaceId: string) JIRA_API_TOKEN: "", LUMA_API_KEY: "", ELASTICSEARCH_API_KEY: "", + FIRECRAWL_API_KEY: "", + INITIAL_URLS: "" }, }); @@ -142,6 +144,8 @@ export function useConnectorEditPage(connectorId: number, searchSpaceId: string) JIRA_API_TOKEN: config.JIRA_API_TOKEN || "", LUMA_API_KEY: config.LUMA_API_KEY || "", ELASTICSEARCH_API_KEY: config.ELASTICSEARCH_API_KEY || "", + FIRECRAWL_API_KEY: config.FIRECRAWL_API_KEY || "", + INITIAL_URLS: config.INITIAL_URLS || "" }); if (currentConnector.connector_type === "GITHUB_CONNECTOR") { const savedRepos = config.repo_full_names || []; @@ -469,6 +473,31 @@ export function useConnectorEditPage(connectorId: number, searchSpaceId: string) newConfig = { ELASTICSEARCH_API_KEY: formData.ELASTICSEARCH_API_KEY }; } break; + case "WEBCRAWLER_CONNECTOR": + if ( + formData.FIRECRAWL_API_KEY !== originalConfig.FIRECRAWL_API_KEY || + formData.INITIAL_URLS !== originalConfig.INITIAL_URLS + ) { + newConfig = {}; + + if (formData.FIRECRAWL_API_KEY && formData.FIRECRAWL_API_KEY.trim()) { + if (!formData.FIRECRAWL_API_KEY.startsWith("fc-")) { + toast.warning("Firecrawl API keys typically start with 'fc-'. Please verify your key."); + } + newConfig.FIRECRAWL_API_KEY = formData.FIRECRAWL_API_KEY.trim(); + } else if (originalConfig.FIRECRAWL_API_KEY) { + toast.info("Firecrawl API key removed. Web crawler will use AsyncChromiumLoader as fallback."); + } + + if (formData.INITIAL_URLS !== undefined) { + if (formData.INITIAL_URLS && formData.INITIAL_URLS.trim()) { + newConfig.INITIAL_URLS = formData.INITIAL_URLS.trim(); + } else if (originalConfig.INITIAL_URLS) { + toast.info("URLs removed from crawler configuration."); + } + } + } + break; } if (newConfig !== null) { @@ -562,6 +591,9 @@ export function useConnectorEditPage(connectorId: number, searchSpaceId: string) "ELASTICSEARCH_API_KEY", newlySavedConfig.ELASTICSEARCH_API_KEY || "" ); + } else if (connector.connector_type == "WEBCRAWLER_CONNECTOR") { + editForm.setValue("FIRECRAWL_API_KEY",newlySavedConfig.FIRECRAWL_API_KEY || ""); + editForm.setValue("INITIAL_URLS", newlySavedConfig.INITIAL_URLS || ""); } } if (connector.connector_type === "GITHUB_CONNECTOR") { diff --git a/surfsense_web/lib/connectors/utils.ts b/surfsense_web/lib/connectors/utils.ts index f2052900..c921bd1a 100644 --- a/surfsense_web/lib/connectors/utils.ts +++ b/surfsense_web/lib/connectors/utils.ts @@ -18,6 +18,7 @@ export const getConnectorTypeDisplay = (type: string): string => { AIRTABLE_CONNECTOR: "Airtable", LUMA_CONNECTOR: "Luma", ELASTICSEARCH_CONNECTOR: "Elasticsearch", + WEBCRAWLER_CONNECTOR: "Web Pages", }; return typeMap[type] || type; }; diff --git a/surfsense_web/messages/en.json b/surfsense_web/messages/en.json index ee1bea40..55e25bee 100644 --- a/surfsense_web/messages/en.json +++ b/surfsense_web/messages/en.json @@ -331,7 +331,8 @@ "luma_desc": "Connect to Luma to search events", "calendar_desc": "Connect to Google Calendar to search events, meetings and schedules.", "gmail_desc": "Connect to your Gmail account to search through your emails.", - "zoom_desc": "Connect to Zoom to access meeting recordings and transcripts." + "zoom_desc": "Connect to Zoom to access meeting recordings and transcripts.", + "webcrawler_desc": "Crawl web pages" }, "upload_documents": { "title": "Upload Documents",
Crawl and index web pages for search.
+ With Firecrawl (Recommended): Get your API key from{" "} + + firecrawl.dev + {" "} + for faster, more reliable crawling with better content extraction. +
+ Without Firecrawl: The crawler will use AsyncChromiumLoader as a + free fallback option. This works well for most websites but may be slower. +
+ You can add initial URLs now or add them later from the connector management page. + Enter one URL per line. +
+ After setup, you can add more URLs, trigger manual crawls, or set up periodic + indexing to keep your content up-to-date. +
{t("hint")}