|
4 | 4 | import logging |
5 | 5 | import sqlite3 |
6 | 6 | import time |
| 7 | +from itertools import islice |
7 | 8 | from pathlib import Path |
8 | | -from typing import Dict, List, Optional |
| 9 | +from typing import Dict, Generator, List, Optional, Tuple |
9 | 10 |
|
10 | 11 | try: |
11 | 12 | from codexlens.semantic import SEMANTIC_AVAILABLE |
|
23 | 24 | EMBEDDING_BATCH_SIZE = 64 # Increased from 8 for better performance |
24 | 25 |
|
25 | 26 |
|
| 27 | +def _generate_chunks_from_cursor( |
| 28 | + cursor, |
| 29 | + chunker, |
| 30 | + path_column: str, |
| 31 | + file_batch_size: int, |
| 32 | + failed_files: List[Tuple[str, str]], |
| 33 | +) -> Generator[Tuple, None, Tuple[int, int]]: |
| 34 | + """Generator that yields chunks from database cursor in a streaming fashion. |
| 35 | +
|
| 36 | + This avoids loading all chunks into memory at once, significantly reducing |
| 37 | + peak memory usage for large codebases. |
| 38 | +
|
| 39 | + Args: |
| 40 | + cursor: SQLite cursor with file data |
| 41 | + chunker: Chunker instance for splitting files |
| 42 | + path_column: Column name for file path |
| 43 | + file_batch_size: Number of files to fetch at a time |
| 44 | + failed_files: List to append failed files to |
| 45 | +
|
| 46 | + Yields: |
| 47 | + (chunk, file_path) tuples |
| 48 | +
|
| 49 | + Returns: |
| 50 | + (total_files_processed, batch_count) after iteration completes |
| 51 | + """ |
| 52 | + total_files = 0 |
| 53 | + batch_count = 0 |
| 54 | + |
| 55 | + while True: |
| 56 | + file_batch = cursor.fetchmany(file_batch_size) |
| 57 | + if not file_batch: |
| 58 | + break |
| 59 | + |
| 60 | + batch_count += 1 |
| 61 | + |
| 62 | + for file_row in file_batch: |
| 63 | + file_path = file_row[path_column] |
| 64 | + content = file_row["content"] |
| 65 | + language = file_row["language"] or "python" |
| 66 | + |
| 67 | + try: |
| 68 | + chunks = chunker.chunk_sliding_window( |
| 69 | + content, |
| 70 | + file_path=file_path, |
| 71 | + language=language |
| 72 | + ) |
| 73 | + if chunks: |
| 74 | + total_files += 1 |
| 75 | + for chunk in chunks: |
| 76 | + yield (chunk, file_path) |
| 77 | + except Exception as e: |
| 78 | + logger.error(f"Failed to chunk {file_path}: {e}") |
| 79 | + failed_files.append((file_path, str(e))) |
| 80 | + |
| 81 | + |
26 | 82 | def _get_path_column(conn: sqlite3.Connection) -> str: |
27 | 83 | """Detect whether files table uses 'path' or 'full_path' column. |
28 | 84 |
|
@@ -199,7 +255,9 @@ def generate_embeddings( |
199 | 255 | try: |
200 | 256 | # Initialize embedder (singleton, reused throughout the function) |
201 | 257 | embedder = get_embedder(profile=model_profile) |
202 | | - chunker = Chunker(config=ChunkConfig(max_chunk_size=chunk_size)) |
| 258 | + # skip_token_count=True: Use fast estimation (len/4) instead of expensive tiktoken |
| 259 | + # This significantly reduces CPU usage with minimal impact on metadata accuracy |
| 260 | + chunker = Chunker(config=ChunkConfig(max_chunk_size=chunk_size, skip_token_count=True)) |
203 | 261 |
|
204 | 262 | if progress_callback: |
205 | 263 | progress_callback(f"Using model: {embedder.model_name} ({embedder.embedding_dim} dimensions)") |
@@ -238,78 +296,50 @@ def generate_embeddings( |
238 | 296 | progress_callback(f"Processing {total_files} files for embeddings in batches of {FILE_BATCH_SIZE}...") |
239 | 297 |
|
240 | 298 | cursor = conn.execute(f"SELECT {path_column}, content, language FROM files") |
| 299 | + |
| 300 | + # --- STREAMING GENERATOR APPROACH --- |
| 301 | + # Instead of accumulating all chunks from 100 files, we use a generator |
| 302 | + # that yields chunks on-demand, keeping memory usage low and constant. |
| 303 | + chunk_generator = _generate_chunks_from_cursor( |
| 304 | + cursor, chunker, path_column, FILE_BATCH_SIZE, failed_files |
| 305 | + ) |
| 306 | + |
241 | 307 | batch_number = 0 |
| 308 | + files_seen = set() |
242 | 309 |
|
243 | 310 | while True: |
244 | | - # Fetch a batch of files (streaming, not fetchall) |
245 | | - file_batch = cursor.fetchmany(FILE_BATCH_SIZE) |
246 | | - if not file_batch: |
| 311 | + # Get a small batch of chunks from the generator (EMBEDDING_BATCH_SIZE at a time) |
| 312 | + chunk_batch = list(islice(chunk_generator, EMBEDDING_BATCH_SIZE)) |
| 313 | + if not chunk_batch: |
247 | 314 | break |
248 | 315 |
|
249 | 316 | batch_number += 1 |
250 | | - batch_chunks_with_paths = [] |
251 | | - files_in_batch_with_chunks = set() |
252 | | - |
253 | | - # Step 1: Chunking for the current file batch |
254 | | - for file_row in file_batch: |
255 | | - file_path = file_row[path_column] |
256 | | - content = file_row["content"] |
257 | | - language = file_row["language"] or "python" |
258 | | - |
259 | | - try: |
260 | | - chunks = chunker.chunk_sliding_window( |
261 | | - content, |
262 | | - file_path=file_path, |
263 | | - language=language |
264 | | - ) |
265 | | - if chunks: |
266 | | - for chunk in chunks: |
267 | | - batch_chunks_with_paths.append((chunk, file_path)) |
268 | | - files_in_batch_with_chunks.add(file_path) |
269 | | - except Exception as e: |
270 | | - logger.error(f"Failed to chunk {file_path}: {e}") |
271 | | - failed_files.append((file_path, str(e))) |
272 | | - |
273 | | - if not batch_chunks_with_paths: |
274 | | - continue |
275 | 317 |
|
276 | | - batch_chunk_count = len(batch_chunks_with_paths) |
277 | | - if progress_callback: |
278 | | - progress_callback(f" Batch {batch_number}: {len(file_batch)} files, {batch_chunk_count} chunks") |
| 318 | + # Track unique files for progress |
| 319 | + for _, file_path in chunk_batch: |
| 320 | + files_seen.add(file_path) |
279 | 321 |
|
280 | | - # Step 2: Generate embeddings for this batch (use memory-efficient numpy method) |
281 | | - batch_embeddings = [] |
| 322 | + # Generate embeddings directly to numpy (no tolist() conversion) |
282 | 323 | try: |
283 | | - for i in range(0, batch_chunk_count, EMBEDDING_BATCH_SIZE): |
284 | | - batch_end = min(i + EMBEDDING_BATCH_SIZE, batch_chunk_count) |
285 | | - batch_contents = [chunk.content for chunk, _ in batch_chunks_with_paths[i:batch_end]] |
286 | | - # Use embed_to_numpy() to avoid unnecessary list conversion |
287 | | - embeddings_numpy = embedder.embed_to_numpy(batch_contents) |
288 | | - # Convert to list only for storage (VectorStore expects list format) |
289 | | - embeddings = [emb.tolist() for emb in embeddings_numpy] |
290 | | - batch_embeddings.extend(embeddings) |
291 | | - # Explicit cleanup of intermediate data |
292 | | - del batch_contents, embeddings_numpy |
293 | | - except Exception as e: |
294 | | - logger.error(f"Failed to generate embeddings for batch {batch_number}: {str(e)}") |
295 | | - failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) |
296 | | - continue |
| 324 | + batch_contents = [chunk.content for chunk, _ in chunk_batch] |
| 325 | + embeddings_numpy = embedder.embed_to_numpy(batch_contents) |
297 | 326 |
|
298 | | - # Step 3: Assign embeddings to chunks |
299 | | - for (chunk, _), embedding in zip(batch_chunks_with_paths, batch_embeddings): |
300 | | - chunk.embedding = embedding |
| 327 | + # Use add_chunks_batch_numpy to avoid numpy->list->numpy roundtrip |
| 328 | + vector_store.add_chunks_batch_numpy(chunk_batch, embeddings_numpy) |
301 | 329 |
|
302 | | - # Step 4: Store this batch to database (ANN update deferred in bulk_insert mode) |
303 | | - try: |
304 | | - vector_store.add_chunks_batch(batch_chunks_with_paths) |
305 | | - total_chunks_created += batch_chunk_count |
306 | | - total_files_processed += len(files_in_batch_with_chunks) |
307 | | - except Exception as e: |
308 | | - logger.error(f"Failed to store batch {batch_number}: {str(e)}") |
309 | | - failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch]) |
| 330 | + total_chunks_created += len(chunk_batch) |
| 331 | + total_files_processed = len(files_seen) |
| 332 | + |
| 333 | + if progress_callback and batch_number % 10 == 0: |
| 334 | + progress_callback(f" Batch {batch_number}: {total_chunks_created} chunks, {total_files_processed} files") |
310 | 335 |
|
311 | | - # Release batch references (let Python GC handle cleanup naturally) |
312 | | - del batch_chunks_with_paths, batch_embeddings |
| 336 | + # Cleanup intermediate data |
| 337 | + del batch_contents, embeddings_numpy, chunk_batch |
| 338 | + |
| 339 | + except Exception as e: |
| 340 | + logger.error(f"Failed to process embedding batch {batch_number}: {str(e)}") |
| 341 | + # Continue to next batch instead of failing entirely |
| 342 | + continue |
313 | 343 |
|
314 | 344 | # Notify before ANN index finalization (happens when bulk_insert context exits) |
315 | 345 | if progress_callback: |
|
0 commit comments