Skip to content

Commit 1091e11

Browse files
phernandezclaude
andauthored
fix: Make sync operations truly non-blocking with thread pool (#309)
Signed-off-by: phernandez <[email protected]> Co-authored-by: Claude <[email protected]>
1 parent f40ab31 commit 1091e11

File tree

4 files changed

+77
-33
lines changed

4 files changed

+77
-33
lines changed

src/basic_memory/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ class BasicMemoryConfig(BaseSettings):
8282
description="Whether to sync changes in real time. default (True)",
8383
)
8484

85+
sync_thread_pool_size: int = Field(
86+
default=4,
87+
description="Size of thread pool for file I/O operations in sync service",
88+
gt=0,
89+
)
90+
8591
kebab_filenames: bool = Field(
8692
default=False,
8793
description="Format for generated filenames. False preserves spaces and special chars, True converts them to hyphens for consistency with permalinks",

src/basic_memory/services/initialization.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,31 +101,38 @@ async def initialize_file_sync(
101101
# Get active projects
102102
active_projects = await project_repository.get_active_projects()
103103

104-
# First, sync all projects sequentially
105-
for project in active_projects:
104+
# Start sync for all projects as background tasks (non-blocking)
105+
async def sync_project_background(project):
106+
"""Sync a single project in the background."""
106107
# avoid circular imports
107108
from basic_memory.cli.commands.sync import get_sync_service
108109

109-
logger.info(f"Starting sync for project: {project.name}")
110-
sync_service = await get_sync_service(project)
111-
sync_dir = Path(project.path)
112-
110+
logger.info(f"Starting background sync for project: {project.name}")
113111
try:
112+
sync_service = await get_sync_service(project)
113+
sync_dir = Path(project.path)
114114
await sync_service.sync(sync_dir, project_name=project.name)
115-
logger.info(f"Sync completed successfully for project: {project.name}")
115+
logger.info(f"Background sync completed successfully for project: {project.name}")
116116

117117
# Mark project as watching for changes after successful sync
118118
from basic_memory.services.sync_status_service import sync_status_tracker
119119

120120
sync_status_tracker.start_project_watch(project.name)
121121
logger.info(f"Project {project.name} is now watching for changes")
122122
except Exception as e: # pragma: no cover
123-
logger.error(f"Error syncing project {project.name}: {e}")
123+
logger.error(f"Error in background sync for project {project.name}: {e}")
124124
# Mark sync as failed for this project
125125
from basic_memory.services.sync_status_service import sync_status_tracker
126126

127127
sync_status_tracker.fail_project_sync(project.name, str(e))
128-
# Continue with other projects even if one fails
128+
129+
# Create background tasks for all project syncs (non-blocking)
130+
sync_tasks = [
131+
asyncio.create_task(sync_project_background(project)) for project in active_projects
132+
]
133+
logger.info(f"Created {len(sync_tasks)} background sync tasks")
134+
135+
# Don't await the tasks - let them run in background while we continue
129136

130137
# Then start the watch service in the background
131138
logger.info("Starting watch service for all projects")

src/basic_memory/sync/sync_service.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
"""Service for syncing files between filesystem and database."""
22

3+
import asyncio
34
import os
45
import time
6+
from concurrent.futures import ThreadPoolExecutor
57
from dataclasses import dataclass, field
68
from datetime import datetime
79
from pathlib import Path
@@ -80,6 +82,41 @@ def __init__(
8082
self.relation_repository = relation_repository
8183
self.search_service = search_service
8284
self.file_service = file_service
85+
self._thread_pool = ThreadPoolExecutor(max_workers=app_config.sync_thread_pool_size)
86+
87+
async def _read_file_async(self, file_path: Path) -> str:
88+
"""Read file content in thread pool to avoid blocking the event loop."""
89+
loop = asyncio.get_event_loop()
90+
return await loop.run_in_executor(self._thread_pool, file_path.read_text, "utf-8")
91+
92+
async def _compute_checksum_async(self, path: str) -> str:
93+
"""Compute file checksum in thread pool to avoid blocking the event loop."""
94+
95+
def _sync_compute_checksum(path_str: str) -> str:
96+
# Synchronous version for thread pool execution
97+
path_obj = self.file_service.base_path / path_str
98+
99+
if self.file_service.is_markdown(path_str):
100+
content = path_obj.read_text(encoding="utf-8")
101+
else:
102+
content = path_obj.read_bytes()
103+
104+
# Use the synchronous version of compute_checksum
105+
import hashlib
106+
107+
if isinstance(content, str):
108+
content_bytes = content.encode("utf-8")
109+
else:
110+
content_bytes = content
111+
return hashlib.sha256(content_bytes).hexdigest()
112+
113+
loop = asyncio.get_event_loop()
114+
return await loop.run_in_executor(self._thread_pool, _sync_compute_checksum, path)
115+
116+
def __del__(self):
117+
"""Cleanup thread pool when service is destroyed."""
118+
if hasattr(self, "_thread_pool"):
119+
self._thread_pool.shutdown(wait=False)
83120

84121
async def sync(self, directory: Path, project_name: Optional[str] = None) -> SyncReport:
85122
"""Sync all files with database."""
@@ -289,7 +326,7 @@ async def sync_markdown_file(self, path: str, new: bool = True) -> Tuple[Optiona
289326
logger.debug(f"Parsing markdown file, path: {path}, new: {new}")
290327

291328
file_path = self.entity_parser.base_path / path
292-
file_content = file_path.read_text(encoding="utf-8")
329+
file_content = await self._read_file_async(file_path)
293330
file_contains_frontmatter = has_frontmatter(file_content)
294331

295332
# entity markdown will always contain front matter, so it can be used up create/update the entity
@@ -326,7 +363,7 @@ async def sync_markdown_file(self, path: str, new: bool = True) -> Tuple[Optiona
326363
# After updating relations, we need to compute the checksum again
327364
# This is necessary for files with wikilinks to ensure consistent checksums
328365
# after relation processing is complete
329-
final_checksum = await self.file_service.compute_checksum(path)
366+
final_checksum = await self._compute_checksum_async(path)
330367

331368
# set checksum
332369
await self.entity_repository.update(entity.id, {"checksum": final_checksum})
@@ -350,7 +387,7 @@ async def sync_regular_file(self, path: str, new: bool = True) -> Tuple[Optional
350387
Returns:
351388
Tuple of (entity, checksum)
352389
"""
353-
checksum = await self.file_service.compute_checksum(path)
390+
checksum = await self._compute_checksum_async(path)
354391
if new:
355392
# Generate permalink from path
356393
await self.entity_service.resolve_permalink(path)
@@ -620,7 +657,7 @@ async def scan_directory(self, directory: Path) -> ScanResult:
620657

621658
path = Path(root) / filename
622659
rel_path = path.relative_to(directory).as_posix()
623-
checksum = await self.file_service.compute_checksum(rel_path)
660+
checksum = await self._compute_checksum_async(rel_path)
624661
result.files[rel_path] = checksum
625662
result.checksums[checksum] = rel_path
626663

tests/services/test_initialization.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""Tests for the initialization service."""
22

3-
from pathlib import Path
43
from unittest.mock import patch, MagicMock, AsyncMock
54

65
import pytest
@@ -127,10 +126,11 @@ async def test_reconcile_projects_with_error_handling(mock_get_db, app_config):
127126
@patch("basic_memory.services.initialization.db.get_or_create_db")
128127
@patch("basic_memory.cli.commands.sync.get_sync_service")
129128
@patch("basic_memory.sync.WatchService")
130-
async def test_initialize_file_sync_sequential(
131-
mock_watch_service_class, mock_get_sync_service, mock_get_db, app_config
129+
@patch("basic_memory.services.initialization.asyncio.create_task")
130+
async def test_initialize_file_sync_background_tasks(
131+
mock_create_task, mock_watch_service_class, mock_get_sync_service, mock_get_db, app_config
132132
):
133-
"""Test file sync initialization with sequential project processing."""
133+
"""Test file sync initialization with background task processing."""
134134
# Setup mocks
135135
mock_session_maker = AsyncMock()
136136
mock_get_db.return_value = (None, mock_session_maker)
@@ -154,6 +154,11 @@ async def test_initialize_file_sync_sequential(
154154
mock_sync_service.sync = AsyncMock()
155155
mock_get_sync_service.return_value = mock_sync_service
156156

157+
# Mock background tasks
158+
mock_task1 = MagicMock()
159+
mock_task2 = MagicMock()
160+
mock_create_task.side_effect = [mock_task1, mock_task2]
161+
157162
# Mock the repository
158163
with patch("basic_memory.services.initialization.ProjectRepository") as mock_repo_class:
159164
mock_repo_class.return_value = mock_repository
@@ -165,22 +170,11 @@ async def test_initialize_file_sync_sequential(
165170
# Assertions
166171
mock_repository.get_active_projects.assert_called_once()
167172

168-
# Should call sync for each project sequentially
169-
assert mock_get_sync_service.call_count == 2
170-
mock_get_sync_service.assert_any_call(mock_project1)
171-
mock_get_sync_service.assert_any_call(mock_project2)
173+
# Should create background tasks for each project (non-blocking)
174+
assert mock_create_task.call_count == 2
172175

173-
# Should call sync on each project
174-
assert mock_sync_service.sync.call_count == 2
175-
mock_sync_service.sync.assert_any_call(
176-
Path(mock_project1.path), project_name=mock_project1.name
177-
)
178-
mock_sync_service.sync.assert_any_call(
179-
Path(mock_project2.path), project_name=mock_project2.name
180-
)
176+
# Verify tasks were created but not awaited (function returns immediately)
177+
assert result is None
181178

182-
# Should start the watch service
179+
# Watch service should still be started
183180
mock_watch_service.run.assert_called_once()
184-
185-
# Should return None
186-
assert result is None

0 commit comments

Comments
 (0)