Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions Unsiloed/services/chunking.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import os
import requests
import tempfile
import logging
from urllib.parse import urlparse

from Unsiloed.utils.chunking import (
fixed_size_chunking,
page_based_chunking,
Expand All @@ -11,8 +17,6 @@
extract_text_from_pptx,
)

import logging

logger = logging.getLogger(__name__)


Expand All @@ -25,26 +29,14 @@ def process_document_chunking(
):
"""
Process a document file (PDF, DOCX, PPTX) with the specified chunking strategy.

Args:
file_path: Path to the document file
file_type: Type of document (pdf, docx, pptx)
strategy: Chunking strategy to use
chunk_size: Size of chunks for fixed strategy
overlap: Overlap size for fixed strategy

Returns:
Dictionary with chunking results
"""
logger.info(
f"Processing {file_type.upper()} document with {strategy} chunking strategy"
)

# Handle page-based chunking for PDFs only
if strategy == "page" and file_type == "pdf":
chunks = page_based_chunking(file_path)
else:
# Extract text based on file type
if file_type == "pdf":
text = extract_text_from_pdf(file_path)
elif file_type == "docx":
Expand All @@ -54,7 +46,6 @@ def process_document_chunking(
else:
raise ValueError(f"Unsupported file type: {file_type}")

# Apply the selected chunking strategy
if strategy == "fixed":
chunks = fixed_size_chunking(text, chunk_size, overlap)
elif strategy == "semantic":
Expand All @@ -64,28 +55,66 @@ def process_document_chunking(
elif strategy == "heading":
chunks = heading_chunking(text)
elif strategy == "page" and file_type != "pdf":
# For non-PDF files, fall back to paragraph chunking for page strategy
logger.warning(
f"Page-based chunking not supported for {file_type}, falling back to paragraph chunking"
)
chunks = paragraph_chunking(text)
else:
raise ValueError(f"Unknown chunking strategy: {strategy}")

# Calculate statistics
total_chunks = len(chunks)
avg_chunk_size = (
sum(len(chunk["text"]) for chunk in chunks) / total_chunks
if total_chunks > 0
else 0
)

result = {
return {
"file_type": file_type,
"strategy": strategy,
"total_chunks": total_chunks,
"avg_chunk_size": avg_chunk_size,
"chunks": chunks,
}

return result

def process_sync(payload: dict):
"""
Synchronous processor for documents using strategy-based chunking.
Supports remote file downloads and infers file type.
"""
file_path = payload.get("filePath")
strategy = payload.get("strategy", "semantic")
chunk_size = payload.get("chunkSize", 1000)
overlap = payload.get("overlap", 100)

# Handle remote URLs
if file_path.startswith("http"):
logger.info(f"Downloading remote file from {file_path}")
response = requests.get(file_path)
response.raise_for_status()

content_disposition = response.headers.get("content-disposition", "")
_, ext = os.path.splitext(file_path or content_disposition or ".pdf")

with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp_file:
tmp_file.write(response.content)
tmp_path = tmp_file.name

file_path = tmp_path
else:
_, ext = os.path.splitext(file_path)

ext = ext.lower().lstrip(".")
file_type = {"pdf": "pdf", "docx": "docx", "pptx": "pptx"}.get(ext)

if not file_type:
raise ValueError(f"Unsupported file extension: .{ext}")

return process_document_chunking(
file_path=file_path,
file_type=file_type,
strategy=strategy,
chunk_size=chunk_size,
overlap=overlap,
)
8 changes: 8 additions & 0 deletions Unsiloed/tests/test_text_cleaning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from Unsiloed.text_cleaning.cleaning_pipeline import TextCleaningPipeline

def test_cleaning_pipeline():
text = "fi Test “quoted”\nparagraph\n1. Numbered"
cleaned = TextCleaningPipeline().clean(text)
assert 'fi' not in cleaned
assert '“' not in cleaned
assert '\n' not in cleaned or cleaned.count('\n') < 2
Empty file.
34 changes: 34 additions & 0 deletions Unsiloed/text_cleaning/cleaning_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from .cleaning_utils import (
normalize_unicode, replace_ligatures, normalize_quotes,
clean_bullets, group_paragraphs, normalize_whitespace, decode_mime
)

class TextCleaningPipeline:
def __init__(self, config=None):
default_config = {
'normalize_unicode': True,
'replace_ligatures': True,
'normalize_quotes': True,
'clean_bullets': True,
'group_paragraphs': True,
'normalize_whitespace': True,
'decode_mime': True,
}
self.config = config or default_config

def clean(self, text):
if self.config.get('normalize_unicode'):
text = normalize_unicode(text)
if self.config.get('replace_ligatures'):
text = replace_ligatures(text)
if self.config.get('normalize_quotes'):
text = normalize_quotes(text)
if self.config.get('clean_bullets'):
text = clean_bullets(text)
if self.config.get('group_paragraphs'):
text = group_paragraphs(text)
if self.config.get('normalize_whitespace'):
text = normalize_whitespace(text)
if self.config.get('decode_mime'):
text = decode_mime(text)
return text
34 changes: 34 additions & 0 deletions Unsiloed/text_cleaning/cleaning_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import re
import unicodedata
import html

def normalize_unicode(text):
return unicodedata.normalize('NFKC', text)

def replace_ligatures(text):
ligatures = {'fi': 'fi', 'fl': 'fl', 'ffi': 'ffi'}
for lig, rep in ligatures.items():
text = text.replace(lig, rep)
return text

def normalize_quotes(text):
return text.replace('“', '"').replace('”', '"').replace("‘", "'").replace("’", "'")

def clean_bullets(text):
bullet_patterns = [r'^\s*[-*•]\s+', r'^\s*\d+\.\s+']
for pattern in bullet_patterns:
text = re.sub(pattern, '', text, flags=re.MULTILINE)
return text

def group_paragraphs(text):
return re.sub(r'(?<!\n)\n(?!\n)', ' ', text)

def normalize_whitespace(text):
text = re.sub(r'\s+', ' ', text)
return text.strip()

def decode_mime(text):
try:
return html.unescape(text)
except Exception:
return text
74 changes: 15 additions & 59 deletions Unsiloed/utils/chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,67 +5,50 @@
from Unsiloed.utils.openai import (
semantic_chunk_with_structured_output,
)
from Unsiloed.text_cleaning.cleaning_pipeline import TextCleaningPipeline

logger = logging.getLogger(__name__)

ChunkingStrategy = Literal["fixed", "page", "semantic", "paragraph", "heading"]

cleaner = TextCleaningPipeline()

def fixed_size_chunking(text, chunk_size=1000, overlap=100):
"""
Split text into fixed-size chunks with optional overlap.

Args:
text: The text to chunk
chunk_size: Maximum size of each chunk in characters
overlap: Number of characters to overlap between chunks

Returns:
List of chunks with metadata
"""
text = cleaner.clean(text)

chunks = []
start = 0
text_length = len(text)

while start < text_length:
# Calculate end position for current chunk
end = min(start + chunk_size, text_length)

# Extract chunk
chunk_text = text[start:end]

# Add chunk to result
chunks.append(
{
"text": chunk_text,
"metadata": {"start_char": start, "end_char": end, "strategy": "fixed"},
}
)

# Move start position for next chunk, considering overlap
start = end - overlap if end < text_length else text_length

return chunks


def page_based_chunking(pdf_path):
"""
Split PDF by pages, with each page as a separate chunk.

Args:
pdf_path: Path to the PDF file

Returns:
List of chunks with metadata
Split PDF by pages.
"""
try:
chunks = []
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)

# Use ThreadPoolExecutor to process pages in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
# Function to process a single page
def process_page(page_idx):
page = reader.pages[page_idx]
text = page.extract_text()
Expand All @@ -74,7 +57,6 @@ def process_page(page_idx):
"metadata": {"page": page_idx + 1, "strategy": "page"},
}

# Process all pages in parallel
chunks = list(executor.map(process_page, range(len(reader.pages))))

return chunks
Expand All @@ -86,19 +68,11 @@ def process_page(page_idx):
def paragraph_chunking(text):
"""
Split text by paragraphs.

Args:
text: The text to chunk

Returns:
List of chunks with metadata
"""
# Split text by double newlines to identify paragraphs
paragraphs = text.split("\n\n")
text = cleaner.clean(text)

# Remove empty paragraphs
paragraphs = text.split("\n\n")
paragraphs = [p.strip() for p in paragraphs if p.strip()]

chunks = []
current_position = 0

Expand All @@ -124,28 +98,19 @@ def paragraph_chunking(text):

def heading_chunking(text):
"""
Split text by headings (identified by heuristics).

Args:
text: The text to chunk

Returns:
List of chunks with metadata
Split text by headings.
"""
import re
text = cleaner.clean(text)

# Define patterns for common heading formats
heading_patterns = [
r"^#{1,6}\s+.+$", # Markdown headings
r"^[A-Z][A-Za-z\s]+$", # All caps or title case single line
r"^\d+\.\s+[A-Z]", # Numbered headings (1. Title)
r"^[IVXLCDMivxlcdm]+\.\s+[A-Z]", # Roman numeral headings (IV. Title)
r"^#{1,6}\s+.+$",
r"^[A-Z][A-Za-z\s]+$",
r"^\d+\.\s+[A-Z]",
r"^[IVXLCDMivxlcdm]+\.\s+[A-Z]",
]

# Combine patterns
combined_pattern = "|".join(f"({pattern})" for pattern in heading_patterns)

# Split by lines first
lines = text.split("\n")

chunks = []
Expand All @@ -155,7 +120,6 @@ def heading_chunking(text):

for line in lines:
if re.match(combined_pattern, line.strip()):
# If we have accumulated text, save it as a chunk
if current_text:
chunk_text = "\n".join(current_text)
chunks.append(
Expand All @@ -170,14 +134,12 @@ def heading_chunking(text):
}
)

# Start a new chunk with this heading
current_heading = line.strip()
current_text = []
current_start = text.find(line, current_start)
else:
current_text.append(line)

# Add the last chunk
if current_text:
chunk_text = "\n".join(current_text)
chunks.append(
Expand All @@ -197,13 +159,7 @@ def heading_chunking(text):

def semantic_chunking(text):
"""
Use OpenAI to identify semantic chunks in the text.

Args:
text: The text to chunk

Returns:
List of chunks with metadata
Use OpenAI to identify semantic chunks.
"""
# Use the optimized semantic chunking with Structured Outputs
text = cleaner.clean(text)
return semantic_chunk_with_structured_output(text)
Loading