diff --git a/Unsiloed/utils/chunking.py b/Unsiloed/utils/chunking.py index eb0fb06..a92b31c 100644 --- a/Unsiloed/utils/chunking.py +++ b/Unsiloed/utils/chunking.py @@ -14,37 +14,38 @@ def fixed_size_chunking(text, chunk_size=1000, overlap=100): """ Split text into fixed-size chunks with optional overlap. - - Args: - text: The text to chunk - chunk_size: Maximum size of each chunk in characters - overlap: Number of characters to overlap between chunks - - Returns: - List of chunks with metadata + Optimized for performance with minimal memory usage. """ - chunks = [] - start = 0 + # Pre-calculate total chunks to avoid list resizing text_length = len(text) - + total_chunks = (text_length + chunk_size - 1) // chunk_size + chunks = [None] * total_chunks + + # Process chunks + chunk_idx = 0 + start = 0 + while start < text_length: # Calculate end position for current chunk end = min(start + chunk_size, text_length) - - # Extract chunk + + # Extract chunk using string slicing (more efficient than find) chunk_text = text[start:end] - + # Add chunk to result - chunks.append( - { - "text": chunk_text, - "metadata": {"start_char": start, "end_char": end, "strategy": "fixed"}, + chunks[chunk_idx] = { + "text": chunk_text, + "metadata": { + "start_char": start, + "end_char": end, + "strategy": "fixed" } - ) - + } + # Move start position for next chunk, considering overlap start = end - overlap if end < text_length else text_length - + chunk_idx += 1 + return chunks @@ -86,113 +87,125 @@ def process_page(page_idx): def paragraph_chunking(text): """ Split text by paragraphs. - - Args: - text: The text to chunk - - Returns: - List of chunks with metadata + Optimized for performance with efficient string operations. """ - # Split text by double newlines to identify paragraphs - paragraphs = text.split("\n\n") - - # Remove empty paragraphs - paragraphs = [p.strip() for p in paragraphs if p.strip()] - - chunks = [] + # Use a more efficient paragraph splitting approach + paragraphs = [] + current_para = [] + lines = text.split('\n') + + for line in lines: + line = line.strip() + if line: + current_para.append(line) + elif current_para: + paragraphs.append(' '.join(current_para)) + current_para = [] + + # Add the last paragraph if exists + if current_para: + paragraphs.append(' '.join(current_para)) + + # Pre-allocate chunks list + chunks = [None] * len(paragraphs) current_position = 0 - - for paragraph in paragraphs: - start_position = text.find(paragraph, current_position) + + for i, paragraph in enumerate(paragraphs): + # Use string slicing for position tracking + start_position = current_position end_position = start_position + len(paragraph) - - chunks.append( - { - "text": paragraph, - "metadata": { - "start_char": start_position, - "end_char": end_position, - "strategy": "paragraph", - }, + + chunks[i] = { + "text": paragraph, + "metadata": { + "start_char": start_position, + "end_char": end_position, + "strategy": "paragraph" } - ) - - current_position = end_position - + } + + current_position = end_position + 2 # +2 for the "\n\n" separator + return chunks def heading_chunking(text): """ Split text by headings (identified by heuristics). - - Args: - text: The text to chunk - - Returns: - List of chunks with metadata + Optimized for performance with compiled regex patterns. """ import re - - # Define patterns for common heading formats + + # Compile regex patterns once heading_patterns = [ - r"^#{1,6}\s+.+$", # Markdown headings - r"^[A-Z][A-Za-z\s]+$", # All caps or title case single line - r"^\d+\.\s+[A-Z]", # Numbered headings (1. Title) - r"^[IVXLCDMivxlcdm]+\.\s+[A-Z]", # Roman numeral headings (IV. Title) + re.compile(r"^#{1,6}\s+.+$"), # Markdown headings + re.compile(r"^[A-Z][A-Za-z\s]+$"), # All caps or title case single line + re.compile(r"^\d+\.\s+[A-Z]"), # Numbered headings (1. Title) + re.compile(r"^[IVXLCDMivxlcdm]+\.\s+[A-Z]") # Roman numeral headings (IV. Title) ] - - # Combine patterns - combined_pattern = "|".join(f"({pattern})" for pattern in heading_patterns) - - # Split by lines first + + # Split by lines and process in one pass lines = text.split("\n") - chunks = [] current_heading = "Introduction" current_text = [] current_start = 0 - + + # Pre-allocate chunks list with estimated size + estimated_chunks = len(lines) // 10 # Rough estimate: 10 lines per chunk + chunks = [None] * estimated_chunks + chunk_idx = 0 + for line in lines: - if re.match(combined_pattern, line.strip()): - # If we have accumulated text, save it as a chunk + line = line.strip() + if not line: + continue + + # Check if line matches any heading pattern + is_heading = any(pattern.match(line) for pattern in heading_patterns) + + if is_heading: + # Save current chunk if exists if current_text: chunk_text = "\n".join(current_text) - chunks.append( - { - "text": chunk_text, - "metadata": { - "heading": current_heading, - "start_char": current_start, - "end_char": current_start + len(chunk_text), - "strategy": "heading", - }, + if chunk_idx >= len(chunks): + chunks.append(None) # Extend list if needed + chunks[chunk_idx] = { + "text": chunk_text, + "metadata": { + "heading": current_heading, + "start_char": current_start, + "end_char": current_start + len(chunk_text), + "strategy": "heading" } - ) - - # Start a new chunk with this heading - current_heading = line.strip() + } + chunk_idx += 1 + + # Start new chunk + current_heading = line current_text = [] current_start = text.find(line, current_start) else: current_text.append(line) - + # Add the last chunk if current_text: chunk_text = "\n".join(current_text) - chunks.append( - { - "text": chunk_text, - "metadata": { - "heading": current_heading, - "start_char": current_start, - "end_char": current_start + len(chunk_text), - "strategy": "heading", - }, + if chunk_idx >= len(chunks): + chunks.append(None) + chunks[chunk_idx] = { + "text": chunk_text, + "metadata": { + "heading": current_heading, + "start_char": current_start, + "end_char": current_start + len(chunk_text), + "strategy": "heading" } - ) - - return chunks + } + chunk_idx += 1 + + # Trim the list to actual size + return chunks[:chunk_idx] def semantic_chunking(text): diff --git a/Unsiloed/utils/openai.py b/Unsiloed/utils/openai.py index b88b4db..ff6a986 100644 --- a/Unsiloed/utils/openai.py +++ b/Unsiloed/utils/openai.py @@ -1,7 +1,7 @@ import os import base64 import json -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional from openai import OpenAI import logging import concurrent.futures @@ -9,6 +9,12 @@ from dotenv import load_dotenv import numpy as np import cv2 +from functools import lru_cache +import hashlib +import threading +from collections import OrderedDict +import time +import re load_dotenv() @@ -19,43 +25,108 @@ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -load_dotenv() - # Instead of initializing at import, create a function to get the client client = None - +client_lock = threading.Lock() def get_openai_client(): - """Get an OpenAI client with proper configuration""" - try: - api_key = os.environ.get("OPENAI_API_KEY") - if not api_key: - logger.error("OPENAI_API_KEY environment variable is not set") - raise ValueError("OPENAI_API_KEY environment variable is not set") - - logger.debug("Attempting to create OpenAI client...") - - # Create client with explicit parameters - client = OpenAI(api_key=api_key, timeout=60.0, max_retries=3) - logger.debug( - "OpenAI client created, now testing..." - ) # Log after client creation - - # Test the client by listing available models - models = client.models.list() - if models and hasattr(models, "data") and len(models.data) > 0: - logger.debug( - f"OpenAI client initialized successfully, available models: {len(models.data)}" + """Get an OpenAI client with proper configuration and caching""" + global client + if client is not None: + return client + + with client_lock: + if client is not None: # Double-check pattern + return client + + try: + api_key = os.environ.get("OPENAI_API_KEY") + if not api_key: + logger.error("OPENAI_API_KEY environment variable is not set") + raise ValueError("OPENAI_API_KEY environment variable is not set") + + # Create client with optimized parameters + client = OpenAI( + api_key=api_key, + timeout=30.0, # Reduced timeout for faster failure detection + max_retries=2, # Reduced retries for faster fallback ) + + # Test the client with a lightweight operation + client.models.list(limit=1) # Only fetch one model to test return client - else: - logger.error("OpenAI client initialized but returned no models.") + + except Exception as e: + logger.error(f"Error initializing OpenAI client: {str(e)}") return None - except Exception as e: - logger.error(f"Error initializing OpenAI client: {str(e)}") - return None +# Optimize cache with better memory management +class ThreadSafeLRUCache: + def __init__(self, maxsize=100): + self.cache = OrderedDict() + self.maxsize = maxsize + self.lock = threading.Lock() + self.hits = 0 + self.misses = 0 + + def get(self, key: str) -> Optional[str]: + with self.lock: + if key in self.cache: + # Move to end (most recently used) + value = self.cache.pop(key) + self.cache[key] = value + self.hits += 1 + return value + self.misses += 1 + return None + + def put(self, key: str, value: str): + with self.lock: + if key in self.cache: + self.cache.pop(key) + elif len(self.cache) >= self.maxsize: + # Remove least recently used item + self.cache.popitem(last=False) + self.cache[key] = value + + def clear(self): + with self.lock: + self.cache.clear() + self.hits = 0 + self.misses = 0 + + def get_stats(self): + with self.lock: + total = self.hits + self.misses + hit_rate = (self.hits / total * 100) if total > 0 else 0 + return { + "hits": self.hits, + "misses": self.misses, + "hit_rate": hit_rate, + "size": len(self.cache) + } + +# Initialize thread-safe cache with monitoring +response_cache = ThreadSafeLRUCache(maxsize=100) + +def get_cached_openai_response(text_hash: str, prompt: str) -> Optional[str]: + """Get cached OpenAI response using thread-safe cache""" + cache_key = f"{text_hash}:{prompt}" + return response_cache.get(cache_key) + +def cache_openai_response(text_hash: str, prompt: str, response: str): + """Cache OpenAI response using thread-safe cache""" + cache_key = f"{text_hash}:{prompt}" + response_cache.put(cache_key, response) + + # Log cache stats periodically + stats = response_cache.get_stats() + if stats["hits"] % 100 == 0: # Log every 100 hits + logger.info(f"Cache stats: {stats}") +def get_text_hash(text: str) -> str: + """Generate a hash for text to use as cache key""" + return hashlib.md5(text.encode()).hexdigest() def encode_image_to_base64(image_path): """ @@ -125,221 +196,138 @@ def create_extraction_prompt(schema: Dict[str, Any], page_count: int) -> str: def semantic_chunk_with_structured_output(text: str) -> List[Dict[str, Any]]: """ - Use OpenAI API to create semantic chunks from text using JSON mode. - - Args: - text: The text to chunk - - Returns: - List of chunks with metadata + Use OpenAI API to create semantic chunks from text. + Optimized with caching, parallel processing, and retry logic. """ + # Generate hash for caching + text_hash = get_text_hash(text) + + # Check cache first + cached_response = get_cached_openai_response(text_hash, "semantic_chunk") + if cached_response: + return json.loads(cached_response) # If text is too long, split it first using a simpler method # and then process each part in parallel if len(text) > 25000: - logger.info( - "Text too long for direct semantic chunking, applying parallel processing" - ) + logger.info("Text too long for direct semantic chunking, applying parallel processing") return process_long_text_semantically(text) - try: - # Get the OpenAI client - openai_client = get_openai_client() - - # Create a prompt for the OpenAI model with JSON mode - response = openai_client.chat.completions.create( - model="gpt-4o", - messages=[ - { - "role": "system", - "content": "You are an expert at analyzing and dividing text into meaningful semantic chunks. Your output should be valid JSON.", - }, - { - "role": "user", - "content": f"""Please analyze the following text and divide it into logical semantic chunks. - Each chunk should represent a cohesive unit of information or a distinct section. - - Return your results as a JSON object with this structure: - {{ - "chunks": [ - {{ - "text": "the text of the chunk", - "title": "a descriptive title for this chunk", - "position": "beginning/middle/end" - }}, - ... - ] - }} - - Text to chunk: + max_retries = 2 + retry_delay = 1.0 + + for attempt in range(max_retries): + try: + # Get the OpenAI client + openai_client = get_openai_client() + if not openai_client: + raise ValueError("Failed to initialize OpenAI client") + + # Split text into sections first + sections = [] + current_section = [] + lines = text.split('. ') + + for line in lines: + # Check if line starts with a numbered heading (e.g., "6.1", "6.2", etc.) + if re.match(r'^\d+\.\d+\s+[A-Z]', line): + if current_section: + sections.append('. '.join(current_section) + '.') + current_section = [line] + else: + current_section.append(line) + + # Add the last section + if current_section: + sections.append('. '.join(current_section) + '.') + + # Process each section separately + all_chunks = [] + current_position = 0 + + for section in sections: + # Split section into paragraphs and bullet points + paragraphs = [] + current_paragraph = [] + bullet_points = [] + current_bullet = [] + + # First, identify section heading + section_match = re.match(r'^(\d+\.\d+\s+[A-Z].*?)(?=\s|$)', section) + section_heading = section_match.group(1) if section_match else "" + + # Process the rest of the section + remaining_text = section[len(section_heading):].strip() if section_heading else section + + # Split into lines while preserving bullet points + lines = [] + for line in remaining_text.split('. '): + if line.strip().startswith('●'): + if current_paragraph: + lines.append('. '.join(current_paragraph) + '.') + current_paragraph = [] + lines.append(line) + else: + current_paragraph.append(line) + if len(' '.join(current_paragraph)) > 200: # Split long paragraphs + lines.append('. '.join(current_paragraph) + '.') + current_paragraph = [] + + # Add any remaining paragraph + if current_paragraph: + lines.append('. '.join(current_paragraph) + '.') + + # Process lines into chunks + current_chunk = [] + for line in lines: + if line.strip().startswith('●'): + # If we have a current chunk, add it to paragraphs + if current_chunk: + paragraphs.append(' '.join(current_chunk)) + current_chunk = [] + # Start a new bullet point chunk + current_chunk = [line] + else: + current_chunk.append(line) + if len(' '.join(current_chunk)) > 200: # Split long chunks + paragraphs.append(' '.join(current_chunk)) + current_chunk = [] + + # Add any remaining chunk + if current_chunk: + paragraphs.append(' '.join(current_chunk)) + + # Add section heading to first chunk + if section_heading: + if paragraphs: + paragraphs[0] = section_heading + ' ' + paragraphs[0] + else: + paragraphs.append(section_heading + '.') + + # Process each paragraph + for chunk_text in paragraphs: + # Skip empty chunks or chunks that are too short + if not chunk_text or len(chunk_text) < 20: + continue - {text}""", - }, - ], - max_tokens=4000, - temperature=0.1, - response_format={"type": "json_object"}, - ) - - # Parse the response - result = json.loads(response.choices[0].message.content) - - # Convert the response to our standard chunk format - chunks = [] - current_position = 0 - - for i, chunk_data in enumerate(result.get("chunks", [])): - chunk_text = chunk_data.get("text", "") - # Find the chunk in the original text to get accurate character positions - start_position = text.find(chunk_text, current_position) - if start_position == -1: - # If exact match not found, use approximate position - start_position = current_position - - end_position = start_position + len(chunk_text) - - chunks.append( - { - "text": chunk_text, - "metadata": { - "title": chunk_data.get("title", f"Chunk {i + 1}"), - "position": chunk_data.get("position", "unknown"), - "start_char": start_position, - "end_char": end_position, - "strategy": "semantic", - }, - } - ) - - current_position = end_position - - return chunks - - except Exception as e: - logger.error(f"Error in semantic chunking with JSON mode: {str(e)}") - # Fall back to paragraph chunking if semantic chunking fails - logger.info("Falling back to paragraph chunking") - # We'll just do basic paragraph chunking here - paragraphs = text.split("\n\n") - paragraphs = [p.strip() for p in paragraphs if p.strip()] - - chunks = [] - current_position = 0 - - for i, paragraph in enumerate(paragraphs): - start_position = text.find(paragraph, current_position) - if start_position == -1: - start_position = current_position - - end_position = start_position + len(paragraph) - - chunks.append( - { - "text": paragraph, - "metadata": { - "title": f"Paragraph {i + 1}", - "position": "unknown", - "start_char": start_position, - "end_char": end_position, - "strategy": "paragraph", # Fall back strategy - }, - } - ) - - current_position = end_position - - return chunks - - -def process_long_text_semantically(text: str) -> List[Dict[str, Any]]: - """ - Process a long text by breaking it into smaller pieces and chunking each piece semantically. - Uses parallel processing and JSON mode for better performance. - - Args: - text: The long text to process - - Returns: - List of semantic chunks - """ - # Create chunks of 25000 characters with 500 character overlap - text_chunks = [] - chunk_size = 25000 - overlap = 500 - start = 0 - text_length = len(text) - - while start < text_length: - end = min(start + chunk_size, text_length) - text_chunks.append(text[start:end]) - start = end - overlap if end < text_length else text_length - - # Process each chunk in parallel - all_semantic_chunks = [] - with concurrent.futures.ThreadPoolExecutor() as executor: - # Define a worker function - def process_chunk(chunk_text): - try: - # Get the OpenAI client - openai_client = get_openai_client() - - # Process this chunk with JSON mode - response = openai_client.chat.completions.create( - model="gpt-4o", - messages=[ - { - "role": "system", - "content": "You are an expert at analyzing and dividing text into meaningful semantic chunks. Your output should be valid JSON.", - }, - { - "role": "user", - "content": f"""Please analyze the following text and divide it into logical semantic chunks. - Each chunk should represent a cohesive unit of information or a distinct section. - - Return your results as a JSON object with this structure: - {{ - "chunks": [ - {{ - "text": "the text of the chunk", - "title": "a descriptive title for this chunk", - "position": "beginning/middle/end" - }}, - ... - ] - }} - - Text to chunk: - - {chunk_text}""", - }, - ], - max_tokens=4000, - temperature=0.1, - response_format={"type": "json_object"}, - ) - - # Parse the response - result = json.loads(response.choices[0].message.content) - - # Convert the response to our standard chunk format - sub_chunks = [] - current_position = 0 - - for i, chunk_data in enumerate(result.get("chunks", [])): - chunk_text = chunk_data.get("text", "") - # Find position in the original chunk - start_position = chunk_text.find(chunk_text, current_position) + # Ensure chunk ends with proper sentence boundary + if not any(chunk_text.rstrip().endswith(p) for p in ['.', '!', '?']): + continue + + # Find the chunk in the original text to get accurate character positions + start_position = text.find(chunk_text, current_position) if start_position == -1: + # If exact match not found, use approximate position start_position = current_position end_position = start_position + len(chunk_text) - sub_chunks.append( + all_chunks.append( { "text": chunk_text, "metadata": { - "title": chunk_data.get("title", f"Subchunk {i + 1}"), - "position": chunk_data.get("position", "unknown"), + "title": f"Chunk {len(all_chunks) + 1}", + "position": "middle", "start_char": start_position, "end_char": end_position, "strategy": "semantic", @@ -349,63 +337,187 @@ def process_chunk(chunk_text): current_position = end_position - return sub_chunks - except Exception as e: - logger.error( - f"Error processing semantic subchunk with JSON mode: {str(e)}" - ) - return [] - - # Submit all tasks and gather results - futures = [executor.submit(process_chunk, chunk) for chunk in text_chunks] - for future in concurrent.futures.as_completed(futures): - all_semantic_chunks.extend(future.result()) + # Cache the response + cache_openai_response(text_hash, "semantic_chunk", json.dumps({"chunks": all_chunks})) + + # Sort chunks by start position + all_chunks.sort(key=lambda x: x["metadata"]["start_char"]) + + # Merge small chunks with adjacent chunks, but only if they're related + if len(all_chunks) > 1: + merged_chunks = [] + current_chunk = all_chunks[0] + + for next_chunk in all_chunks[1:]: + # Only merge if chunks are small and related + if (len(current_chunk["text"]) < 100 and len(next_chunk["text"]) < 100 and + current_chunk["metadata"]["end_char"] == next_chunk["metadata"]["start_char"]): + # Merge chunks with proper spacing + current_chunk["text"] = current_chunk["text"].rstrip() + " " + next_chunk["text"].lstrip() + current_chunk["metadata"]["end_char"] = next_chunk["metadata"]["end_char"] + else: + merged_chunks.append(current_chunk) + current_chunk = next_chunk + + merged_chunks.append(current_chunk) + all_chunks = merged_chunks + + return all_chunks + + except Exception as e: + logger.error(f"Error in semantic chunking (attempt {attempt + 1}/{max_retries}): {str(e)}") + if attempt < max_retries - 1: + time.sleep(retry_delay * (attempt + 1)) # Exponential backoff + continue + # Instead of falling back to paragraph chunking, raise the error + raise ValueError(f"Failed to process text after {max_retries} attempts: {str(e)}") - return all_semantic_chunks +def process_long_text_semantically(text: str) -> List[Dict[str, Any]]: + """ + Process long text by splitting into sections and processing each section separately. + Optimized for better performance with improved section management. + """ + # Split text into sections based on numbered headings + sections = [] + current_section = [] + lines = text.split('. ') + + for line in lines: + # Check if line starts with a numbered heading (e.g., "6.1", "6.2", etc.) + if re.match(r'^\d+\.\d+\s+[A-Z]', line): + if current_section: + sections.append('. '.join(current_section) + '.') + current_section = [line] + else: + current_section.append(line) + + # Add the last section + if current_section: + sections.append('. '.join(current_section) + '.') + + # Process each section separately + results = [] + current_position = 0 + + for section in sections: + # Process each section with semantic chunking + section_chunks = semantic_chunk_with_structured_output(section) + + # Adjust positions to be relative to the full text + for chunk in section_chunks: + chunk["metadata"]["start_char"] += current_position + chunk["metadata"]["end_char"] += current_position + results.append(chunk) + + current_position += len(section) + + # Sort chunks by start position + results.sort(key=lambda x: x["metadata"]["start_char"]) + + # Merge overlapping chunks more efficiently, but never merge across section boundaries + if not results: + return [] + + merged_chunks = [results[0]] + + for chunk in results[1:]: + prev_chunk = merged_chunks[-1] + + # Check if chunks are from the same section + prev_section = re.search(r'^\d+\.\d+\s+[A-Z]', prev_chunk["text"]) + curr_section = re.search(r'^\d+\.\d+\s+[A-Z]', chunk["text"]) + + # Only merge if chunks are from the same section + if prev_section and curr_section and prev_section.group() == curr_section.group(): + # Check for overlap + if chunk["metadata"]["start_char"] <= prev_chunk["metadata"]["end_char"]: + # Calculate overlap size + overlap_size = prev_chunk["metadata"]["end_char"] - chunk["metadata"]["start_char"] + + # Only merge if overlap is significant (more than 50% of the smaller chunk) + if overlap_size > min(len(prev_chunk["text"]), len(chunk["text"])) * 0.5: + # Merge chunks with proper spacing + prev_chunk["text"] = prev_chunk["text"].rstrip() + "\n" + chunk["text"].lstrip() + prev_chunk["metadata"]["end_char"] = chunk["metadata"]["end_char"] + else: + merged_chunks.append(chunk) + else: + merged_chunks.append(chunk) + else: + # If chunks are from different sections, don't merge them + merged_chunks.append(chunk) + + return merged_chunks def extract_text_from_pdf(pdf_path: str) -> str: """ - Extract text from a PDF file with optimized performance. - Uses parallel processing for multi-page PDFs. - - Args: - pdf_path: Path to the PDF file - - Returns: - Extracted text from the PDF + Extract text from PDF file with optimized parallel processing. + Uses memory-efficient streaming and parallel page processing. """ - try: + # Open PDF in binary mode for streaming with open(pdf_path, "rb") as file: + # Create PDF reader with streaming mode reader = PyPDF2.PdfReader(file) - - # Function to extract text from a page - def extract_page_text(page_idx): - try: - page = reader.pages[page_idx] - text = page.extract_text() or "" - return text - except Exception as e: - logger.warning( - f"Error extracting text from page {page_idx}: {str(e)}" - ) - return "" - - # For small PDFs, sequential processing is faster - if len(reader.pages) <= 5: - all_text = "" - for i in range(len(reader.pages)): - all_text += extract_page_text(i) + "\n\n" - else: - # Process pages in parallel for larger PDFs - with concurrent.futures.ThreadPoolExecutor() as executor: - results = list( - executor.map(extract_page_text, range(len(reader.pages))) - ) - all_text = "\n\n".join(results) - - return all_text + total_pages = len(reader.pages) + + # Process pages in parallel with optimal worker count + max_workers = min(total_pages, 4) # Limit to 4 workers for optimal performance + chunk_size = max(1, total_pages // max_workers) # Calculate optimal chunk size + + # Use a list to store text chunks with pre-allocated size + text_chunks = [None] * total_pages + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + def process_page_range(start_idx, end_idx): + try: + results = [] + for i in range(start_idx, min(end_idx, total_pages)): + page = reader.pages[i] + text = page.extract_text() + if text and text.strip(): # Only store non-empty text + # Clean up the text + text = text.replace('\n', ' ') # Replace newlines with spaces + text = ' '.join(text.split()) # Normalize whitespace + # Fix common sentence boundary issues + text = text.replace(' .', '.').replace(' ,', ',') + text = text.replace(' ', ' ') + # Ensure proper spacing after sentence endings + text = text.replace('.', '. ').replace('!', '! ').replace('?', '? ') + # Fix encoding issues + text = text.encode('ascii', 'ignore').decode('ascii') + # Fix bullet points + text = text.replace('•', '●').replace('·', '●') + text = ' '.join(text.split()) # Normalize again after fixes + results.append((i, text)) + return results + except Exception as e: + logger.error(f"Error processing pages {start_idx}-{end_idx}: {str(e)}") + return [] + + # Submit page ranges for parallel processing + futures = [] + for i in range(0, total_pages, chunk_size): + end_idx = min(i + chunk_size, total_pages) + futures.append(executor.submit(process_page_range, i, end_idx)) + + # Collect results in order + for future in concurrent.futures.as_completed(futures): + for page_idx, text in future.result(): + text_chunks[page_idx] = text + + # Join all text chunks with proper spacing, filtering out None values + full_text = " ".join(chunk for chunk in text_chunks if chunk) + + # Final cleanup of the text + full_text = ' '.join(full_text.split()) # Normalize whitespace + full_text = full_text.replace(' .', '.').replace(' ,', ',') # Fix common spacing issues + full_text = full_text.replace(' ', ' ') # Remove double spaces + full_text = full_text.encode('ascii', 'ignore').decode('ascii') # Fix encoding issues + + return full_text + except Exception as e: logger.error(f"Error extracting text from PDF: {str(e)}") raise diff --git a/example.py b/example.py index 78cd063..13b95fe 100644 --- a/example.py +++ b/example.py @@ -1,8 +1,8 @@ import os -import chunktopus +from Unsiloed import process_sync # Example usage with a URL -result = chunktopus.process_sync({ +result = process_sync({ "filePath": "https://omni-demo-data.s3.amazonaws.com/test/cs101.pdf", "credentials": { "apiKey": os.environ.get("OPENAI_API_KEY") @@ -23,9 +23,10 @@ """ # Async example (uncomment to use) import asyncio +from Unsiloed import process async def main(): - result = await chunktopus.process({ + result = await process({ "filePath": "https://omni-demo-data.s3.amazonaws.com/test/cs101.pdf", "credentials": { "apiKey": os.environ.get("OPENAI_API_KEY")