Skip to content

Commit 4fd6d0c

Browse files
authored
fix: Optimize sync memory usage to prevent OOM on large projects (#380)
Signed-off-by: Claude <[email protected]> Signed-off-by: phernandez <[email protected]>
1 parent e6c8e36 commit 4fd6d0c

File tree

2 files changed

+41
-10
lines changed

2 files changed

+41
-10
lines changed

src/basic_memory/config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,13 @@ class BasicMemoryConfig(BaseSettings):
8484

8585
sync_thread_pool_size: int = Field(
8686
default=4,
87-
description="Size of thread pool for file I/O operations in sync service",
87+
description="Size of thread pool for file I/O operations in sync service. Default of 4 is optimized for cloud deployments with 1-2GB RAM.",
88+
gt=0,
89+
)
90+
91+
sync_max_concurrent_files: int = Field(
92+
default=10,
93+
description="Maximum number of files to process concurrently during sync. Limits memory usage on large projects (2000+ files). Lower values reduce memory consumption.",
8894
gt=0,
8995
)
9096

src/basic_memory/sync/sync_service.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import os
55
import time
6+
from collections import OrderedDict
67
from concurrent.futures import ThreadPoolExecutor
78
from dataclasses import dataclass, field
89
from datetime import datetime
@@ -133,15 +134,27 @@ def __init__(
133134
# Load ignore patterns once at initialization for performance
134135
self._ignore_patterns = load_bmignore_patterns()
135136
# Circuit breaker: track file failures to prevent infinite retry loops
136-
self._file_failures: Dict[str, FileFailureInfo] = {}
137+
# Use OrderedDict for LRU behavior with bounded size to prevent unbounded memory growth
138+
self._file_failures: OrderedDict[str, FileFailureInfo] = OrderedDict()
139+
self._max_tracked_failures = 100 # Limit failure cache size
140+
# Semaphore to limit concurrent file operations and prevent OOM on large projects
141+
# Limits peak memory usage by processing files in batches rather than all at once
142+
self._file_semaphore = asyncio.Semaphore(app_config.sync_max_concurrent_files)
137143

138144
async def _read_file_async(self, file_path: Path) -> str:
139-
"""Read file content in thread pool to avoid blocking the event loop."""
140-
loop = asyncio.get_event_loop()
141-
return await loop.run_in_executor(self._thread_pool, file_path.read_text, "utf-8")
145+
"""Read file content in thread pool to avoid blocking the event loop.
146+
147+
Uses semaphore to limit concurrent file reads and prevent OOM on large projects.
148+
"""
149+
async with self._file_semaphore:
150+
loop = asyncio.get_event_loop()
151+
return await loop.run_in_executor(self._thread_pool, file_path.read_text, "utf-8")
142152

143153
async def _compute_checksum_async(self, path: str) -> str:
144-
"""Compute file checksum in thread pool to avoid blocking the event loop."""
154+
"""Compute file checksum in thread pool to avoid blocking the event loop.
155+
156+
Uses semaphore to limit concurrent file reads and prevent OOM on large projects.
157+
"""
145158

146159
def _sync_compute_checksum(path_str: str) -> str:
147160
# Synchronous version for thread pool execution
@@ -161,8 +174,9 @@ def _sync_compute_checksum(path_str: str) -> str:
161174
content_bytes = content
162175
return hashlib.sha256(content_bytes).hexdigest()
163176

164-
loop = asyncio.get_event_loop()
165-
return await loop.run_in_executor(self._thread_pool, _sync_compute_checksum, path)
177+
async with self._file_semaphore:
178+
loop = asyncio.get_event_loop()
179+
return await loop.run_in_executor(self._thread_pool, _sync_compute_checksum, path)
166180

167181
def __del__(self):
168182
"""Cleanup thread pool when service is destroyed."""
@@ -212,6 +226,8 @@ async def _should_skip_file(self, path: str) -> bool:
212226
async def _record_failure(self, path: str, error: str) -> None:
213227
"""Record a file sync failure for circuit breaker tracking.
214228
229+
Uses LRU cache with bounded size to prevent unbounded memory growth.
230+
215231
Args:
216232
path: File path that failed
217233
error: Error message from the failure
@@ -226,12 +242,13 @@ async def _record_failure(self, path: str, error: str) -> None:
226242
checksum = ""
227243

228244
if path in self._file_failures:
229-
# Update existing failure record
230-
failure_info = self._file_failures[path]
245+
# Update existing failure record and move to end (most recently used)
246+
failure_info = self._file_failures.pop(path)
231247
failure_info.count += 1
232248
failure_info.last_failure = now
233249
failure_info.last_error = error
234250
failure_info.last_checksum = checksum
251+
self._file_failures[path] = failure_info
235252

236253
logger.warning(
237254
f"File sync failed (attempt {failure_info.count}/{MAX_CONSECUTIVE_FAILURES}): "
@@ -255,6 +272,14 @@ async def _record_failure(self, path: str, error: str) -> None:
255272
)
256273
logger.debug(f"Recording first failure for {path}: {error}")
257274

275+
# Enforce cache size limit - remove oldest entry if over limit
276+
if len(self._file_failures) > self._max_tracked_failures:
277+
removed_path, removed_info = self._file_failures.popitem(last=False)
278+
logger.debug(
279+
f"Evicting oldest failure record from cache: path={removed_path}, "
280+
f"failures={removed_info.count}"
281+
)
282+
258283
def _clear_failure(self, path: str) -> None:
259284
"""Clear failure tracking for a file after successful sync.
260285

0 commit comments

Comments
 (0)