| 
 | 1 | +import re  | 
 | 2 | + | 
 | 3 | +from fai.utils.website.models import DocumentChunk  | 
 | 4 | + | 
 | 5 | + | 
 | 6 | +class MarkdownChunker:  | 
 | 7 | +    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200, min_chunk_size: int = 100):  | 
 | 8 | +        self.chunk_size = chunk_size  | 
 | 9 | +        self.chunk_overlap = chunk_overlap  | 
 | 10 | +        self.min_chunk_size = min_chunk_size  | 
 | 11 | + | 
 | 12 | +    def chunk_document(  | 
 | 13 | +        self, markdown_content: str, title: str, metadata: dict[str, str | list[str] | None]  | 
 | 14 | +    ) -> list[DocumentChunk]:  | 
 | 15 | +        chunks: list[DocumentChunk] = []  | 
 | 16 | +        sections = self._split_by_headers(markdown_content)  | 
 | 17 | + | 
 | 18 | +        for section in sections:  | 
 | 19 | +            section_chunks = self._chunk_section(section, title, metadata, markdown_content)  | 
 | 20 | +            chunks.extend(section_chunks)  | 
 | 21 | + | 
 | 22 | +        return chunks  | 
 | 23 | + | 
 | 24 | +    def _split_by_headers(self, markdown: str) -> list[dict[str, str | int | None]]:  | 
 | 25 | +        sections: list[dict[str, str | int | None]] = []  | 
 | 26 | +        lines = markdown.split("\n")  | 
 | 27 | + | 
 | 28 | +        current_lines: list[str] = []  | 
 | 29 | +        current_heading: str | None = None  | 
 | 30 | +        current_level: int = 0  | 
 | 31 | + | 
 | 32 | +        for line in lines:  | 
 | 33 | +            header_match = re.match(r"^(#{1,6})\s+(.+)$", line)  | 
 | 34 | + | 
 | 35 | +            if header_match:  | 
 | 36 | +                if current_lines:  | 
 | 37 | +                    sections.append(  | 
 | 38 | +                        {"heading": current_heading, "level": current_level, "content": "\n".join(current_lines)}  | 
 | 39 | +                    )  | 
 | 40 | + | 
 | 41 | +                current_level = len(header_match.group(1))  | 
 | 42 | +                current_heading = header_match.group(2).strip()  | 
 | 43 | +                current_lines = []  | 
 | 44 | +            else:  | 
 | 45 | +                current_lines.append(line)  | 
 | 46 | + | 
 | 47 | +        if current_lines:  | 
 | 48 | +            sections.append({"heading": current_heading, "level": current_level, "content": "\n".join(current_lines)})  | 
 | 49 | + | 
 | 50 | +        if not sections and markdown.strip():  | 
 | 51 | +            sections.append({"heading": None, "level": 0, "content": markdown})  | 
 | 52 | + | 
 | 53 | +        return sections  | 
 | 54 | + | 
 | 55 | +    def _chunk_section(  | 
 | 56 | +        self,  | 
 | 57 | +        section: dict[str, str | int | None],  | 
 | 58 | +        doc_title: str,  | 
 | 59 | +        base_metadata: dict[str, str | list[str] | None],  | 
 | 60 | +        full_document: str,  | 
 | 61 | +    ) -> list[DocumentChunk]:  | 
 | 62 | +        chunks: list[DocumentChunk] = []  | 
 | 63 | +        heading_val = section["heading"]  | 
 | 64 | +        level_val = section["level"]  | 
 | 65 | +        content_val = section["content"]  | 
 | 66 | + | 
 | 67 | +        heading: str | None = heading_val if isinstance(heading_val, str) or heading_val is None else None  | 
 | 68 | +        level: int = level_val if isinstance(level_val, int) else 0  | 
 | 69 | +        content: str = content_val.strip() if isinstance(content_val, str) else ""  | 
 | 70 | + | 
 | 71 | +        if not content or len(content) < self.min_chunk_size:  | 
 | 72 | +            return chunks  | 
 | 73 | + | 
 | 74 | +        if len(content) <= self.chunk_size:  | 
 | 75 | +            chunk_content = content  | 
 | 76 | + | 
 | 77 | +            if heading:  | 
 | 78 | +                chunk_content = f"# {heading}\n\n{chunk_content}"  | 
 | 79 | + | 
 | 80 | +            chunks.append(  | 
 | 81 | +                DocumentChunk(  | 
 | 82 | +                    content=chunk_content,  | 
 | 83 | +                    metadata={  | 
 | 84 | +                        "document_title": doc_title,  | 
 | 85 | +                        "section_heading": heading,  | 
 | 86 | +                        "heading_level": level,  | 
 | 87 | +                        "chunk_type": "section",  | 
 | 88 | +                        **base_metadata,  | 
 | 89 | +                    },  | 
 | 90 | +                    full_document=full_document,  | 
 | 91 | +                )  | 
 | 92 | +            )  | 
 | 93 | +        else:  | 
 | 94 | +            text_chunks = self._split_with_overlap(content)  | 
 | 95 | + | 
 | 96 | +            filtered_chunks = [(i, chunk_text) for i, chunk_text in enumerate(text_chunks)  | 
 | 97 | +                             if len(chunk_text.strip()) >= self.min_chunk_size]  | 
 | 98 | + | 
 | 99 | +            total_filtered = len(filtered_chunks)  | 
 | 100 | + | 
 | 101 | +            for part_num, (original_index, chunk_text) in enumerate(filtered_chunks, start=1):  | 
 | 102 | +                if heading and original_index == 0:  | 
 | 103 | +                    chunk_content = f"# {heading}\n\n{chunk_text}"  | 
 | 104 | +                elif heading:  | 
 | 105 | +                    chunk_content = f"[Continuing from: {heading}]\n\n{chunk_text}"  | 
 | 106 | +                else:  | 
 | 107 | +                    chunk_content = chunk_text  | 
 | 108 | + | 
 | 109 | +                chunks.append(  | 
 | 110 | +                    DocumentChunk(  | 
 | 111 | +                        content=chunk_content,  | 
 | 112 | +                        metadata={  | 
 | 113 | +                            "document_title": doc_title,  | 
 | 114 | +                            "section_heading": heading,  | 
 | 115 | +                            "heading_level": level,  | 
 | 116 | +                            "chunk_type": "section_part",  | 
 | 117 | +                            "part_number": part_num,  | 
 | 118 | +                            "total_parts": total_filtered,  | 
 | 119 | +                            **base_metadata,  | 
 | 120 | +                        },  | 
 | 121 | +                        full_document=full_document,  | 
 | 122 | +                    )  | 
 | 123 | +                )  | 
 | 124 | + | 
 | 125 | +        return chunks  | 
 | 126 | + | 
 | 127 | +    def _split_with_overlap(self, text: str) -> list[str]:  | 
 | 128 | +        if len(text) <= self.chunk_size:  | 
 | 129 | +            return [text]  | 
 | 130 | + | 
 | 131 | +        chunks: list[str] = []  | 
 | 132 | +        paragraphs = re.split(r"\n\n+", text)  | 
 | 133 | +        current_chunk: list[str] = []  | 
 | 134 | +        current_length = 0  | 
 | 135 | + | 
 | 136 | +        for para in paragraphs:  | 
 | 137 | +            para_length = len(para)  | 
 | 138 | + | 
 | 139 | +            if current_length + para_length > self.chunk_size and current_chunk:  | 
 | 140 | +                chunks.append("\n\n".join(current_chunk))  | 
 | 141 | + | 
 | 142 | +                overlap_paras: list[str] = []  | 
 | 143 | +                overlap_length = 0  | 
 | 144 | + | 
 | 145 | +                for p in reversed(current_chunk):  | 
 | 146 | +                    if overlap_length + len(p) <= self.chunk_overlap:  | 
 | 147 | +                        overlap_paras.insert(0, p)  | 
 | 148 | +                        overlap_length += len(p)  | 
 | 149 | +                    else:  | 
 | 150 | +                        break  | 
 | 151 | + | 
 | 152 | +                current_chunk = overlap_paras  | 
 | 153 | +                current_length = overlap_length  | 
 | 154 | + | 
 | 155 | +            current_chunk.append(para)  | 
 | 156 | +            current_length += para_length  | 
 | 157 | + | 
 | 158 | +        if current_chunk:  | 
 | 159 | +            chunks.append("\n\n".join(current_chunk))  | 
 | 160 | + | 
 | 161 | +        return chunks  | 
0 commit comments