VecClean is an ultra-low latency text cleaning, deduplication, and vectorization pipeline designed for production-ready document processing at scale. It combines the flexibility of Python with the performance of C++ to deliver maximum throughput for RAG pipelines.
pip install veccleanimport asyncio
from vecclean import Pipeline
async def main():
# Initialize the pipeline with default configuration
pipeline = Pipeline()
# Process a simple text
sample_text = """
This is a sample document for testing.
It contains multiple sentences that need processing.
Some sentences might have extra whitespace.
Others might contain HTML tags like <b>bold text</b>.
"""
# Process the text
result = await pipeline.process_text(sample_text)
# Access the processed chunks
for i, chunk in enumerate(result.chunks):
print(f"Chunk {i}: {chunk.text}")
print(f"Hash: {chunk.text_hash}")
print(f"Word count: {chunk.word_count}")
print(f"Embedding shape: {chunk.embedding.shape if chunk.embedding is not None else 'None'}")
print("---")
# Run the async function
asyncio.run(main())import asyncio
from vecclean import Pipeline
from pathlib import Path
async def process_files():
pipeline = Pipeline()
# Process multiple files
files = [
"document1.pdf",
"document2.docx",
"document3.txt"
]
result = await pipeline.process_files(files)
print(f"Processed {len(result.chunks)} chunks from {len(files)} files")
print(f"Processing time: {result.stats.total_processing_time:.2f} seconds")
# Save results to JSON
import json
with open("processed_chunks.json", "w") as f:
json.dump(result.to_dict(), f, indent=2)
asyncio.run(process_files())from vecclean import Pipeline, Config
# Create custom configuration
config = Config(
chunking={
"chunk_size": 512,
"chunk_overlap": 50,
"strategy": "sentence"
},
cleaning={
"normalize_whitespace": True,
"strip_html_tags": True,
"remove_stopwords": True
},
dedup={
"sentence_dedup": True,
"chunk_dedup": True,
"similarity_threshold": 0.85
},
embedding={
"model_name": "all-MiniLM-L6-v2",
"device": "auto"
}
)
# Initialize pipeline with custom config
pipeline = Pipeline(config)chunk_size: Maximum size of each chunk (default: 512)chunk_overlap: Overlap between chunks (default: 50)strategy: Chunking strategy - "sentence", "token", "recursive" (default: "sentence")min_chunk_size: Minimum chunk size (default: 100)max_chunk_size: Maximum chunk size (default: 1000)
normalize_unicode: Unicode normalization form (default: "NFC")normalize_whitespace: Normalize whitespace (default: True)standardize_punctuation: Standardize punctuation (default: True)strip_html_tags: Remove HTML tags (default: True)remove_stopwords: Remove stopwords (default: True)min_text_length: Minimum text length (default: 10)
sentence_dedup: Enable sentence-level deduplication (default: True)chunk_dedup: Enable chunk-level deduplication (default: True)similarity_threshold: Similarity threshold for deduplication (default: 0.85)hash_algorithm: Hash algorithm for deduplication (default: "xxhash")
model_name: Embedding model name (default: "all-MiniLM-L6-v2")device: Device for embedding generation (default: "auto")batch_size: Batch size for embedding generation (default: 32)cache_embeddings: Cache embeddings (default: True)
The main entry point for text processing.
Process a single text string.
result = await pipeline.process_text("Your text here")Process multiple files.
result = await pipeline.process_files(["file1.pdf", "file2.txt"])Process a list of document objects.
from vecclean.core.types import Document
documents = [
Document(content="Text 1", metadata={"source": "file1.txt"}),
Document(content="Text 2", metadata={"source": "file2.txt"})
]
result = await pipeline.process_documents(documents)The result object containing processed chunks and metadata.
chunks: List ofCleanedChunkobjectsstats: Processing statisticsstatus: Processing status (COMPLETED, FAILED, etc.)errors: List of error messageswarnings: List of warning messages
to_dict(): Convert to dictionary for serializationto_json(): Convert to JSON string
Represents a processed text chunk.
text: The cleaned text contenttext_hash: Hash of the text for deduplicationembedding: Vector embedding (numpy array)chunk_index: Index of the chunk in the documentstart_char: Starting character positionend_char: Ending character positionword_count: Number of words in the chunkchar_count: Number of characters in the chunk
VecClean supports processing various file formats:
- PDF (.pdf) - Text extraction with metadata
- Word Documents (.docx) - Text and formatting
- PowerPoint (.pptx) - Text from slides
- Text Files (.txt) - Plain text
- HTML (.html, .htm) - Web content
- Email (.eml) - Email messages
VecClean includes C++ optimizations for high-performance text processing:
- SIMD-optimized text cleaning
- Parallel processing with work-stealing thread pools
- Memory-efficient streaming for large files
All operations are asynchronous for better performance:
# Process multiple texts concurrently
tasks = [
pipeline.process_text(text1),
pipeline.process_text(text2),
pipeline.process_text(text3)
]
results = await asyncio.gather(*tasks)try:
result = await pipeline.process_text(text)
if result.status == ProcessingStatus.COMPLETED:
print(f"Successfully processed {len(result.chunks)} chunks")
else:
print(f"Processing failed: {result.errors}")
except Exception as e:
print(f"Error during processing: {e}")from fastapi import FastAPI, UploadFile, File
from vecclean import Pipeline
app = FastAPI()
pipeline = Pipeline()
@app.post("/process-text")
async def process_text(text: str):
result = await pipeline.process_text(text)
return {
"chunks": [chunk.to_dict() for chunk in result.chunks],
"stats": result.stats.to_dict()
}
@app.post("/process-file")
async def process_file(file: UploadFile = File(...)):
content = await file.read()
result = await pipeline.process_text(content.decode())
return {"chunks": len(result.chunks)}from langchain.text_splitter import TextSplitter
from vecclean import Pipeline
class VecCleanTextSplitter(TextSplitter):
def __init__(self, pipeline: Pipeline):
self.pipeline = pipeline
async def split_text(self, text: str):
result = await self.pipeline.process_text(text)
return [chunk.text for chunk in result.chunks]
# Usage
pipeline = Pipeline()
splitter = VecCleanTextSplitter(pipeline)
chunks = await splitter.split_text("Your long text here")- Batch Processing: Process multiple documents together for better performance
- Memory Management: Use streaming for very large files
- Configuration: Tune chunking parameters based on your use case
- Error Handling: Always check the processing status and handle errors
- Caching: Enable embedding caching for repeated processing
- C++ Backend Not Available: VecClean will fall back to Python implementation
- Memory Issues: Reduce batch size or use streaming for large files
- Slow Processing: Check if C++ backend is enabled and consider using GPU for embeddings
Enable debug logging to see detailed processing information:
import logging
logging.basicConfig(level=logging.DEBUG)
pipeline = Pipeline()
# Processing will now show detailed logsVecClean is licensed under the MIT License. See the LICENSE file for details.