Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 123 additions & 156 deletions src/basic_memory/sync/sync_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ async def sync(self, directory: Path, project_name: Optional[str] = None) -> Syn
# Update scan watermark after successful sync
# Use the timestamp from sync start (not end) to ensure we catch files
# created during the sync on the next iteration
current_file_count = await self._quick_count_files(directory)
current_file_count = await self._count_files(directory)
if self.entity_repository.project_id is not None:
project = await self.project_repository.find_by_id(self.entity_repository.project_id)
if project:
Expand Down Expand Up @@ -382,6 +382,36 @@ async def sync(self, directory: Path, project_name: Optional[str] = None) -> Syn

return report

async def _count_files(self, directory: Path) -> int:
"""Fast file count using find command.

Uses subprocess to leverage OS-level file counting which is much faster
than Python iteration, especially on network filesystems like TigrisFS.

Args:
directory: Directory to count files in

Returns:
Number of files in directory (recursive)
"""
process = await asyncio.create_subprocess_shell(
f'find "{directory}" -type f | wc -l',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()

if process.returncode != 0:
error_msg = stderr.decode().strip()
logger.error(
f"FILE COUNT FAILED: find command returned exit code {process.returncode}, "
f"error: {error_msg}"
)
logfire.metric_counter("sync.scan.file_count_failure").add(1)
raise RuntimeError(f"File count failed (exit code {process.returncode}): {error_msg}")

return int(stdout.strip())

@logfire.instrument()
async def scan(self, directory):
"""Smart scan using watermark and file count for large project optimization.
Expand All @@ -391,16 +421,16 @@ async def scan(self, directory):
- Uses `find -newermt` for incremental scanning (only changed files)
- Falls back to full scan when deletions detected (file count decreased)

Expected performance:
- No changes: 225x faster (2s vs 450s for 1,460 files on TigrisFS)
- Few changes: 84x faster (5s vs 420s)
Expected performance (with find optimization):
- Full scan: 52 minutes → 2-3 minutes for 1.4k files on TigrisFS
- Incremental scan: Already fast at 200-600ms
- Deletions: Full scan (rare, acceptable)

Architecture:
- Get current file count quickly (find | wc -l: 1.4s)
- Get current file count quickly (find | wc -l: ~1.4s)
- Compare with last_file_count to detect deletions
- If no deletions: incremental scan with find -newermt (0.2s)
- Process changed files with mtime-based comparison
- Use unified scan_directory() with optional since_timestamp filter
- Process files with mtime-based comparison
"""
scan_start_time = time.time()

Expand All @@ -416,15 +446,15 @@ async def scan(self, directory):

# Step 1: Quick file count
logger.debug("Counting files in directory")
current_count = await self._quick_count_files(directory)
current_count = await self._count_files(directory)
logger.debug(f"Found {current_count} files in directory")

# Step 2: Determine scan strategy based on watermark and file count
since_timestamp: Optional[float] = None
if project.last_file_count is None:
# First sync ever → full scan
scan_type = "full_initial"
logger.info("First sync for this project, performing full scan")
file_paths_to_scan = await self._scan_directory_full(directory)

elif current_count < project.last_file_count:
# Files deleted → need full scan to detect which ones
Expand All @@ -433,49 +463,42 @@ async def scan(self, directory):
f"File count decreased ({project.last_file_count} → {current_count}), "
f"running full scan to detect deletions"
)
file_paths_to_scan = await self._scan_directory_full(directory)

elif project.last_scan_timestamp is not None:
# Incremental scan: only files modified since last scan
scan_type = "incremental"
since_timestamp = project.last_scan_timestamp
logger.info(
f"Running incremental scan for files modified since {project.last_scan_timestamp}"
)
file_paths_to_scan = await self._scan_directory_modified_since(
directory, project.last_scan_timestamp
)
logger.info(
f"Incremental scan found {len(file_paths_to_scan)} potentially changed files"
)

else:
# Fallback to full scan (no watermark available)
scan_type = "full_fallback"
logger.warning("No scan watermark available, falling back to full scan")
file_paths_to_scan = await self._scan_directory_full(directory)

# Record scan type metric
logfire.metric_counter(f"sync.scan.{scan_type}").add(1)
logfire.metric_histogram("sync.scan.files_scanned", unit="files").record(
len(file_paths_to_scan)
)

# Step 3: Process each file with mtime-based comparison
# Step 3: Process each file with mtime-based comparison using unified scan_directory
scanned_paths: Set[str] = set()
changed_checksums: Dict[str, str] = {}
files_scanned = 0

logger.debug(f"Processing {len(file_paths_to_scan)} files with mtime-based comparison")
logger.debug(f"Starting {scan_type} scan with unified scan_directory")

for rel_path in file_paths_to_scan:
scanned_paths.add(rel_path)
async for abs_path_str, stat_info in self.scan_directory(directory, since_timestamp):
files_scanned += 1
abs_path = Path(abs_path_str)

# Get file stats
abs_path = directory / rel_path
if not abs_path.exists():
# File was deleted between scan and now (race condition)
# Convert to relative path
try:
rel_path = abs_path.relative_to(directory).as_posix()
except ValueError:
logger.warning(f"Skipping file not under directory: {abs_path}")
continue

stat_info = abs_path.stat()
scanned_paths.add(rel_path)

# Indexed lookup - single file query (not full table scan)
db_entity = await self.entity_repository.get_by_file_path(rel_path)
Expand Down Expand Up @@ -517,6 +540,13 @@ async def scan(self, directory):
# File unchanged - no checksum needed
logger.trace(f"File unchanged (mtime/size match): {rel_path}")

# Log incremental scan results
if scan_type == "incremental":
logger.info(f"Incremental scan found {files_scanned} potentially changed files")

# Record files scanned metric
logfire.metric_histogram("sync.scan.files_scanned", unit="files").record(files_scanned)

# Step 4: Detect moves (for both full and incremental scans)
# Check if any "new" files are actually moves by matching checksums
for new_path in list(report.new): # Use list() to allow modification during iteration
Expand Down Expand Up @@ -1019,164 +1049,101 @@ async def resolve_relations(self, entity_id: int | None = None):
# update search index
await self.search_service.index_entity(resolved_entity)

async def _quick_count_files(self, directory: Path) -> int:
"""Fast file count using find command.

Uses subprocess to leverage OS-level file counting which is much faster
than Python iteration, especially on network filesystems like TigrisFS.

Args:
directory: Directory to count files in

Returns:
Number of files in directory (recursive)
"""
process = await asyncio.create_subprocess_shell(
f'find "{directory}" -type f | wc -l',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()

if process.returncode != 0:
error_msg = stderr.decode().strip()
logger.error(
f"FILE COUNT OPTIMIZATION FAILED: find command failed with exit code {process.returncode}, "
f"error: {error_msg}. Falling back to manual count. "
f"This will slow down watermark detection!"
)
# Track optimization failures for visibility
logfire.metric_counter("sync.scan.file_count_failure").add(1)
# Fallback: count using scan_directory
count = 0
async for _ in self.scan_directory(directory):
count += 1
return count

return int(stdout.strip())
async def scan_directory(
self, directory: Path, since_timestamp: Optional[float] = None
) -> AsyncIterator[Tuple[str, os.stat_result]]:
"""Scan directory using find command (optimized for network filesystems).

async def _scan_directory_modified_since(
self, directory: Path, since_timestamp: float
) -> List[str]:
"""Use find -newermt for filesystem-level filtering of modified files.
Uses server-side `find` command with `-printf` to get path, mtime, and size
in a single network operation per directory level. This eliminates per-file
stat() calls that cause thousands of network round trips on network filesystems
like TigrisFS.

This is dramatically faster than scanning all files and comparing mtimes,
especially on network filesystems like TigrisFS where stat operations are expensive.
Performance: ~1 network operation per directory level vs 1.4k operations for
1.4k files with Python scandir().

Args:
directory: Directory to scan
since_timestamp: Unix timestamp to find files newer than
since_timestamp: Optional - only return files modified after this timestamp

Returns:
List of relative file paths modified since the timestamp (respects .bmignore)
Yields:
Tuples of (absolute_file_path, stat_info) for each file
"""
# Convert timestamp to find-compatible format
since_date = datetime.fromtimestamp(since_timestamp).strftime("%Y-%m-%d %H:%M:%S")
# Build find command with printf to get path + mtime + size in one operation
# Format: path\tmtime\tsize (tab-separated for easy parsing)
cmd = f'find "{directory}" -type f -printf "%p\\t%T@\\t%s\\n"'
if since_timestamp:
since_date = datetime.fromtimestamp(since_timestamp).strftime("%Y-%m-%d %H:%M:%S")
cmd += f' -newermt "{since_date}"'

process = await asyncio.create_subprocess_shell(
f'find "{directory}" -type f -newermt "{since_date}"',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()

if process.returncode != 0:
error_msg = stderr.decode().strip()
logger.error(
f"SCAN OPTIMIZATION FAILED: find -newermt command failed with exit code {process.returncode}, "
f"error: {error_msg}. Falling back to full scan. "
f"This will cause slow syncs on large projects!"
f"SCAN OPTIMIZATION FAILED: find command failed with exit code {process.returncode}, "
f"error: {error_msg}. This will cause sync failures!"
)
# Track optimization failures for visibility
logfire.metric_counter("sync.scan.optimization_failure").add(1)
# Fallback to full scan
return await self._scan_directory_full(directory)
logfire.metric_counter("sync.scan.find_command_failure").add(1)
raise RuntimeError(
f"Find command failed (exit code {process.returncode}): {error_msg}"
)

# Convert absolute paths to relative and filter through ignore patterns
file_paths = []
# Parse find output and yield results
for line in stdout.decode().splitlines():
if line:
try:
abs_path = Path(line)
rel_path = abs_path.relative_to(directory).as_posix()

# Apply ignore patterns (same as scan_directory)
if should_ignore_path(abs_path, directory, self._ignore_patterns):
logger.trace(f"Ignoring path per .bmignore: {rel_path}")
continue
if not line:
continue

file_paths.append(rel_path)
except ValueError:
# Path is not relative to directory, skip it
logger.warning(f"Skipping file not under directory: {line}")
try:
# Parse tab-separated values
parts = line.split("\t")
if len(parts) != 3:
logger.warning(f"Unexpected find output format: {line}")
continue

return file_paths

async def _scan_directory_full(self, directory: Path) -> List[str]:
"""Full directory scan returning all file paths.

Uses scan_directory() which respects .bmignore patterns.

Args:
directory: Directory to scan

Returns:
List of relative file paths (respects .bmignore)
"""
file_paths = []
async for file_path_str, _ in self.scan_directory(directory):
rel_path = Path(file_path_str).relative_to(directory).as_posix()
file_paths.append(rel_path)
return file_paths

async def scan_directory(self, directory: Path) -> AsyncIterator[Tuple[str, os.stat_result]]:
"""Stream files from directory using aiofiles.os.scandir() with cached stat info.

This method uses aiofiles.os.scandir() to leverage async I/O and cached stat
information from directory entries. This reduces network I/O by 50% on network
filesystems like TigrisFS by avoiding redundant stat() calls.
abs_path_str, mtime_str, size_str = parts
abs_path = Path(abs_path_str)

Args:
directory: Directory to scan

Yields:
Tuples of (absolute_file_path, stat_info) for each file
"""
try:
entries = await aiofiles.os.scandir(directory)
except PermissionError:
logger.warning(f"Permission denied scanning directory: {directory}")
return
# Apply ignore patterns (same as old scan_directory)
if should_ignore_path(abs_path, directory, self._ignore_patterns):
logger.trace(f"Ignoring path per .bmignore: {abs_path.relative_to(directory)}")
continue

results = []
subdirs = []
# Parse mtime and size
mtime = float(mtime_str)
size = int(size_str)

# Create minimal stat_result with fields we need
# We use os.stat_result to maintain compatibility with existing code
# Only st_mtime and st_size are used by sync_service
stat_info = os.stat_result(
(
0, # st_mode
0, # st_ino
0, # st_dev
0, # st_nlink
0, # st_uid
0, # st_gid
size, # st_size
0, # st_atime
mtime, # st_mtime
0, # st_ctime
)
)

for entry in entries:
entry_path = Path(entry.path)
yield (str(abs_path), stat_info)

# Check ignore patterns
if should_ignore_path(entry_path, directory, self._ignore_patterns):
logger.trace(f"Ignoring path per .bmignore: {entry_path.relative_to(directory)}")
except (ValueError, IndexError) as e:
logger.warning(f"Failed to parse find output line: {line}, error: {e}")
continue

if entry.is_dir(follow_symlinks=False):
# Collect subdirectories to recurse into
subdirs.append(entry_path)
elif entry.is_file(follow_symlinks=False):
# Get cached stat info (no extra syscall!)
stat_info = entry.stat(follow_symlinks=False)
results.append((entry.path, stat_info))

# Yield files from current directory
for file_path, stat_info in results:
yield (file_path, stat_info)

# Recurse into subdirectories
for subdir in subdirs:
async for result in self.scan_directory(subdir):
yield result


async def get_sync_service(project: Project) -> SyncService: # pragma: no cover
"""Get sync service instance with all dependencies."""
Expand Down
Loading