diff --git a/src/manage/commands.py b/src/manage/commands.py index ec57c09..f17722a 100644 --- a/src/manage/commands.py +++ b/src/manage/commands.py @@ -351,6 +351,10 @@ class BaseCommand: show_help = False def __init__(self, args, root=None): + # Storage for command-specific data per-command execution. + # All data should use a unique key. + self.scratch = {} + cmd_args = { k: v for k, v in [*CLI_SCHEMA.items(), *CLI_SCHEMA.get(self.CMD, {}).items()] diff --git a/src/manage/install_command.py b/src/manage/install_command.py index eaff783..ac67e61 100644 --- a/src/manage/install_command.py +++ b/src/manage/install_command.py @@ -223,13 +223,14 @@ def _if_exists(launcher, plat): return launcher -def _write_alias(cmd, install, alias, target): +def _write_alias(cmd, install, alias, target, _link=os.link): p = (cmd.global_dir / alias["name"]) + target = Path(target) ensure_tree(p) - unlink(p) launcher = cmd.launcher_exe if alias.get("windowed"): launcher = cmd.launcherw_exe or launcher + plat = install["tag"].rpartition("-")[-1] if plat: LOGGER.debug("Checking for launcher for platform -%s", plat) @@ -247,8 +248,71 @@ def _write_alias(cmd, install, alias, target): else: LOGGER.debug("Skipping %s alias because the launcher template was not found.", alias["name"]) return - p.write_bytes(launcher.read_bytes()) - p.with_name(p.name + ".__target__").write_text(str(target), encoding="utf-8") + + try: + launcher_bytes = launcher.read_bytes() + except OSError: + warnings_shown = cmd.scratch.setdefault("install_command._write_alias.warnings_shown", set()) + if str(launcher) not in warnings_shown: + LOGGER.warn("Failed to read launcher template at %s.", launcher) + warnings_shown.add(str(launcher)) + LOGGER.debug("Failed to read %s", launcher, exc_info=True) + return + + existing_bytes = b'' + try: + with open(p, 'rb') as f: + existing_bytes = f.read(len(launcher_bytes) + 1) + except FileNotFoundError: + pass + except OSError: + LOGGER.debug("Failed to read existing alias launcher.") + + launcher_remap = cmd.scratch.setdefault("install_command._write_alias.launcher_remap", {}) + + if existing_bytes == launcher_bytes: + # Valid existing launcher, so save its path in case we need it later + # for a hard link. + launcher_remap.setdefault(launcher.name, p) + else: + # First try and create a hard link + unlink(p) + try: + _link(launcher, p) + LOGGER.debug("Created %s as hard link to %s", p.name, launcher.name) + except OSError as ex: + if ex.winerror != 17: + # Report errors other than cross-drive links + LOGGER.debug("Failed to create hard link for command.", exc_info=True) + launcher2 = launcher_remap.get(launcher.name) + if launcher2: + try: + _link(launcher2, p) + LOGGER.debug("Created %s as hard link to %s", p.name, launcher2.name) + except FileNotFoundError: + raise + except OSError: + LOGGER.debug("Failed to create hard link to fallback launcher") + launcher2 = None + if not launcher2: + try: + p.write_bytes(launcher_bytes) + LOGGER.debug("Created %s as copy of %s", p.name, launcher.name) + launcher_remap[launcher.name] = p + except OSError: + LOGGER.error("Failed to create global command %s.", alias["name"]) + LOGGER.debug(exc_info=True) + + p_target = p.with_name(p.name + ".__target__") + try: + if target.match(p_target.read_text(encoding="utf-8")): + return + except FileNotFoundError: + pass + except (OSError, UnicodeDecodeError): + LOGGER.debug("Failed to read existing target path.", exc_info=True) + + p_target.write_text(str(target), encoding="utf-8") def _create_shortcut_pep514(cmd, install, shortcut): diff --git a/tests/test_install_command.py b/tests/test_install_command.py index 2e0f445..ec48de2 100644 --- a/tests/test_install_command.py +++ b/tests/test_install_command.py @@ -22,6 +22,7 @@ class Cmd: default_platform = "-64" def __init__(self, platform=None): + self.scratch = {} if platform: self.default_platform = platform @@ -81,19 +82,19 @@ def check_warm64(self, cmd, tag, name): def test_write_alias_tag_with_platform(alias_checker): - alias_checker.check_32(alias_checker.Cmd, "1.0-32", "testA") - alias_checker.check_w32(alias_checker.Cmd, "1.0-32", "testB") - alias_checker.check_64(alias_checker.Cmd, "1.0-64", "testC") - alias_checker.check_w64(alias_checker.Cmd, "1.0-64", "testD") - alias_checker.check_arm64(alias_checker.Cmd, "1.0-arm64", "testE") - alias_checker.check_warm64(alias_checker.Cmd, "1.0-arm64", "testF") + alias_checker.check_32(alias_checker.Cmd(), "1.0-32", "testA") + alias_checker.check_w32(alias_checker.Cmd(), "1.0-32", "testB") + alias_checker.check_64(alias_checker.Cmd(), "1.0-64", "testC") + alias_checker.check_w64(alias_checker.Cmd(), "1.0-64", "testD") + alias_checker.check_arm64(alias_checker.Cmd(), "1.0-arm64", "testE") + alias_checker.check_warm64(alias_checker.Cmd(), "1.0-arm64", "testF") def test_write_alias_default_platform(alias_checker): alias_checker.check_32(alias_checker.Cmd("-32"), "1.0", "testA") alias_checker.check_w32(alias_checker.Cmd("-32"), "1.0", "testB") - alias_checker.check_64(alias_checker.Cmd, "1.0", "testC") - alias_checker.check_w64(alias_checker.Cmd, "1.0", "testD") + alias_checker.check_64(alias_checker.Cmd(), "1.0", "testC") + alias_checker.check_w64(alias_checker.Cmd(), "1.0", "testD") alias_checker.check_arm64(alias_checker.Cmd("-arm64"), "1.0", "testE") alias_checker.check_warm64(alias_checker.Cmd("-arm64"), "1.0", "testF") @@ -103,6 +104,120 @@ def test_write_alias_fallback_platform(alias_checker): alias_checker.check_w64(alias_checker.Cmd("-spam"), "1.0", "testB") +def test_write_alias_launcher_missing(fake_config, assert_log, tmp_path): + fake_config.launcher_exe = tmp_path / "non-existent.exe" + fake_config.default_platform = '-32' + fake_config.global_dir = tmp_path / "bin" + IC._write_alias( + fake_config, + {"tag": "test"}, + {"name": "test.exe"}, + tmp_path / "target.exe", + ) + assert_log( + "Checking for launcher.*", + "Checking for launcher.*", + "Checking for launcher.*", + "Create %s linking to %s", + "Skipping %s alias because the launcher template was not found.", + assert_log.end_of_log(), + ) + + +def test_write_alias_launcher_unreadable(fake_config, assert_log, tmp_path): + class FakeLauncherPath: + stem = "test" + suffix = ".exe" + parent = tmp_path + + @staticmethod + def is_file(): + return True + + @staticmethod + def read_bytes(): + raise OSError("no reading for the test") + + fake_config.scratch = {} + fake_config.launcher_exe = FakeLauncherPath + fake_config.default_platform = '-32' + fake_config.global_dir = tmp_path / "bin" + IC._write_alias( + fake_config, + {"tag": "test"}, + {"name": "test.exe"}, + tmp_path / "target.exe", + ) + assert_log( + "Checking for launcher.*", + "Create %s linking to %s", + "Failed to read launcher template at %s\\.", + "Failed to read %s", + assert_log.end_of_log(), + ) + + +def test_write_alias_launcher_unlinkable(fake_config, assert_log, tmp_path): + def fake_link(x, y): + raise OSError("Error for testing") + + fake_config.scratch = {} + fake_config.launcher_exe = tmp_path / "launcher.txt" + fake_config.launcher_exe.write_bytes(b'Arbitrary contents') + fake_config.default_platform = '-32' + fake_config.global_dir = tmp_path / "bin" + IC._write_alias( + fake_config, + {"tag": "test"}, + {"name": "test.exe"}, + tmp_path / "target.exe", + _link=fake_link + ) + assert_log( + "Checking for launcher.*", + "Create %s linking to %s", + "Failed to create hard link.+", + "Created %s as copy of %s", + assert_log.end_of_log(), + ) + + +def test_write_alias_launcher_unlinkable_remap(fake_config, assert_log, tmp_path): + # This is for the fairly expected case of the PyManager install being on one + # drive, but the global commands directory being on another. In this + # situation, we can't hard link directly into the app files, and will need + # to copy. But we only need to copy once, so if a launcher_remap has been + # set (in the current process), then we have an available copy already and + # can link to that. + + def fake_link(x, y): + if x.match("launcher.txt"): + raise OSError(17, "Error for testing") + + fake_config.scratch = { + "install_command._write_alias.launcher_remap": {"launcher.txt": tmp_path / "actual_launcher.txt"}, + } + fake_config.launcher_exe = tmp_path / "launcher.txt" + fake_config.launcher_exe.write_bytes(b'Arbitrary contents') + (tmp_path / "actual_launcher.txt").write_bytes(b'Arbitrary contents') + fake_config.default_platform = '-32' + fake_config.global_dir = tmp_path / "bin" + IC._write_alias( + fake_config, + {"tag": "test"}, + {"name": "test.exe"}, + tmp_path / "target.exe", + _link=fake_link + ) + assert_log( + "Checking for launcher.*", + "Create %s linking to %s", + "Failed to create hard link.+", + ("Created %s as hard link to %s", ("test.exe", "actual_launcher.txt")), + assert_log.end_of_log(), + ) + + @pytest.mark.parametrize("default", [1, 0]) def test_write_alias_default(alias_checker, monkeypatch, tmp_path, default): prefix = Path(tmp_path) / "runtime" @@ -110,6 +225,7 @@ def test_write_alias_default(alias_checker, monkeypatch, tmp_path, default): class Cmd: global_dir = Path(tmp_path) / "bin" launcher_exe = None + scratch = {} def get_installs(self): return [ { @@ -146,6 +262,7 @@ def write_alias(*a): def test_print_cli_shortcuts(patched_installs, assert_log, monkeypatch, tmp_path): class Cmd: + scratch = {} global_dir = Path(tmp_path) def get_installs(self): return installs.get_installs(None) @@ -163,6 +280,7 @@ def get_installs(self): def test_print_path_warning(patched_installs, assert_log, tmp_path): class Cmd: + scratch = {} global_dir = Path(tmp_path) def get_installs(self): return installs.get_installs(None)