Skip to content

Commit 0ed57bf

Browse files
authored
Refactor file operations to use pathlib (#285)
1 parent f9aeadf commit 0ed57bf

File tree

1 file changed

+93
-88
lines changed

1 file changed

+93
-88
lines changed

platform.py

Lines changed: 93 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
import sys
3535
import shutil
3636
import logging
37-
from typing import Optional, Dict, List, Any
37+
from pathlib import Path
38+
from typing import Optional, Dict, List, Any, Union
3839

3940
from platformio.compat import IS_WINDOWS
4041
from platformio.public import PlatformBase, to_unix_path
@@ -95,8 +96,8 @@
9596
raise SystemExit(1)
9697

9798
# Set IDF_TOOLS_PATH to Pio core_dir
98-
PROJECT_CORE_DIR=ProjectConfig.get_instance().get("platformio", "core_dir")
99-
IDF_TOOLS_PATH=os.path.join(PROJECT_CORE_DIR)
99+
PROJECT_CORE_DIR = ProjectConfig.get_instance().get("platformio", "core_dir")
100+
IDF_TOOLS_PATH = PROJECT_CORE_DIR
100101
os.environ["IDF_TOOLS_PATH"] = IDF_TOOLS_PATH
101102
os.environ['IDF_PATH'] = ""
102103

@@ -130,51 +131,62 @@ def wrapper(*args, **kwargs):
130131

131132

132133
@safe_file_operation
133-
def safe_remove_file(path: str) -> bool:
134-
"""Safely remove a file with error handling."""
135-
if os.path.exists(path) and os.path.isfile(path):
136-
os.remove(path)
134+
def safe_remove_file(path: Union[str, Path]) -> bool:
135+
"""Safely remove a file with error handling using pathlib."""
136+
path = Path(path)
137+
if path.is_file() or path.is_symlink():
138+
path.unlink()
137139
logger.debug(f"File removed: {path}")
138140
return True
139141

140142

141143
@safe_file_operation
142-
def safe_remove_directory(path: str) -> bool:
143-
"""Safely remove directories with error handling."""
144-
if os.path.exists(path) and os.path.isdir(path):
144+
def safe_remove_directory(path: Union[str, Path]) -> bool:
145+
"""Safely remove directories with error handling using pathlib."""
146+
path = Path(path)
147+
if not path.exists():
148+
return True
149+
if path.is_symlink():
150+
path.unlink()
151+
elif path.is_dir():
145152
shutil.rmtree(path)
146153
logger.debug(f"Directory removed: {path}")
147154
return True
148155

149156

150157
@safe_file_operation
151-
def safe_remove_directory_pattern(base_path: str, pattern: str) -> bool:
152-
"""Safely remove directories matching a pattern with error handling."""
153-
if not os.path.exists(base_path):
158+
def safe_remove_directory_pattern(base_path: Union[str, Path], pattern: str) -> bool:
159+
"""Safely remove directories matching a pattern with error handling using pathlib."""
160+
base_path = Path(base_path)
161+
if not base_path.exists():
154162
return True
155163
# Find all directories matching the pattern in the base directory
156-
for item in os.listdir(base_path):
157-
item_path = os.path.join(base_path, item)
158-
if os.path.isdir(item_path) and fnmatch.fnmatch(item, pattern):
159-
shutil.rmtree(item_path)
160-
logger.debug(f"Directory removed: {item_path}")
164+
for item in base_path.rglob("*"):
165+
if fnmatch.fnmatch(item.name, pattern):
166+
if item.is_symlink():
167+
item.unlink()
168+
elif item.is_dir():
169+
shutil.rmtree(item)
170+
logger.debug(f"Directory removed: {item}")
161171
return True
162172

163173

164174
@safe_file_operation
165-
def safe_copy_file(src: str, dst: str) -> bool:
166-
"""Safely copy files with error handling."""
167-
os.makedirs(os.path.dirname(dst), exist_ok=True)
168-
shutil.copyfile(src, dst)
175+
def safe_copy_file(src: Union[str, Path], dst: Union[str, Path]) -> bool:
176+
"""Safely copy files with error handling using pathlib."""
177+
src, dst = Path(src), Path(dst)
178+
dst.parent.mkdir(parents=True, exist_ok=True)
179+
shutil.copy2(src, dst)
169180
logger.debug(f"File copied: {src} -> {dst}")
170181
return True
171182

172183

173184
@safe_file_operation
174-
def safe_copy_directory(src: str, dst: str) -> bool:
175-
"""Safely copy directories with error handling."""
176-
os.makedirs(os.path.dirname(dst), exist_ok=True)
177-
shutil.copytree(src, dst, dirs_exist_ok=True)
185+
def safe_copy_directory(src: Union[str, Path], dst: Union[str, Path]) -> bool:
186+
"""Safely copy directories with error handling using pathlib."""
187+
src, dst = Path(src), Path(dst)
188+
dst.parent.mkdir(parents=True, exist_ok=True)
189+
shutil.copytree(src, dst, dirs_exist_ok=True, copy_function=shutil.copy2, symlinks=True)
178190
logger.debug(f"Directory copied: {src} -> {dst}")
179191
return True
180192

@@ -190,11 +202,11 @@ def __init__(self, *args, **kwargs):
190202
self._mcu_config_cache = {}
191203

192204
@property
193-
def packages_dir(self) -> str:
205+
def packages_dir(self) -> Path:
194206
"""Get cached packages directory path."""
195207
if self._packages_dir is None:
196208
config = ProjectConfig.get_instance()
197-
self._packages_dir = config.get("platformio", "packages_dir")
209+
self._packages_dir = Path(config.get("platformio", "packages_dir"))
198210
return self._packages_dir
199211

200212
def _check_tl_install_version(self) -> bool:
@@ -213,10 +225,10 @@ def _check_tl_install_version(self) -> bool:
213225
return True
214226

215227
# Check if tool is already installed
216-
tl_install_path = os.path.join(self.packages_dir, tl_install_name)
217-
package_json_path = os.path.join(tl_install_path, "package.json")
228+
tl_install_path = self.packages_dir / tl_install_name
229+
package_json_path = tl_install_path / "package.json"
218230

219-
if not os.path.exists(package_json_path):
231+
if not package_json_path.exists():
220232
logger.info(f"{tl_install_name} not installed, installing version {required_version}")
221233
return self._install_tl_install(required_version)
222234

@@ -300,16 +312,16 @@ def _install_tl_install(self, version: str) -> bool:
300312
Returns:
301313
bool: True if installation successful, False otherwise
302314
"""
303-
tl_install_path = os.path.join(self.packages_dir, tl_install_name)
304-
old_tl_install_path = os.path.join(self.packages_dir, "tl-install")
315+
tl_install_path = Path(self.packages_dir) / tl_install_name
316+
old_tl_install_path = Path(self.packages_dir) / "tl-install"
305317

306318
try:
307-
old_tl_install_exists = os.path.exists(old_tl_install_path)
319+
old_tl_install_exists = old_tl_install_path.exists()
308320
if old_tl_install_exists:
309321
# remove outdated tl-install
310322
safe_remove_directory(old_tl_install_path)
311323

312-
if os.path.exists(tl_install_path):
324+
if tl_install_path.exists():
313325
logger.info(f"Removing old {tl_install_name} installation")
314326
safe_remove_directory(tl_install_path)
315327

@@ -318,10 +330,10 @@ def _install_tl_install(self, version: str) -> bool:
318330
self.packages[tl_install_name]["version"] = version
319331
pm.install(version)
320332
# Ensure backward compatibility by removing pio install status indicator
321-
tl_piopm_path = os.path.join(tl_install_path, ".piopm")
333+
tl_piopm_path = tl_install_path / ".piopm"
322334
safe_remove_file(tl_piopm_path)
323335

324-
if os.path.exists(os.path.join(tl_install_path, "package.json")):
336+
if (tl_install_path / "package.json").exists():
325337
logger.info(f"{tl_install_name} successfully installed and verified")
326338
self.packages[tl_install_name]["optional"] = True
327339

@@ -349,51 +361,48 @@ def _cleanup_versioned_tool_directories(self, tool_name: str) -> None:
349361
Args:
350362
tool_name: Name of the tool to clean up
351363
"""
352-
if not os.path.exists(self.packages_dir) or not os.path.isdir(self.packages_dir):
364+
packages_path = Path(self.packages_dir)
365+
if not packages_path.exists() or not packages_path.is_dir():
353366
return
354367

355368
try:
356369
# Remove directories with '@' in their name (e.g., tool-name@version, tool-name@src)
357-
safe_remove_directory_pattern(self.packages_dir, f"{tool_name}@*")
370+
safe_remove_directory_pattern(packages_path, f"{tool_name}@*")
358371

359372
# Remove directories with version suffixes (e.g., tool-name.12345)
360-
safe_remove_directory_pattern(self.packages_dir, f"{tool_name}.*")
373+
safe_remove_directory_pattern(packages_path, f"{tool_name}.*")
361374

362375
# Also check for any directory that starts with tool_name and contains '@'
363-
for item in os.listdir(self.packages_dir):
364-
if item.startswith(tool_name) and '@' in item:
365-
item_path = os.path.join(self.packages_dir, item)
366-
if os.path.isdir(item_path):
367-
safe_remove_directory(item_path)
368-
logger.debug(f"Removed versioned directory: {item_path}")
376+
for item in packages_path.iterdir():
377+
if item.name.startswith(tool_name) and '@' in item.name and item.is_dir():
378+
safe_remove_directory(item)
379+
logger.debug(f"Removed versioned directory: {item}")
369380

370-
except OSError as e:
371-
logger.error(f"Error cleaning up versioned directories for {tool_name}: {e}")
381+
except OSError:
382+
logger.exception(f"Error cleaning up versioned directories for {tool_name}")
372383

373384
def _get_tool_paths(self, tool_name: str) -> Dict[str, str]:
374385
"""Get centralized path calculation for tools with caching."""
375386
if tool_name not in self._tools_cache:
376-
tool_path = os.path.join(self.packages_dir, tool_name)
387+
tool_path = Path(self.packages_dir) / tool_name
377388

378389
self._tools_cache[tool_name] = {
379-
'tool_path': tool_path,
380-
'package_path': os.path.join(tool_path, "package.json"),
381-
'tools_json_path': os.path.join(tool_path, "tools.json"),
382-
'piopm_path': os.path.join(tool_path, ".piopm"),
383-
'idf_tools_path': os.path.join(
384-
self.packages_dir, tl_install_name, "tools", "idf_tools.py"
385-
)
390+
'tool_path': str(tool_path),
391+
'package_path': str(tool_path / "package.json"),
392+
'tools_json_path': str(tool_path / "tools.json"),
393+
'piopm_path': str(tool_path / ".piopm"),
394+
'idf_tools_path': str(Path(self.packages_dir) / tl_install_name / "tools" / "idf_tools.py")
386395
}
387396
return self._tools_cache[tool_name]
388397

389398
def _check_tool_status(self, tool_name: str) -> Dict[str, bool]:
390399
"""Check the installation status of a tool."""
391400
paths = self._get_tool_paths(tool_name)
392401
return {
393-
'has_idf_tools': os.path.exists(paths['idf_tools_path']),
394-
'has_tools_json': os.path.exists(paths['tools_json_path']),
395-
'has_piopm': os.path.exists(paths['piopm_path']),
396-
'tool_exists': os.path.exists(paths['tool_path'])
402+
'has_idf_tools': Path(paths['idf_tools_path']).exists(),
403+
'has_tools_json': Path(paths['tools_json_path']).exists(),
404+
'has_piopm': Path(paths['piopm_path']).exists(),
405+
'tool_exists': Path(paths['tool_path']).exists()
397406
}
398407

399408
def _run_idf_tools_install(self, tools_json_path: str, idf_tools_path: str) -> bool:
@@ -493,16 +502,14 @@ def _install_with_idf_tools(self, tool_name: str, paths: Dict[str, str]) -> bool
493502
return False
494503

495504
# Copy tool files
496-
target_package_path = os.path.join(
497-
IDF_TOOLS_PATH, "tools", tool_name, "package.json"
498-
)
505+
target_package_path = Path(IDF_TOOLS_PATH) / "tools" / tool_name / "package.json"
499506

500507
if not safe_copy_file(paths['package_path'], target_package_path):
501508
return False
502509

503510
safe_remove_directory(paths['tool_path'])
504511

505-
tl_path = f"file://{os.path.join(IDF_TOOLS_PATH, 'tools', tool_name)}"
512+
tl_path = f"file://{Path(IDF_TOOLS_PATH) / 'tools' / tool_name}"
506513
pm.install(tl_path)
507514

508515
logger.info(f"Tool {tool_name} successfully installed")
@@ -597,7 +604,7 @@ def _configure_mcu_toolchains(
597604
self.install_tool(toolchain)
598605

599606
# ULP toolchain if ULP directory exists
600-
if mcu_config.get("ulp_toolchain") and os.path.isdir("ulp"):
607+
if mcu_config.get("ulp_toolchain") and Path("ulp").is_dir():
601608
for toolchain in mcu_config["ulp_toolchain"]:
602609
self.install_tool(toolchain)
603610

@@ -616,16 +623,14 @@ def _configure_installer(self) -> None:
616623
return
617624

618625
# Remove pio install marker to avoid issues when switching versions
619-
old_tl_piopm_path = os.path.join(self.packages_dir, "tl-install", ".piopm")
620-
if os.path.exists(old_tl_piopm_path):
626+
old_tl_piopm_path = Path(self.packages_dir) / "tl-install" / ".piopm"
627+
if old_tl_piopm_path.exists():
621628
safe_remove_file(old_tl_piopm_path)
622629

623630
# Check if idf_tools.py is available
624-
installer_path = os.path.join(
625-
self.packages_dir, tl_install_name, "tools", "idf_tools.py"
626-
)
631+
installer_path = Path(self.packages_dir) / tl_install_name / "tools" / "idf_tools.py"
627632

628-
if os.path.exists(installer_path):
633+
if installer_path.exists():
629634
logger.debug(f"{tl_install_name} is available and ready")
630635
self.packages[tl_install_name]["optional"] = True
631636
else:
@@ -653,42 +658,42 @@ def _configure_check_tools(self, variables: Dict) -> None:
653658

654659
def _ensure_mklittlefs_version(self) -> None:
655660
"""Ensure correct mklittlefs version is installed."""
656-
piopm_path = os.path.join(self.packages_dir, "tool-mklittlefs", ".piopm")
661+
piopm_path = Path(self.packages_dir) / "tool-mklittlefs" / ".piopm"
657662

658-
if os.path.exists(piopm_path):
663+
if piopm_path.exists():
659664
try:
660665
with open(piopm_path, 'r', encoding='utf-8') as f:
661666
package_data = json.load(f)
662667
version = package_data.get('version', '')
663668
if not version.startswith("3."):
664-
os.remove(piopm_path)
669+
safe_remove_file(piopm_path)
665670
logger.info(f"Incompatible mklittlefs version {version} removed (required: 3.x)")
666-
except (json.JSONDecodeError, KeyError) as e:
667-
logger.error(f"Error reading mklittlefs package {e}")
671+
except (json.JSONDecodeError, KeyError):
672+
logger.exception("Error reading mklittlefs package metadata")
668673

669674
def _setup_mklittlefs_for_download(self) -> None:
670675
"""Setup mklittlefs for download functionality with version 4.x."""
671-
mklittlefs_dir = os.path.join(self.packages_dir, "tool-mklittlefs")
672-
mklittlefs4_dir = os.path.join(
673-
self.packages_dir, "tool-mklittlefs4"
674-
)
676+
mklittlefs_dir = Path(self.packages_dir) / "tool-mklittlefs"
677+
mklittlefs4_dir = Path(self.packages_dir) / "tool-mklittlefs4"
675678

676679
# Ensure mklittlefs 3.x is installed
677-
if not os.path.exists(mklittlefs_dir):
680+
if not mklittlefs_dir.exists():
678681
self.install_tool("tool-mklittlefs")
679-
if os.path.exists(os.path.join(mklittlefs_dir, "tools.json")):
682+
if (mklittlefs_dir / "tools.json").exists():
680683
self.install_tool("tool-mklittlefs")
681684

682685
# Install mklittlefs 4.x
683-
if not os.path.exists(mklittlefs4_dir):
686+
if not mklittlefs4_dir.exists():
684687
self.install_tool("tool-mklittlefs4")
685-
if os.path.exists(os.path.join(mklittlefs4_dir, "tools.json")):
688+
if (mklittlefs4_dir / "tools.json").exists():
686689
self.install_tool("tool-mklittlefs4")
687690

688691
# Copy mklittlefs 4.x over 3.x
689-
if os.path.exists(mklittlefs4_dir):
690-
package_src = os.path.join(mklittlefs_dir, "package.json")
691-
package_dst = os.path.join(mklittlefs4_dir, "package.json")
692+
if mklittlefs4_dir.exists():
693+
# Copy 3.x package.json into 4.x before mirroring 4.x -> 3.x,
694+
# so 3.x dir ends up with 4.x binaries and 3.x metadata.
695+
package_src = mklittlefs_dir / "package.json"
696+
package_dst = mklittlefs4_dir / "package.json"
692697
safe_copy_file(package_src, package_dst)
693698
shutil.copytree(mklittlefs4_dir, mklittlefs_dir, dirs_exist_ok=True)
694699
self.packages.pop("tool-mkfatfs", None)
@@ -899,7 +904,7 @@ def configure_debug_session(self, debug_config):
899904
ignore_conds = [
900905
debug_config.load_cmds != ["load"],
901906
not flash_images,
902-
not all([os.path.isfile(item["path"]) for item in flash_images]),
907+
not all([Path(item["path"]).is_file() for item in flash_images]),
903908
]
904909

905910
if any(ignore_conds):

0 commit comments

Comments
 (0)