diff --git a/haystack/components/preprocessors/markdown_header_splitter.py b/haystack/components/preprocessors/markdown_header_splitter.py new file mode 100644 index 0000000000..4e837a8db5 --- /dev/null +++ b/haystack/components/preprocessors/markdown_header_splitter.py @@ -0,0 +1,324 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import re +from typing import Literal, Optional + +from haystack import Document, component, logging +from haystack.components.preprocessors import DocumentSplitter + +logger = logging.getLogger(__name__) + + +@component +class MarkdownHeaderSplitter: + """ + Split documents at ATX-style Markdown headers (#), with optional secondary splitting. + + This component processes text documents by: + - Splitting them into chunks at Markdown headers (e.g., '#', '##', etc.), preserving header hierarchy as metadata. + - Optionally applying a secondary split (by word, passage, period, or line) to each chunk + (using haystack's DocumentSplitter). + - Preserving and propagating metadata such as parent headers, page numbers, and split IDs. + """ + + def __init__( + self, + *, + page_break_character: str = "\f", + keep_headers: bool = True, + secondary_split: Optional[Literal["word", "passage", "period", "line"]] = None, + split_length: int = 200, + split_overlap: int = 0, + split_threshold: int = 0, + skip_empty_documents: bool = True, + ): + """ + Initialize the MarkdownHeaderSplitter. + + :param page_break_character: Character used to identify page breaks. Defaults to form feed ("\f"). + :param keep_headers: If True, headers are kept in the content. If False, headers are moved to metadata. + Defaults to True. + :param secondary_split: Optional secondary split condition after header splitting. + Options are None, "word", "passage", "period", "line". Defaults to None. + :param split_length: The maximum number of units in each split when using secondary splitting. Defaults to 200. + :param split_overlap: The number of overlapping units for each split when using secondary splitting. + Defaults to 0. + :param split_threshold: The minimum number of units per split when using secondary splitting. Defaults to 0. + :param skip_empty_documents: If True, skip documents with empty content. If False, process empty documents. + Defaults to True. + """ + self.page_break_character = page_break_character + self.secondary_split = secondary_split + self.split_length = split_length + self.split_overlap = split_overlap + self.split_threshold = split_threshold + self.skip_empty_documents = skip_empty_documents + self.keep_headers = keep_headers + self._header_pattern = re.compile(r"(?m)^(#{1,6}) (.+)$") # ATX-style .md-headers + + # initialize secondary_splitter only if needed + if self.secondary_split: + self.secondary_splitter = DocumentSplitter( + split_by=self.secondary_split, + split_length=self.split_length, + split_overlap=self.split_overlap, + split_threshold=self.split_threshold, + ) + + def _split_text_by_markdown_headers(self, text: str, doc_id: str) -> list[dict]: + """Split text by ATX-style headers (#) and create chunks with appropriate metadata.""" + logger.debug("Splitting text by markdown headers") + + # find headers + matches = list(re.finditer(self._header_pattern, text)) + + # return unsplit if no headers found + if not matches: + logger.info( + "No headers found in document {doc_id}; returning full document as single chunk.", doc_id=doc_id + ) + return [{"content": text, "meta": {}}] + + # process headers and build chunks + chunks: list[dict] = [] + header_stack: list[Optional[str]] = [None] * 6 + active_parents: list[str] = [] # track active parent headers + pending_headers: list[str] = [] # store empty headers to prepend to next content + has_content = False # flag to track if any header has content + + for i, match in enumerate(matches): + # extract header info + header_prefix = match.group(1) + header_text = match.group(2).strip() + level = len(header_prefix) + + # get content + start = match.end() + end = matches[i + 1].start() if i + 1 < len(matches) else len(text) + content = text[start:end].strip() + + # update header stack to track nesting + header_stack[level - 1] = header_text + for j in range(level, 6): + header_stack[j] = None + + # prepare header_line if keep_headers + header_line = f"{header_prefix} {header_text}" + + # skip splits w/o content + if not content: + # add as parent for subsequent headers + active_parents = [h for h in header_stack[: level - 1] if h is not None] + active_parents.append(header_text) + if self.keep_headers: + pending_headers.append(header_line) + continue + + has_content = True # at least one header has content + parent_headers = list(active_parents) + + logger.debug( + "Creating chunk for header '{header_text}' at level {level}", header_text=header_text, level=level + ) + + if self.keep_headers: + # add pending & current header to content + chunk_content = "" + if pending_headers: + chunk_content += "\n".join(pending_headers) + "\n" + chunk_content += f"{header_line}\n{content}" + chunks.append( + { + "content": chunk_content, + "meta": {} if self.keep_headers else {"header": header_text, "parent_headers": parent_headers}, + } + ) + pending_headers = [] # reset pending headers + else: + chunks.append({"content": content, "meta": {"header": header_text, "parent_headers": parent_headers}}) + + # reset active parents + active_parents = [h for h in header_stack[: level - 1] if h is not None] + + # return doc unchunked if no headers have content + if not has_content: + logger.info( + "Document {doc_id} contains only headers with no content; returning original document.", doc_id=doc_id + ) + return [{"content": text, "meta": {}}] + + return chunks + + def _apply_secondary_splitting(self, documents: list[Document]) -> list[Document]: + """ + Apply secondary splitting while preserving header metadata and structure. + + Ensures page counting is maintained across splits. + """ + result_docs = [] + + for doc in documents: + if doc.content is None: + result_docs.append(doc) + continue + + content_for_splitting: str = doc.content + + if not self.keep_headers: # skip header extraction if keep_headers + # extract header information + header_match = re.search(self._header_pattern, doc.content) + if header_match: + content_for_splitting = doc.content[header_match.end() :] + + if not content_for_splitting or not content_for_splitting.strip(): # skip empty content + result_docs.append(doc) + continue + + # track page from meta + current_page = doc.meta.get("page_number", 1) + + secondary_splits = self.secondary_splitter.run( + documents=[Document(content=content_for_splitting, meta=doc.meta)] + )["documents"] + + # split processing + for i, split in enumerate(secondary_splits): + # calculate page number for this split + if i > 0 and secondary_splits[i - 1].content: + current_page = self._update_page_number_with_breaks(secondary_splits[i - 1].content, current_page) + + # set page number to meta + split.meta["page_number"] = current_page + + # preserve header metadata if we're not keeping headers in content + if not self.keep_headers: + for key in ["header", "parent_headers"]: + if key in doc.meta: + split.meta[key] = doc.meta[key] + + result_docs.append(split) + + logger.debug( + "Secondary splitting complete. Final count: {final_count} documents.", final_count=len(result_docs) + ) + return result_docs + + def _update_page_number_with_breaks(self, content: str, current_page: int) -> int: + """ + Update page number based on page breaks in content. + + :param content: Content to check for page breaks + :param current_page: Current page number + :return: New current page number + """ + if not isinstance(content, str): + return current_page + + page_breaks = content.count(self.page_break_character) + new_page_number = current_page + page_breaks + + if page_breaks > 0: + logger.debug( + "Found {page_breaks} page breaks, page number updated: {old} → {new}", + page_breaks=page_breaks, + old=current_page, + new=new_page_number, + ) + + return new_page_number + + def _split_documents_by_markdown_headers(self, documents: list[Document]) -> list[Document]: + """Split a list of documents by markdown headers, preserving metadata.""" + + result_docs = [] + for doc in documents: + logger.debug("Splitting document with id={doc_id}", doc_id=doc.id) + # mypy: doc.content is Optional[str], so we must check for None before passing to splitting method + if doc.content is None: + continue + splits = self._split_text_by_markdown_headers(doc.content, doc.id) + docs = [] + + current_page = doc.meta.get("page_number", 1) if doc.meta else 1 + total_pages = doc.content.count(self.page_break_character) + 1 + logger.debug( + "Processing page number: {current_page} out of {total_pages}", + current_page=current_page, + total_pages=total_pages, + ) + for split in splits: + meta = {} + if doc.meta: + meta = doc.meta.copy() + meta.update({"source_id": doc.id, "page_number": current_page}) + if split.get("meta"): + meta.update(split["meta"]) + current_page = self._update_page_number_with_breaks(split["content"], current_page) + docs.append(Document(content=split["content"], meta=meta)) + logger.debug( + "Split into {num_docs} documents for id={doc_id}, final page: {current_page}", + num_docs=len(docs), + doc_id=doc.id, + current_page=current_page, + ) + result_docs.extend(docs) + return result_docs + + @component.output_types(documents=list[Document]) + def run(self, documents: list[Document]) -> dict[str, list[Document]]: + """ + Run the markdown header splitter with optional secondary splitting. + + :param documents: List of documents to split + + :returns: A dictionary with the following key: + - `documents`: List of documents with the split texts. Each document includes: + - A metadata field `source_id` to track the original document. + - A metadata field `page_number` to track the original page number. + - A metadata field `split_id` to uniquely identify each split chunk. + - All other metadata copied from the original document. + """ + # validate input documents + for doc in documents: + if doc.content is None: + raise ValueError( + ( + "MarkdownHeaderSplitter only works with text documents but content for document ID" + f" {doc.id} is None." + ) + ) + if not isinstance(doc.content, str): + raise ValueError("MarkdownHeaderSplitter only works with text documents (str content).") + + processed_documents = [] + for doc in documents: + # handle empty documents + if not doc.content or not doc.content.strip(): + if self.skip_empty_documents: + logger.warning("Document ID {doc_id} has an empty content. Skipping this document.", doc_id=doc.id) + continue + # keep empty documents + processed_documents.append(doc) + logger.warning( + "Document ID {doc_id} has an empty content. Keeping this document as per configuration.", + doc_id=doc.id, + ) + continue + + processed_documents.append(doc) + + if not processed_documents: + return {"documents": []} + + header_split_docs = self._split_documents_by_markdown_headers(processed_documents) + + # secondary splitting if configured + final_docs = self._apply_secondary_splitting(header_split_docs) if self.secondary_split else header_split_docs + + # assign split_id to all output documents + for idx, doc in enumerate(final_docs): + doc.meta["split_id"] = idx + + return {"documents": final_docs} diff --git a/releasenotes/notes/add-md-header-splitter-df5c024a6ddd2718.yaml b/releasenotes/notes/add-md-header-splitter-df5c024a6ddd2718.yaml new file mode 100644 index 0000000000..bb5cbec612 --- /dev/null +++ b/releasenotes/notes/add-md-header-splitter-df5c024a6ddd2718.yaml @@ -0,0 +1,9 @@ +--- +features: + - | + Introduced the `MarkdownHeaderSplitter` component: + - Splits documents into chunks at Markdown headers (`#`, `##`, etc.), preserving header hierarchy as metadata. + - Optionally infers and rewrites header levels for documents where header structure is ambiguous (e.g. documents parsed using Docling). + - Supports secondary splitting (by word, passage, period, or line) for further chunking after header-based splitting using Haystack's `DocumentSplitter`. + - Preserves and propagates metadata such as parent headers and page numbers. + - Handles edge cases such as documents with no headers, empty content, and non-text documents. diff --git a/test/components/preprocessors/test_markdown_header_splitter.py b/test/components/preprocessors/test_markdown_header_splitter.py new file mode 100644 index 0000000000..8d5694e6cd --- /dev/null +++ b/test/components/preprocessors/test_markdown_header_splitter.py @@ -0,0 +1,324 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import ANY + +import pytest + +from haystack import Document +from haystack.components.preprocessors.markdown_header_splitter import MarkdownHeaderSplitter + + +# Fixtures +@pytest.fixture +def sample_text(): + return ( + "# Header 1\n" + "Content under header 1.\n" + "## Header 1.1\n" + "### Subheader 1.1.1\n" + "Content under sub-header 1.1.1\n" + "## Header 1.2\n" + "### Subheader 1.2.1\n" + "Content under header 1.2.1.\n" + "### Subheader 1.2.2\n" + "Content under header 1.2.2.\n" + "### Subheader 1.2.3\n" + "Content under header 1.2.3." + ) + + +# Basic splitting and structure +def test_basic_split(sample_text): + splitter = MarkdownHeaderSplitter(keep_headers=False) + docs = [Document(content=sample_text)] + result = splitter.run(documents=docs) + split_docs = result["documents"] + + # Should split into all headers with content + headers = [doc.meta["header"] for doc in split_docs] + assert "Header 1" in headers + assert "Subheader 1.1.1" in headers + assert "Subheader 1.2.1" in headers + assert "Subheader 1.2.2" in headers + assert "Subheader 1.2.3" in headers + + # Check that content is present and correct + # Test first split + header1_doc = split_docs[0] + assert header1_doc.meta["header"] == "Header 1" + assert header1_doc.meta["split_id"] == 0 + assert header1_doc.meta["page_number"] == 1 + assert header1_doc.meta["parent_headers"] == [] + assert header1_doc.content == "# Header 1\nContent under header 1." + + subheader111_doc = next(doc for doc in split_docs if doc.meta["header"] == "Subheader 1.1.1") + assert "Content under sub-header 1.1.1" in subheader111_doc.content + + subheader121_doc = next(doc for doc in split_docs if doc.meta["header"] == "Subheader 1.2.1") + assert "Content under header 1.2.1." in subheader121_doc.content + + subheader122_doc = next(doc for doc in split_docs if doc.meta["header"] == "Subheader 1.2.2") + assert "Content under header 1.2.2." in subheader122_doc.content + + subheader123_doc = next(doc for doc in split_docs if doc.meta["header"] == "Subheader 1.2.3") + assert "Content under header 1.2.3." in subheader123_doc.content + + +def test_split_parentheaders(sample_text): + splitter = MarkdownHeaderSplitter(keep_headers=False) + docs = [Document(content=sample_text), Document(content="# H1\n## H2\n### H3\nContent")] + result = splitter.run(documents=docs) + split_docs = result["documents"] + # Check parentheaders for both a deep subheader and a simple one + subheader_doc = next(doc for doc in split_docs if doc.meta["header"] == "Subheader 1.2.2") + assert "Header 1" in subheader_doc.meta["parent_headers"] + assert "Header 1.2" in subheader_doc.meta["parent_headers"] + h3_doc = next((doc for doc in split_docs if doc.meta["header"] == "H3"), None) + assert h3_doc.meta["parent_headers"] == ["H1", "H2"] + + +def test_split_no_headers(): + splitter = MarkdownHeaderSplitter() + docs = [Document(content="No headers here."), Document(content="Just some text without headers.")] + result = splitter.run(documents=docs) + split_docs = result["documents"] + # Should return one doc per input, and no header key in meta + assert len(split_docs) == 2 + for doc in split_docs: + assert "header" not in doc.meta + # Sanity Checks + assert split_docs[0].content == docs[0].content + assert split_docs[1].content == docs[1].content + + +def test_split_multiple_documents(sample_text): + splitter = MarkdownHeaderSplitter(keep_headers=False) + docs = [ + Document(content=sample_text), + Document(content="# Another Header\nSome content."), + Document(content="# H1\nA"), + Document(content="# H2\nB"), + ] + result = splitter.run(documents=docs) + split_docs = result["documents"] + + assert len(split_docs) == 8 + + headers = {doc.meta["header"] for doc in split_docs} + assert {"Another Header", "H1", "H2"}.issubset(headers) + + # Verify that all documents have a split_id and they're sequential + split_ids = [doc.meta.get("split_id") for doc in split_docs] + assert all(split_id is not None for split_id in split_ids) + assert split_ids == list(range(len(split_ids))) + + +def test_split_only_headers(): + text = "# H1\n# H2\n# H3" + splitter = MarkdownHeaderSplitter() + docs = [Document(content=text)] + result = splitter.run(documents=docs) + split_docs = result["documents"] + # Return doc without content unchunked + assert len(split_docs) == 1 + assert split_docs[0].content == text + + +# Metadata preservation +def test_preserve_document_metadata(): + """Test that document metadata is preserved through splitting.""" + splitter = MarkdownHeaderSplitter(keep_headers=False) + docs = [Document(content="# Header\nContent", meta={"source": "test", "importance": "high", "custom_field": 123})] + + result = splitter.run(documents=docs) + split_docs = result["documents"] + + # Original metadata should be preserved + assert split_docs[0].meta["source"] == "test" + assert split_docs[0].meta["importance"] == "high" + assert split_docs[0].meta["custom_field"] == 123 + + # New metadata should be added + assert "header" in split_docs[0].meta + assert split_docs[0].meta["header"] == "Header" + assert "split_id" in split_docs[0].meta + assert split_docs[0].meta["split_id"] == 0 + + +# Error and edge case handling +def test_non_text_document(caplog): + """Test that the component correctly handles non-text documents.""" + splitter = MarkdownHeaderSplitter() + docs = [Document(content=None)] + + # Should raise ValueError about text documents + with pytest.raises(ValueError, match="only works with text documents"): + splitter.run(documents=docs) + + +def test_empty_document_list(): + """Test handling of an empty document list.""" + splitter = MarkdownHeaderSplitter() + result = splitter.run(documents=[]) + assert result["documents"] == [] + + +def test_invalid_secondary_split_at_init(): + """Test that an invalid secondary split type raises an error at initialization time.""" + with pytest.raises(ValueError, match="split_by must be one of"): + MarkdownHeaderSplitter(secondary_split="invalid_split_type") + + +def test_invalid_split_parameters_at_init(): + """Test invalid split parameter validation at initialization time.""" + # Test split_length validation + with pytest.raises(ValueError, match="split_length must be greater than 0"): + MarkdownHeaderSplitter(secondary_split="word", split_length=0) + + # Test split_overlap validation + with pytest.raises(ValueError, match="split_overlap must be greater than or equal to 0"): + MarkdownHeaderSplitter(secondary_split="word", split_overlap=-1) + + +def test_empty_content_handling(): + """Test handling of documents with empty content.""" + splitter_skip = MarkdownHeaderSplitter() # skip empty documents by default + docs = [Document(content="")] + result = splitter_skip.run(documents=docs) + assert len(result["documents"]) == 0 + + splitter_no_skip = MarkdownHeaderSplitter(skip_empty_documents=False) + docs = [Document(content="")] + result = splitter_no_skip.run(documents=docs) + assert len(result["documents"]) == 1 + + +def test_split_id_sequentiality_primary_and_secondary(sample_text): + # Test primary splitting + splitter = MarkdownHeaderSplitter(keep_headers=False) + docs = [Document(content=sample_text)] + result = splitter.run(documents=docs) + split_docs = result["documents"] + + # Test number of documents + assert len(split_docs) == 5 + + # Check that split_ids are sequential + split_ids = [doc.meta["split_id"] for doc in split_docs] + assert split_ids == list(range(len(split_ids))) + + # Test secondary splitting + splitter = MarkdownHeaderSplitter(secondary_split="word", split_length=3) + docs = [Document(content=sample_text)] + result = splitter.run(documents=docs) + split_docs = result["documents"] + + # Test number of documents + assert len(split_docs) == 12 + + split_ids = [doc.meta["split_id"] for doc in split_docs] + assert split_ids == list(range(len(split_ids))) + + # Test with multiple input documents + docs = [Document(content=sample_text), Document(content="# Another Header\nSome more content here.")] + result = splitter.run(documents=docs) + split_docs = result["documents"] + + # Test number of documents + assert len(split_docs) == 14 + + split_ids = [doc.meta["split_id"] for doc in split_docs] + assert split_ids == list(range(len(split_ids))) + + +def test_secondary_split_with_overlap(): + text = ( + "# Introduction\n" + "This is the introduction section with some words for testing overlap splitting. " + "It should be split into chunks with overlap.\n" + "## Details\n" + "Here are more details about the topic. " + "Splitting should work across multiple headers and content blocks.\n" + "### Subsection\n" + "This subsection contains additional information and should also be split with overlap." + ) + splitter = MarkdownHeaderSplitter(secondary_split="word", split_length=4, split_overlap=2, keep_headers=False) + docs = [Document(content=text)] + result = splitter.run(documents=docs) + split_docs = result["documents"] + assert len(split_docs) == 21 + + for i in range(1, len(split_docs)): + prev_doc = split_docs[i - 1] + curr_doc = split_docs[i] + if prev_doc.meta["header"] == curr_doc.meta["header"]: # only check overlap within same header + prev_words = prev_doc.content.split() + curr_words = curr_doc.content.split() + assert prev_words[-2:] == curr_words[:2] + + +def test_secondary_split_with_threshold(): + text = "# Header\n" + " ".join([f"word{i}" for i in range(1, 11)]) + splitter = MarkdownHeaderSplitter(secondary_split="word", split_length=3, split_threshold=2, keep_headers=False) + docs = [Document(content=text)] + result = splitter.run(documents=docs) + split_docs = result["documents"] + for doc in split_docs[:-1]: + assert len(doc.content.split()) == 3 + # The last chunk should have at least 2 words (threshold) + assert len(split_docs[-1].content.split()) >= 2 + + +def test_page_break_handling_in_secondary_split(): + text = "# Header\nFirst page\fSecond page\fThird page" + splitter = MarkdownHeaderSplitter(secondary_split="word", split_length=1) + docs = [Document(content=text)] + result = splitter.run(documents=docs) + split_docs = result["documents"] + page_numbers = [doc.meta.get("page_number") for doc in split_docs] + # Should start at 1 and increment at each \f + assert page_numbers[0] == 1 + assert max(page_numbers) == 3 + + +def test_page_break_handling_with_multiple_headers(): + text = "# Header\nFirst page\f Second page\f Third page" + splitter = MarkdownHeaderSplitter(secondary_split="word", split_length=1, keep_headers=True) + docs = [Document(content=text)] + result = splitter.run(documents=docs) + split_docs = result["documents"] + assert len(split_docs) == 7 + + # Split 1 + assert split_docs[0].content == "# " + assert split_docs[0].meta == {"source_id": ANY, "page_number": 1, "split_id": 0, "split_idx_start": 0} + + # Split 2 + assert split_docs[1].content == "Header\nFirst " + assert split_docs[1].meta == {"source_id": ANY, "page_number": 1, "split_id": 1, "split_idx_start": 2} + + # Split 3 + assert split_docs[2].content == "page\f " + assert split_docs[2].meta == {"source_id": ANY, "page_number": 1, "split_id": 2, "split_idx_start": 15} + + # Split 4 + assert split_docs[3].content == "Second " + assert split_docs[3].meta == {"source_id": ANY, "page_number": 2, "split_id": 3, "split_idx_start": 21} + + # Split 5 + assert split_docs[4].content == "page\f " + assert split_docs[4].meta == {"source_id": ANY, "page_number": 2, "split_id": 4, "split_idx_start": 28} + + # Split 6 + assert split_docs[5].content == "Third " + assert split_docs[5].meta == {"source_id": ANY, "page_number": 3, "split_id": 5, "split_idx_start": 34} + + # Split 7 + assert split_docs[6].content == "page" + assert split_docs[6].meta == {"source_id": ANY, "page_number": 3, "split_id": 6, "split_idx_start": 40} + + # Check reconstruction + reconstructed_text = "".join(doc.content for doc in split_docs) + assert reconstructed_text == text