Skip to content

Commit 5849f75

Browse files
catlog22claude
andcommitted
fix: 修复嵌入生成内存泄漏,优化性能
- HNSW 索引:预分配从 100 万降至 5 万,添加动态扩容和可控保存 - Embedder:添加 embed_to_numpy() 避免 .tolist() 转换,增强缓存清理 - embedding_manager:每 10 批次重建 embedder 实例,显式 gc.collect() - VectorStore:添加 bulk_insert() 上下文管理器,支持 numpy 批量写入 - Chunker:添加 skip_token_count 轻量模式,使用 char/4 估算(~9x 加速) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 45f92fe commit 5849f75

File tree

5 files changed

+420
-34
lines changed

5 files changed

+420
-34
lines changed

codex-lens/src/codexlens/cli/embedding_manager.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Embedding Manager - Manage semantic embeddings for code indexes."""
22

3+
import gc
34
import logging
45
import sqlite3
56
import time
@@ -9,14 +10,17 @@
910
try:
1011
from codexlens.semantic import SEMANTIC_AVAILABLE
1112
if SEMANTIC_AVAILABLE:
12-
from codexlens.semantic.embedder import Embedder, get_embedder
13+
from codexlens.semantic.embedder import Embedder, get_embedder, clear_embedder_cache
1314
from codexlens.semantic.vector_store import VectorStore
1415
from codexlens.semantic.chunker import Chunker, ChunkConfig
1516
except ImportError:
1617
SEMANTIC_AVAILABLE = False
1718

1819
logger = logging.getLogger(__name__)
1920

21+
# Periodic embedder recreation interval to prevent memory accumulation
22+
EMBEDDER_RECREATION_INTERVAL = 10 # Recreate embedder every N batches
23+
2024

2125
def _get_path_column(conn: sqlite3.Connection) -> str:
2226
"""Detect whether files table uses 'path' or 'full_path' column.
@@ -192,12 +196,13 @@ def generate_embeddings(
192196

193197
# Initialize components
194198
try:
195-
# Use cached embedder (singleton) for performance
199+
# Initialize embedder (will be periodically recreated to prevent memory leaks)
196200
embedder = get_embedder(profile=model_profile)
197201
chunker = Chunker(config=ChunkConfig(max_chunk_size=chunk_size))
198202

199203
if progress_callback:
200204
progress_callback(f"Using model: {embedder.model_name} ({embedder.embedding_dim} dimensions)")
205+
progress_callback(f"Memory optimization: Embedder will be recreated every {EMBEDDER_RECREATION_INTERVAL} batches")
201206

202207
except Exception as e:
203208
return {
@@ -242,6 +247,14 @@ def generate_embeddings(
242247
batch_chunks_with_paths = []
243248
files_in_batch_with_chunks = set()
244249

250+
# Periodic embedder recreation to prevent memory accumulation
251+
if batch_number % EMBEDDER_RECREATION_INTERVAL == 0:
252+
if progress_callback:
253+
progress_callback(f" [Memory optimization] Recreating embedder at batch {batch_number}")
254+
clear_embedder_cache()
255+
embedder = get_embedder(profile=model_profile)
256+
gc.collect()
257+
245258
# Step 1: Chunking for the current file batch
246259
for file_row in file_batch:
247260
file_path = file_row[path_column]
@@ -269,14 +282,19 @@ def generate_embeddings(
269282
if progress_callback:
270283
progress_callback(f" Batch {batch_number}: {len(file_batch)} files, {batch_chunk_count} chunks")
271284

272-
# Step 2: Generate embeddings for this batch
285+
# Step 2: Generate embeddings for this batch (use memory-efficient numpy method)
273286
batch_embeddings = []
274287
try:
275288
for i in range(0, batch_chunk_count, EMBEDDING_BATCH_SIZE):
276289
batch_end = min(i + EMBEDDING_BATCH_SIZE, batch_chunk_count)
277290
batch_contents = [chunk.content for chunk, _ in batch_chunks_with_paths[i:batch_end]]
278-
embeddings = embedder.embed(batch_contents)
291+
# Use embed_to_numpy() to avoid unnecessary list conversion
292+
embeddings_numpy = embedder.embed_to_numpy(batch_contents)
293+
# Convert to list only for storage (VectorStore expects list format)
294+
embeddings = [emb.tolist() for emb in embeddings_numpy]
279295
batch_embeddings.extend(embeddings)
296+
# Explicit cleanup of intermediate data
297+
del batch_contents, embeddings_numpy
280298
except Exception as e:
281299
logger.error(f"Failed to generate embeddings for batch {batch_number}: {str(e)}")
282300
failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch])
@@ -295,7 +313,9 @@ def generate_embeddings(
295313
logger.error(f"Failed to store batch {batch_number}: {str(e)}")
296314
failed_files.extend([(file_row[path_column], str(e)) for file_row in file_batch])
297315

298-
# Memory is released here as batch_chunks_with_paths and batch_embeddings go out of scope
316+
# Explicit memory cleanup after each batch
317+
del batch_chunks_with_paths, batch_embeddings
318+
gc.collect()
299319

300320
except Exception as e:
301321
return {"success": False, "error": f"Failed to read or process files: {str(e)}"}

codex-lens/src/codexlens/semantic/ann_index.py

Lines changed: 116 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from __future__ import annotations
1515

16+
import logging
1617
import threading
1718
from pathlib import Path
1819
from typing import List, Optional, Tuple
@@ -24,6 +25,8 @@
2425
if SEMANTIC_AVAILABLE:
2526
import numpy as np
2627

28+
logger = logging.getLogger(__name__)
29+
2730
# Try to import hnswlib (optional dependency)
2831
try:
2932
import hnswlib
@@ -48,16 +51,26 @@ class ANNIndex:
4851
- ef: 50 (search width during query - higher = better recall)
4952
"""
5053

51-
def __init__(self, index_path: Path, dim: int) -> None:
54+
def __init__(
55+
self,
56+
index_path: Path,
57+
dim: int,
58+
initial_capacity: int = 50000,
59+
auto_save: bool = False,
60+
expansion_threshold: float = 0.8,
61+
) -> None:
5262
"""Initialize ANN index.
5363
5464
Args:
5565
index_path: Path to SQLite database (index will be saved as _vectors.hnsw)
5666
dim: Dimension of embedding vectors
67+
initial_capacity: Initial maximum elements capacity (default: 50000)
68+
auto_save: Whether to automatically save index after operations (default: False)
69+
expansion_threshold: Capacity threshold to trigger auto-expansion (default: 0.8)
5770
5871
Raises:
5972
ImportError: If required dependencies are not available
60-
ValueError: If dimension is invalid
73+
ValueError: If dimension or capacity is invalid
6174
"""
6275
if not SEMANTIC_AVAILABLE:
6376
raise ImportError(
@@ -74,6 +87,14 @@ def __init__(self, index_path: Path, dim: int) -> None:
7487
if dim <= 0:
7588
raise ValueError(f"Invalid dimension: {dim}")
7689

90+
if initial_capacity <= 0:
91+
raise ValueError(f"Invalid initial capacity: {initial_capacity}")
92+
93+
if not 0.0 < expansion_threshold < 1.0:
94+
raise ValueError(
95+
f"Invalid expansion threshold: {expansion_threshold}. Must be between 0 and 1."
96+
)
97+
7798
self.index_path = Path(index_path)
7899
self.dim = dim
79100

@@ -89,14 +110,23 @@ def __init__(self, index_path: Path, dim: int) -> None:
89110
self.ef_construction = 200 # Build-time search width (higher = better quality)
90111
self.ef = 50 # Query-time search width (higher = better recall)
91112

113+
# Memory management parameters
114+
self._auto_save = auto_save
115+
self._expansion_threshold = expansion_threshold
116+
92117
# Thread safety
93118
self._lock = threading.RLock()
94119

95120
# HNSW index instance
96121
self._index: Optional[hnswlib.Index] = None
97-
self._max_elements = 1000000 # Initial capacity (auto-resizes)
122+
self._max_elements = initial_capacity # Initial capacity (reduced from 1M to 50K)
98123
self._current_count = 0 # Track number of vectors
99124

125+
logger.info(
126+
f"Initialized ANNIndex with capacity={initial_capacity}, "
127+
f"auto_save={auto_save}, expansion_threshold={expansion_threshold}"
128+
)
129+
100130
def _ensure_index(self) -> None:
101131
"""Ensure HNSW index is initialized (lazy initialization)."""
102132
if self._index is None:
@@ -108,6 +138,33 @@ def _ensure_index(self) -> None:
108138
)
109139
self._index.set_ef(self.ef)
110140
self._current_count = 0
141+
logger.debug(f"Created new HNSW index with capacity {self._max_elements}")
142+
143+
def _auto_expand_if_needed(self, additional_count: int) -> None:
144+
"""Auto-expand index capacity if threshold is reached.
145+
146+
Args:
147+
additional_count: Number of vectors to be added
148+
149+
Note:
150+
This is called internally by add_vectors and is thread-safe.
151+
"""
152+
usage_ratio = (self._current_count + additional_count) / self._max_elements
153+
154+
if usage_ratio >= self._expansion_threshold:
155+
# Calculate new capacity (2x current or enough to fit new vectors)
156+
new_capacity = max(
157+
self._max_elements * 2,
158+
self._current_count + additional_count,
159+
)
160+
161+
logger.info(
162+
f"Expanding index capacity: {self._max_elements} -> {new_capacity} "
163+
f"(usage: {usage_ratio:.1%}, threshold: {self._expansion_threshold:.1%})"
164+
)
165+
166+
self._index.resize_index(new_capacity)
167+
self._max_elements = new_capacity
111168

112169
def add_vectors(self, ids: List[int], vectors: np.ndarray) -> None:
113170
"""Add vectors to the index.
@@ -137,14 +194,8 @@ def add_vectors(self, ids: List[int], vectors: np.ndarray) -> None:
137194
try:
138195
self._ensure_index()
139196

140-
# Resize index if needed
141-
if self._current_count + len(ids) > self._max_elements:
142-
new_max = max(
143-
self._max_elements * 2,
144-
self._current_count + len(ids)
145-
)
146-
self._index.resize_index(new_max)
147-
self._max_elements = new_max
197+
# Auto-expand if threshold reached
198+
self._auto_expand_if_needed(len(ids))
148199

149200
# Ensure vectors are C-contiguous float32 (hnswlib requirement)
150201
if not vectors.flags['C_CONTIGUOUS'] or vectors.dtype != np.float32:
@@ -154,6 +205,15 @@ def add_vectors(self, ids: List[int], vectors: np.ndarray) -> None:
154205
self._index.add_items(vectors, ids)
155206
self._current_count += len(ids)
156207

208+
logger.debug(
209+
f"Added {len(ids)} vectors to index "
210+
f"(total: {self._current_count}/{self._max_elements})"
211+
)
212+
213+
# Auto-save if enabled
214+
if self._auto_save:
215+
self.save()
216+
157217
except Exception as e:
158218
raise StorageError(f"Failed to add vectors to ANN index: {e}")
159219

@@ -178,13 +238,21 @@ def remove_vectors(self, ids: List[int]) -> None:
178238
return # Nothing to remove
179239

180240
# Mark vectors as deleted
241+
deleted_count = 0
181242
for vec_id in ids:
182243
try:
183244
self._index.mark_deleted(vec_id)
245+
deleted_count += 1
184246
except RuntimeError:
185247
# ID not found - ignore (idempotent deletion)
186248
pass
187249

250+
logger.debug(f"Marked {deleted_count}/{len(ids)} vectors as deleted")
251+
252+
# Auto-save if enabled
253+
if self._auto_save and deleted_count > 0:
254+
self.save()
255+
188256
except Exception as e:
189257
raise StorageError(f"Failed to remove vectors from ANN index: {e}")
190258

@@ -248,6 +316,7 @@ def save(self) -> None:
248316
with self._lock:
249317
try:
250318
if self._index is None or self._current_count == 0:
319+
logger.debug("Skipping save: index is empty")
251320
return # Nothing to save
252321

253322
# Ensure parent directory exists
@@ -256,6 +325,11 @@ def save(self) -> None:
256325
# Save index
257326
self._index.save_index(str(self.hnsw_path))
258327

328+
logger.debug(
329+
f"Saved index to {self.hnsw_path} "
330+
f"({self._current_count} vectors, capacity: {self._max_elements})"
331+
)
332+
259333
except Exception as e:
260334
raise StorageError(f"Failed to save ANN index: {e}")
261335

@@ -271,20 +345,28 @@ def load(self) -> bool:
271345
with self._lock:
272346
try:
273347
if not self.hnsw_path.exists():
348+
logger.debug(f"Index file not found: {self.hnsw_path}")
274349
return False # Index file doesn't exist (not an error)
275350

276351
# Create fresh index object for loading (don't call init_index first)
277352
self._index = hnswlib.Index(space=self.space, dim=self.dim)
278353

279354
# Load index from disk
355+
# Note: max_elements here is just for initial allocation, can expand later
280356
self._index.load_index(str(self.hnsw_path), max_elements=self._max_elements)
281357

282-
# Update count from loaded index
358+
# Update count and capacity from loaded index
283359
self._current_count = self._index.get_current_count()
360+
self._max_elements = self._index.get_max_elements()
284361

285362
# Set query-time ef parameter
286363
self._index.set_ef(self.ef)
287364

365+
logger.info(
366+
f"Loaded index from {self.hnsw_path} "
367+
f"({self._current_count} vectors, capacity: {self._max_elements})"
368+
)
369+
288370
return True
289371

290372
except Exception as e:
@@ -299,6 +381,28 @@ def count(self) -> int:
299381
with self._lock:
300382
return self._current_count
301383

384+
@property
385+
def capacity(self) -> int:
386+
"""Get current maximum capacity of the index.
387+
388+
Returns:
389+
Maximum number of vectors the index can hold before expansion
390+
"""
391+
with self._lock:
392+
return self._max_elements
393+
394+
@property
395+
def usage_ratio(self) -> float:
396+
"""Get current usage ratio (count / capacity).
397+
398+
Returns:
399+
Usage ratio between 0.0 and 1.0
400+
"""
401+
with self._lock:
402+
if self._max_elements == 0:
403+
return 0.0
404+
return self._current_count / self._max_elements
405+
302406
@property
303407
def is_loaded(self) -> bool:
304408
"""Check if index is loaded and ready for use.

0 commit comments

Comments
 (0)