|
| 1 | +""" |
| 2 | +HTML chunker module. |
| 3 | +
|
| 4 | +This module splits HTML content into chunks based on semantic boundaries. |
| 5 | +""" |
| 6 | + |
| 7 | +from typing import List, Dict, Any, Optional |
| 8 | +from dataclasses import dataclass |
| 9 | +from bs4 import BeautifulSoup, Tag, NavigableString |
| 10 | +import warnings |
| 11 | + |
| 12 | +from tokenizer import count_html_tokens |
| 13 | + |
| 14 | +# Constants |
| 15 | +DEFAULT_CHARS_PER_TOKEN_RATIO = 3.5 |
| 16 | + |
| 17 | +@dataclass |
| 18 | +class ChunkingOptions: |
| 19 | + max_token_limit: int = 500 |
| 20 | + count_tag_tokens: bool = True |
| 21 | + |
| 22 | +@dataclass |
| 23 | +class Chunk: |
| 24 | + """A dataclass to hold a chunk's text and its associated metadata.""" |
| 25 | + text: str |
| 26 | + metadata: Dict[str, Any] |
| 27 | + |
| 28 | + |
| 29 | +def find_first_anchor(chunk_soup: BeautifulSoup) -> Optional[str]: |
| 30 | + """Finds the first ID attribute from a significant tag in a soup object.""" |
| 31 | + for tag_name in ["section", "div", "h1", "h2", "h3", "h4", "h5", "h6"]: |
| 32 | + first_tag = chunk_soup.find(tag_name, id=True) |
| 33 | + if first_tag: |
| 34 | + return first_tag.get('id') |
| 35 | + first_tag_with_id = chunk_soup.find(id=True) |
| 36 | + if first_tag_with_id: |
| 37 | + return first_tag_with_id.get('id') |
| 38 | + return None |
| 39 | + |
| 40 | + |
| 41 | +def chunk_html( |
| 42 | + html_content: str, |
| 43 | + source_url: str, |
| 44 | + max_token_limit: int = 500, |
| 45 | + count_tag_tokens: bool = True, |
| 46 | + **kwargs |
| 47 | +) -> List[Chunk]: |
| 48 | + """ |
| 49 | + Chunks the given HTML content and generates metadata with source URLs and anchors. |
| 50 | +
|
| 51 | + Args: |
| 52 | + html_content: The HTML content to be chunked. |
| 53 | + source_url: The original public URL of the HTML document. |
| 54 | + max_token_limit: The maximum number of tokens allowed per chunk. |
| 55 | + count_tag_tokens: Whether to count HTML tags as tokens. |
| 56 | +
|
| 57 | + Returns: |
| 58 | + A list of Chunk objects, each containing text and metadata. |
| 59 | + """ |
| 60 | + options = ChunkingOptions( |
| 61 | + max_token_limit=max_token_limit, |
| 62 | + count_tag_tokens=count_tag_tokens |
| 63 | + ) |
| 64 | + |
| 65 | + try: |
| 66 | + if count_html_tokens(html_content, options.count_tag_tokens) <= options.max_token_limit: |
| 67 | + return [Chunk(text=html_content, metadata={"source": source_url})] |
| 68 | + except Exception as e: |
| 69 | + warnings.warn("Could not pre-calculate total tokens: %s. Proceeding with chunking." % e) |
| 70 | + |
| 71 | + try: |
| 72 | + soup = BeautifulSoup(html_content, 'html.parser') |
| 73 | + body = soup.body or soup |
| 74 | + string_chunks = _split_element_by_children(body, options) |
| 75 | + except Exception as e: |
| 76 | + warnings.warn("A critical error occurred during semantic chunking: %s. Falling back to linear splitting." % e) |
| 77 | + string_chunks = _linear_split(html_content, options) |
| 78 | + |
| 79 | + # Post-process string chunks to add stateful anchor metadata |
| 80 | + final_chunks = [] |
| 81 | + last_seen_anchor = None |
| 82 | + for s_chunk in string_chunks: |
| 83 | + if not s_chunk.strip(): |
| 84 | + continue |
| 85 | + |
| 86 | + chunk_soup = BeautifulSoup(s_chunk, 'html.parser') |
| 87 | + current_anchor = find_first_anchor(chunk_soup) |
| 88 | + |
| 89 | + if current_anchor: |
| 90 | + last_seen_anchor = current_anchor |
| 91 | + |
| 92 | + final_anchor = last_seen_anchor |
| 93 | + |
| 94 | + full_source_url = f"{source_url}#{final_anchor}" if final_anchor else source_url |
| 95 | + metadata = {"source": full_source_url} |
| 96 | + final_chunks.append(Chunk(text=s_chunk, metadata=metadata)) |
| 97 | + |
| 98 | + return final_chunks if final_chunks else [Chunk(text=html_content, metadata={"source": source_url})] |
| 99 | + |
| 100 | + |
| 101 | +def _split_element_by_children(element: Tag, options: ChunkingOptions) -> List[str]: |
| 102 | + chunks, current_chunk_elements, current_tokens = [], [], 0 |
| 103 | + children = [child for child in element.children if not (isinstance(child, NavigableString) and not child.strip())] |
| 104 | + |
| 105 | + i = 0 |
| 106 | + while i < len(children): |
| 107 | + child, processed_elements = children[i], 1 |
| 108 | + child_html = str(child) |
| 109 | + |
| 110 | + is_heading = isinstance(child, Tag) and child.name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6'] |
| 111 | + is_p_tag = isinstance(child, Tag) and child.name == 'p' |
| 112 | + |
| 113 | + if is_heading and i + 1 < len(children): |
| 114 | + child_html += str(children[i+1]) |
| 115 | + processed_elements = 2 |
| 116 | + elif is_p_tag and child.get_text(strip=True).endswith(':') and i + 1 < len(children): |
| 117 | + next_child = children[i+1] |
| 118 | + if isinstance(next_child, Tag) and (next_child.name in ['table', 'rh-table', 'ol', 'ul'] or ('class' in next_child.attrs and 'variablelist' in next_child.attrs['class'])): |
| 119 | + child_html += str(next_child) |
| 120 | + processed_elements = 2 |
| 121 | + |
| 122 | + try: |
| 123 | + child_tokens = count_html_tokens(child_html, options.count_tag_tokens) |
| 124 | + is_oversized = child_tokens > options.max_token_limit |
| 125 | + except Exception: |
| 126 | + child_tokens, is_oversized = options.max_token_limit + 1, True |
| 127 | + |
| 128 | + if is_oversized: |
| 129 | + if current_chunk_elements: chunks.append("".join(current_chunk_elements)) |
| 130 | + root_to_split = BeautifulSoup(child_html, 'html.parser').body or BeautifulSoup(child_html, 'html.parser') |
| 131 | + chunks.extend(_split_element_by_children_no_grouping(root_to_split, options)) |
| 132 | + current_chunk_elements, current_tokens = [], 0 |
| 133 | + elif current_chunk_elements and current_tokens + child_tokens > options.max_token_limit: |
| 134 | + chunks.append("".join(current_chunk_elements)) |
| 135 | + current_chunk_elements, current_tokens = [child_html], child_tokens |
| 136 | + else: |
| 137 | + current_chunk_elements.append(child_html) |
| 138 | + current_tokens += child_tokens |
| 139 | + |
| 140 | + i += processed_elements |
| 141 | + |
| 142 | + if current_chunk_elements: chunks.append("".join(current_chunk_elements)) |
| 143 | + return chunks |
| 144 | + |
| 145 | +def _split_element_by_children_no_grouping(element: Tag, options: ChunkingOptions) -> List[str]: |
| 146 | + chunks, current_chunk_elements, current_tokens = [], [], 0 |
| 147 | + children = [child for child in element.children if not (isinstance(child, NavigableString) and not child.strip())] |
| 148 | + |
| 149 | + for child in children: |
| 150 | + child_html = str(child) |
| 151 | + try: |
| 152 | + child_tokens = count_html_tokens(child_html, options.count_tag_tokens) |
| 153 | + is_oversized = child_tokens > options.max_token_limit |
| 154 | + except Exception: |
| 155 | + child_tokens, is_oversized = options.max_token_limit + 1, True |
| 156 | + |
| 157 | + if is_oversized: |
| 158 | + if current_chunk_elements: chunks.append("".join(current_chunk_elements)) |
| 159 | + if isinstance(child, Tag): |
| 160 | + if child.name in ['table', 'rh-table']: chunks.extend(_split_table(child, options)) |
| 161 | + elif child.name in ['ol', 'ul']: chunks.extend(_split_list(child, options)) |
| 162 | + elif child.name == 'pre': chunks.extend(_split_code(child, options)) |
| 163 | + elif child.name == 'div' and 'class' in child.attrs and 'variablelist' in child.attrs['class']: chunks.extend(_split_definition_list(child, options)) |
| 164 | + else: chunks.extend(_split_element_by_children_no_grouping(child, options)) |
| 165 | + else: chunks.extend(_linear_split(child_html, options)) |
| 166 | + current_chunk_elements, current_tokens = [], 0 |
| 167 | + continue |
| 168 | + |
| 169 | + if current_chunk_elements and current_tokens + child_tokens > options.max_token_limit: |
| 170 | + chunks.append("".join(current_chunk_elements)) |
| 171 | + current_chunk_elements, current_tokens = [child_html], child_tokens |
| 172 | + else: |
| 173 | + current_chunk_elements.append(child_html) |
| 174 | + current_tokens += child_tokens |
| 175 | + |
| 176 | + if current_chunk_elements: chunks.append("".join(current_chunk_elements)) |
| 177 | + return chunks |
| 178 | + |
| 179 | +def _split_definition_list(div_element: Tag, options: ChunkingOptions) -> List[str]: |
| 180 | + dl = div_element.find('dl') |
| 181 | + if not dl: return _split_element_by_children(div_element, options) |
| 182 | + chunks, current_chunk_pairs_html, current_tokens = [], [], 0 |
| 183 | + pairs, children, i = [], list(dl.children), 0 |
| 184 | + while i < len(children): |
| 185 | + child = children[i] |
| 186 | + if isinstance(child, Tag) and child.name == 'dt': |
| 187 | + term_html = str(child) |
| 188 | + def_html = "" |
| 189 | + if i + 1 < len(children) and isinstance(children[i+1], Tag) and children[i+1].name == 'dd': |
| 190 | + def_html = str(children[i+1]); i += 1 |
| 191 | + pairs.append(term_html + def_html) |
| 192 | + i += 1 |
| 193 | + for pair_html in pairs: |
| 194 | + pair_tokens = count_html_tokens(pair_html, options.count_tag_tokens) |
| 195 | + if current_chunk_pairs_html and current_tokens + pair_tokens > options.max_token_limit: |
| 196 | + chunks.append(f'<div class="variablelist"><dl>{"".join(current_chunk_pairs_html)}</dl></div>') |
| 197 | + current_chunk_pairs_html, current_tokens = [pair_html], pair_tokens |
| 198 | + else: |
| 199 | + current_chunk_pairs_html.append(pair_html); current_tokens += pair_tokens |
| 200 | + if current_chunk_pairs_html: chunks.append(f'<div class="variablelist"><dl>{"".join(current_chunk_pairs_html)}</dl></div>') |
| 201 | + return chunks if chunks else [str(div_element)] |
| 202 | + |
| 203 | +def _split_table(table: Tag, options: ChunkingOptions) -> List[str]: |
| 204 | + chunks, header = [], table.find('thead') |
| 205 | + rows = table.find_all('tr') |
| 206 | + header_rows_ids = set(id(r) for r in header.find_all('tr')) if header else set() |
| 207 | + body_rows = [row for row in rows if id(row) not in header_rows_ids] |
| 208 | + table_attrs = " ".join([f'{k}="{v}"' for k, v in table.attrs.items()]) |
| 209 | + table_open, table_close = f"<table {table_attrs}>", "</table>" |
| 210 | + header_html = str(header) if header else "" |
| 211 | + base_tokens = count_html_tokens(table_open + header_html + table_close, options.count_tag_tokens) |
| 212 | + current_chunk_rows, current_tokens = [], base_tokens |
| 213 | + for row in body_rows: |
| 214 | + row_html, row_tokens = str(row), count_html_tokens(str(row), options.count_tag_tokens) |
| 215 | + if row_tokens + base_tokens > options.max_token_limit: |
| 216 | + if current_chunk_rows: chunks.append(table_open + header_html + "".join(current_chunk_rows) + table_close) |
| 217 | + chunks.extend(_split_oversized_row(row, table_open, header_html, table_close, options)) |
| 218 | + current_chunk_rows, current_tokens = [], base_tokens |
| 219 | + continue |
| 220 | + if current_chunk_rows and (current_tokens + row_tokens > options.max_token_limit): |
| 221 | + chunks.append(table_open + header_html + "".join(current_chunk_rows) + table_close) |
| 222 | + current_chunk_rows, current_tokens = [row_html], base_tokens + row_tokens |
| 223 | + else: |
| 224 | + current_chunk_rows.append(row_html); current_tokens += row_tokens |
| 225 | + if current_chunk_rows: chunks.append(table_open + header_html + "".join(current_chunk_rows) + table_close) |
| 226 | + return chunks if chunks else [str(table)] |
| 227 | + |
| 228 | +def _split_oversized_row(row: Tag, table_open: str, header_html: str, table_close: str, options: ChunkingOptions) -> List[str]: |
| 229 | + row_chunks, cells = [], row.find_all(['td', 'th'], recursive=False) |
| 230 | + cell_sub_chunks = [_split_element_by_children(cell, options) for cell in cells] |
| 231 | + max_len = max(len(c) for c in cell_sub_chunks) if cell_sub_chunks else 0 |
| 232 | + if max_len == 0: return [table_open + header_html + str(row) + table_close] |
| 233 | + for i in range(max_len): |
| 234 | + new_row_html = "<tr>" |
| 235 | + for j, cell in enumerate(cells): |
| 236 | + cell_tag, cell_attrs = cell.name, " ".join([f'{k}="{v}"' for k, v in cell.attrs.items()]) |
| 237 | + content = cell_sub_chunks[j][i] if i < len(cell_sub_chunks[j]) else "" |
| 238 | + new_row_html += f"<{cell_tag} {cell_attrs}>{content}</{cell_tag}>" |
| 239 | + new_row_html += "</tr>" |
| 240 | + row_chunks.append(table_open + header_html + new_row_html + table_close) |
| 241 | + return row_chunks |
| 242 | + |
| 243 | +def _split_list(list_element: Tag, options: ChunkingOptions) -> List[str]: |
| 244 | + chunks, items = [], list_element.find_all('li', recursive=False) |
| 245 | + list_attrs = " ".join([f'{k}="{v}"' for k, v in list_element.attrs.items()]) |
| 246 | + list_open, list_close = f"<{list_element.name} {list_attrs}>", f"</{list_element.name}>" |
| 247 | + base_tokens = count_html_tokens(list_open + list_close, options.count_tag_tokens) |
| 248 | + current_chunk_items, current_tokens = [], base_tokens |
| 249 | + for item in items: |
| 250 | + item_html, item_tokens = str(item), count_html_tokens(str(item), options.count_tag_tokens) |
| 251 | + if item_tokens + base_tokens > options.max_token_limit: |
| 252 | + if current_chunk_items: chunks.append(list_open + "".join(current_chunk_items) + list_close) |
| 253 | + item_soup = BeautifulSoup(item_html, 'html.parser').li |
| 254 | + if item_soup: |
| 255 | + sub_chunks = _split_element_by_children(item_soup, options) |
| 256 | + for sub_chunk in sub_chunks: chunks.append(list_open + f"<li>{sub_chunk}</li>" + list_close) |
| 257 | + else: chunks.append(list_open + item_html + list_close) |
| 258 | + current_chunk_items, current_tokens = [], base_tokens |
| 259 | + continue |
| 260 | + if current_chunk_items and (current_tokens + item_tokens > options.max_token_limit): |
| 261 | + chunks.append(list_open + "".join(current_chunk_items) + list_close) |
| 262 | + current_chunk_items, current_tokens = [item_html], base_tokens + item_tokens |
| 263 | + else: |
| 264 | + current_chunk_items.append(item_html); current_tokens += item_tokens |
| 265 | + if current_chunk_items: chunks.append(list_open + "".join(current_chunk_items) + list_close) |
| 266 | + return chunks if chunks else [str(list_element)] |
| 267 | + |
| 268 | +def _split_code(pre_element: Tag, options: ChunkingOptions) -> List[str]: |
| 269 | + chunks, code_text = [], pre_element.get_text() |
| 270 | + lines = code_text.split('\n') |
| 271 | + attrs = " ".join([f'{k}="{v}"' for k, v in pre_element.attrs.items()]) |
| 272 | + open_tag, close_tag = f"<pre {attrs}>", "</pre>" |
| 273 | + base_tokens = count_html_tokens(open_tag + close_tag, options.count_tag_tokens) |
| 274 | + current_chunk_lines, current_tokens = [], base_tokens |
| 275 | + for line in lines: |
| 276 | + line_tokens = count_html_tokens(line + '\n', options.count_tag_tokens) |
| 277 | + if current_chunk_lines and current_tokens + line_tokens > options.max_token_limit: |
| 278 | + chunks.append(open_tag + "\n".join(current_chunk_lines) + close_tag) |
| 279 | + current_chunk_lines, current_tokens = [line], base_tokens + line_tokens |
| 280 | + else: |
| 281 | + current_chunk_lines.append(line); current_tokens += line_tokens |
| 282 | + if current_chunk_lines: chunks.append(open_tag + "\n".join(current_chunk_lines) + close_tag) |
| 283 | + return chunks if chunks else [str(pre_element)] |
| 284 | + |
| 285 | +def _linear_split(html_content: str, options: ChunkingOptions) -> List[str]: |
| 286 | + warnings.warn("Using linear character split as a fallback for an oversized, indivisible chunk.") |
| 287 | + chars_per_chunk = int(options.max_token_limit * DEFAULT_CHARS_PER_TOKEN_RATIO) |
| 288 | + return [html_content[i:i + chars_per_chunk] for i in range(0, len(html_content), chars_per_chunk)] |
0 commit comments