From 6a24b138dd65b690ccc0e1f214d2d29a9f4c9b16 Mon Sep 17 00:00:00 2001 From: yashksaini-coder Date: Mon, 1 Sep 2025 01:35:32 +0530 Subject: [PATCH 1/6] feat: Add cross-platform path utilities module --- libp2p/utils/paths.py | 162 ++++++++++++++++++++++++++++ scripts/audit_paths.py | 222 ++++++++++++++++++++++++++++++++++++++ tests/utils/test_paths.py | 213 ++++++++++++++++++++++++++++++++++++ 3 files changed, 597 insertions(+) create mode 100644 libp2p/utils/paths.py create mode 100644 scripts/audit_paths.py create mode 100644 tests/utils/test_paths.py diff --git a/libp2p/utils/paths.py b/libp2p/utils/paths.py new file mode 100644 index 000000000..27924d8fc --- /dev/null +++ b/libp2p/utils/paths.py @@ -0,0 +1,162 @@ +""" +Cross-platform path utilities for py-libp2p. + +This module provides standardized path operations to ensure consistent +behavior across Windows, macOS, and Linux platforms. +""" + +import os +import tempfile +from pathlib import Path +from typing import Union, Optional + +PathLike = Union[str, Path] + + +def get_temp_dir() -> Path: + """ + Get cross-platform temporary directory. + + Returns: + Path: Platform-specific temporary directory path + """ + return Path(tempfile.gettempdir()) + + +def get_project_root() -> Path: + """ + Get the project root directory. + + Returns: + Path: Path to the py-libp2p project root + """ + # Navigate from libp2p/utils/paths.py to project root + return Path(__file__).parent.parent.parent + + +def join_paths(*parts: PathLike) -> Path: + """ + Cross-platform path joining. + + Args: + *parts: Path components to join + + Returns: + Path: Joined path using platform-appropriate separator + """ + return Path(*parts) + + +def ensure_dir_exists(path: PathLike) -> Path: + """ + Ensure directory exists, create if needed. + + Args: + path: Directory path to ensure exists + + Returns: + Path: Path object for the directory + """ + path_obj = Path(path) + path_obj.mkdir(parents=True, exist_ok=True) + return path_obj + + +def get_config_dir() -> Path: + """ + Get user config directory (cross-platform). + + Returns: + Path: Platform-specific config directory + """ + if os.name == 'nt': # Windows + appdata = os.environ.get('APPDATA', '') + if appdata: + return Path(appdata) / 'py-libp2p' + else: + # Fallback to user home directory + return Path.home() / 'AppData' / 'Roaming' / 'py-libp2p' + else: # Unix-like (Linux, macOS) + return Path.home() / '.config' / 'py-libp2p' + + +def get_script_dir(script_path: Optional[PathLike] = None) -> Path: + """ + Get the directory containing a script file. + + Args: + script_path: Path to the script file. If None, uses __file__ + + Returns: + Path: Directory containing the script + """ + if script_path is None: + # This will be the directory of the calling script + import inspect + frame = inspect.currentframe() + if frame and frame.f_back: + script_path = frame.f_back.f_globals.get('__file__') + else: + raise RuntimeError("Could not determine script path") + + return Path(script_path).parent.absolute() + + +def create_temp_file(prefix: str = "py-libp2p_", suffix: str = ".log") -> Path: + """ + Create a temporary file with a unique name. + + Args: + prefix: File name prefix + suffix: File name suffix + + Returns: + Path: Path to the created temporary file + """ + temp_dir = get_temp_dir() + # Create a unique filename using timestamp and random bytes + import time + import secrets + + timestamp = time.strftime("%Y%m%d_%H%M%S") + microseconds = f"{time.time() % 1:.6f}"[2:] # Get microseconds as string + unique_id = secrets.token_hex(4) + filename = f"{prefix}{timestamp}_{microseconds}_{unique_id}{suffix}" + + temp_file = temp_dir / filename + # Create the file by touching it + temp_file.touch() + return temp_file + + +def resolve_relative_path(base_path: PathLike, relative_path: PathLike) -> Path: + """ + Resolve a relative path from a base path. + + Args: + base_path: Base directory path + relative_path: Relative path to resolve + + Returns: + Path: Resolved absolute path + """ + base = Path(base_path).resolve() + relative = Path(relative_path) + + if relative.is_absolute(): + return relative + else: + return (base / relative).resolve() + + +def normalize_path(path: PathLike) -> Path: + """ + Normalize a path, resolving any symbolic links and relative components. + + Args: + path: Path to normalize + + Returns: + Path: Normalized absolute path + """ + return Path(path).resolve() diff --git a/scripts/audit_paths.py b/scripts/audit_paths.py new file mode 100644 index 000000000..b0079869c --- /dev/null +++ b/scripts/audit_paths.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +""" +Audit script to identify path handling issues in the py-libp2p codebase. + +This script scans for patterns that should be migrated to use the new +cross-platform path utilities. +""" + +import re +import os +from pathlib import Path +from typing import List, Dict, Any +import argparse + + +def scan_for_path_issues(directory: Path) -> Dict[str, List[Dict[str, Any]]]: + """ + Scan for path handling issues in the codebase. + + Args: + directory: Root directory to scan + + Returns: + Dictionary mapping issue types to lists of found issues + """ + issues = { + 'hard_coded_slash': [], + 'os_path_join': [], + 'temp_hardcode': [], + 'os_path_dirname': [], + 'os_path_abspath': [], + 'direct_path_concat': [], + } + + # Patterns to search for + patterns = { + 'hard_coded_slash': r'["\'][^"\']*\/[^"\']*["\']', + 'os_path_join': r'os\.path\.join\(', + 'temp_hardcode': r'["\']\/tmp\/|["\']C:\\\\', + 'os_path_dirname': r'os\.path\.dirname\(', + 'os_path_abspath': r'os\.path\.abspath\(', + 'direct_path_concat': r'["\'][^"\']*["\']\s*\+\s*["\'][^"\']*["\']', + } + + # Files to exclude + exclude_patterns = [ + r'__pycache__', + r'\.git', + r'\.pytest_cache', + r'\.mypy_cache', + r'\.ruff_cache', + r'env/', + r'venv/', + r'\.venv/', + ] + + for py_file in directory.rglob("*.py"): + # Skip excluded files + if any(re.search(pattern, str(py_file)) for pattern in exclude_patterns): + continue + + try: + content = py_file.read_text(encoding='utf-8') + except UnicodeDecodeError: + print(f"Warning: Could not read {py_file} (encoding issue)") + continue + + for issue_type, pattern in patterns.items(): + matches = re.finditer(pattern, content, re.MULTILINE) + for match in matches: + line_num = content[:match.start()].count('\n') + 1 + line_content = content.split('\n')[line_num - 1].strip() + + issues[issue_type].append({ + 'file': py_file, + 'line': line_num, + 'content': match.group(), + 'full_line': line_content, + 'relative_path': py_file.relative_to(directory) + }) + + return issues + + +def generate_migration_suggestions(issues: Dict[str, List[Dict[str, Any]]]) -> str: + """ + Generate migration suggestions for found issues. + + Args: + issues: Dictionary of found issues + + Returns: + Formatted string with migration suggestions + """ + suggestions = [] + + for issue_type, issue_list in issues.items(): + if not issue_list: + continue + + suggestions.append(f"\n## {issue_type.replace('_', ' ').title()}") + suggestions.append(f"Found {len(issue_list)} instances:") + + for issue in issue_list[:10]: # Show first 10 examples + suggestions.append(f"\n### {issue['relative_path']}:{issue['line']}") + suggestions.append(f"```python") + suggestions.append(f"# Current code:") + suggestions.append(f"{issue['full_line']}") + suggestions.append(f"```") + + # Add migration suggestion based on issue type + if issue_type == 'os_path_join': + suggestions.append(f"```python") + suggestions.append(f"# Suggested fix:") + suggestions.append(f"from libp2p.utils.paths import join_paths") + suggestions.append(f"# Replace os.path.join(a, b, c) with join_paths(a, b, c)") + suggestions.append(f"```") + elif issue_type == 'temp_hardcode': + suggestions.append(f"```python") + suggestions.append(f"# Suggested fix:") + suggestions.append(f"from libp2p.utils.paths import get_temp_dir, create_temp_file") + suggestions.append(f"# Replace hard-coded temp paths with get_temp_dir() or create_temp_file()") + suggestions.append(f"```") + elif issue_type == 'os_path_dirname': + suggestions.append(f"```python") + suggestions.append(f"# Suggested fix:") + suggestions.append(f"from libp2p.utils.paths import get_script_dir") + suggestions.append(f"# Replace os.path.dirname(os.path.abspath(__file__)) with get_script_dir(__file__)") + suggestions.append(f"```") + + if len(issue_list) > 10: + suggestions.append(f"\n... and {len(issue_list) - 10} more instances") + + return "\n".join(suggestions) + + +def generate_summary_report(issues: Dict[str, List[Dict[str, Any]]]) -> str: + """ + Generate a summary report of all found issues. + + Args: + issues: Dictionary of found issues + + Returns: + Formatted summary report + """ + total_issues = sum(len(issue_list) for issue_list in issues.values()) + + report = [ + "# Cross-Platform Path Handling Audit Report", + "", + f"## Summary", + f"Total issues found: {total_issues}", + "", + "## Issue Breakdown:", + ] + + for issue_type, issue_list in issues.items(): + if issue_list: + report.append(f"- **{issue_type.replace('_', ' ').title()}**: {len(issue_list)} instances") + + report.append("") + report.append("## Priority Matrix:") + report.append("") + report.append("| Priority | Issue Type | Risk Level | Impact |") + report.append("|----------|------------|------------|---------|") + + priority_map = { + 'temp_hardcode': ('šŸ”“ P0', 'HIGH', 'Core functionality fails on different platforms'), + 'os_path_join': ('🟔 P1', 'MEDIUM', 'Examples and utilities may break'), + 'os_path_dirname': ('🟔 P1', 'MEDIUM', 'Script location detection issues'), + 'hard_coded_slash': ('🟢 P2', 'LOW', 'Future-proofing and consistency'), + 'os_path_abspath': ('🟢 P2', 'LOW', 'Path resolution consistency'), + 'direct_path_concat': ('🟢 P2', 'LOW', 'String concatenation issues'), + } + + for issue_type, issue_list in issues.items(): + if issue_list: + priority, risk, impact = priority_map.get(issue_type, ('🟢 P2', 'LOW', 'General improvement')) + report.append(f"| {priority} | {issue_type.replace('_', ' ').title()} | {risk} | {impact} |") + + return "\n".join(report) + + +def main(): + """Main function to run the audit.""" + parser = argparse.ArgumentParser(description="Audit py-libp2p codebase for path handling issues") + parser.add_argument("--directory", default=".", help="Directory to scan (default: current directory)") + parser.add_argument("--output", help="Output file for detailed report") + parser.add_argument("--summary-only", action="store_true", help="Only show summary report") + + args = parser.parse_args() + + directory = Path(args.directory) + if not directory.exists(): + print(f"Error: Directory {directory} does not exist") + return 1 + + print("šŸ” Scanning for path handling issues...") + issues = scan_for_path_issues(directory) + + # Generate and display summary + summary = generate_summary_report(issues) + print(summary) + + if not args.summary_only: + # Generate detailed suggestions + suggestions = generate_migration_suggestions(issues) + + if args.output: + with open(args.output, 'w', encoding='utf-8') as f: + f.write(summary) + f.write(suggestions) + print(f"\nšŸ“„ Detailed report saved to {args.output}") + else: + print(suggestions) + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/tests/utils/test_paths.py b/tests/utils/test_paths.py new file mode 100644 index 000000000..fcd4c08a5 --- /dev/null +++ b/tests/utils/test_paths.py @@ -0,0 +1,213 @@ +""" +Tests for cross-platform path utilities. +""" + +import os +import tempfile +from pathlib import Path +import pytest + +from libp2p.utils.paths import ( + get_temp_dir, + get_project_root, + join_paths, + ensure_dir_exists, + get_config_dir, + get_script_dir, + create_temp_file, + resolve_relative_path, + normalize_path, +) + + +class TestPathUtilities: + """Test cross-platform path utilities.""" + + def test_get_temp_dir(self): + """Test that temp directory is accessible and exists.""" + temp_dir = get_temp_dir() + assert isinstance(temp_dir, Path) + assert temp_dir.exists() + assert temp_dir.is_dir() + # Should match system temp directory + assert temp_dir == Path(tempfile.gettempdir()) + + def test_get_project_root(self): + """Test that project root is correctly determined.""" + project_root = get_project_root() + assert isinstance(project_root, Path) + assert project_root.exists() + # Should contain pyproject.toml + assert (project_root / "pyproject.toml").exists() + # Should contain libp2p directory + assert (project_root / "libp2p").exists() + + def test_join_paths(self): + """Test cross-platform path joining.""" + # Test with strings + result = join_paths("a", "b", "c") + expected = Path("a") / "b" / "c" + assert result == expected + + # Test with mixed types + result = join_paths("a", Path("b"), "c") + expected = Path("a") / "b" / "c" + assert result == expected + + # Test with absolute path + result = join_paths("/absolute", "path") + expected = Path("/absolute") / "path" + assert result == expected + + def test_ensure_dir_exists(self, tmp_path): + """Test directory creation and existence checking.""" + # Test creating new directory + new_dir = tmp_path / "new_dir" + result = ensure_dir_exists(new_dir) + assert result == new_dir + assert new_dir.exists() + assert new_dir.is_dir() + + # Test creating nested directory + nested_dir = tmp_path / "parent" / "child" / "grandchild" + result = ensure_dir_exists(nested_dir) + assert result == nested_dir + assert nested_dir.exists() + assert nested_dir.is_dir() + + # Test with existing directory + result = ensure_dir_exists(new_dir) + assert result == new_dir + assert new_dir.exists() + + def test_get_config_dir(self): + """Test platform-specific config directory.""" + config_dir = get_config_dir() + assert isinstance(config_dir, Path) + + if os.name == 'nt': # Windows + # Should be in AppData/Roaming or user home + assert "AppData" in str(config_dir) or "py-libp2p" in str(config_dir) + else: # Unix-like + # Should be in ~/.config + assert ".config" in str(config_dir) + assert "py-libp2p" in str(config_dir) + + def test_get_script_dir(self): + """Test script directory detection.""" + # Test with current file + script_dir = get_script_dir(__file__) + assert isinstance(script_dir, Path) + assert script_dir.exists() + assert script_dir.is_dir() + # Should contain this test file + assert (script_dir / "test_paths.py").exists() + + def test_create_temp_file(self): + """Test temporary file creation.""" + temp_file = create_temp_file() + assert isinstance(temp_file, Path) + assert temp_file.parent == get_temp_dir() + assert temp_file.name.startswith("py-libp2p_") + assert temp_file.name.endswith(".log") + + # Test with custom prefix and suffix + temp_file = create_temp_file(prefix="test_", suffix=".txt") + assert temp_file.name.startswith("test_") + assert temp_file.name.endswith(".txt") + + def test_resolve_relative_path(self, tmp_path): + """Test relative path resolution.""" + base_path = tmp_path / "base" + base_path.mkdir() + + # Test relative path + relative_path = "subdir/file.txt" + result = resolve_relative_path(base_path, relative_path) + expected = (base_path / "subdir" / "file.txt").resolve() + assert result == expected + + # Test absolute path (platform-agnostic) + if os.name == 'nt': # Windows + absolute_path = "C:\\absolute\\path" + else: # Unix-like + absolute_path = "/absolute/path" + result = resolve_relative_path(base_path, absolute_path) + assert result == Path(absolute_path) + + def test_normalize_path(self, tmp_path): + """Test path normalization.""" + # Test with relative path + relative_path = tmp_path / ".." / "normalize_test" + result = normalize_path(relative_path) + assert result.is_absolute() + assert "normalize_test" in str(result) + + # Test with absolute path + absolute_path = tmp_path / "test_file" + result = normalize_path(absolute_path) + assert result.is_absolute() + assert result == absolute_path.resolve() + + +class TestCrossPlatformCompatibility: + """Test cross-platform compatibility.""" + + def test_config_dir_platform_specific_windows(self, monkeypatch): + """Test config directory respects Windows conventions.""" + monkeypatch.setattr('os.name', 'nt') + monkeypatch.setenv('APPDATA', 'C:\\Users\\Test\\AppData\\Roaming') + config_dir = get_config_dir() + assert "AppData" in str(config_dir) + assert "py-libp2p" in str(config_dir) + + def test_path_separators_consistent(self): + """Test that path separators are handled consistently.""" + # Test that join_paths uses platform-appropriate separators + result = join_paths("dir1", "dir2", "file.txt") + expected = Path("dir1") / "dir2" / "file.txt" + assert result == expected + + # Test that the result uses correct separators for the platform + if os.name == 'nt': # Windows + assert "\\" in str(result) or "/" in str(result) + else: # Unix-like + assert "/" in str(result) + + def test_temp_file_uniqueness(self): + """Test that temporary files have unique names.""" + files = set() + for _ in range(10): + temp_file = create_temp_file() + assert temp_file not in files + files.add(temp_file) + + +class TestBackwardCompatibility: + """Test backward compatibility with existing code patterns.""" + + def test_path_operations_equivalent(self): + """Test that new path operations are equivalent to old os.path operations.""" + # Test join_paths vs os.path.join + parts = ["a", "b", "c"] + new_result = join_paths(*parts) + old_result = Path(os.path.join(*parts)) + assert new_result == old_result + + # Test get_script_dir vs os.path.dirname(os.path.abspath(__file__)) + new_script_dir = get_script_dir(__file__) + old_script_dir = Path(os.path.dirname(os.path.abspath(__file__))) + assert new_script_dir == old_script_dir + + def test_existing_functionality_preserved(self): + """Ensure no existing functionality is broken.""" + # Test that all functions return Path objects + assert isinstance(get_temp_dir(), Path) + assert isinstance(get_project_root(), Path) + assert isinstance(join_paths("a", "b"), Path) + assert isinstance(ensure_dir_exists(tempfile.gettempdir()), Path) + assert isinstance(get_config_dir(), Path) + assert isinstance(get_script_dir(__file__), Path) + assert isinstance(create_temp_file(), Path) + assert isinstance(resolve_relative_path(".", "test"), Path) + assert isinstance(normalize_path("."), Path) From 64ccce17eb2e67a7dfa8f8a1cef97ddfa83d3235 Mon Sep 17 00:00:00 2001 From: yashksaini-coder Date: Mon, 1 Sep 2025 02:03:51 +0530 Subject: [PATCH 2/6] fix(app): 882 Comprehensive cross-platform path handling utilities --- docs/conf.py | 4 +- examples/kademlia/kademlia.py | 5 +- libp2p/utils/logging.py | 14 +- libp2p/utils/paths.py | 163 +++++++++++++++++++---- scripts/audit_paths.py | 241 +++++++++++++++++++--------------- tests/utils/test_paths.py | 99 +++++++++++--- 6 files changed, 365 insertions(+), 161 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 446252f1f..646183590 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -27,7 +27,9 @@ import tomli as tomllib # type: ignore (In case of >3.11 Pyrefly doesnt find tomli , which is right but a false flag) # Path to pyproject.toml (assuming conf.py is in a 'docs' subdirectory) -pyproject_path = os.path.join(os.path.dirname(__file__), "..", "pyproject.toml") +from libp2p.utils.paths import get_project_root, join_paths + +pyproject_path = join_paths(get_project_root(), "pyproject.toml") with open(pyproject_path, "rb") as f: pyproject_data = tomllib.load(f) diff --git a/examples/kademlia/kademlia.py b/examples/kademlia/kademlia.py index 5daa70d76..faaa66be7 100644 --- a/examples/kademlia/kademlia.py +++ b/examples/kademlia/kademlia.py @@ -41,6 +41,7 @@ from libp2p.tools.utils import ( info_from_p2p_addr, ) +from libp2p.utils.paths import get_script_dir, join_paths # Configure logging logging.basicConfig( @@ -53,8 +54,8 @@ # Configure DHT module loggers to inherit from the parent logger # This ensures all kademlia-example.* loggers use the same configuration # Get the directory where this script is located -SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) -SERVER_ADDR_LOG = os.path.join(SCRIPT_DIR, "server_node_addr.txt") +SCRIPT_DIR = get_script_dir(__file__) +SERVER_ADDR_LOG = join_paths(SCRIPT_DIR, "server_node_addr.txt") # Set the level for all child loggers for module in [ diff --git a/libp2p/utils/logging.py b/libp2p/utils/logging.py index 3458a41e7..acc673734 100644 --- a/libp2p/utils/logging.py +++ b/libp2p/utils/logging.py @@ -1,7 +1,4 @@ import atexit -from datetime import ( - datetime, -) import logging import logging.handlers import os @@ -148,13 +145,10 @@ def setup_logging() -> None: log_path = Path(log_file) log_path.parent.mkdir(parents=True, exist_ok=True) else: - # Default log file with timestamp and unique identifier - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") - unique_id = os.urandom(4).hex() # Add a unique identifier to prevent collisions - if os.name == "nt": # Windows - log_file = f"C:\\Windows\\Temp\\py-libp2p_{timestamp}_{unique_id}.log" - else: # Unix-like - log_file = f"/tmp/py-libp2p_{timestamp}_{unique_id}.log" + # Use cross-platform temp file creation + from libp2p.utils.paths import create_temp_file + + log_file = str(create_temp_file(prefix="py-libp2p_", suffix=".log")) # Print the log file path so users know where to find it print(f"Logging to: {log_file}", file=sys.stderr) diff --git a/libp2p/utils/paths.py b/libp2p/utils/paths.py index 27924d8fc..23f10dc6f 100644 --- a/libp2p/utils/paths.py +++ b/libp2p/utils/paths.py @@ -6,9 +6,10 @@ """ import os -import tempfile from pathlib import Path -from typing import Union, Optional +import sys +import tempfile +from typing import Union PathLike = Union[str, Path] @@ -16,9 +17,10 @@ def get_temp_dir() -> Path: """ Get cross-platform temporary directory. - + Returns: Path: Platform-specific temporary directory path + """ return Path(tempfile.gettempdir()) @@ -26,9 +28,10 @@ def get_temp_dir() -> Path: def get_project_root() -> Path: """ Get the project root directory. - + Returns: Path: Path to the py-libp2p project root + """ # Navigate from libp2p/utils/paths.py to project root return Path(__file__).parent.parent.parent @@ -37,12 +40,13 @@ def get_project_root() -> Path: def join_paths(*parts: PathLike) -> Path: """ Cross-platform path joining. - + Args: *parts: Path components to join - + Returns: Path: Joined path using platform-appropriate separator + """ return Path(*parts) @@ -50,12 +54,13 @@ def join_paths(*parts: PathLike) -> Path: def ensure_dir_exists(path: PathLike) -> Path: """ Ensure directory exists, create if needed. - + Args: path: Directory path to ensure exists - + Returns: Path: Path object for the directory + """ path_obj = Path(path) path_obj.mkdir(parents=True, exist_ok=True) @@ -65,64 +70,74 @@ def ensure_dir_exists(path: PathLike) -> Path: def get_config_dir() -> Path: """ Get user config directory (cross-platform). - + Returns: Path: Platform-specific config directory + """ - if os.name == 'nt': # Windows - appdata = os.environ.get('APPDATA', '') + if os.name == "nt": # Windows + appdata = os.environ.get("APPDATA", "") if appdata: - return Path(appdata) / 'py-libp2p' + return Path(appdata) / "py-libp2p" else: # Fallback to user home directory - return Path.home() / 'AppData' / 'Roaming' / 'py-libp2p' + return Path.home() / "AppData" / "Roaming" / "py-libp2p" else: # Unix-like (Linux, macOS) - return Path.home() / '.config' / 'py-libp2p' + return Path.home() / ".config" / "py-libp2p" -def get_script_dir(script_path: Optional[PathLike] = None) -> Path: +def get_script_dir(script_path: PathLike | None = None) -> Path: """ Get the directory containing a script file. - + Args: script_path: Path to the script file. If None, uses __file__ - + Returns: Path: Directory containing the script + + Raises: + RuntimeError: If script path cannot be determined + """ if script_path is None: # This will be the directory of the calling script import inspect + frame = inspect.currentframe() if frame and frame.f_back: - script_path = frame.f_back.f_globals.get('__file__') + script_path = frame.f_back.f_globals.get("__file__") else: raise RuntimeError("Could not determine script path") - + + if script_path is None: + raise RuntimeError("Script path is None") + return Path(script_path).parent.absolute() def create_temp_file(prefix: str = "py-libp2p_", suffix: str = ".log") -> Path: """ Create a temporary file with a unique name. - + Args: prefix: File name prefix suffix: File name suffix - + Returns: Path: Path to the created temporary file + """ temp_dir = get_temp_dir() # Create a unique filename using timestamp and random bytes - import time import secrets - + import time + timestamp = time.strftime("%Y%m%d_%H%M%S") microseconds = f"{time.time() % 1:.6f}"[2:] # Get microseconds as string unique_id = secrets.token_hex(4) filename = f"{prefix}{timestamp}_{microseconds}_{unique_id}{suffix}" - + temp_file = temp_dir / filename # Create the file by touching it temp_file.touch() @@ -132,17 +147,18 @@ def create_temp_file(prefix: str = "py-libp2p_", suffix: str = ".log") -> Path: def resolve_relative_path(base_path: PathLike, relative_path: PathLike) -> Path: """ Resolve a relative path from a base path. - + Args: base_path: Base directory path relative_path: Relative path to resolve - + Returns: Path: Resolved absolute path + """ base = Path(base_path).resolve() relative = Path(relative_path) - + if relative.is_absolute(): return relative else: @@ -152,11 +168,100 @@ def resolve_relative_path(base_path: PathLike, relative_path: PathLike) -> Path: def normalize_path(path: PathLike) -> Path: """ Normalize a path, resolving any symbolic links and relative components. - + Args: path: Path to normalize - + Returns: Path: Normalized absolute path + """ return Path(path).resolve() + + +def get_venv_path() -> Path | None: + """ + Get virtual environment path if active. + + Returns: + Path: Virtual environment path if active, None otherwise + + """ + venv_path = os.environ.get("VIRTUAL_ENV") + if venv_path: + return Path(venv_path) + return None + + +def get_python_executable() -> Path: + """ + Get current Python executable path. + + Returns: + Path: Path to the current Python executable + + """ + return Path(sys.executable) + + +def find_executable(name: str) -> Path | None: + """ + Find executable in system PATH. + + Args: + name: Name of the executable to find + + Returns: + Path: Path to executable if found, None otherwise + + """ + # Check if name already contains path + if os.path.dirname(name): + path = Path(name) + if path.exists() and os.access(path, os.X_OK): + return path + return None + + # Search in PATH + for path_dir in os.environ.get("PATH", "").split(os.pathsep): + if not path_dir: + continue + path = Path(path_dir) / name + if path.exists() and os.access(path, os.X_OK): + return path + + return None + + +def get_script_binary_path() -> Path: + """ + Get path to script's binary directory. + + Returns: + Path: Directory containing the script's binary + + """ + return get_python_executable().parent + + +def get_binary_path(binary_name: str) -> Path | None: + """ + Find binary in PATH or virtual environment. + + Args: + binary_name: Name of the binary to find + + Returns: + Path: Path to binary if found, None otherwise + + """ + # First check in virtual environment if active + venv_path = get_venv_path() + if venv_path: + venv_bin = venv_path / "bin" if os.name != "nt" else venv_path / "Scripts" + binary_path = venv_bin / binary_name + if binary_path.exists() and os.access(binary_path, os.X_OK): + return binary_path + + # Fall back to system PATH + return find_executable(binary_name) diff --git a/scripts/audit_paths.py b/scripts/audit_paths.py index b0079869c..80df11f87 100644 --- a/scripts/audit_paths.py +++ b/scripts/audit_paths.py @@ -6,215 +6,248 @@ cross-platform path utilities. """ -import re -import os -from pathlib import Path -from typing import List, Dict, Any import argparse +from pathlib import Path +import re +from typing import Any -def scan_for_path_issues(directory: Path) -> Dict[str, List[Dict[str, Any]]]: +def scan_for_path_issues(directory: Path) -> dict[str, list[dict[str, Any]]]: """ Scan for path handling issues in the codebase. - + Args: directory: Root directory to scan - + Returns: Dictionary mapping issue types to lists of found issues + """ issues = { - 'hard_coded_slash': [], - 'os_path_join': [], - 'temp_hardcode': [], - 'os_path_dirname': [], - 'os_path_abspath': [], - 'direct_path_concat': [], + "hard_coded_slash": [], + "os_path_join": [], + "temp_hardcode": [], + "os_path_dirname": [], + "os_path_abspath": [], + "direct_path_concat": [], } - + # Patterns to search for patterns = { - 'hard_coded_slash': r'["\'][^"\']*\/[^"\']*["\']', - 'os_path_join': r'os\.path\.join\(', - 'temp_hardcode': r'["\']\/tmp\/|["\']C:\\\\', - 'os_path_dirname': r'os\.path\.dirname\(', - 'os_path_abspath': r'os\.path\.abspath\(', - 'direct_path_concat': r'["\'][^"\']*["\']\s*\+\s*["\'][^"\']*["\']', + "hard_coded_slash": r'["\'][^"\']*\/[^"\']*["\']', + "os_path_join": r"os\.path\.join\(", + "temp_hardcode": r'["\']\/tmp\/|["\']C:\\\\', + "os_path_dirname": r"os\.path\.dirname\(", + "os_path_abspath": r"os\.path\.abspath\(", + "direct_path_concat": r'["\'][^"\']*["\']\s*\+\s*["\'][^"\']*["\']', } - + # Files to exclude exclude_patterns = [ - r'__pycache__', - r'\.git', - r'\.pytest_cache', - r'\.mypy_cache', - r'\.ruff_cache', - r'env/', - r'venv/', - r'\.venv/', + r"__pycache__", + r"\.git", + r"\.pytest_cache", + r"\.mypy_cache", + r"\.ruff_cache", + r"env/", + r"venv/", + r"\.venv/", ] - + for py_file in directory.rglob("*.py"): # Skip excluded files if any(re.search(pattern, str(py_file)) for pattern in exclude_patterns): continue - + try: - content = py_file.read_text(encoding='utf-8') + content = py_file.read_text(encoding="utf-8") except UnicodeDecodeError: print(f"Warning: Could not read {py_file} (encoding issue)") continue - + for issue_type, pattern in patterns.items(): matches = re.finditer(pattern, content, re.MULTILINE) for match in matches: - line_num = content[:match.start()].count('\n') + 1 - line_content = content.split('\n')[line_num - 1].strip() - - issues[issue_type].append({ - 'file': py_file, - 'line': line_num, - 'content': match.group(), - 'full_line': line_content, - 'relative_path': py_file.relative_to(directory) - }) - + line_num = content[: match.start()].count("\n") + 1 + line_content = content.split("\n")[line_num - 1].strip() + + issues[issue_type].append( + { + "file": py_file, + "line": line_num, + "content": match.group(), + "full_line": line_content, + "relative_path": py_file.relative_to(directory), + } + ) + return issues -def generate_migration_suggestions(issues: Dict[str, List[Dict[str, Any]]]) -> str: +def generate_migration_suggestions(issues: dict[str, list[dict[str, Any]]]) -> str: """ Generate migration suggestions for found issues. - + Args: issues: Dictionary of found issues - + Returns: Formatted string with migration suggestions + """ suggestions = [] - + for issue_type, issue_list in issues.items(): if not issue_list: continue - + suggestions.append(f"\n## {issue_type.replace('_', ' ').title()}") suggestions.append(f"Found {len(issue_list)} instances:") - + for issue in issue_list[:10]: # Show first 10 examples suggestions.append(f"\n### {issue['relative_path']}:{issue['line']}") - suggestions.append(f"```python") - suggestions.append(f"# Current code:") + suggestions.append("```python") + suggestions.append("# Current code:") suggestions.append(f"{issue['full_line']}") - suggestions.append(f"```") - + suggestions.append("```") + # Add migration suggestion based on issue type - if issue_type == 'os_path_join': - suggestions.append(f"```python") - suggestions.append(f"# Suggested fix:") - suggestions.append(f"from libp2p.utils.paths import join_paths") - suggestions.append(f"# Replace os.path.join(a, b, c) with join_paths(a, b, c)") - suggestions.append(f"```") - elif issue_type == 'temp_hardcode': - suggestions.append(f"```python") - suggestions.append(f"# Suggested fix:") - suggestions.append(f"from libp2p.utils.paths import get_temp_dir, create_temp_file") - suggestions.append(f"# Replace hard-coded temp paths with get_temp_dir() or create_temp_file()") - suggestions.append(f"```") - elif issue_type == 'os_path_dirname': - suggestions.append(f"```python") - suggestions.append(f"# Suggested fix:") - suggestions.append(f"from libp2p.utils.paths import get_script_dir") - suggestions.append(f"# Replace os.path.dirname(os.path.abspath(__file__)) with get_script_dir(__file__)") - suggestions.append(f"```") - + if issue_type == "os_path_join": + suggestions.append("```python") + suggestions.append("# Suggested fix:") + suggestions.append("from libp2p.utils.paths import join_paths") + suggestions.append( + "# Replace os.path.join(a, b, c) with join_paths(a, b, c)" + ) + suggestions.append("```") + elif issue_type == "temp_hardcode": + suggestions.append("```python") + suggestions.append("# Suggested fix:") + suggestions.append( + "from libp2p.utils.paths import get_temp_dir, create_temp_file" + ) + temp_fix_msg = ( + "# Replace hard-coded temp paths with get_temp_dir() or " + "create_temp_file()" + ) + suggestions.append(temp_fix_msg) + suggestions.append("```") + elif issue_type == "os_path_dirname": + suggestions.append("```python") + suggestions.append("# Suggested fix:") + suggestions.append("from libp2p.utils.paths import get_script_dir") + script_dir_fix_msg = ( + "# Replace os.path.dirname(os.path.abspath(__file__)) with " + "get_script_dir(__file__)" + ) + suggestions.append(script_dir_fix_msg) + suggestions.append("```") + if len(issue_list) > 10: suggestions.append(f"\n... and {len(issue_list) - 10} more instances") - + return "\n".join(suggestions) -def generate_summary_report(issues: Dict[str, List[Dict[str, Any]]]) -> str: +def generate_summary_report(issues: dict[str, list[dict[str, Any]]]) -> str: """ Generate a summary report of all found issues. - + Args: issues: Dictionary of found issues - + Returns: Formatted summary report + """ total_issues = sum(len(issue_list) for issue_list in issues.values()) - + report = [ "# Cross-Platform Path Handling Audit Report", "", - f"## Summary", + "## Summary", f"Total issues found: {total_issues}", "", "## Issue Breakdown:", ] - + for issue_type, issue_list in issues.items(): if issue_list: - report.append(f"- **{issue_type.replace('_', ' ').title()}**: {len(issue_list)} instances") - + issue_title = issue_type.replace("_", " ").title() + instances_count = len(issue_list) + report.append(f"- **{issue_title}**: {instances_count} instances") + report.append("") report.append("## Priority Matrix:") report.append("") report.append("| Priority | Issue Type | Risk Level | Impact |") report.append("|----------|------------|------------|---------|") - + priority_map = { - 'temp_hardcode': ('šŸ”“ P0', 'HIGH', 'Core functionality fails on different platforms'), - 'os_path_join': ('🟔 P1', 'MEDIUM', 'Examples and utilities may break'), - 'os_path_dirname': ('🟔 P1', 'MEDIUM', 'Script location detection issues'), - 'hard_coded_slash': ('🟢 P2', 'LOW', 'Future-proofing and consistency'), - 'os_path_abspath': ('🟢 P2', 'LOW', 'Path resolution consistency'), - 'direct_path_concat': ('🟢 P2', 'LOW', 'String concatenation issues'), + "temp_hardcode": ( + "šŸ”“ P0", + "HIGH", + "Core functionality fails on different platforms", + ), + "os_path_join": ("🟔 P1", "MEDIUM", "Examples and utilities may break"), + "os_path_dirname": ("🟔 P1", "MEDIUM", "Script location detection issues"), + "hard_coded_slash": ("🟢 P2", "LOW", "Future-proofing and consistency"), + "os_path_abspath": ("🟢 P2", "LOW", "Path resolution consistency"), + "direct_path_concat": ("🟢 P2", "LOW", "String concatenation issues"), } - + for issue_type, issue_list in issues.items(): if issue_list: - priority, risk, impact = priority_map.get(issue_type, ('🟢 P2', 'LOW', 'General improvement')) - report.append(f"| {priority} | {issue_type.replace('_', ' ').title()} | {risk} | {impact} |") - + priority, risk, impact = priority_map.get( + issue_type, ("🟢 P2", "LOW", "General improvement") + ) + issue_title = issue_type.replace("_", " ").title() + report.append(f"| {priority} | {issue_title} | {risk} | {impact} |") + return "\n".join(report) def main(): """Main function to run the audit.""" - parser = argparse.ArgumentParser(description="Audit py-libp2p codebase for path handling issues") - parser.add_argument("--directory", default=".", help="Directory to scan (default: current directory)") + parser = argparse.ArgumentParser( + description="Audit py-libp2p codebase for path handling issues" + ) + parser.add_argument( + "--directory", + default=".", + help="Directory to scan (default: current directory)", + ) parser.add_argument("--output", help="Output file for detailed report") - parser.add_argument("--summary-only", action="store_true", help="Only show summary report") - + parser.add_argument( + "--summary-only", action="store_true", help="Only show summary report" + ) + args = parser.parse_args() - + directory = Path(args.directory) if not directory.exists(): print(f"Error: Directory {directory} does not exist") return 1 - + print("šŸ” Scanning for path handling issues...") issues = scan_for_path_issues(directory) - + # Generate and display summary summary = generate_summary_report(issues) print(summary) - + if not args.summary_only: # Generate detailed suggestions suggestions = generate_migration_suggestions(issues) - + if args.output: - with open(args.output, 'w', encoding='utf-8') as f: + with open(args.output, "w", encoding="utf-8") as f: f.write(summary) f.write(suggestions) print(f"\nšŸ“„ Detailed report saved to {args.output}") else: print(suggestions) - + return 0 diff --git a/tests/utils/test_paths.py b/tests/utils/test_paths.py index fcd4c08a5..e5247cdb3 100644 --- a/tests/utils/test_paths.py +++ b/tests/utils/test_paths.py @@ -3,20 +3,24 @@ """ import os -import tempfile from pathlib import Path -import pytest +import tempfile from libp2p.utils.paths import ( - get_temp_dir, - get_project_root, - join_paths, + create_temp_file, ensure_dir_exists, + find_executable, + get_binary_path, get_config_dir, + get_project_root, + get_python_executable, + get_script_binary_path, get_script_dir, - create_temp_file, - resolve_relative_path, + get_temp_dir, + get_venv_path, + join_paths, normalize_path, + resolve_relative_path, ) @@ -84,8 +88,8 @@ def test_get_config_dir(self): """Test platform-specific config directory.""" config_dir = get_config_dir() assert isinstance(config_dir, Path) - - if os.name == 'nt': # Windows + + if os.name == "nt": # Windows # Should be in AppData/Roaming or user home assert "AppData" in str(config_dir) or "py-libp2p" in str(config_dir) else: # Unix-like @@ -120,7 +124,7 @@ def test_resolve_relative_path(self, tmp_path): """Test relative path resolution.""" base_path = tmp_path / "base" base_path.mkdir() - + # Test relative path relative_path = "subdir/file.txt" result = resolve_relative_path(base_path, relative_path) @@ -128,7 +132,7 @@ def test_resolve_relative_path(self, tmp_path): assert result == expected # Test absolute path (platform-agnostic) - if os.name == 'nt': # Windows + if os.name == "nt": # Windows absolute_path = "C:\\absolute\\path" else: # Unix-like absolute_path = "/absolute/path" @@ -149,14 +153,72 @@ def test_normalize_path(self, tmp_path): assert result.is_absolute() assert result == absolute_path.resolve() + def test_get_venv_path(self, monkeypatch): + """Test virtual environment path detection.""" + # Test when no virtual environment is active + # Temporarily clear VIRTUAL_ENV to test the "no venv" case + monkeypatch.delenv("VIRTUAL_ENV", raising=False) + result = get_venv_path() + assert result is None + + # Test when virtual environment is active + test_venv_path = "/path/to/venv" + monkeypatch.setenv("VIRTUAL_ENV", test_venv_path) + result = get_venv_path() + assert result == Path(test_venv_path) + + def test_get_python_executable(self): + """Test Python executable path detection.""" + result = get_python_executable() + assert isinstance(result, Path) + assert result.exists() + assert result.name.startswith("python") + + def test_find_executable(self): + """Test executable finding in PATH.""" + # Test with non-existent executable + result = find_executable("nonexistent_executable") + assert result is None + + # Test with existing executable (python should be available) + result = find_executable("python") + if result: + assert isinstance(result, Path) + assert result.exists() + + def test_get_script_binary_path(self): + """Test script binary path detection.""" + result = get_script_binary_path() + assert isinstance(result, Path) + assert result.exists() + assert result.is_dir() + + def test_get_binary_path(self, monkeypatch): + """Test binary path resolution with virtual environment.""" + # Test when no virtual environment is active + result = get_binary_path("python") + if result: + assert isinstance(result, Path) + assert result.exists() + + # Test when virtual environment is active + test_venv_path = "/path/to/venv" + monkeypatch.setenv("VIRTUAL_ENV", test_venv_path) + # This test is more complex as it depends on the actual venv structure + # We'll just verify the function doesn't crash + result = get_binary_path("python") + # Result can be None if binary not found in venv + if result: + assert isinstance(result, Path) + class TestCrossPlatformCompatibility: """Test cross-platform compatibility.""" def test_config_dir_platform_specific_windows(self, monkeypatch): """Test config directory respects Windows conventions.""" - monkeypatch.setattr('os.name', 'nt') - monkeypatch.setenv('APPDATA', 'C:\\Users\\Test\\AppData\\Roaming') + monkeypatch.setattr("os.name", "nt") + monkeypatch.setenv("APPDATA", "C:\\Users\\Test\\AppData\\Roaming") config_dir = get_config_dir() assert "AppData" in str(config_dir) assert "py-libp2p" in str(config_dir) @@ -167,9 +229,9 @@ def test_path_separators_consistent(self): result = join_paths("dir1", "dir2", "file.txt") expected = Path("dir1") / "dir2" / "file.txt" assert result == expected - + # Test that the result uses correct separators for the platform - if os.name == 'nt': # Windows + if os.name == "nt": # Windows assert "\\" in str(result) or "/" in str(result) else: # Unix-like assert "/" in str(result) @@ -211,3 +273,10 @@ def test_existing_functionality_preserved(self): assert isinstance(create_temp_file(), Path) assert isinstance(resolve_relative_path(".", "test"), Path) assert isinstance(normalize_path("."), Path) + assert isinstance(get_python_executable(), Path) + assert isinstance(get_script_binary_path(), Path) + + # Test optional return types + venv_path = get_venv_path() + if venv_path is not None: + assert isinstance(venv_path, Path) From 42c8937a8d419ae31c3b34828e841d49e08c8c9f Mon Sep 17 00:00:00 2001 From: yashksaini-coder Date: Mon, 1 Sep 2025 02:53:53 +0530 Subject: [PATCH 3/6] build(app): Add fallback to os.path.join + newsfragment 886 --- docs/conf.py | 15 +++++++++------ newsfragments/886.bugfix.rst | 2 ++ 2 files changed, 11 insertions(+), 6 deletions(-) create mode 100644 newsfragments/886.bugfix.rst diff --git a/docs/conf.py b/docs/conf.py index 646183590..6f7be7095 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -16,20 +16,23 @@ # sys.path.insert(0, os.path.abspath('.')) import doctest -import os import sys from unittest.mock import MagicMock try: import tomllib -except ModuleNotFoundError: +except ImportError: # For Python < 3.11 - import tomli as tomllib # type: ignore (In case of >3.11 Pyrefly doesnt find tomli , which is right but a false flag) + import tomli as tomllib # type: ignore # Path to pyproject.toml (assuming conf.py is in a 'docs' subdirectory) -from libp2p.utils.paths import get_project_root, join_paths - -pyproject_path = join_paths(get_project_root(), "pyproject.toml") +try: + from libp2p.utils.paths import get_project_root, join_paths + pyproject_path = join_paths(get_project_root(), "pyproject.toml") +except ImportError: + # Fallback for documentation builds where libp2p is not available + import os + pyproject_path = os.path.join(os.path.dirname(__file__), "..", "pyproject.toml") with open(pyproject_path, "rb") as f: pyproject_data = tomllib.load(f) diff --git a/newsfragments/886.bugfix.rst b/newsfragments/886.bugfix.rst new file mode 100644 index 000000000..add7c4abc --- /dev/null +++ b/newsfragments/886.bugfix.rst @@ -0,0 +1,2 @@ +Fixed cross-platform path handling by replacing hardcoded OS-specific +paths with standardized utilities in core modules and examples. \ No newline at end of file From fcb35084b399d5c0d228b3594b73772779b74d6f Mon Sep 17 00:00:00 2001 From: yashksaini-coder Date: Mon, 1 Sep 2025 03:14:09 +0530 Subject: [PATCH 4/6] fix(docs): Update tomllib import handling and streamline pyproject path resolution --- docs/conf.py | 13 ++++--------- newsfragments/886.bugfix.rst | 4 ++-- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 6f7be7095..446252f1f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -16,23 +16,18 @@ # sys.path.insert(0, os.path.abspath('.')) import doctest +import os import sys from unittest.mock import MagicMock try: import tomllib -except ImportError: +except ModuleNotFoundError: # For Python < 3.11 - import tomli as tomllib # type: ignore + import tomli as tomllib # type: ignore (In case of >3.11 Pyrefly doesnt find tomli , which is right but a false flag) # Path to pyproject.toml (assuming conf.py is in a 'docs' subdirectory) -try: - from libp2p.utils.paths import get_project_root, join_paths - pyproject_path = join_paths(get_project_root(), "pyproject.toml") -except ImportError: - # Fallback for documentation builds where libp2p is not available - import os - pyproject_path = os.path.join(os.path.dirname(__file__), "..", "pyproject.toml") +pyproject_path = os.path.join(os.path.dirname(__file__), "..", "pyproject.toml") with open(pyproject_path, "rb") as f: pyproject_data = tomllib.load(f) diff --git a/newsfragments/886.bugfix.rst b/newsfragments/886.bugfix.rst index add7c4abc..1ebf38d1b 100644 --- a/newsfragments/886.bugfix.rst +++ b/newsfragments/886.bugfix.rst @@ -1,2 +1,2 @@ -Fixed cross-platform path handling by replacing hardcoded OS-specific -paths with standardized utilities in core modules and examples. \ No newline at end of file +Fixed cross-platform path handling by replacing hardcoded OS-specific +paths with standardized utilities in core modules and examples. From 84c1a7031a39cc2efaad412c42850fc29cd6d695 Mon Sep 17 00:00:00 2001 From: yashksaini-coder Date: Tue, 2 Sep 2025 01:23:12 +0530 Subject: [PATCH 5/6] Enhance logging cleanup: Introduce global handler management for proper resource cleanup on exit and during logging setup. Update tests to ensure file handlers are closed correctly across platforms. --- libp2p/utils/logging.py | 22 ++++++++++++++-- tests/utils/test_logging.py | 52 ++++++++++++++++++++++++++----------- tests/utils/test_paths.py | 8 ++++++ 3 files changed, 65 insertions(+), 17 deletions(-) diff --git a/libp2p/utils/logging.py b/libp2p/utils/logging.py index acc673734..a9da0d657 100644 --- a/libp2p/utils/logging.py +++ b/libp2p/utils/logging.py @@ -18,6 +18,9 @@ # Store the current listener to stop it on exit _current_listener: logging.handlers.QueueListener | None = None +# Store the handlers for proper cleanup +_current_handlers: list[logging.Handler] = [] + # Event to track when the listener is ready _listener_ready = threading.Event() @@ -92,7 +95,7 @@ def setup_logging() -> None: - Child loggers inherit their parent's level unless explicitly set - The root libp2p logger controls the default level """ - global _current_listener, _listener_ready + global _current_listener, _listener_ready, _current_handlers # Reset the event _listener_ready.clear() @@ -101,6 +104,12 @@ def setup_logging() -> None: if _current_listener is not None: _current_listener.stop() _current_listener = None + + # Close and clear existing handlers + for handler in _current_handlers: + if isinstance(handler, logging.FileHandler): + handler.close() + _current_handlers.clear() # Get the log level from environment variable debug_str = os.environ.get("LIBP2P_DEBUG", "") @@ -189,6 +198,9 @@ def setup_logging() -> None: logger.setLevel(level) logger.propagate = False # Prevent message duplication + # Store handlers globally for cleanup + _current_handlers.extend(handlers) + # Start the listener AFTER configuring all loggers _current_listener = logging.handlers.QueueListener( log_queue, *handlers, respect_handler_level=True @@ -203,7 +215,13 @@ def setup_logging() -> None: @atexit.register def cleanup_logging() -> None: """Clean up logging resources on exit.""" - global _current_listener + global _current_listener, _current_handlers if _current_listener is not None: _current_listener.stop() _current_listener = None + + # Close all file handlers to ensure proper cleanup on Windows + for handler in _current_handlers: + if isinstance(handler, logging.FileHandler): + handler.close() + _current_handlers.clear() diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py index 603af5e1c..05c76ec2d 100644 --- a/tests/utils/test_logging.py +++ b/tests/utils/test_logging.py @@ -15,6 +15,7 @@ import trio from libp2p.utils.logging import ( + _current_handlers, _current_listener, _listener_ready, log_queue, @@ -24,13 +25,19 @@ def _reset_logging(): """Reset all logging state.""" - global _current_listener, _listener_ready + global _current_listener, _listener_ready, _current_handlers # Stop existing listener if any if _current_listener is not None: _current_listener.stop() _current_listener = None + # Close all file handlers to ensure proper cleanup on Windows + for handler in _current_handlers: + if isinstance(handler, logging.FileHandler): + handler.close() + _current_handlers.clear() + # Reset the event _listener_ready = threading.Event() @@ -173,6 +180,15 @@ async def test_custom_log_file(clean_env): # Stop the listener to ensure all messages are written if _current_listener is not None: _current_listener.stop() + + # Give a moment for the listener to fully stop + await trio.sleep(0.05) + + # Close all file handlers to release the file + for handler in _current_handlers: + if isinstance(handler, logging.FileHandler): + handler.flush() # Ensure all writes are flushed + handler.close() # Check if the file exists and contains our message assert log_file.exists() @@ -185,17 +201,14 @@ async def test_default_log_file(clean_env): """Test logging to the default file path.""" os.environ["LIBP2P_DEBUG"] = "INFO" - with patch("libp2p.utils.logging.datetime") as mock_datetime: - # Mock the timestamp to have a predictable filename - mock_datetime.now.return_value.strftime.return_value = "20240101_120000" - + with patch("libp2p.utils.paths.create_temp_file") as mock_create_temp: + # Mock the temp file creation to return a predictable path + mock_temp_file = Path(tempfile.gettempdir()) / "test_py-libp2p_20240101_120000.log" + mock_create_temp.return_value = mock_temp_file + # Remove the log file if it exists - if os.name == "nt": # Windows - log_file = Path("C:/Windows/Temp/20240101_120000_py-libp2p.log") - else: # Unix-like - log_file = Path("/tmp/20240101_120000_py-libp2p.log") - log_file.unlink(missing_ok=True) - + mock_temp_file.unlink(missing_ok=True) + setup_logging() # Wait for the listener to be ready @@ -210,10 +223,19 @@ async def test_default_log_file(clean_env): # Stop the listener to ensure all messages are written if _current_listener is not None: _current_listener.stop() - - # Check the default log file - if log_file.exists(): # Only check content if we have write permission - content = log_file.read_text() + + # Give a moment for the listener to fully stop + await trio.sleep(0.05) + + # Close all file handlers to release the file + for handler in _current_handlers: + if isinstance(handler, logging.FileHandler): + handler.flush() # Ensure all writes are flushed + handler.close() + + # Check the mocked temp file + if mock_temp_file.exists(): + content = mock_temp_file.read_text() assert "Test message" in content diff --git a/tests/utils/test_paths.py b/tests/utils/test_paths.py index e5247cdb3..a8eb4ed9b 100644 --- a/tests/utils/test_paths.py +++ b/tests/utils/test_paths.py @@ -6,6 +6,8 @@ from pathlib import Path import tempfile +import pytest + from libp2p.utils.paths import ( create_temp_file, ensure_dir_exists, @@ -217,6 +219,12 @@ class TestCrossPlatformCompatibility: def test_config_dir_platform_specific_windows(self, monkeypatch): """Test config directory respects Windows conventions.""" + import platform + + # Only run this test on Windows systems + if platform.system() != "Windows": + pytest.skip("This test only runs on Windows systems") + monkeypatch.setattr("os.name", "nt") monkeypatch.setenv("APPDATA", "C:\\Users\\Test\\AppData\\Roaming") config_dir = get_config_dir() From 145727a9baae6948575a49c2d6da1a0ff63a32a9 Mon Sep 17 00:00:00 2001 From: yashksaini-coder Date: Tue, 2 Sep 2025 01:39:24 +0530 Subject: [PATCH 6/6] Refactor logging code: Remove unnecessary blank lines in logging setup and cleanup functions for improved readability. Update tests to reflect formatting changes. --- libp2p/utils/logging.py | 6 +++--- tests/utils/test_logging.py | 16 +++++++++------- tests/utils/test_paths.py | 4 ++-- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/libp2p/utils/logging.py b/libp2p/utils/logging.py index a9da0d657..b23136f51 100644 --- a/libp2p/utils/logging.py +++ b/libp2p/utils/logging.py @@ -104,7 +104,7 @@ def setup_logging() -> None: if _current_listener is not None: _current_listener.stop() _current_listener = None - + # Close and clear existing handlers for handler in _current_handlers: if isinstance(handler, logging.FileHandler): @@ -200,7 +200,7 @@ def setup_logging() -> None: # Store handlers globally for cleanup _current_handlers.extend(handlers) - + # Start the listener AFTER configuring all loggers _current_listener = logging.handlers.QueueListener( log_queue, *handlers, respect_handler_level=True @@ -219,7 +219,7 @@ def cleanup_logging() -> None: if _current_listener is not None: _current_listener.stop() _current_listener = None - + # Close all file handlers to ensure proper cleanup on Windows for handler in _current_handlers: if isinstance(handler, logging.FileHandler): diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py index 05c76ec2d..06be05c78 100644 --- a/tests/utils/test_logging.py +++ b/tests/utils/test_logging.py @@ -180,10 +180,10 @@ async def test_custom_log_file(clean_env): # Stop the listener to ensure all messages are written if _current_listener is not None: _current_listener.stop() - + # Give a moment for the listener to fully stop await trio.sleep(0.05) - + # Close all file handlers to release the file for handler in _current_handlers: if isinstance(handler, logging.FileHandler): @@ -203,12 +203,14 @@ async def test_default_log_file(clean_env): with patch("libp2p.utils.paths.create_temp_file") as mock_create_temp: # Mock the temp file creation to return a predictable path - mock_temp_file = Path(tempfile.gettempdir()) / "test_py-libp2p_20240101_120000.log" + mock_temp_file = ( + Path(tempfile.gettempdir()) / "test_py-libp2p_20240101_120000.log" + ) mock_create_temp.return_value = mock_temp_file - + # Remove the log file if it exists mock_temp_file.unlink(missing_ok=True) - + setup_logging() # Wait for the listener to be ready @@ -223,10 +225,10 @@ async def test_default_log_file(clean_env): # Stop the listener to ensure all messages are written if _current_listener is not None: _current_listener.stop() - + # Give a moment for the listener to fully stop await trio.sleep(0.05) - + # Close all file handlers to release the file for handler in _current_handlers: if isinstance(handler, logging.FileHandler): diff --git a/tests/utils/test_paths.py b/tests/utils/test_paths.py index a8eb4ed9b..421fc557a 100644 --- a/tests/utils/test_paths.py +++ b/tests/utils/test_paths.py @@ -220,11 +220,11 @@ class TestCrossPlatformCompatibility: def test_config_dir_platform_specific_windows(self, monkeypatch): """Test config directory respects Windows conventions.""" import platform - + # Only run this test on Windows systems if platform.system() != "Windows": pytest.skip("This test only runs on Windows systems") - + monkeypatch.setattr("os.name", "nt") monkeypatch.setenv("APPDATA", "C:\\Users\\Test\\AppData\\Roaming") config_dir = get_config_dir()