diff --git a/servers/fai/poetry.lock b/servers/fai/poetry.lock index 4fd3984197..58cb6621af 100644 --- a/servers/fai/poetry.lock +++ b/servers/fai/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. [[package]] name = "aioboto3" @@ -446,6 +446,29 @@ files = [ {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, ] +[[package]] +name = "beautifulsoup4" +version = "4.14.2" +description = "Screen-scraping library" +optional = false +python-versions = ">=3.7.0" +groups = ["main"] +files = [ + {file = "beautifulsoup4-4.14.2-py3-none-any.whl", hash = "sha256:5ef6fa3a8cbece8488d66985560f97ed091e22bbc4e9c2338508a9d5de6d4515"}, + {file = "beautifulsoup4-4.14.2.tar.gz", hash = "sha256:2a98ab9f944a11acee9cc848508ec28d9228abfd522ef0fad6a02a72e0ded69e"}, +] + +[package.dependencies] +soupsieve = ">1.2" +typing-extensions = ">=4.0.0" + +[package.extras] +cchardet = ["cchardet"] +chardet = ["chardet"] +charset-normalizer = ["charset-normalizer"] +html5lib = ["html5lib"] +lxml = ["lxml"] + [[package]] name = "black" version = "24.10.0" @@ -736,14 +759,14 @@ python-dotenv = "*" [[package]] name = "fastapi" -version = "0.120.1" +version = "0.120.4" description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production" optional = false python-versions = ">=3.8" groups = ["main"] files = [ - {file = "fastapi-0.120.1-py3-none-any.whl", hash = "sha256:0e8a2c328e96c117272d8c794d3a97d205f753cc2e69dd7ee387b7488a75601f"}, - {file = "fastapi-0.120.1.tar.gz", hash = "sha256:b5c6217e9ddca6dfcf54c97986180d4a1955e10c693d74943fc5327700178bff"}, + {file = "fastapi-0.120.4-py3-none-any.whl", hash = "sha256:9bdf192308676480d3593e10fd05094e56d6fdc7d9283db26053d8104d5f82a0"}, + {file = "fastapi-0.120.4.tar.gz", hash = "sha256:2d856bc847893ca4d77896d4504ffdec0fb04312b705065fca9104428eca3868"}, ] [package.dependencies] @@ -1376,6 +1399,22 @@ babel = ["Babel"] lingua = ["lingua"] testing = ["pytest"] +[[package]] +name = "markdownify" +version = "1.2.0" +description = "Convert HTML to markdown." +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "markdownify-1.2.0-py3-none-any.whl", hash = "sha256:48e150a1c4993d4d50f282f725c0111bd9eb25645d41fa2f543708fd44161351"}, + {file = "markdownify-1.2.0.tar.gz", hash = "sha256:f6c367c54eb24ee953921804dfe6d6575c5e5b42c643955e7242034435de634c"}, +] + +[package.dependencies] +beautifulsoup4 = ">=4.9,<5" +six = ">=1.15,<2" + [[package]] name = "markupsafe" version = "3.0.2" @@ -2774,6 +2813,18 @@ files = [ {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, ] +[[package]] +name = "soupsieve" +version = "2.8" +description = "A modern CSS selector implementation for Beautiful Soup." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "soupsieve-2.8-py3-none-any.whl", hash = "sha256:0cc76456a30e20f5d7f2e14a98a4ae2ee4e5abdc7c5ea0aafe795f344bc7984c"}, + {file = "soupsieve-2.8.tar.gz", hash = "sha256:e2dd4a40a628cb5f28f6d4b0db8800b8f581b65bb380b97de22ba5ca8d72572f"}, +] + [[package]] name = "sqlalchemy" version = "2.0.41" @@ -2872,14 +2923,14 @@ sqlcipher = ["sqlcipher3_binary"] [[package]] name = "starlette" -version = "0.49.1" +version = "0.47.3" description = "The little ASGI library that shines." optional = false python-versions = ">=3.9" groups = ["main"] files = [ - {file = "starlette-0.49.1-py3-none-any.whl", hash = "sha256:d92ce9f07e4a3caa3ac13a79523bd18e3bc0042bb8ff2d759a8e7dd0e1859875"}, - {file = "starlette-0.49.1.tar.gz", hash = "sha256:481a43b71e24ed8c43b11ea02f5353d77840e01480881b8cb5a26b8cae64a8cb"}, + {file = "starlette-0.47.3-py3-none-any.whl", hash = "sha256:89c0778ca62a76b826101e7c709e70680a1699ca7da6b44d38eb0a7e61fe4b51"}, + {file = "starlette-0.47.3.tar.gz", hash = "sha256:6bc94f839cc176c4858894f1f8908f0ab79dfec1a6b8402f6da9be26ebea52e9"}, ] [package.dependencies] @@ -2889,6 +2940,22 @@ typing-extensions = {version = ">=4.10.0", markers = "python_version < \"3.13\"" [package.extras] full = ["httpx (>=0.27.0,<0.29.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.18)", "pyyaml"] +[[package]] +name = "tenacity" +version = "8.5.0" +description = "Retry code until it succeeds" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "tenacity-8.5.0-py3-none-any.whl", hash = "sha256:b594c2a5945830c267ce6b79a166228323ed52718f30302c1359836112346687"}, + {file = "tenacity-8.5.0.tar.gz", hash = "sha256:8bc6c0c8a09b31e6cad13c47afbed1a567518250a9a171418582ed8d9c20ca78"}, +] + +[package.extras] +doc = ["reno", "sphinx"] +test = ["pytest", "tornado (>=4.5)", "typeguard"] + [[package]] name = "threadpoolctl" version = "3.6.0" @@ -3392,4 +3459,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = ">=3.11,<4.0" -content-hash = "5b183a3843cbd75eeeda0df5b84053ea7ed4d504fc6d26b41983f18d81a78e7f" +content-hash = "79d4a0540677519f17be45c1b0d80e0416c22dfe3663cc375a06979e5425bced" diff --git a/servers/fai/pyproject.toml b/servers/fai/pyproject.toml index 00e3a9630d..1765a4eac2 100644 --- a/servers/fai/pyproject.toml +++ b/servers/fai/pyproject.toml @@ -45,6 +45,9 @@ slack-sdk = "^3.36.0" python-multipart = "^0.0.20" upstash-redis = "^1.4.0" aioboto3 = "^13.0.0" +markdownify = "^1.2.0" +tenacity = "^8.2.0" +beautifulsoup4 = "^4.12.0" [tool.poetry.scripts] start = "fai.main:start" diff --git a/servers/fai/src/fai/utils/website/__init__.py b/servers/fai/src/fai/utils/website/__init__.py new file mode 100644 index 0000000000..6ccd22ed5a --- /dev/null +++ b/servers/fai/src/fai/utils/website/__init__.py @@ -0,0 +1,6 @@ +from fai.utils.website.chunker import MarkdownChunker +from fai.utils.website.crawler import DocumentationCrawler +from fai.utils.website.extractor import ContentExtractor +from fai.utils.website.models import DocumentChunk + +__all__ = ["DocumentChunk", "ContentExtractor", "MarkdownChunker", "DocumentationCrawler"] diff --git a/servers/fai/src/fai/utils/website/chunker.py b/servers/fai/src/fai/utils/website/chunker.py new file mode 100644 index 0000000000..1de87081e3 --- /dev/null +++ b/servers/fai/src/fai/utils/website/chunker.py @@ -0,0 +1,161 @@ +import re + +from fai.utils.website.models import DocumentChunk + + +class MarkdownChunker: + def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200, min_chunk_size: int = 100): + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.min_chunk_size = min_chunk_size + + def chunk_document( + self, markdown_content: str, title: str, metadata: dict[str, str | list[str] | None] + ) -> list[DocumentChunk]: + chunks: list[DocumentChunk] = [] + sections = self._split_by_headers(markdown_content) + + for section in sections: + section_chunks = self._chunk_section(section, title, metadata, markdown_content) + chunks.extend(section_chunks) + + return chunks + + def _split_by_headers(self, markdown: str) -> list[dict[str, str | int | None]]: + sections: list[dict[str, str | int | None]] = [] + lines = markdown.split("\n") + + current_lines: list[str] = [] + current_heading: str | None = None + current_level: int = 0 + + for line in lines: + header_match = re.match(r"^(#{1,6})\s+(.+)$", line) + + if header_match: + if current_lines: + sections.append( + {"heading": current_heading, "level": current_level, "content": "\n".join(current_lines)} + ) + + current_level = len(header_match.group(1)) + current_heading = header_match.group(2).strip() + current_lines = [] + else: + current_lines.append(line) + + if current_lines: + sections.append({"heading": current_heading, "level": current_level, "content": "\n".join(current_lines)}) + + if not sections and markdown.strip(): + sections.append({"heading": None, "level": 0, "content": markdown}) + + return sections + + def _chunk_section( + self, + section: dict[str, str | int | None], + doc_title: str, + base_metadata: dict[str, str | list[str] | None], + full_document: str, + ) -> list[DocumentChunk]: + chunks: list[DocumentChunk] = [] + heading_val = section["heading"] + level_val = section["level"] + content_val = section["content"] + + heading: str | None = heading_val if isinstance(heading_val, str) or heading_val is None else None + level: int = level_val if isinstance(level_val, int) else 0 + content: str = content_val.strip() if isinstance(content_val, str) else "" + + if not content or len(content) < self.min_chunk_size: + return chunks + + if len(content) <= self.chunk_size: + chunk_content = content + + if heading: + chunk_content = f"# {heading}\n\n{chunk_content}" + + chunks.append( + DocumentChunk( + content=chunk_content, + metadata={ + "document_title": doc_title, + "section_heading": heading, + "heading_level": level, + "chunk_type": "section", + **base_metadata, + }, + full_document=full_document, + ) + ) + else: + text_chunks = self._split_with_overlap(content) + + filtered_chunks = [(i, chunk_text) for i, chunk_text in enumerate(text_chunks) + if len(chunk_text.strip()) >= self.min_chunk_size] + + total_filtered = len(filtered_chunks) + + for part_num, (original_index, chunk_text) in enumerate(filtered_chunks, start=1): + if heading and original_index == 0: + chunk_content = f"# {heading}\n\n{chunk_text}" + elif heading: + chunk_content = f"[Continuing from: {heading}]\n\n{chunk_text}" + else: + chunk_content = chunk_text + + chunks.append( + DocumentChunk( + content=chunk_content, + metadata={ + "document_title": doc_title, + "section_heading": heading, + "heading_level": level, + "chunk_type": "section_part", + "part_number": part_num, + "total_parts": total_filtered, + **base_metadata, + }, + full_document=full_document, + ) + ) + + return chunks + + def _split_with_overlap(self, text: str) -> list[str]: + if len(text) <= self.chunk_size: + return [text] + + chunks: list[str] = [] + paragraphs = re.split(r"\n\n+", text) + current_chunk: list[str] = [] + current_length = 0 + + for para in paragraphs: + para_length = len(para) + + if current_length + para_length > self.chunk_size and current_chunk: + chunks.append("\n\n".join(current_chunk)) + + overlap_paras: list[str] = [] + overlap_length = 0 + + for p in reversed(current_chunk): + if overlap_length + len(p) <= self.chunk_overlap: + overlap_paras.insert(0, p) + overlap_length += len(p) + else: + break + + current_chunk = overlap_paras + current_length = overlap_length + + current_chunk.append(para) + current_length += para_length + + if current_chunk: + chunks.append("\n\n".join(current_chunk)) + + return chunks diff --git a/servers/fai/src/fai/utils/website/crawler.py b/servers/fai/src/fai/utils/website/crawler.py new file mode 100644 index 0000000000..640277dab6 --- /dev/null +++ b/servers/fai/src/fai/utils/website/crawler.py @@ -0,0 +1,255 @@ +import json +import re +import time +from collections import deque +from urllib.parse import ( + parse_qs, + urlencode, + urljoin, + urlparse, +) + +import requests +from bs4 import BeautifulSoup +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, +) + +from fai.utils.website.chunker import MarkdownChunker +from fai.utils.website.extractor import ContentExtractor +from fai.utils.website.models import DocumentChunk + + +class DocumentationCrawler: + def __init__( + self, + start_url: str, + domain_filter: str | None = None, + path_filter: str | None = None, + url_pattern: str | None = None, + chunk_size: int = 1000, + chunk_overlap: int = 200, + min_content_length: int = 100, + request_timeout: int = 15, + max_retries: int = 3, + user_agent: str = "Mozilla/5.0 (Documentation Scraper)", + ): + self.start_url = start_url + self.domain_filter = domain_filter or urlparse(start_url).netloc + self.path_filter = path_filter + self.url_pattern = re.compile(url_pattern) if url_pattern else None + self.min_content_length = min_content_length + self.request_timeout = request_timeout + self.max_retries = max_retries + self.user_agent = user_agent + + self.visited: set[str] = set() + self.to_visit: deque[str] = deque([start_url]) + + self.extractor = ContentExtractor() + self.chunker = MarkdownChunker(chunk_size, chunk_overlap) + + self.all_chunks: list[DocumentChunk] = [] + self.failed_urls: list[dict[str, str | int | None]] = [] + + def _fetch_page(self, url: str) -> requests.Response: + @retry( + stop=stop_after_attempt(self.max_retries), + wait=wait_exponential(multiplier=1, min=1, max=10), + reraise=True, + ) + def _fetch_with_retry() -> requests.Response: + response = requests.get(url, timeout=self.request_timeout, headers={"User-Agent": self.user_agent}) + response.raise_for_status() + return response + + return _fetch_with_retry() + + def crawl(self, max_pages: int | None = None, delay: float = 1.0, verbose: bool = True) -> list[DocumentChunk]: + pages_crawled = 0 + + while self.to_visit and (max_pages is None or pages_crawled < max_pages): + url = self.to_visit.popleft() + + if url in self.visited: + continue + + if verbose: + print(f"Crawling [{pages_crawled + 1}]: {url}") + + try: + response = self._fetch_page(url) + + if response.encoding is None: + response.encoding = response.apparent_encoding or "utf-8" + elif response.encoding.lower() not in ["utf-8", "utf8"]: + response.encoding = "utf-8" + + self.visited.add(url) + extracted = self.extractor.extract_content(response.text, url) + + if len(extracted["markdown_content"]) < self.min_content_length: + if verbose: + print(" ⚠ Skipped (insufficient content)") + continue + + chunks = self.chunker.chunk_document( + extracted["markdown_content"], extracted["title"], extracted["metadata"] + ) + + self.all_chunks.extend(chunks) + + if verbose: + print(f" ✓ '{extracted['title']}' → {len(chunks)} chunks") + + new_links = self._extract_links(response.text, url) + for link in new_links: + if link not in self.visited: + self.to_visit.append(link) + + pages_crawled += 1 + time.sleep(delay) + + except requests.exceptions.HTTPError as e: + self.failed_urls.append({"url": url, "status": e.response.status_code if e.response else None}) + if verbose: + print(f" ✗ HTTP Error: {e}") + + except requests.exceptions.RequestException as e: + self.failed_urls.append({"url": url, "error": str(e)}) + if verbose: + print(f" ✗ Request Error: {str(e)}") + + except Exception as e: + self.failed_urls.append({"url": url, "error": str(e)}) + if verbose: + print(f" ✗ Error: {str(e)}") + + return self.all_chunks + + def _extract_links(self, html: str, current_url: str) -> set[str]: + soup = BeautifulSoup(html, "html.parser") + links = set() + + for a_tag in soup.find_all("a", href=True): + href = a_tag["href"] + full_url = urljoin(current_url, href) + full_url = self._normalize_url(full_url) + + if self._is_valid_url(full_url): + links.add(full_url) + + return links + + def _is_valid_url(self, url: str) -> bool: + parsed = urlparse(url) + + if parsed.netloc != self.domain_filter: + return False + + if self.path_filter and not parsed.path.startswith(self.path_filter): + return False + + if self.url_pattern and not self.url_pattern.match(url): + return False + + excluded_extensions = [ + ".pdf", + ".zip", + ".tar", + ".gz", + ".rar", + ".jpg", + ".jpeg", + ".png", + ".gif", + ".svg", + ".ico", + ".css", + ".js", + ".json", + ".xml", + ".mp4", + ".mp3", + ".avi", + ".mov", + ] + if any(parsed.path.lower().endswith(ext) for ext in excluded_extensions): + return False + + excluded_patterns = ["/search", "/print", "/download", "/login", "/signup"] + if any(pattern in parsed.path.lower() for pattern in excluded_patterns): + return False + + return True + + def _normalize_url(self, url: str) -> str: + url = url.split("#")[0] + url = url.rstrip("/") + + parsed = urlparse(url) + if parsed.query: + params = parse_qs(parsed.query) + keep_params = {k: v for k, v in params.items() if k in ["version", "v", "lang", "language"]} + if keep_params: + query = urlencode(keep_params, doseq=True) + url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{query}" + else: + url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" + + return url + + def save_chunks(self, filename: str = "chunks.jsonl") -> None: + with open(filename, "w", encoding="utf-8") as f: + for chunk in self.all_chunks: + json.dump(chunk.to_dict(), f, ensure_ascii=False) + f.write("\n") + + print(f"\n✓ Saved {len(self.all_chunks)} chunks to {filename}") + + def save_markdown_docs(self, output_dir: str = "markdown_docs") -> None: + import os + + os.makedirs(output_dir, exist_ok=True) + + docs_by_url: dict[str, dict[str, str | list[str]]] = {} + for chunk in self.all_chunks: + url_val = chunk.metadata.get("url") + title_val = chunk.metadata.get("document_title") + + url = str(url_val) if url_val else "unknown" + title = str(title_val) if title_val else "Untitled" + + if url not in docs_by_url: + docs_by_url[url] = {"title": title, "chunks": []} + + chunks_list = docs_by_url[url]["chunks"] + if isinstance(chunks_list, list): + chunks_list.append(chunk.content) + + for i, (url, doc_info) in enumerate(docs_by_url.items()): + filename = f"{output_dir}/doc_{i:04d}.md" + title_val = doc_info["title"] + chunks_val = doc_info["chunks"] + + title_str = str(title_val) if isinstance(title_val, str) else "Untitled" + + with open(filename, "w", encoding="utf-8") as f: + f.write(f"# {title_str}\n\n") + f.write(f"Source: {url}\n\n") + f.write("---\n\n") + if isinstance(chunks_val, list): + f.write("\n\n".join(chunks_val)) + + print(f"✓ Saved {len(docs_by_url)} markdown documents to {output_dir}/") + + def get_statistics(self) -> dict[str, int | float]: + return { + "total_pages": len(self.visited), + "total_chunks": len(self.all_chunks), + "failed_urls": len(self.failed_urls), + "avg_chunks_per_page": len(self.all_chunks) / len(self.visited) if self.visited else 0, + "urls_in_queue": len(self.to_visit), + } diff --git a/servers/fai/src/fai/utils/website/extractor.py b/servers/fai/src/fai/utils/website/extractor.py new file mode 100644 index 0000000000..3da1f2b979 --- /dev/null +++ b/servers/fai/src/fai/utils/website/extractor.py @@ -0,0 +1,226 @@ +import re +from typing import Any +from urllib.parse import urlparse + +from bs4 import ( + BeautifulSoup, + Comment, +) +from markdownify import markdownify as md + + +class ContentExtractor: + CODE_LANGUAGES = [ + "python", + "javascript", + "java", + "bash", + "shell", + "sql", + "json", + "yaml", + "xml", + "html", + "css", + "typescript", + "go", + "rust", + "ruby", + "php", + "c", + "cpp", + "csharp", + ] + + NOISE_SELECTORS = [ + "nav", + "header", + "footer", + "aside", + ".sidebar", + ".navigation", + ".nav", + ".menu", + ".navbar", + ".breadcrumb", + ".breadcrumbs", + '[role="navigation"]', + '[role="banner"]', + '[role="contentinfo"]', + ".toc", + ".table-of-contents", + "#toc", + "#table-of-contents", + ".edit-page", + ".edit-link", + ".github-link", + ".page-edit", + ".feedback", + ".rating", + ".social-share", + ".share-buttons", + ".advertisement", + ".ad", + ".ads", + ".banner-ad", + "script", + "style", + "noscript", + "iframe", + ".cookie-banner", + ".popup", + ".modal", + ".overlay", + ".newsletter-signup", + ".subscription-form", + ".search", + ".search-box", + ".filter", + ".sort", + ".print-only", + ".no-web", + ] + + NOISE_ATTRIBUTES = [ + ("role", "navigation"), + ("role", "banner"), + ("role", "contentinfo"), + ("aria-hidden", "true"), + ("hidden", ""), + ("style", re.compile(r"display:\s*none", re.I)), + ] + + def extract_content(self, html: str, url: str) -> dict[str, Any]: + soup = BeautifulSoup(html, "html.parser") + title = self._extract_title(soup) + metadata = self._extract_metadata(soup, url) + self._remove_noise(soup) + content_root = soup.find("body") or soup + markdown_content = self._html_to_markdown(content_root) + markdown_content = self._clean_markdown(markdown_content) + + return {"title": title, "markdown_content": markdown_content, "metadata": metadata} + + def _extract_title(self, soup: BeautifulSoup) -> str: + title_tag = soup.find("title") + if title_tag and title_tag.text.strip(): + title = title_tag.text.strip() + title = re.split(r"\s*[|\-–—]\s*", title)[0] + return title.strip() + + h1 = soup.find("h1") + if h1: + return h1.get_text(strip=True) + + og_title = soup.find("meta", property="og:title") + if og_title and og_title.get("content"): + return og_title["content"] + + return "Untitled" + + def _remove_noise(self, soup: BeautifulSoup) -> None: + for selector in self.NOISE_SELECTORS: + elements = soup.select(selector) + for element in elements: + if element and element.parent: + element.decompose() + + for attr_name, attr_value in self.NOISE_ATTRIBUTES: + if isinstance(attr_value, re.Pattern): + elements = [el for el in soup.find_all() if hasattr(el, "get")] + for element in elements: + try: + attr = element.get(attr_name) + if attr and attr_value.search(str(attr)): + if element.parent: + element.decompose() + except (AttributeError, TypeError): + continue + else: + elements = soup.find_all(attrs={attr_name: attr_value}) + for element in elements: + if element and element.parent: + element.decompose() + + comments = soup.find_all(string=lambda text: isinstance(text, Comment)) + for comment in comments: + try: + comment.extract() + except (AttributeError, ValueError): + continue + + def _html_to_markdown(self, soup: BeautifulSoup) -> str: + markdown = md( + str(soup), + heading_style="ATX", + bullets="-", + code_language_callback=self._extract_code_language, + strip=["a"], + ) + + return markdown + + def _extract_code_language(self, element: Any) -> str: + classes = element.get("class", []) + for cls in classes: + if cls.startswith("language-"): + return cls.replace("language-", "") + elif cls.startswith("lang-"): + return cls.replace("lang-", "") + elif cls in self.CODE_LANGUAGES: + return cls + return "" + + def _clean_markdown(self, markdown: str) -> str: + import unicodedata + + markdown = unicodedata.normalize("NFKD", markdown) + + replacements = { + "\u00a0": " ", + "\u2018": "'", + "\u2019": "'", + "\u201c": '"', + "\u201d": '"', + "\u2013": "-", + "\u2014": "-", + "\u2026": "...", + "\u200b": "", + "\ufeff": "", + } + + for old, new in replacements.items(): + markdown = markdown.replace(old, new) + + markdown = re.sub(r"\n{3,}", "\n\n", markdown) + markdown = markdown.strip() + markdown = markdown.replace("\\_", "_") + markdown = markdown.replace("\\*", "*") + markdown = re.sub(r"\[\]\(\)", "", markdown) + + return markdown + + def _extract_metadata(self, soup: BeautifulSoup, url: str) -> dict[str, str | list[str]]: + metadata: dict[str, str | list[str]] = {"url": url} + + description = soup.find("meta", attrs={"name": "description"}) + if description and description.get("content"): + metadata["description"] = description["content"] + + og_description = soup.find("meta", property="og:description") + if og_description and og_description.get("content") and "description" not in metadata: + metadata["description"] = og_description["content"] + + keywords = soup.find("meta", attrs={"name": "keywords"}) + if keywords and keywords.get("content"): + metadata["keywords"] = keywords["content"] + + canonical = soup.find("link", rel="canonical") + if canonical and canonical.get("href"): + metadata["canonical_url"] = canonical["href"] + + path_parts = [p for p in urlparse(url).path.split("/") if p] + if path_parts: + metadata["url_path"] = path_parts + + return metadata diff --git a/servers/fai/src/fai/utils/website/models.py b/servers/fai/src/fai/utils/website/models.py new file mode 100644 index 0000000000..5aaa033e2a --- /dev/null +++ b/servers/fai/src/fai/utils/website/models.py @@ -0,0 +1,11 @@ +from dataclasses import dataclass + + +@dataclass +class DocumentChunk: + content: str + metadata: dict[str, str | int | list[str] | None] + full_document: str + + def to_dict(self) -> dict[str, str | dict[str, str | int | list[str] | None]]: + return {"content": self.content, "metadata": self.metadata, "full_document": self.full_document} diff --git a/servers/fai/tests/utils/website/__init__.py b/servers/fai/tests/utils/website/__init__.py new file mode 100644 index 0000000000..76a5a736a2 --- /dev/null +++ b/servers/fai/tests/utils/website/__init__.py @@ -0,0 +1 @@ +"""Tests for website indexing utilities.""" diff --git a/servers/fai/tests/utils/website/test_content_extractor.py b/servers/fai/tests/utils/website/test_content_extractor.py new file mode 100644 index 0000000000..d4c8b3d3be --- /dev/null +++ b/servers/fai/tests/utils/website/test_content_extractor.py @@ -0,0 +1,316 @@ +import pytest +from bs4 import ( + BeautifulSoup, + Comment, +) + +from fai.utils.website.extractor import ContentExtractor + + +class TestContentExtractor: + @pytest.fixture + def extractor(self) -> ContentExtractor: + return ContentExtractor() + + @pytest.fixture + def sample_html_with_title(self) -> str: + return """ + + + + Test Page | Site Name + + +

Main Heading

+

Content here

+ + + """ + + @pytest.fixture + def sample_html_with_noise(self) -> str: + return """ + + + + Clean Content Test + + + +
+

Site Header

+
+
+

Article Title

+

This is the actual content we want to keep.

+
print("hello")
+
+ + + + + + + """ + + @pytest.fixture + def sample_html_with_metadata(self) -> str: + return """ + + + + Test Page + + + + + + + +

Content

+ + + """ + + @pytest.fixture + def sample_html_with_code_blocks(self) -> str: + return """ + + + +

Code Examples

+
def hello(): pass
+
const x = 1;
+
print("test")
+
plain code
+ + + """ + + def test_extract_title_from_title_tag(self, extractor: ContentExtractor) -> None: + html = "Page Title | Site Name" + soup = BeautifulSoup(html, "html.parser") + title = extractor._extract_title(soup) + assert title == "Page Title" + + def test_extract_title_from_title_tag_with_dash(self, extractor: ContentExtractor) -> None: + html = "Page Title - Site Name" + soup = BeautifulSoup(html, "html.parser") + title = extractor._extract_title(soup) + assert title == "Page Title" + + def test_extract_title_from_h1_fallback(self, extractor: ContentExtractor) -> None: + html = "

H1 Title

" + soup = BeautifulSoup(html, "html.parser") + title = extractor._extract_title(soup) + assert title == "H1 Title" + + def test_extract_title_from_og_title_fallback(self, extractor: ContentExtractor) -> None: + html = '' + soup = BeautifulSoup(html, "html.parser") + title = extractor._extract_title(soup) + assert title == "OG Title" + + def test_extract_title_returns_untitled_when_missing(self, extractor: ContentExtractor) -> None: + html = "

No title here

" + soup = BeautifulSoup(html, "html.parser") + title = extractor._extract_title(soup) + assert title == "Untitled" + + def test_remove_noise_removes_navigation(self, extractor: ContentExtractor) -> None: + html = "
Content
" + soup = BeautifulSoup(html, "html.parser") + extractor._remove_noise(soup) + assert soup.find("nav") is None + assert soup.find("main") is not None + + def test_remove_noise_removes_scripts_and_styles(self, extractor: ContentExtractor) -> None: + html = '

Content

' + soup = BeautifulSoup(html, "html.parser") + extractor._remove_noise(soup) + assert soup.find("script") is None + assert soup.find("style") is None + assert soup.find("p") is not None + + def test_remove_noise_removes_hidden_elements(self, extractor: ContentExtractor) -> None: + html = '
Visible
' + soup = BeautifulSoup(html, "html.parser") + extractor._remove_noise(soup) + hidden_div = soup.find("div", {"aria-hidden": "true"}) + assert hidden_div is None + + def test_remove_noise_removes_comments(self, extractor: ContentExtractor) -> None: + html = "

Content

" + soup = BeautifulSoup(html, "html.parser") + extractor._remove_noise(soup) + # Check that no Comment objects remain in the soup + comments = soup.find_all(string=lambda text: isinstance(text, Comment)) + assert len(comments) == 0 + + def test_extract_code_language_with_language_prefix(self, extractor: ContentExtractor) -> None: + # Use BeautifulSoup to create a real element + html = '' + soup = BeautifulSoup(html, "html.parser") + element = soup.find("code") + language = extractor._extract_code_language(element) + assert language == "python" + + def test_extract_code_language_with_lang_prefix(self, extractor: ContentExtractor) -> None: + html = '' + soup = BeautifulSoup(html, "html.parser") + element = soup.find("code") + language = extractor._extract_code_language(element) + assert language == "javascript" + + def test_extract_code_language_direct_match(self, extractor: ContentExtractor) -> None: + html = '' + soup = BeautifulSoup(html, "html.parser") + element = soup.find("code") + language = extractor._extract_code_language(element) + assert language == "python" + + def test_extract_code_language_no_match(self, extractor: ContentExtractor) -> None: + html = '' + soup = BeautifulSoup(html, "html.parser") + element = soup.find("code") + language = extractor._extract_code_language(element) + assert language == "" + + def test_clean_markdown_removes_excess_newlines(self, extractor: ContentExtractor) -> None: + markdown = "Line 1\n\n\n\n\nLine 2" + cleaned = extractor._clean_markdown(markdown) + assert "\n\n\n" not in cleaned + assert "Line 1\n\nLine 2" == cleaned + + def test_clean_markdown_normalizes_unicode(self, extractor: ContentExtractor) -> None: + markdown = "Hello\u00a0World\u2019s\u201cquoted\u201d" + cleaned = extractor._clean_markdown(markdown) + assert "\u00a0" not in cleaned # Non-breaking space removed + assert "'" in cleaned # Smart quote converted + assert '"' in cleaned # Smart quotes converted + + def test_clean_markdown_fixes_escaped_characters(self, extractor: ContentExtractor) -> None: + markdown = r"This is \_escaped\_ and \*also\* escaped" + cleaned = extractor._clean_markdown(markdown) + assert r"\_" not in cleaned + assert r"\*" not in cleaned + assert "_escaped_" in cleaned + assert "*also*" in cleaned + + def test_clean_markdown_removes_empty_links(self, extractor: ContentExtractor) -> None: + markdown = "Text with []() empty link" + cleaned = extractor._clean_markdown(markdown) + assert "[]()" not in cleaned + + def test_clean_markdown_strips_whitespace(self, extractor: ContentExtractor) -> None: + markdown = "\n\n Content here \n\n" + cleaned = extractor._clean_markdown(markdown) + assert cleaned == "Content here" + + def test_extract_metadata_extracts_description(self, extractor: ContentExtractor) -> None: + html = '' + soup = BeautifulSoup(html, "html.parser") + metadata = extractor._extract_metadata(soup, "https://example.com/page") + assert metadata["description"] == "Test description" + + def test_extract_metadata_prefers_description_over_og_description(self, extractor: ContentExtractor) -> None: + html = """ + + + """ + soup = BeautifulSoup(html, "html.parser") + metadata = extractor._extract_metadata(soup, "https://example.com/page") + assert metadata["description"] == "Meta description" + + def test_extract_metadata_uses_og_description_fallback(self, extractor: ContentExtractor) -> None: + html = '' + soup = BeautifulSoup(html, "html.parser") + metadata = extractor._extract_metadata(soup, "https://example.com/page") + assert metadata["description"] == "OG description" + + def test_extract_metadata_extracts_keywords(self, extractor: ContentExtractor) -> None: + html = '' + soup = BeautifulSoup(html, "html.parser") + metadata = extractor._extract_metadata(soup, "https://example.com/page") + assert metadata["keywords"] == "python, testing, tutorial" + + def test_extract_metadata_extracts_canonical_url(self, extractor: ContentExtractor) -> None: + html = '' + soup = BeautifulSoup(html, "html.parser") + metadata = extractor._extract_metadata(soup, "https://example.com/page") + assert metadata["canonical_url"] == "https://example.com/canonical" + + def test_extract_metadata_extracts_url_path(self, extractor: ContentExtractor) -> None: + html = "" + soup = BeautifulSoup(html, "html.parser") + metadata = extractor._extract_metadata(soup, "https://example.com/docs/getting-started/") + assert metadata["url_path"] == ["docs", "getting-started"] + + def test_extract_metadata_includes_url(self, extractor: ContentExtractor) -> None: + html = "" + soup = BeautifulSoup(html, "html.parser") + metadata = extractor._extract_metadata(soup, "https://example.com/page") + assert metadata["url"] == "https://example.com/page" + + def test_extract_content_full_integration(self, extractor: ContentExtractor, sample_html_with_noise: str) -> None: + result = extractor.extract_content(sample_html_with_noise, "https://example.com/test") + + # Check title is extracted + assert result["title"] == "Clean Content Test" + + # Check that noise elements are not in markdown + markdown = result["markdown_content"] + assert "Home" not in markdown # Nav removed + assert "Site Header" not in markdown # Header removed + assert "Copyright" not in markdown # Footer removed + assert "Table of Contents" not in markdown # Sidebar removed + + # Check that actual content is present + assert "Article Title" in markdown + assert "actual content" in markdown + + # Check that code blocks are preserved + assert "print" in markdown or "hello" in markdown # Code content should be present + + # Check metadata + assert result["metadata"]["url"] == "https://example.com/test" + + def test_extract_content_with_metadata(self, extractor: ContentExtractor, sample_html_with_metadata: str) -> None: + result = extractor.extract_content(sample_html_with_metadata, "https://example.com/docs/guide") + + metadata = result["metadata"] + assert metadata["description"] == "This is a test page description" + assert metadata["keywords"] == "test, sample, page" + assert metadata["canonical_url"] == "https://example.com/canonical" + assert metadata["url_path"] == ["docs", "guide"] + + def test_html_to_markdown_preserves_headings(self, extractor: ContentExtractor) -> None: + html = "

Title

Subtitle

Text

" + soup = BeautifulSoup(html, "html.parser") + markdown = extractor._html_to_markdown(soup) + + assert "# Title" in markdown + assert "## Subtitle" in markdown + + def test_html_to_markdown_preserves_lists(self, extractor: ContentExtractor) -> None: + html = "" + soup = BeautifulSoup(html, "html.parser") + markdown = extractor._html_to_markdown(soup) + + assert "- Item 1" in markdown or "* Item 1" in markdown + assert "- Item 2" in markdown or "* Item 2" in markdown + + def test_extract_content_handles_empty_body(self, extractor: ContentExtractor) -> None: + html = "Empty" + result = extractor.extract_content(html, "https://example.com/empty") + + assert result["title"] == "Empty" + assert result["markdown_content"] == "" + assert result["metadata"]["url"] == "https://example.com/empty" diff --git a/servers/fai/tests/utils/website/test_documentation_crawler.py b/servers/fai/tests/utils/website/test_documentation_crawler.py new file mode 100644 index 0000000000..a4bd2ce280 --- /dev/null +++ b/servers/fai/tests/utils/website/test_documentation_crawler.py @@ -0,0 +1,397 @@ +from collections import deque +from unittest.mock import ( + Mock, + patch, +) + +import pytest +import requests + +from fai.utils.website.crawler import DocumentationCrawler + + +class TestDocumentationCrawler: + @pytest.fixture + def basic_crawler(self) -> DocumentationCrawler: + return DocumentationCrawler( + start_url="https://docs.example.com/guide", + domain_filter="docs.example.com", + chunk_size=1000, + chunk_overlap=200, + ) + + @pytest.fixture + def sample_html_page(self) -> str: + return """ + + + + Documentation Guide + + +

Getting Started

+

Welcome to our documentation with enough content to pass minimum length requirements.

+ Introduction + Advanced Topics + External Link + + + """ + + @pytest.fixture + def sample_html_with_content(self) -> str: + return """ + + + + API Reference | Example Docs + + + +
+

API Reference

+

Authentication

+

Use API keys for authentication. Here's how to get started with our authentication system.

+ +

Endpoints

+

Available endpoints for the API with detailed descriptions and examples.

+ +

+import requests
+response = requests.get("https://api.example.com/data")
+                
+
+ + + """ + + def test_initialization(self, basic_crawler: DocumentationCrawler) -> None: + assert basic_crawler.start_url == "https://docs.example.com/guide" + assert basic_crawler.domain_filter == "docs.example.com" + assert basic_crawler.chunker.chunk_size == 1000 + assert basic_crawler.chunker.chunk_overlap == 200 + assert len(basic_crawler.visited) == 0 + assert len(basic_crawler.to_visit) == 1 + to_visit_list = list(basic_crawler.to_visit) + assert basic_crawler.start_url in to_visit_list + + def test_initialization_auto_domain_filter(self) -> None: + crawler = DocumentationCrawler(start_url="https://docs.example.com/path") + assert crawler.domain_filter == "docs.example.com" + + def test_is_valid_url_same_domain(self, basic_crawler: DocumentationCrawler) -> None: + assert basic_crawler._is_valid_url("https://docs.example.com/guide/intro") + assert basic_crawler._is_valid_url("https://docs.example.com/other/path") + + def test_is_valid_url_different_domain(self, basic_crawler: DocumentationCrawler) -> None: + assert not basic_crawler._is_valid_url("https://other-site.com/guide") + assert not basic_crawler._is_valid_url("https://example.com/guide") + + def test_is_valid_url_with_path_filter(self) -> None: + crawler = DocumentationCrawler(start_url="https://docs.example.com/en/guide", path_filter="/en/") + + assert crawler._is_valid_url("https://docs.example.com/en/tutorial") + assert not crawler._is_valid_url("https://docs.example.com/fr/tutorial") + + def test_is_valid_url_with_pattern(self) -> None: + crawler = DocumentationCrawler(start_url="https://docs.example.com/v1/guide", url_pattern=r".*\/v\d+\/.*") + + assert crawler._is_valid_url("https://docs.example.com/v1/guide") + assert crawler._is_valid_url("https://docs.example.com/v2/tutorial") + assert not crawler._is_valid_url("https://docs.example.com/latest/guide") + + def test_is_valid_url_excludes_file_extensions(self, basic_crawler: DocumentationCrawler) -> None: + assert not basic_crawler._is_valid_url("https://docs.example.com/guide.pdf") + assert not basic_crawler._is_valid_url("https://docs.example.com/image.png") + assert not basic_crawler._is_valid_url("https://docs.example.com/script.js") + assert not basic_crawler._is_valid_url("https://docs.example.com/data.json") + + def test_is_valid_url_excludes_utility_pages(self, basic_crawler: DocumentationCrawler) -> None: + assert not basic_crawler._is_valid_url("https://docs.example.com/search?q=test") + assert not basic_crawler._is_valid_url("https://docs.example.com/print/guide") + assert not basic_crawler._is_valid_url("https://docs.example.com/download/pdf") + assert not basic_crawler._is_valid_url("https://docs.example.com/login") + + def test_normalize_url_removes_fragment(self, basic_crawler: DocumentationCrawler) -> None: + url = "https://docs.example.com/guide#section-2" + normalized = basic_crawler._normalize_url(url) + assert "#section-2" not in normalized + assert normalized == "https://docs.example.com/guide" + + def test_normalize_url_removes_trailing_slash(self, basic_crawler: DocumentationCrawler) -> None: + url = "https://docs.example.com/guide/" + normalized = basic_crawler._normalize_url(url) + assert not normalized.endswith("/") + assert normalized == "https://docs.example.com/guide" + + def test_normalize_url_keeps_version_params(self, basic_crawler: DocumentationCrawler) -> None: + url = "https://docs.example.com/guide?version=2.0&lang=en&utm_source=google" + normalized = basic_crawler._normalize_url(url) + + assert "version=2.0" in normalized + assert "lang=en" in normalized + assert "utm_source" not in normalized + + def test_normalize_url_removes_tracking_params(self, basic_crawler: DocumentationCrawler) -> None: + url = "https://docs.example.com/guide?utm_source=twitter&ref=homepage" + normalized = basic_crawler._normalize_url(url) + + assert normalized == "https://docs.example.com/guide" + + def test_extract_links_finds_valid_links(self, basic_crawler: DocumentationCrawler, sample_html_page: str) -> None: + current_url = "https://docs.example.com/guide" + links = basic_crawler._extract_links(sample_html_page, current_url) + + assert "https://docs.example.com/guide/intro" in links + assert "https://docs.example.com/guide/advanced" in links + + assert "https://external.com/resource" not in links + + def test_extract_links_converts_relative_urls(self, basic_crawler: DocumentationCrawler) -> None: + html = """ + + Intro + Other + Page + + """ + current_url = "https://docs.example.com/guide/start" + links = basic_crawler._extract_links(html, current_url) + + for link in links: + assert link.startswith("https://") + + def test_extract_links_normalizes_urls(self, basic_crawler: DocumentationCrawler) -> None: + html = """ + + Link with fragment + Link with slash + + """ + current_url = "https://docs.example.com/" + links = basic_crawler._extract_links(html, current_url) + + assert "https://docs.example.com/guide" in links + assert all("#" not in link for link in links) + + @patch("fai.utils.website.crawler.requests.get") + def test_fetch_page_success( + self, mock_get: Mock, basic_crawler: DocumentationCrawler, sample_html_with_content: str + ) -> None: + mock_response = Mock() + mock_response.text = sample_html_with_content + mock_response.status_code = 200 + mock_response.encoding = "utf-8" + mock_get.return_value = mock_response + + response = basic_crawler._fetch_page("https://docs.example.com/page") + + assert response.text == sample_html_with_content + mock_get.assert_called_once() + + @patch("fai.utils.website.crawler.requests.get") + def test_fetch_page_sets_user_agent(self, mock_get: Mock, basic_crawler: DocumentationCrawler) -> None: + mock_response = Mock() + mock_response.text = "" + mock_response.status_code = 200 + mock_get.return_value = mock_response + + basic_crawler._fetch_page("https://docs.example.com/page") + + call_kwargs = mock_get.call_args[1] + assert "User-Agent" in call_kwargs["headers"] + + @patch("fai.utils.website.crawler.requests.get") + def test_crawl_single_page( + self, mock_get: Mock, basic_crawler: DocumentationCrawler, sample_html_with_content: str + ) -> None: + mock_response = Mock() + mock_response.text = sample_html_with_content + mock_response.status_code = 200 + mock_response.encoding = "utf-8" + mock_get.return_value = mock_response + + chunks = basic_crawler.crawl(max_pages=1, delay=0, verbose=False) + + assert len(basic_crawler.visited) == 1 + assert len(chunks) > 0 + assert basic_crawler.start_url in basic_crawler.visited + + @patch("fai.utils.website.crawler.requests.get") + def test_crawl_respects_max_pages(self, mock_get: Mock, sample_html_with_content: str) -> None: + html_with_links = """ + + Page + +

Content with enough text to pass minimum length requirements for chunking and processing properly.

+ Page 1 + Page 2 + Page 3 + + + """ + + mock_response = Mock() + mock_response.text = html_with_links + mock_response.status_code = 200 + mock_response.encoding = "utf-8" + mock_get.return_value = mock_response + + crawler = DocumentationCrawler(start_url="https://docs.example.com/start") + crawler.crawl(max_pages=2, delay=0, verbose=False) + + assert len(crawler.visited) == 2 + + @patch("fai.utils.website.crawler.requests.get") + def test_crawl_skips_visited_urls(self, mock_get: Mock, basic_crawler: DocumentationCrawler) -> None: + test_url = "https://docs.example.com/guide" + basic_crawler.visited.add(test_url) + basic_crawler.to_visit = deque([test_url]) + + basic_crawler.crawl(max_pages=1, delay=0, verbose=False) + + mock_get.assert_not_called() + + @patch("fai.utils.website.crawler.requests.get") + def test_crawl_skips_insufficient_content(self, mock_get: Mock, basic_crawler: DocumentationCrawler) -> None: + short_html = "Short

Hi

" + + mock_response = Mock() + mock_response.text = short_html + mock_response.status_code = 200 + mock_response.encoding = "utf-8" + mock_get.return_value = mock_response + + chunks = basic_crawler.crawl(max_pages=1, delay=0, verbose=False) + + assert len(basic_crawler.visited) == 1 + assert len(chunks) == 0 + + @patch("fai.utils.website.crawler.requests.get") + def test_crawl_handles_http_errors(self, mock_get: Mock, basic_crawler: DocumentationCrawler) -> None: + mock_response = Mock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + mock_get.return_value.raise_for_status.side_effect = requests.exceptions.HTTPError(response=mock_response) + + basic_crawler.crawl(max_pages=1, delay=0, verbose=False) + + assert len(basic_crawler.failed_urls) == 1 + assert basic_crawler.failed_urls[0]["url"] == basic_crawler.start_url + assert basic_crawler.failed_urls[0]["status"] == 404 + + @patch("fai.utils.website.crawler.requests.get") + def test_crawl_handles_request_exceptions(self, mock_get: Mock, basic_crawler: DocumentationCrawler) -> None: + mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed") + + basic_crawler.crawl(max_pages=1, delay=0, verbose=False) + + assert len(basic_crawler.failed_urls) == 1 + assert "Connection failed" in basic_crawler.failed_urls[0]["error"] + + @patch("fai.utils.website.crawler.requests.get") + def test_crawl_discovers_new_links(self, mock_get: Mock, basic_crawler: DocumentationCrawler) -> None: + html_with_links = """ + + Start Page + +

Start page with enough content to meet minimum requirements for processing and chunking properly.

+ Page 1 + Page 2 + + + """ + + mock_response = Mock() + mock_response.text = html_with_links + mock_response.status_code = 200 + mock_response.encoding = "utf-8" + mock_get.return_value = mock_response + + basic_crawler.crawl(max_pages=1, delay=0, verbose=False) + + assert len(basic_crawler.visited) == 1 + to_visit_list = list(basic_crawler.to_visit) + assert ( + "https://docs.example.com/page1" in to_visit_list + or "https://docs.example.com/page1" in basic_crawler.visited + ) + assert ( + "https://docs.example.com/page2" in to_visit_list + or "https://docs.example.com/page2" in basic_crawler.visited + ) + + @patch("fai.utils.website.crawler.requests.get") + def test_crawl_produces_chunks_with_metadata( + self, mock_get: Mock, basic_crawler: DocumentationCrawler, sample_html_with_content: str + ) -> None: + mock_response = Mock() + mock_response.text = sample_html_with_content + mock_response.status_code = 200 + mock_response.encoding = "utf-8" + mock_get.return_value = mock_response + + chunks = basic_crawler.crawl(max_pages=1, delay=0, verbose=False) + + assert len(chunks) > 0 + for chunk in chunks: + assert hasattr(chunk, "content") + assert hasattr(chunk, "metadata") + assert "document_title" in chunk.metadata + assert "url" in chunk.metadata + + def test_get_statistics(self, basic_crawler: DocumentationCrawler) -> None: + basic_crawler.visited = {"url1", "url2", "url3"} + basic_crawler.all_chunks = [Mock(), Mock(), Mock(), Mock(), Mock()] + basic_crawler.failed_urls = [{"url": "failed1"}] + basic_crawler.to_visit = deque(["url4", "url5"]) + + stats = basic_crawler.get_statistics() + + assert stats["total_pages"] == 3 + assert stats["total_chunks"] == 5 + assert stats["failed_urls"] == 1 + assert stats["avg_chunks_per_page"] == 5 / 3 + assert stats["urls_in_queue"] == 2 + + def test_get_statistics_empty_crawler(self, basic_crawler: DocumentationCrawler) -> None: + basic_crawler.to_visit.clear() + + stats = basic_crawler.get_statistics() + + assert stats["total_pages"] == 0 + assert stats["total_chunks"] == 0 + assert stats["failed_urls"] == 0 + assert stats["avg_chunks_per_page"] == 0 + assert stats["urls_in_queue"] == 0 + + @patch("fai.utils.website.crawler.requests.get") + def test_crawl_uses_apparent_encoding_when_missing( + self, mock_get: Mock, basic_crawler: DocumentationCrawler + ) -> None: + html_content = ( + "Test

Content here with sufficient text.

" + ) + + mock_response = Mock() + mock_response.text = html_content + mock_response.status_code = 200 + mock_response.encoding = None + mock_response.apparent_encoding = "utf-8" + mock_get.return_value = mock_response + + basic_crawler.crawl(max_pages=1, delay=0, verbose=False) + + assert len(basic_crawler.visited) == 1 + + def test_multiple_crawlers_independent(self) -> None: + crawler1 = DocumentationCrawler(start_url="https://site1.com/docs") + crawler2 = DocumentationCrawler(start_url="https://site2.com/docs") + + crawler1.visited.add("https://site1.com/page1") + crawler2.visited.add("https://site2.com/page2") + + assert len(crawler1.visited) == 1 + assert len(crawler2.visited) == 1 + assert "https://site1.com/page1" in crawler1.visited + assert "https://site1.com/page1" not in crawler2.visited diff --git a/servers/fai/tests/utils/website/test_markdown_chunker.py b/servers/fai/tests/utils/website/test_markdown_chunker.py new file mode 100644 index 0000000000..5fbb95f7c7 --- /dev/null +++ b/servers/fai/tests/utils/website/test_markdown_chunker.py @@ -0,0 +1,417 @@ +import pytest + +from fai.utils.website.chunker import MarkdownChunker + + +class TestMarkdownChunker: + @pytest.fixture + def chunker(self) -> MarkdownChunker: + return MarkdownChunker(chunk_size=1000, chunk_overlap=200, min_chunk_size=100) + + @pytest.fixture + def small_chunker(self) -> MarkdownChunker: + return MarkdownChunker(chunk_size=100, chunk_overlap=20, min_chunk_size=10) + + @pytest.fixture + def sample_markdown_with_headers(self) -> str: + return """# Introduction + +This is the introduction paragraph with some content that provides enough detail to meet minimum chunk size +requirements. + +## Getting Started + +Here's how to get started with the project. Follow these detailed instructions carefully to ensure proper setup. + +### Installation + +Run the following command to install the software. Make sure you have all prerequisites installed first. + +### Configuration + +Configure your settings here with the appropriate values. Review the configuration guide for more details. + +## Advanced Topics + +This section covers advanced topics including performance tuning, security, and deployment strategies for production. +""" + + @pytest.fixture + def sample_markdown_no_headers(self) -> str: + return """This is plain text without any headers. +Just paragraphs of content that should be treated as a single section. + +Another paragraph here with more information.""" + + @pytest.fixture + def sample_long_section(self) -> str: + paragraphs = [f"This is paragraph number {i}." for i in range(50)] + return "# Long Section\n\n" + "\n\n".join(paragraphs) + + def test_split_by_headers_single_header(self, chunker: MarkdownChunker) -> None: + markdown = "# Title\n\nSome content here." + sections = chunker._split_by_headers(markdown) + + assert len(sections) == 1 + assert sections[0]["heading"] == "Title" + assert sections[0]["level"] == 1 + assert "Some content here" in sections[0]["content"] + + def test_split_by_headers_multiple_headers(self, chunker: MarkdownChunker) -> None: + markdown = """# H1 Title + +Content for H1. + +## H2 Subtitle + +Content for H2. + +### H3 Section + +Content for H3. +""" + sections = chunker._split_by_headers(markdown) + + assert len(sections) == 3 + assert sections[0]["heading"] == "H1 Title" + assert sections[0]["level"] == 1 + assert sections[1]["heading"] == "H2 Subtitle" + assert sections[1]["level"] == 2 + assert sections[2]["heading"] == "H3 Section" + assert sections[2]["level"] == 3 + + def test_split_by_headers_no_headers(self, chunker: MarkdownChunker) -> None: + markdown = "Just plain text\n\nWith multiple paragraphs." + sections = chunker._split_by_headers(markdown) + + assert len(sections) == 1 + assert sections[0]["heading"] is None + assert sections[0]["level"] == 0 + assert "plain text" in sections[0]["content"] + + def test_split_by_headers_empty_string(self, chunker: MarkdownChunker) -> None: + markdown = "" + sections = chunker._split_by_headers(markdown) + + # Empty string should return no sections or one empty section + assert len(sections) <= 1 + if len(sections) == 1: + assert sections[0]["heading"] is None + assert sections[0]["level"] == 0 + + def test_split_by_headers_whitespace_only_content(self, chunker: MarkdownChunker) -> None: + markdown = " \n\n " # Whitespace only + sections = chunker._split_by_headers(markdown) + + # The implementation treats whitespace as content and returns one section + assert len(sections) == 1 + assert sections[0]["heading"] is None + assert sections[0]["level"] == 0 + + def test_split_by_headers_headers_only(self, chunker: MarkdownChunker) -> None: + markdown = "# Title\n## Subtitle\n" + sections = chunker._split_by_headers(markdown) + + # Headers without content after them get only the last one with empty content + # The implementation only adds sections when there are content lines + assert len(sections) == 1 + assert sections[0]["heading"] == "Subtitle" + assert sections[0]["content"].strip() == "" + + def test_split_with_overlap_short_text(self, chunker: MarkdownChunker) -> None: + text = "This is a short text that fits in one chunk." + chunks = chunker._split_with_overlap(text) + + assert len(chunks) == 1 + assert chunks[0] == text + + def test_split_with_overlap_long_text(self, small_chunker: MarkdownChunker) -> None: + # Create text longer than chunk_size (100 chars) + paragraphs = [f"Paragraph {i} with some text." for i in range(10)] + text = "\n\n".join(paragraphs) + + chunks = small_chunker._split_with_overlap(text) + + # Should have multiple chunks + assert len(chunks) > 1 + + # Check that chunks overlap + for i in range(len(chunks) - 1): + # Some content from end of chunk[i] should appear at start of chunk[i+1] + # This is a simplified check - actual overlap is at paragraph boundaries + assert len(chunks[i]) <= small_chunker.chunk_size + 50 # Allow some flexibility + + def test_split_with_overlap_respects_paragraph_boundaries(self, small_chunker: MarkdownChunker) -> None: + text = "Para 1.\n\nPara 2.\n\nPara 3.\n\nPara 4.\n\nPara 5." + chunks = small_chunker._split_with_overlap(text) + + # Each chunk should contain complete paragraphs (not split mid-paragraph) + for chunk in chunks: + # Chunks should not contain single newlines (only double \n\n paragraph separators) + # Single newlines would indicate a paragraph was split in the middle + lines = chunk.split("\n\n") + for line in lines: + # Each paragraph should not contain internal newlines + assert "\n" not in line.strip() or line.strip() == "" + + def test_chunk_document_small_content(self, chunker: MarkdownChunker) -> None: + markdown = ( + "# Simple Document\n\nThis is a simple document with enough content to meet the minimum " + "chunk size requirements. We need at least 100 characters to pass the minimum threshold." + ) + title = "Test Document" + metadata = {"url": "https://example.com/test"} + + chunks = chunker.chunk_document(markdown, title, metadata) + + assert len(chunks) == 1 + assert chunks[0].metadata["document_title"] == title + assert chunks[0].metadata["section_heading"] == "Simple Document" + assert chunks[0].metadata["chunk_type"] == "section" + assert "Simple Document" in chunks[0].content + + def test_chunk_document_respects_min_chunk_size(self, chunker: MarkdownChunker) -> None: + # Create markdown with very short section + markdown = "# Tiny\n\nSmall." + title = "Test" + metadata = {"url": "https://example.com/test"} + + chunks = chunker.chunk_document(markdown, title, metadata) + + # Should be filtered out due to min_chunk_size=100 + assert len(chunks) == 0 + + def test_chunk_document_multiple_sections( + self, chunker: MarkdownChunker, sample_markdown_with_headers: str + ) -> None: + title = "Documentation Guide" + metadata = {"url": "https://example.com/guide"} + + chunks = chunker.chunk_document(sample_markdown_with_headers, title, metadata) + + # Should have multiple chunks for different sections + assert len(chunks) > 1 + + # Check that sections are properly labeled + headings = [chunk.metadata.get("section_heading") for chunk in chunks] + assert "Introduction" in headings + assert "Getting Started" in headings + + def test_chunk_document_preserves_heading_in_content(self, chunker: MarkdownChunker) -> None: + markdown = ( + "# Important Section\n\nThis is the content of the section with enough text to meet " + "minimum requirements for chunking and processing properly with all necessary details." + ) + title = "Test Document" + metadata = {"url": "https://example.com/test"} + + chunks = chunker.chunk_document(markdown, title, metadata) + + assert len(chunks) == 1 + # Heading should be included in the content + assert "# Important Section" in chunks[0].content + assert "content of the section" in chunks[0].content + + def test_chunk_document_large_section_split(self, small_chunker: MarkdownChunker, sample_long_section: str) -> None: + title = "Long Document" + metadata = {"url": "https://example.com/long"} + + chunks = small_chunker.chunk_document(sample_long_section, title, metadata) + + # Should be split into multiple chunks + assert len(chunks) > 1 + + # Check part numbering + for chunk in chunks: + if chunk.metadata["chunk_type"] == "section_part": + assert "part_number" in chunk.metadata + assert "total_parts" in chunk.metadata + assert chunk.metadata["part_number"] <= chunk.metadata["total_parts"] + + def test_chunk_document_continuation_markers( + self, small_chunker: MarkdownChunker, sample_long_section: str + ) -> None: + title = "Long Document" + metadata = {"url": "https://example.com/long"} + + chunks = small_chunker.chunk_document(sample_long_section, title, metadata) + + # Find chunks that are continuations + continuation_chunks = [ + c for c in chunks if c.metadata.get("chunk_type") == "section_part" and c.metadata.get("part_number", 1) > 1 + ] + + # Continuation chunks should have continuation marker + for chunk in continuation_chunks: + assert "[Continuing from:" in chunk.content + + def test_chunk_document_metadata_propagation(self, chunker: MarkdownChunker) -> None: + markdown = "# Section 1\n\nContent 1.\n\n# Section 2\n\nContent 2." + title = "Test Document" + base_metadata = { + "url": "https://example.com/test", + "description": "Test description", + "url_path": ["docs", "test"], + } + + chunks = chunker.chunk_document(markdown, title, base_metadata) + + # All chunks should have base metadata + for chunk in chunks: + assert chunk.metadata["url"] == "https://example.com/test" + assert chunk.metadata["description"] == "Test description" + assert chunk.metadata["url_path"] == ["docs", "test"] + assert chunk.metadata["document_title"] == title + + def test_chunk_document_no_headers(self, chunker: MarkdownChunker, sample_markdown_no_headers: str) -> None: + title = "Plain Document" + metadata = {"url": "https://example.com/plain"} + + chunks = chunker.chunk_document(sample_markdown_no_headers, title, metadata) + + # Should create chunk with no section heading + assert len(chunks) == 1 + assert chunks[0].metadata["section_heading"] is None + assert chunks[0].metadata["heading_level"] == 0 + + def test_chunk_section_includes_heading_level(self, chunker: MarkdownChunker) -> None: + section = { + "heading": "Test Section", + "level": 2, + "content": ( + "Content here with enough text to meet minimum chunk size requirements. " + "Adding more text to ensure we pass the threshold." + ), + } + title = "Document" + metadata = {"url": "https://example.com"} + full_document = ( + "## Test Section\n\nContent here with enough text to meet minimum chunk size " + "requirements. Adding more text to ensure we pass the threshold." + ) + + chunks = chunker._chunk_section(section, title, metadata, full_document) + + assert len(chunks) == 1 + assert chunks[0].metadata["heading_level"] == 2 + + def test_chunk_section_empty_content(self, chunker: MarkdownChunker) -> None: + section = {"heading": "Empty Section", "level": 1, "content": ""} + title = "Document" + metadata = {"url": "https://example.com"} + full_document = "# Empty Section\n\n" + + chunks = chunker._chunk_section(section, title, metadata, full_document) + + assert len(chunks) == 0 + + def test_chunk_section_content_below_min_size(self, chunker: MarkdownChunker) -> None: + section = {"heading": "Tiny", "level": 1, "content": "Too small."} + title = "Document" + metadata = {"url": "https://example.com"} + full_document = "# Tiny\n\nToo small." + + chunks = chunker._chunk_section(section, title, metadata, full_document) + + # Should be filtered out (min_chunk_size=100) + assert len(chunks) == 0 + + def test_document_chunk_to_dict(self, chunker: MarkdownChunker) -> None: + markdown = ( + "# Test\n\nContent here with enough text to create a proper chunk that meets " + "minimum size requirements for the chunker." + ) + title = "Test" + metadata = {"url": "https://example.com"} + + chunks = chunker.chunk_document(markdown, title, metadata) + assert len(chunks) > 0, "Should have at least one chunk" + chunk_dict = chunks[0].to_dict() + + assert "content" in chunk_dict + assert "metadata" in chunk_dict + assert isinstance(chunk_dict["content"], str) + assert isinstance(chunk_dict["metadata"], dict) + + def test_chunk_size_parameter_respected(self) -> None: + # Create very small chunk size + small_chunker = MarkdownChunker(chunk_size=200, chunk_overlap=50, min_chunk_size=10) + + # Create content longer than chunk_size to force splitting + paragraphs = [f"This is a detailed paragraph number {i} with substantial content." for i in range(20)] + long_content = "\n\n".join(paragraphs) + markdown = f"# Section\n\n{long_content}" + + chunks = small_chunker.chunk_document(markdown, "Test", {"url": "https://example.com"}) + + # Should be split into multiple chunks + assert len(chunks) > 1 + + def test_chunk_overlap_parameter_respected(self) -> None: + chunker = MarkdownChunker(chunk_size=100, chunk_overlap=50, min_chunk_size=10) + + # Create content that will be split + paragraphs = [f"Para {i}. " * 5 for i in range(10)] + text = "\n\n".join(paragraphs) + + chunks = chunker._split_with_overlap(text) + + # Should have multiple chunks + assert len(chunks) > 1 + + # Verify overlap: content from end of chunk[i] should appear at start of chunk[i+1] + for i in range(len(chunks) - 1): + # Get the last paragraph(s) of current chunk + current_paragraphs = chunks[i].split("\n\n") + chunks[i + 1].split("\n\n") + + # At least one paragraph from current chunk should appear in next chunk + overlap_found = False + for para in current_paragraphs[-3:]: # Check last few paragraphs + if para.strip() and para in chunks[i + 1]: + overlap_found = True + break + + assert overlap_found, f"No overlap found between chunk {i} and {i+1}" + + def test_part_numbers_sequential_after_filtering(self) -> None: + chunker = MarkdownChunker(chunk_size=100, chunk_overlap=20, min_chunk_size=40) + markdown = "# Test\n\n" + "A" * 90 + "\n\n" + "B" * 30 + "\n\n" + "C" * 90 + + chunks = chunker.chunk_document(markdown, "Test", {"url": "test"}) + section_parts = [c for c in chunks if c.metadata.get("chunk_type") == "section_part"] + + assert len(section_parts) == 2, f"Expected 2 chunks after filtering, got {len(section_parts)}" + + part_numbers = [c.metadata["part_number"] for c in section_parts] + expected_part_numbers = list(range(1, len(section_parts) + 1)) + assert part_numbers == expected_part_numbers, ( + f"Part numbers {part_numbers} are not sequential. Expected {expected_part_numbers}" + ) + + for chunk in section_parts: + assert chunk.metadata["total_parts"] == len(section_parts), ( + f"total_parts is {chunk.metadata['total_parts']} but actual chunks: {len(section_parts)}" + ) + + def test_heading_hierarchy_preserved_in_metadata(self, chunker: MarkdownChunker) -> None: + markdown = """# H1 +Content for H1 with enough text to meet the minimum chunk size requirements for proper testing and validation. + +## H2 +Content for H2 with enough text to meet the minimum chunk size requirements for proper testing and validation. + +### H3 +Content for H3 with enough text to meet the minimum chunk size requirements for proper testing and validation. + +#### H4 +Content for H4 with enough text to meet the minimum chunk size requirements for proper testing and validation. +""" + chunks = chunker.chunk_document(markdown, "Test", {"url": "https://example.com"}) + + # Check that levels are correctly assigned + levels = [chunk.metadata["heading_level"] for chunk in chunks] + assert 1 in levels # H1 + assert 2 in levels # H2 + assert 3 in levels # H3 + assert 4 in levels # H4