Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 110 additions & 97 deletions Unsiloed/utils/chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,38 @@
def fixed_size_chunking(text, chunk_size=1000, overlap=100):
"""
Split text into fixed-size chunks with optional overlap.

Args:
text: The text to chunk
chunk_size: Maximum size of each chunk in characters
overlap: Number of characters to overlap between chunks

Returns:
List of chunks with metadata
Optimized for performance with minimal memory usage.
"""
chunks = []
start = 0
# Pre-calculate total chunks to avoid list resizing
text_length = len(text)

total_chunks = (text_length + chunk_size - 1) // chunk_size
chunks = [None] * total_chunks

# Process chunks
chunk_idx = 0
start = 0

while start < text_length:
# Calculate end position for current chunk
end = min(start + chunk_size, text_length)

# Extract chunk
# Extract chunk using string slicing (more efficient than find)
chunk_text = text[start:end]

# Add chunk to result
chunks.append(
{
"text": chunk_text,
"metadata": {"start_char": start, "end_char": end, "strategy": "fixed"},
chunks[chunk_idx] = {
"text": chunk_text,
"metadata": {
"start_char": start,
"end_char": end,
"strategy": "fixed"
}
)

}
# Move start position for next chunk, considering overlap
start = end - overlap if end < text_length else text_length

chunk_idx += 1

return chunks


Expand Down Expand Up @@ -86,113 +87,125 @@ def process_page(page_idx):
def paragraph_chunking(text):
"""
Split text by paragraphs.

Args:
text: The text to chunk

Returns:
List of chunks with metadata
Optimized for performance with efficient string operations.
"""
# Split text by double newlines to identify paragraphs
paragraphs = text.split("\n\n")

# Remove empty paragraphs
paragraphs = [p.strip() for p in paragraphs if p.strip()]

chunks = []
# Use a more efficient paragraph splitting approach
paragraphs = []
current_para = []
lines = text.split('\n')

for line in lines:
line = line.strip()
if line:
current_para.append(line)
elif current_para:
paragraphs.append(' '.join(current_para))
current_para = []

# Add the last paragraph if exists
if current_para:
paragraphs.append(' '.join(current_para))

# Pre-allocate chunks list
chunks = [None] * len(paragraphs)
current_position = 0

for paragraph in paragraphs:
start_position = text.find(paragraph, current_position)

for i, paragraph in enumerate(paragraphs):
# Use string slicing for position tracking
start_position = current_position
end_position = start_position + len(paragraph)

chunks.append(
{
"text": paragraph,
"metadata": {
"start_char": start_position,
"end_char": end_position,
"strategy": "paragraph",
},

chunks[i] = {
"text": paragraph,
"metadata": {
"start_char": start_position,
"end_char": end_position,
"strategy": "paragraph"
}
)

current_position = end_position

}
current_position = end_position + 2 # +2 for the "\n\n" separator
return chunks


def heading_chunking(text):
"""
Split text by headings (identified by heuristics).

Args:
text: The text to chunk

Returns:
List of chunks with metadata
Optimized for performance with compiled regex patterns.
"""
import re

# Define patterns for common heading formats
# Compile regex patterns once
heading_patterns = [
r"^#{1,6}\s+.+$", # Markdown headings
r"^[A-Z][A-Za-z\s]+$", # All caps or title case single line
r"^\d+\.\s+[A-Z]", # Numbered headings (1. Title)
r"^[IVXLCDMivxlcdm]+\.\s+[A-Z]", # Roman numeral headings (IV. Title)
re.compile(r"^#{1,6}\s+.+$"), # Markdown headings
re.compile(r"^[A-Z][A-Za-z\s]+$"), # All caps or title case single line
re.compile(r"^\d+\.\s+[A-Z]"), # Numbered headings (1. Title)
re.compile(r"^[IVXLCDMivxlcdm]+\.\s+[A-Z]") # Roman numeral headings (IV. Title)
]

# Combine patterns
combined_pattern = "|".join(f"({pattern})" for pattern in heading_patterns)

# Split by lines first

# Split by lines and process in one pass
lines = text.split("\n")

chunks = []
current_heading = "Introduction"
current_text = []
current_start = 0


# Pre-allocate chunks list with estimated size
estimated_chunks = len(lines) // 10 # Rough estimate: 10 lines per chunk
chunks = [None] * estimated_chunks
chunk_idx = 0

for line in lines:
if re.match(combined_pattern, line.strip()):
# If we have accumulated text, save it as a chunk
line = line.strip()
if not line:
continue

# Check if line matches any heading pattern
is_heading = any(pattern.match(line) for pattern in heading_patterns)

if is_heading:
# Save current chunk if exists
if current_text:
chunk_text = "\n".join(current_text)
chunks.append(
{
"text": chunk_text,
"metadata": {
"heading": current_heading,
"start_char": current_start,
"end_char": current_start + len(chunk_text),
"strategy": "heading",
},
if chunk_idx >= len(chunks):
chunks.append(None) # Extend list if needed
chunks[chunk_idx] = {
"text": chunk_text,
"metadata": {
"heading": current_heading,
"start_char": current_start,
"end_char": current_start + len(chunk_text),
"strategy": "heading"
}
)

# Start a new chunk with this heading
current_heading = line.strip()
}
chunk_idx += 1

# Start new chunk
current_heading = line
current_text = []
current_start = text.find(line, current_start)
else:
current_text.append(line)

# Add the last chunk
if current_text:
chunk_text = "\n".join(current_text)
chunks.append(
{
"text": chunk_text,
"metadata": {
"heading": current_heading,
"start_char": current_start,
"end_char": current_start + len(chunk_text),
"strategy": "heading",
},
if chunk_idx >= len(chunks):
chunks.append(None)
chunks[chunk_idx] = {
"text": chunk_text,
"metadata": {
"heading": current_heading,
"start_char": current_start,
"end_char": current_start + len(chunk_text),
"strategy": "heading"
}
)

return chunks
}
chunk_idx += 1

# Trim the list to actual size
return chunks[:chunk_idx]


def semantic_chunking(text):
Expand Down
Loading