diff --git a/examples/kademlia/kademlia.py b/examples/kademlia/kademlia.py index 5daa70d76..faaa66be7 100644 --- a/examples/kademlia/kademlia.py +++ b/examples/kademlia/kademlia.py @@ -41,6 +41,7 @@ from libp2p.tools.utils import ( info_from_p2p_addr, ) +from libp2p.utils.paths import get_script_dir, join_paths # Configure logging logging.basicConfig( @@ -53,8 +54,8 @@ # Configure DHT module loggers to inherit from the parent logger # This ensures all kademlia-example.* loggers use the same configuration # Get the directory where this script is located -SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) -SERVER_ADDR_LOG = os.path.join(SCRIPT_DIR, "server_node_addr.txt") +SCRIPT_DIR = get_script_dir(__file__) +SERVER_ADDR_LOG = join_paths(SCRIPT_DIR, "server_node_addr.txt") # Set the level for all child loggers for module in [ diff --git a/libp2p/utils/logging.py b/libp2p/utils/logging.py index 3458a41e7..b23136f51 100644 --- a/libp2p/utils/logging.py +++ b/libp2p/utils/logging.py @@ -1,7 +1,4 @@ import atexit -from datetime import ( - datetime, -) import logging import logging.handlers import os @@ -21,6 +18,9 @@ # Store the current listener to stop it on exit _current_listener: logging.handlers.QueueListener | None = None +# Store the handlers for proper cleanup +_current_handlers: list[logging.Handler] = [] + # Event to track when the listener is ready _listener_ready = threading.Event() @@ -95,7 +95,7 @@ def setup_logging() -> None: - Child loggers inherit their parent's level unless explicitly set - The root libp2p logger controls the default level """ - global _current_listener, _listener_ready + global _current_listener, _listener_ready, _current_handlers # Reset the event _listener_ready.clear() @@ -105,6 +105,12 @@ def setup_logging() -> None: _current_listener.stop() _current_listener = None + # Close and clear existing handlers + for handler in _current_handlers: + if isinstance(handler, logging.FileHandler): + handler.close() + _current_handlers.clear() + # Get the log level from environment variable debug_str = os.environ.get("LIBP2P_DEBUG", "") @@ -148,13 +154,10 @@ def setup_logging() -> None: log_path = Path(log_file) log_path.parent.mkdir(parents=True, exist_ok=True) else: - # Default log file with timestamp and unique identifier - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") - unique_id = os.urandom(4).hex() # Add a unique identifier to prevent collisions - if os.name == "nt": # Windows - log_file = f"C:\\Windows\\Temp\\py-libp2p_{timestamp}_{unique_id}.log" - else: # Unix-like - log_file = f"/tmp/py-libp2p_{timestamp}_{unique_id}.log" + # Use cross-platform temp file creation + from libp2p.utils.paths import create_temp_file + + log_file = str(create_temp_file(prefix="py-libp2p_", suffix=".log")) # Print the log file path so users know where to find it print(f"Logging to: {log_file}", file=sys.stderr) @@ -195,6 +198,9 @@ def setup_logging() -> None: logger.setLevel(level) logger.propagate = False # Prevent message duplication + # Store handlers globally for cleanup + _current_handlers.extend(handlers) + # Start the listener AFTER configuring all loggers _current_listener = logging.handlers.QueueListener( log_queue, *handlers, respect_handler_level=True @@ -209,7 +215,13 @@ def setup_logging() -> None: @atexit.register def cleanup_logging() -> None: """Clean up logging resources on exit.""" - global _current_listener + global _current_listener, _current_handlers if _current_listener is not None: _current_listener.stop() _current_listener = None + + # Close all file handlers to ensure proper cleanup on Windows + for handler in _current_handlers: + if isinstance(handler, logging.FileHandler): + handler.close() + _current_handlers.clear() diff --git a/libp2p/utils/paths.py b/libp2p/utils/paths.py new file mode 100644 index 000000000..23f10dc6f --- /dev/null +++ b/libp2p/utils/paths.py @@ -0,0 +1,267 @@ +""" +Cross-platform path utilities for py-libp2p. + +This module provides standardized path operations to ensure consistent +behavior across Windows, macOS, and Linux platforms. +""" + +import os +from pathlib import Path +import sys +import tempfile +from typing import Union + +PathLike = Union[str, Path] + + +def get_temp_dir() -> Path: + """ + Get cross-platform temporary directory. + + Returns: + Path: Platform-specific temporary directory path + + """ + return Path(tempfile.gettempdir()) + + +def get_project_root() -> Path: + """ + Get the project root directory. + + Returns: + Path: Path to the py-libp2p project root + + """ + # Navigate from libp2p/utils/paths.py to project root + return Path(__file__).parent.parent.parent + + +def join_paths(*parts: PathLike) -> Path: + """ + Cross-platform path joining. + + Args: + *parts: Path components to join + + Returns: + Path: Joined path using platform-appropriate separator + + """ + return Path(*parts) + + +def ensure_dir_exists(path: PathLike) -> Path: + """ + Ensure directory exists, create if needed. + + Args: + path: Directory path to ensure exists + + Returns: + Path: Path object for the directory + + """ + path_obj = Path(path) + path_obj.mkdir(parents=True, exist_ok=True) + return path_obj + + +def get_config_dir() -> Path: + """ + Get user config directory (cross-platform). + + Returns: + Path: Platform-specific config directory + + """ + if os.name == "nt": # Windows + appdata = os.environ.get("APPDATA", "") + if appdata: + return Path(appdata) / "py-libp2p" + else: + # Fallback to user home directory + return Path.home() / "AppData" / "Roaming" / "py-libp2p" + else: # Unix-like (Linux, macOS) + return Path.home() / ".config" / "py-libp2p" + + +def get_script_dir(script_path: PathLike | None = None) -> Path: + """ + Get the directory containing a script file. + + Args: + script_path: Path to the script file. If None, uses __file__ + + Returns: + Path: Directory containing the script + + Raises: + RuntimeError: If script path cannot be determined + + """ + if script_path is None: + # This will be the directory of the calling script + import inspect + + frame = inspect.currentframe() + if frame and frame.f_back: + script_path = frame.f_back.f_globals.get("__file__") + else: + raise RuntimeError("Could not determine script path") + + if script_path is None: + raise RuntimeError("Script path is None") + + return Path(script_path).parent.absolute() + + +def create_temp_file(prefix: str = "py-libp2p_", suffix: str = ".log") -> Path: + """ + Create a temporary file with a unique name. + + Args: + prefix: File name prefix + suffix: File name suffix + + Returns: + Path: Path to the created temporary file + + """ + temp_dir = get_temp_dir() + # Create a unique filename using timestamp and random bytes + import secrets + import time + + timestamp = time.strftime("%Y%m%d_%H%M%S") + microseconds = f"{time.time() % 1:.6f}"[2:] # Get microseconds as string + unique_id = secrets.token_hex(4) + filename = f"{prefix}{timestamp}_{microseconds}_{unique_id}{suffix}" + + temp_file = temp_dir / filename + # Create the file by touching it + temp_file.touch() + return temp_file + + +def resolve_relative_path(base_path: PathLike, relative_path: PathLike) -> Path: + """ + Resolve a relative path from a base path. + + Args: + base_path: Base directory path + relative_path: Relative path to resolve + + Returns: + Path: Resolved absolute path + + """ + base = Path(base_path).resolve() + relative = Path(relative_path) + + if relative.is_absolute(): + return relative + else: + return (base / relative).resolve() + + +def normalize_path(path: PathLike) -> Path: + """ + Normalize a path, resolving any symbolic links and relative components. + + Args: + path: Path to normalize + + Returns: + Path: Normalized absolute path + + """ + return Path(path).resolve() + + +def get_venv_path() -> Path | None: + """ + Get virtual environment path if active. + + Returns: + Path: Virtual environment path if active, None otherwise + + """ + venv_path = os.environ.get("VIRTUAL_ENV") + if venv_path: + return Path(venv_path) + return None + + +def get_python_executable() -> Path: + """ + Get current Python executable path. + + Returns: + Path: Path to the current Python executable + + """ + return Path(sys.executable) + + +def find_executable(name: str) -> Path | None: + """ + Find executable in system PATH. + + Args: + name: Name of the executable to find + + Returns: + Path: Path to executable if found, None otherwise + + """ + # Check if name already contains path + if os.path.dirname(name): + path = Path(name) + if path.exists() and os.access(path, os.X_OK): + return path + return None + + # Search in PATH + for path_dir in os.environ.get("PATH", "").split(os.pathsep): + if not path_dir: + continue + path = Path(path_dir) / name + if path.exists() and os.access(path, os.X_OK): + return path + + return None + + +def get_script_binary_path() -> Path: + """ + Get path to script's binary directory. + + Returns: + Path: Directory containing the script's binary + + """ + return get_python_executable().parent + + +def get_binary_path(binary_name: str) -> Path | None: + """ + Find binary in PATH or virtual environment. + + Args: + binary_name: Name of the binary to find + + Returns: + Path: Path to binary if found, None otherwise + + """ + # First check in virtual environment if active + venv_path = get_venv_path() + if venv_path: + venv_bin = venv_path / "bin" if os.name != "nt" else venv_path / "Scripts" + binary_path = venv_bin / binary_name + if binary_path.exists() and os.access(binary_path, os.X_OK): + return binary_path + + # Fall back to system PATH + return find_executable(binary_name) diff --git a/newsfragments/886.bugfix.rst b/newsfragments/886.bugfix.rst new file mode 100644 index 000000000..1ebf38d1b --- /dev/null +++ b/newsfragments/886.bugfix.rst @@ -0,0 +1,2 @@ +Fixed cross-platform path handling by replacing hardcoded OS-specific +paths with standardized utilities in core modules and examples. diff --git a/scripts/audit_paths.py b/scripts/audit_paths.py new file mode 100644 index 000000000..80df11f87 --- /dev/null +++ b/scripts/audit_paths.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 +""" +Audit script to identify path handling issues in the py-libp2p codebase. + +This script scans for patterns that should be migrated to use the new +cross-platform path utilities. +""" + +import argparse +from pathlib import Path +import re +from typing import Any + + +def scan_for_path_issues(directory: Path) -> dict[str, list[dict[str, Any]]]: + """ + Scan for path handling issues in the codebase. + + Args: + directory: Root directory to scan + + Returns: + Dictionary mapping issue types to lists of found issues + + """ + issues = { + "hard_coded_slash": [], + "os_path_join": [], + "temp_hardcode": [], + "os_path_dirname": [], + "os_path_abspath": [], + "direct_path_concat": [], + } + + # Patterns to search for + patterns = { + "hard_coded_slash": r'["\'][^"\']*\/[^"\']*["\']', + "os_path_join": r"os\.path\.join\(", + "temp_hardcode": r'["\']\/tmp\/|["\']C:\\\\', + "os_path_dirname": r"os\.path\.dirname\(", + "os_path_abspath": r"os\.path\.abspath\(", + "direct_path_concat": r'["\'][^"\']*["\']\s*\+\s*["\'][^"\']*["\']', + } + + # Files to exclude + exclude_patterns = [ + r"__pycache__", + r"\.git", + r"\.pytest_cache", + r"\.mypy_cache", + r"\.ruff_cache", + r"env/", + r"venv/", + r"\.venv/", + ] + + for py_file in directory.rglob("*.py"): + # Skip excluded files + if any(re.search(pattern, str(py_file)) for pattern in exclude_patterns): + continue + + try: + content = py_file.read_text(encoding="utf-8") + except UnicodeDecodeError: + print(f"Warning: Could not read {py_file} (encoding issue)") + continue + + for issue_type, pattern in patterns.items(): + matches = re.finditer(pattern, content, re.MULTILINE) + for match in matches: + line_num = content[: match.start()].count("\n") + 1 + line_content = content.split("\n")[line_num - 1].strip() + + issues[issue_type].append( + { + "file": py_file, + "line": line_num, + "content": match.group(), + "full_line": line_content, + "relative_path": py_file.relative_to(directory), + } + ) + + return issues + + +def generate_migration_suggestions(issues: dict[str, list[dict[str, Any]]]) -> str: + """ + Generate migration suggestions for found issues. + + Args: + issues: Dictionary of found issues + + Returns: + Formatted string with migration suggestions + + """ + suggestions = [] + + for issue_type, issue_list in issues.items(): + if not issue_list: + continue + + suggestions.append(f"\n## {issue_type.replace('_', ' ').title()}") + suggestions.append(f"Found {len(issue_list)} instances:") + + for issue in issue_list[:10]: # Show first 10 examples + suggestions.append(f"\n### {issue['relative_path']}:{issue['line']}") + suggestions.append("```python") + suggestions.append("# Current code:") + suggestions.append(f"{issue['full_line']}") + suggestions.append("```") + + # Add migration suggestion based on issue type + if issue_type == "os_path_join": + suggestions.append("```python") + suggestions.append("# Suggested fix:") + suggestions.append("from libp2p.utils.paths import join_paths") + suggestions.append( + "# Replace os.path.join(a, b, c) with join_paths(a, b, c)" + ) + suggestions.append("```") + elif issue_type == "temp_hardcode": + suggestions.append("```python") + suggestions.append("# Suggested fix:") + suggestions.append( + "from libp2p.utils.paths import get_temp_dir, create_temp_file" + ) + temp_fix_msg = ( + "# Replace hard-coded temp paths with get_temp_dir() or " + "create_temp_file()" + ) + suggestions.append(temp_fix_msg) + suggestions.append("```") + elif issue_type == "os_path_dirname": + suggestions.append("```python") + suggestions.append("# Suggested fix:") + suggestions.append("from libp2p.utils.paths import get_script_dir") + script_dir_fix_msg = ( + "# Replace os.path.dirname(os.path.abspath(__file__)) with " + "get_script_dir(__file__)" + ) + suggestions.append(script_dir_fix_msg) + suggestions.append("```") + + if len(issue_list) > 10: + suggestions.append(f"\n... and {len(issue_list) - 10} more instances") + + return "\n".join(suggestions) + + +def generate_summary_report(issues: dict[str, list[dict[str, Any]]]) -> str: + """ + Generate a summary report of all found issues. + + Args: + issues: Dictionary of found issues + + Returns: + Formatted summary report + + """ + total_issues = sum(len(issue_list) for issue_list in issues.values()) + + report = [ + "# Cross-Platform Path Handling Audit Report", + "", + "## Summary", + f"Total issues found: {total_issues}", + "", + "## Issue Breakdown:", + ] + + for issue_type, issue_list in issues.items(): + if issue_list: + issue_title = issue_type.replace("_", " ").title() + instances_count = len(issue_list) + report.append(f"- **{issue_title}**: {instances_count} instances") + + report.append("") + report.append("## Priority Matrix:") + report.append("") + report.append("| Priority | Issue Type | Risk Level | Impact |") + report.append("|----------|------------|------------|---------|") + + priority_map = { + "temp_hardcode": ( + "šŸ”“ P0", + "HIGH", + "Core functionality fails on different platforms", + ), + "os_path_join": ("🟔 P1", "MEDIUM", "Examples and utilities may break"), + "os_path_dirname": ("🟔 P1", "MEDIUM", "Script location detection issues"), + "hard_coded_slash": ("🟢 P2", "LOW", "Future-proofing and consistency"), + "os_path_abspath": ("🟢 P2", "LOW", "Path resolution consistency"), + "direct_path_concat": ("🟢 P2", "LOW", "String concatenation issues"), + } + + for issue_type, issue_list in issues.items(): + if issue_list: + priority, risk, impact = priority_map.get( + issue_type, ("🟢 P2", "LOW", "General improvement") + ) + issue_title = issue_type.replace("_", " ").title() + report.append(f"| {priority} | {issue_title} | {risk} | {impact} |") + + return "\n".join(report) + + +def main(): + """Main function to run the audit.""" + parser = argparse.ArgumentParser( + description="Audit py-libp2p codebase for path handling issues" + ) + parser.add_argument( + "--directory", + default=".", + help="Directory to scan (default: current directory)", + ) + parser.add_argument("--output", help="Output file for detailed report") + parser.add_argument( + "--summary-only", action="store_true", help="Only show summary report" + ) + + args = parser.parse_args() + + directory = Path(args.directory) + if not directory.exists(): + print(f"Error: Directory {directory} does not exist") + return 1 + + print("šŸ” Scanning for path handling issues...") + issues = scan_for_path_issues(directory) + + # Generate and display summary + summary = generate_summary_report(issues) + print(summary) + + if not args.summary_only: + # Generate detailed suggestions + suggestions = generate_migration_suggestions(issues) + + if args.output: + with open(args.output, "w", encoding="utf-8") as f: + f.write(summary) + f.write(suggestions) + print(f"\nšŸ“„ Detailed report saved to {args.output}") + else: + print(suggestions) + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py index 603af5e1c..06be05c78 100644 --- a/tests/utils/test_logging.py +++ b/tests/utils/test_logging.py @@ -15,6 +15,7 @@ import trio from libp2p.utils.logging import ( + _current_handlers, _current_listener, _listener_ready, log_queue, @@ -24,13 +25,19 @@ def _reset_logging(): """Reset all logging state.""" - global _current_listener, _listener_ready + global _current_listener, _listener_ready, _current_handlers # Stop existing listener if any if _current_listener is not None: _current_listener.stop() _current_listener = None + # Close all file handlers to ensure proper cleanup on Windows + for handler in _current_handlers: + if isinstance(handler, logging.FileHandler): + handler.close() + _current_handlers.clear() + # Reset the event _listener_ready = threading.Event() @@ -174,6 +181,15 @@ async def test_custom_log_file(clean_env): if _current_listener is not None: _current_listener.stop() + # Give a moment for the listener to fully stop + await trio.sleep(0.05) + + # Close all file handlers to release the file + for handler in _current_handlers: + if isinstance(handler, logging.FileHandler): + handler.flush() # Ensure all writes are flushed + handler.close() + # Check if the file exists and contains our message assert log_file.exists() content = log_file.read_text() @@ -185,16 +201,15 @@ async def test_default_log_file(clean_env): """Test logging to the default file path.""" os.environ["LIBP2P_DEBUG"] = "INFO" - with patch("libp2p.utils.logging.datetime") as mock_datetime: - # Mock the timestamp to have a predictable filename - mock_datetime.now.return_value.strftime.return_value = "20240101_120000" + with patch("libp2p.utils.paths.create_temp_file") as mock_create_temp: + # Mock the temp file creation to return a predictable path + mock_temp_file = ( + Path(tempfile.gettempdir()) / "test_py-libp2p_20240101_120000.log" + ) + mock_create_temp.return_value = mock_temp_file # Remove the log file if it exists - if os.name == "nt": # Windows - log_file = Path("C:/Windows/Temp/20240101_120000_py-libp2p.log") - else: # Unix-like - log_file = Path("/tmp/20240101_120000_py-libp2p.log") - log_file.unlink(missing_ok=True) + mock_temp_file.unlink(missing_ok=True) setup_logging() @@ -211,9 +226,18 @@ async def test_default_log_file(clean_env): if _current_listener is not None: _current_listener.stop() - # Check the default log file - if log_file.exists(): # Only check content if we have write permission - content = log_file.read_text() + # Give a moment for the listener to fully stop + await trio.sleep(0.05) + + # Close all file handlers to release the file + for handler in _current_handlers: + if isinstance(handler, logging.FileHandler): + handler.flush() # Ensure all writes are flushed + handler.close() + + # Check the mocked temp file + if mock_temp_file.exists(): + content = mock_temp_file.read_text() assert "Test message" in content diff --git a/tests/utils/test_paths.py b/tests/utils/test_paths.py new file mode 100644 index 000000000..421fc557a --- /dev/null +++ b/tests/utils/test_paths.py @@ -0,0 +1,290 @@ +""" +Tests for cross-platform path utilities. +""" + +import os +from pathlib import Path +import tempfile + +import pytest + +from libp2p.utils.paths import ( + create_temp_file, + ensure_dir_exists, + find_executable, + get_binary_path, + get_config_dir, + get_project_root, + get_python_executable, + get_script_binary_path, + get_script_dir, + get_temp_dir, + get_venv_path, + join_paths, + normalize_path, + resolve_relative_path, +) + + +class TestPathUtilities: + """Test cross-platform path utilities.""" + + def test_get_temp_dir(self): + """Test that temp directory is accessible and exists.""" + temp_dir = get_temp_dir() + assert isinstance(temp_dir, Path) + assert temp_dir.exists() + assert temp_dir.is_dir() + # Should match system temp directory + assert temp_dir == Path(tempfile.gettempdir()) + + def test_get_project_root(self): + """Test that project root is correctly determined.""" + project_root = get_project_root() + assert isinstance(project_root, Path) + assert project_root.exists() + # Should contain pyproject.toml + assert (project_root / "pyproject.toml").exists() + # Should contain libp2p directory + assert (project_root / "libp2p").exists() + + def test_join_paths(self): + """Test cross-platform path joining.""" + # Test with strings + result = join_paths("a", "b", "c") + expected = Path("a") / "b" / "c" + assert result == expected + + # Test with mixed types + result = join_paths("a", Path("b"), "c") + expected = Path("a") / "b" / "c" + assert result == expected + + # Test with absolute path + result = join_paths("/absolute", "path") + expected = Path("/absolute") / "path" + assert result == expected + + def test_ensure_dir_exists(self, tmp_path): + """Test directory creation and existence checking.""" + # Test creating new directory + new_dir = tmp_path / "new_dir" + result = ensure_dir_exists(new_dir) + assert result == new_dir + assert new_dir.exists() + assert new_dir.is_dir() + + # Test creating nested directory + nested_dir = tmp_path / "parent" / "child" / "grandchild" + result = ensure_dir_exists(nested_dir) + assert result == nested_dir + assert nested_dir.exists() + assert nested_dir.is_dir() + + # Test with existing directory + result = ensure_dir_exists(new_dir) + assert result == new_dir + assert new_dir.exists() + + def test_get_config_dir(self): + """Test platform-specific config directory.""" + config_dir = get_config_dir() + assert isinstance(config_dir, Path) + + if os.name == "nt": # Windows + # Should be in AppData/Roaming or user home + assert "AppData" in str(config_dir) or "py-libp2p" in str(config_dir) + else: # Unix-like + # Should be in ~/.config + assert ".config" in str(config_dir) + assert "py-libp2p" in str(config_dir) + + def test_get_script_dir(self): + """Test script directory detection.""" + # Test with current file + script_dir = get_script_dir(__file__) + assert isinstance(script_dir, Path) + assert script_dir.exists() + assert script_dir.is_dir() + # Should contain this test file + assert (script_dir / "test_paths.py").exists() + + def test_create_temp_file(self): + """Test temporary file creation.""" + temp_file = create_temp_file() + assert isinstance(temp_file, Path) + assert temp_file.parent == get_temp_dir() + assert temp_file.name.startswith("py-libp2p_") + assert temp_file.name.endswith(".log") + + # Test with custom prefix and suffix + temp_file = create_temp_file(prefix="test_", suffix=".txt") + assert temp_file.name.startswith("test_") + assert temp_file.name.endswith(".txt") + + def test_resolve_relative_path(self, tmp_path): + """Test relative path resolution.""" + base_path = tmp_path / "base" + base_path.mkdir() + + # Test relative path + relative_path = "subdir/file.txt" + result = resolve_relative_path(base_path, relative_path) + expected = (base_path / "subdir" / "file.txt").resolve() + assert result == expected + + # Test absolute path (platform-agnostic) + if os.name == "nt": # Windows + absolute_path = "C:\\absolute\\path" + else: # Unix-like + absolute_path = "/absolute/path" + result = resolve_relative_path(base_path, absolute_path) + assert result == Path(absolute_path) + + def test_normalize_path(self, tmp_path): + """Test path normalization.""" + # Test with relative path + relative_path = tmp_path / ".." / "normalize_test" + result = normalize_path(relative_path) + assert result.is_absolute() + assert "normalize_test" in str(result) + + # Test with absolute path + absolute_path = tmp_path / "test_file" + result = normalize_path(absolute_path) + assert result.is_absolute() + assert result == absolute_path.resolve() + + def test_get_venv_path(self, monkeypatch): + """Test virtual environment path detection.""" + # Test when no virtual environment is active + # Temporarily clear VIRTUAL_ENV to test the "no venv" case + monkeypatch.delenv("VIRTUAL_ENV", raising=False) + result = get_venv_path() + assert result is None + + # Test when virtual environment is active + test_venv_path = "/path/to/venv" + monkeypatch.setenv("VIRTUAL_ENV", test_venv_path) + result = get_venv_path() + assert result == Path(test_venv_path) + + def test_get_python_executable(self): + """Test Python executable path detection.""" + result = get_python_executable() + assert isinstance(result, Path) + assert result.exists() + assert result.name.startswith("python") + + def test_find_executable(self): + """Test executable finding in PATH.""" + # Test with non-existent executable + result = find_executable("nonexistent_executable") + assert result is None + + # Test with existing executable (python should be available) + result = find_executable("python") + if result: + assert isinstance(result, Path) + assert result.exists() + + def test_get_script_binary_path(self): + """Test script binary path detection.""" + result = get_script_binary_path() + assert isinstance(result, Path) + assert result.exists() + assert result.is_dir() + + def test_get_binary_path(self, monkeypatch): + """Test binary path resolution with virtual environment.""" + # Test when no virtual environment is active + result = get_binary_path("python") + if result: + assert isinstance(result, Path) + assert result.exists() + + # Test when virtual environment is active + test_venv_path = "/path/to/venv" + monkeypatch.setenv("VIRTUAL_ENV", test_venv_path) + # This test is more complex as it depends on the actual venv structure + # We'll just verify the function doesn't crash + result = get_binary_path("python") + # Result can be None if binary not found in venv + if result: + assert isinstance(result, Path) + + +class TestCrossPlatformCompatibility: + """Test cross-platform compatibility.""" + + def test_config_dir_platform_specific_windows(self, monkeypatch): + """Test config directory respects Windows conventions.""" + import platform + + # Only run this test on Windows systems + if platform.system() != "Windows": + pytest.skip("This test only runs on Windows systems") + + monkeypatch.setattr("os.name", "nt") + monkeypatch.setenv("APPDATA", "C:\\Users\\Test\\AppData\\Roaming") + config_dir = get_config_dir() + assert "AppData" in str(config_dir) + assert "py-libp2p" in str(config_dir) + + def test_path_separators_consistent(self): + """Test that path separators are handled consistently.""" + # Test that join_paths uses platform-appropriate separators + result = join_paths("dir1", "dir2", "file.txt") + expected = Path("dir1") / "dir2" / "file.txt" + assert result == expected + + # Test that the result uses correct separators for the platform + if os.name == "nt": # Windows + assert "\\" in str(result) or "/" in str(result) + else: # Unix-like + assert "/" in str(result) + + def test_temp_file_uniqueness(self): + """Test that temporary files have unique names.""" + files = set() + for _ in range(10): + temp_file = create_temp_file() + assert temp_file not in files + files.add(temp_file) + + +class TestBackwardCompatibility: + """Test backward compatibility with existing code patterns.""" + + def test_path_operations_equivalent(self): + """Test that new path operations are equivalent to old os.path operations.""" + # Test join_paths vs os.path.join + parts = ["a", "b", "c"] + new_result = join_paths(*parts) + old_result = Path(os.path.join(*parts)) + assert new_result == old_result + + # Test get_script_dir vs os.path.dirname(os.path.abspath(__file__)) + new_script_dir = get_script_dir(__file__) + old_script_dir = Path(os.path.dirname(os.path.abspath(__file__))) + assert new_script_dir == old_script_dir + + def test_existing_functionality_preserved(self): + """Ensure no existing functionality is broken.""" + # Test that all functions return Path objects + assert isinstance(get_temp_dir(), Path) + assert isinstance(get_project_root(), Path) + assert isinstance(join_paths("a", "b"), Path) + assert isinstance(ensure_dir_exists(tempfile.gettempdir()), Path) + assert isinstance(get_config_dir(), Path) + assert isinstance(get_script_dir(__file__), Path) + assert isinstance(create_temp_file(), Path) + assert isinstance(resolve_relative_path(".", "test"), Path) + assert isinstance(normalize_path("."), Path) + assert isinstance(get_python_executable(), Path) + assert isinstance(get_script_binary_path(), Path) + + # Test optional return types + venv_path = get_venv_path() + if venv_path is not None: + assert isinstance(venv_path, Path)