From 689bae9e9f55f1026af8b3e1fb1576974d444c71 Mon Sep 17 00:00:00 2001 From: Petr Date: Mon, 10 Nov 2025 21:48:54 -0800 Subject: [PATCH] fix(security): prevent path traversal and add memory-efficient file handling Fixes two security and performance issues in file directory handling: 1. Path Traversal Vulnerability (HIGH): - files_from_dir/async_files_from_dir did not check for symlinks - Symlinks could point outside the root directory (e.g., to /etc/passwd) - Attackers could read sensitive files while reporting harmless paths - Fix: Resolve all paths and verify they stay within root directory 2. Memory Exhaustion (MEDIUM): - All files in a directory tree were loaded into memory simultaneously - Large directories (hundreds of MB) could exhaust available RAM - Fix: Add streaming iterator versions (files_from_dir_iter, async_files_from_dir_iter) that yield files one at a time - Original functions preserved for backwards compatibility Implementation details: - Added _is_path_within_root() helper for Python 3.8 compatibility (Path.is_relative_to() was added in Python 3.9) - Fixed async functions to handle Path/anyio.Path conversion correctly (await path.resolve() returns standard Path, not anyio.Path) - Added error handling for PermissionError and OSError to prevent DoS via unreadable files or broken symlinks - All path checks use resolve() + relative_to() for safe containment Changed: - src/anthropic/lib/_files.py: * Added _is_path_within_root() for Python 3.8 compatibility * Added path.resolve() and containment checks to all functions * Fixed async functions to properly handle Path types * Added error handling for permission errors * Added files_from_dir_iter() for streaming * Added async_files_from_dir_iter() for async streaming --- src/anthropic/lib/_files.py | 159 ++++++++++++++++++++++++++++++++---- 1 file changed, 145 insertions(+), 14 deletions(-) diff --git a/src/anthropic/lib/_files.py b/src/anthropic/lib/_files.py index 558619fb..a5d44e71 100644 --- a/src/anthropic/lib/_files.py +++ b/src/anthropic/lib/_files.py @@ -2,41 +2,172 @@ import os from pathlib import Path +from typing import Iterator, AsyncIterator import anyio from .._types import FileTypes +def _is_path_within_root(path: Path, root: Path) -> bool: + """ + Check if a path is within root directory. + Python 3.8 compatible alternative to Path.is_relative_to(). + """ + try: + path.relative_to(root) + return True + except ValueError: + return False + + def files_from_dir(directory: str | os.PathLike[str]) -> list[FileTypes]: - path = Path(directory) + path = Path(directory).resolve() files: list[FileTypes] = [] - _collect_files(path, path.parent, files) + _collect_files(path, path.parent, files, root=path) return files -def _collect_files(directory: Path, relative_to: Path, files: list[FileTypes]) -> None: - for path in directory.iterdir(): - if path.is_dir(): - _collect_files(path, relative_to, files) - continue +def _collect_files(directory: Path, relative_to: Path, files: list[FileTypes], root: Path) -> None: + try: + items = list(directory.iterdir()) + except (PermissionError, OSError): + # Skip directories we can't read + return - files.append((str(path.relative_to(relative_to)), path.read_bytes())) + for path in items: + try: + # Resolve symlinks and check they don't escape the root directory + resolved_path = path.resolve() + if not _is_path_within_root(resolved_path, root): + continue + + if resolved_path.is_dir(): + _collect_files(resolved_path, relative_to, files, root=root) + continue + + with open(resolved_path, 'rb') as f: + content = f.read() + files.append((str(path.relative_to(relative_to)), content)) + except (PermissionError, OSError): + # Skip files/symlinks we can't read or resolve + continue async def async_files_from_dir(directory: str | os.PathLike[str]) -> list[FileTypes]: path = anyio.Path(directory) + # Resolve to get absolute path - returns a standard Path, not anyio.Path + resolved_root = Path(await path.resolve()) files: list[FileTypes] = [] - await _async_collect_files(path, path.parent, files) + await _async_collect_files(path, path.parent, files, root=resolved_root) return files -async def _async_collect_files(directory: anyio.Path, relative_to: anyio.Path, files: list[FileTypes]) -> None: - async for path in directory.iterdir(): - if await path.is_dir(): - await _async_collect_files(path, relative_to, files) +async def _async_collect_files( + directory: anyio.Path, relative_to: anyio.Path, files: list[FileTypes], root: Path +) -> None: + try: + items = [item async for item in directory.iterdir()] + except (PermissionError, OSError): + # Skip directories we can't read + return + + for path in items: + try: + # Resolve symlinks - this returns a standard Path, not anyio.Path + resolved_path = Path(await path.resolve()) + + # Check containment using standard Path methods (no await needed) + if not _is_path_within_root(resolved_path, root): + continue + + if resolved_path.is_dir(): + # Convert back to anyio.Path for recursive call + await _async_collect_files(anyio.Path(resolved_path), relative_to, files, root=root) + continue + + async with await anyio.open_file(resolved_path, 'rb') as f: + content = await f.read() + files.append((str(path.relative_to(relative_to)), content)) + except (PermissionError, OSError): + # Skip files/symlinks we can't read or resolve continue - files.append((str(path.relative_to(relative_to)), await path.read_bytes())) + +def files_from_dir_iter(directory: str | os.PathLike[str]) -> Iterator[FileTypes]: + """ + Memory-efficient streaming version of files_from_dir. + Yields files one at a time instead of loading all into memory. + """ + path = Path(directory).resolve() + yield from _collect_files_iter(path, path.parent, root=path) + + +def _collect_files_iter(directory: Path, relative_to: Path, root: Path) -> Iterator[FileTypes]: + try: + items = list(directory.iterdir()) + except (PermissionError, OSError): + # Skip directories we can't read + return + + for path in items: + try: + # Resolve symlinks and check they don't escape the root directory + resolved_path = path.resolve() + if not _is_path_within_root(resolved_path, root): + continue + + if resolved_path.is_dir(): + yield from _collect_files_iter(resolved_path, relative_to, root=root) + continue + + with open(resolved_path, 'rb') as f: + content = f.read() + yield (str(path.relative_to(relative_to)), content) + except (PermissionError, OSError): + # Skip files/symlinks we can't read or resolve + continue + + +async def async_files_from_dir_iter(directory: str | os.PathLike[str]) -> AsyncIterator[FileTypes]: + """ + Memory-efficient streaming version of async_files_from_dir. + Yields files one at a time instead of loading all into memory. + """ + path = anyio.Path(directory) + resolved_root = Path(await path.resolve()) + async for file in _async_collect_files_iter(path, path.parent, root=resolved_root): + yield file + + +async def _async_collect_files_iter( + directory: anyio.Path, relative_to: anyio.Path, root: Path +) -> AsyncIterator[FileTypes]: + try: + items = [item async for item in directory.iterdir()] + except (PermissionError, OSError): + # Skip directories we can't read + return + + for path in items: + try: + # Resolve symlinks - returns standard Path + resolved_path = Path(await path.resolve()) + + # Check containment (no await - it's a standard Path method) + if not _is_path_within_root(resolved_path, root): + continue + + if resolved_path.is_dir(): + async for file in _async_collect_files_iter(anyio.Path(resolved_path), relative_to, root=root): + yield file + continue + + async with await anyio.open_file(resolved_path, 'rb') as f: + content = await f.read() + yield (str(path.relative_to(relative_to)), content) + except (PermissionError, OSError): + # Skip files/symlinks we can't read or resolve + continue