Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/manage/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ class BaseCommand:
show_help = False

def __init__(self, args, root=None):
# Storage for command-specific data per-command execution.
# All data should use a unique key.
self.scratch = {}

cmd_args = {
k: v for k, v in
[*CLI_SCHEMA.items(), *CLI_SCHEMA.get(self.CMD, {}).items()]
Expand Down
72 changes: 68 additions & 4 deletions src/manage/install_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,14 @@ def _if_exists(launcher, plat):
return launcher


def _write_alias(cmd, install, alias, target):
def _write_alias(cmd, install, alias, target, _link=os.link):
p = (cmd.global_dir / alias["name"])
target = Path(target)
ensure_tree(p)
unlink(p)
launcher = cmd.launcher_exe
if alias.get("windowed"):
launcher = cmd.launcherw_exe or launcher

plat = install["tag"].rpartition("-")[-1]
if plat:
LOGGER.debug("Checking for launcher for platform -%s", plat)
Expand All @@ -247,8 +248,71 @@ def _write_alias(cmd, install, alias, target):
else:
LOGGER.debug("Skipping %s alias because the launcher template was not found.", alias["name"])
return
p.write_bytes(launcher.read_bytes())
p.with_name(p.name + ".__target__").write_text(str(target), encoding="utf-8")

try:
launcher_bytes = launcher.read_bytes()
except OSError:
warnings_shown = cmd.scratch.setdefault("install_command._write_alias.warnings_shown", set())
if str(launcher) not in warnings_shown:
LOGGER.warn("Failed to read launcher template at %s.", launcher)
warnings_shown.add(str(launcher))
LOGGER.debug("Failed to read %s", launcher, exc_info=True)
return

existing_bytes = b''
try:
with open(p, 'rb') as f:
existing_bytes = f.read(len(launcher_bytes) + 1)
except FileNotFoundError:
pass
except OSError:
LOGGER.debug("Failed to read existing alias launcher.")

launcher_remap = cmd.scratch.setdefault("install_command._write_alias.launcher_remap", {})

if existing_bytes == launcher_bytes:
# Valid existing launcher, so save its path in case we need it later
# for a hard link.
launcher_remap.setdefault(launcher.name, p)
else:
# First try and create a hard link
unlink(p)
try:
_link(launcher, p)
LOGGER.debug("Created %s as hard link to %s", p.name, launcher.name)
except OSError as ex:
if ex.winerror != 17:
# Report errors other than cross-drive links
LOGGER.debug("Failed to create hard link for command.", exc_info=True)
launcher2 = launcher_remap.get(launcher.name)
if launcher2:
try:
_link(launcher2, p)
LOGGER.debug("Created %s as hard link to %s", p.name, launcher2.name)
except FileNotFoundError:
raise
except OSError:
LOGGER.debug("Failed to create hard link to fallback launcher")
launcher2 = None
if not launcher2:
try:
p.write_bytes(launcher_bytes)
LOGGER.debug("Created %s as copy of %s", p.name, launcher.name)
launcher_remap[launcher.name] = p
except OSError:
LOGGER.error("Failed to create global command %s.", alias["name"])
LOGGER.debug(exc_info=True)

p_target = p.with_name(p.name + ".__target__")
try:
if target.match(p_target.read_text(encoding="utf-8")):
return
except FileNotFoundError:
pass
except (OSError, UnicodeDecodeError):
LOGGER.debug("Failed to read existing target path.", exc_info=True)

p_target.write_text(str(target), encoding="utf-8")


def _create_shortcut_pep514(cmd, install, shortcut):
Expand Down
134 changes: 126 additions & 8 deletions tests/test_install_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Cmd:
default_platform = "-64"

def __init__(self, platform=None):
self.scratch = {}
if platform:
self.default_platform = platform

Expand Down Expand Up @@ -81,19 +82,19 @@ def check_warm64(self, cmd, tag, name):


def test_write_alias_tag_with_platform(alias_checker):
alias_checker.check_32(alias_checker.Cmd, "1.0-32", "testA")
alias_checker.check_w32(alias_checker.Cmd, "1.0-32", "testB")
alias_checker.check_64(alias_checker.Cmd, "1.0-64", "testC")
alias_checker.check_w64(alias_checker.Cmd, "1.0-64", "testD")
alias_checker.check_arm64(alias_checker.Cmd, "1.0-arm64", "testE")
alias_checker.check_warm64(alias_checker.Cmd, "1.0-arm64", "testF")
alias_checker.check_32(alias_checker.Cmd(), "1.0-32", "testA")
alias_checker.check_w32(alias_checker.Cmd(), "1.0-32", "testB")
alias_checker.check_64(alias_checker.Cmd(), "1.0-64", "testC")
alias_checker.check_w64(alias_checker.Cmd(), "1.0-64", "testD")
alias_checker.check_arm64(alias_checker.Cmd(), "1.0-arm64", "testE")
alias_checker.check_warm64(alias_checker.Cmd(), "1.0-arm64", "testF")


def test_write_alias_default_platform(alias_checker):
alias_checker.check_32(alias_checker.Cmd("-32"), "1.0", "testA")
alias_checker.check_w32(alias_checker.Cmd("-32"), "1.0", "testB")
alias_checker.check_64(alias_checker.Cmd, "1.0", "testC")
alias_checker.check_w64(alias_checker.Cmd, "1.0", "testD")
alias_checker.check_64(alias_checker.Cmd(), "1.0", "testC")
alias_checker.check_w64(alias_checker.Cmd(), "1.0", "testD")
alias_checker.check_arm64(alias_checker.Cmd("-arm64"), "1.0", "testE")
alias_checker.check_warm64(alias_checker.Cmd("-arm64"), "1.0", "testF")

Expand All @@ -103,13 +104,128 @@ def test_write_alias_fallback_platform(alias_checker):
alias_checker.check_w64(alias_checker.Cmd("-spam"), "1.0", "testB")


def test_write_alias_launcher_missing(fake_config, assert_log, tmp_path):
fake_config.launcher_exe = tmp_path / "non-existent.exe"
fake_config.default_platform = '-32'
fake_config.global_dir = tmp_path / "bin"
IC._write_alias(
fake_config,
{"tag": "test"},
{"name": "test.exe"},
tmp_path / "target.exe",
)
assert_log(
"Checking for launcher.*",
"Checking for launcher.*",
"Checking for launcher.*",
"Create %s linking to %s",
"Skipping %s alias because the launcher template was not found.",
assert_log.end_of_log(),
)


def test_write_alias_launcher_unreadable(fake_config, assert_log, tmp_path):
class FakeLauncherPath:
stem = "test"
suffix = ".exe"
parent = tmp_path

@staticmethod
def is_file():
return True

@staticmethod
def read_bytes():
raise OSError("no reading for the test")

fake_config.scratch = {}
fake_config.launcher_exe = FakeLauncherPath
fake_config.default_platform = '-32'
fake_config.global_dir = tmp_path / "bin"
IC._write_alias(
fake_config,
{"tag": "test"},
{"name": "test.exe"},
tmp_path / "target.exe",
)
assert_log(
"Checking for launcher.*",
"Create %s linking to %s",
"Failed to read launcher template at %s\\.",
"Failed to read %s",
assert_log.end_of_log(),
)


def test_write_alias_launcher_unlinkable(fake_config, assert_log, tmp_path):
def fake_link(x, y):
raise OSError("Error for testing")

fake_config.scratch = {}
fake_config.launcher_exe = tmp_path / "launcher.txt"
fake_config.launcher_exe.write_bytes(b'Arbitrary contents')
fake_config.default_platform = '-32'
fake_config.global_dir = tmp_path / "bin"
IC._write_alias(
fake_config,
{"tag": "test"},
{"name": "test.exe"},
tmp_path / "target.exe",
_link=fake_link
)
assert_log(
"Checking for launcher.*",
"Create %s linking to %s",
"Failed to create hard link.+",
"Created %s as copy of %s",
assert_log.end_of_log(),
)


def test_write_alias_launcher_unlinkable_remap(fake_config, assert_log, tmp_path):
# This is for the fairly expected case of the PyManager install being on one
# drive, but the global commands directory being on another. In this
# situation, we can't hard link directly into the app files, and will need
# to copy. But we only need to copy once, so if a launcher_remap has been
# set (in the current process), then we have an available copy already and
# can link to that.

def fake_link(x, y):
if x.match("launcher.txt"):
raise OSError(17, "Error for testing")

fake_config.scratch = {
"install_command._write_alias.launcher_remap": {"launcher.txt": tmp_path / "actual_launcher.txt"},
}
fake_config.launcher_exe = tmp_path / "launcher.txt"
fake_config.launcher_exe.write_bytes(b'Arbitrary contents')
(tmp_path / "actual_launcher.txt").write_bytes(b'Arbitrary contents')
fake_config.default_platform = '-32'
fake_config.global_dir = tmp_path / "bin"
IC._write_alias(
fake_config,
{"tag": "test"},
{"name": "test.exe"},
tmp_path / "target.exe",
_link=fake_link
)
assert_log(
"Checking for launcher.*",
"Create %s linking to %s",
"Failed to create hard link.+",
("Created %s as hard link to %s", ("test.exe", "actual_launcher.txt")),
assert_log.end_of_log(),
)


@pytest.mark.parametrize("default", [1, 0])
def test_write_alias_default(alias_checker, monkeypatch, tmp_path, default):
prefix = Path(tmp_path) / "runtime"

class Cmd:
global_dir = Path(tmp_path) / "bin"
launcher_exe = None
scratch = {}
def get_installs(self):
return [
{
Expand Down Expand Up @@ -146,6 +262,7 @@ def write_alias(*a):

def test_print_cli_shortcuts(patched_installs, assert_log, monkeypatch, tmp_path):
class Cmd:
scratch = {}
global_dir = Path(tmp_path)
def get_installs(self):
return installs.get_installs(None)
Expand All @@ -163,6 +280,7 @@ def get_installs(self):

def test_print_path_warning(patched_installs, assert_log, tmp_path):
class Cmd:
scratch = {}
global_dir = Path(tmp_path)
def get_installs(self):
return installs.get_installs(None)
Expand Down