Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ dependencies = [
]

[project.optional-dependencies]
user = [

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using cli (or commands) for the optional instead?

"wheel"
]
dev = [
"check-manifest",
# Pre Commit Hooks
Expand Down Expand Up @@ -61,6 +64,7 @@ variantlib = "variantlib.commands.main:main"
analyze_wheel = "variantlib.commands.analyze_wheel:analyze_wheel"
analyze_platform = "variantlib.commands.analyze_platform:analyze_platform"
generate_index_json = "variantlib.commands.generate_index_json:generate_index_json"
make_variant = "variantlib.commands.make_variant:make_variant"

[tool.pytest.ini_options]
testpaths = ["tests/"]
Expand Down
212 changes: 212 additions & 0 deletions variantlib/commands/make_variant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
from __future__ import annotations

import argparse
import logging
import os
import pathlib
import re
import tempfile

import wheel.cli.pack as whl_pck
from wheel.cli.unpack import unpack as wheel_unpack

from variantlib.api import VariantDescription
from variantlib.api import VariantProperty
from variantlib.constants import METADATA_VARIANT_HASH_HEADER
from variantlib.constants import METADATA_VARIANT_PROPERTY_HEADER
from variantlib.constants import VARIANT_HASH_LEN
from variantlib.constants import WHEEL_NAME_VALIDATION_REGEX
from variantlib.errors import ValidationError

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def wheel_variant_pack(
directory: str | pathlib.Path,
dest_dir: str | pathlib.Path,
variant_hash: str,
build_number: str | None = None,
) -> str:
"""Repack a previously unpacked wheel directory into a new wheel file.
The .dist-info/WHEEL file must contain one or more tags so that the target
wheel file name can be determined.
This function is heavily taken from:
https://github.com/pypa/wheel/blob/main/src/wheel/_commands/pack.py#L14
Minimal changes tried to be applied to make it work with the Variant Hash.
:param directory: The unpacked wheel directory
:param dest_dir: Destination directory (defaults to the current directory)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it? Or is it required?

:param variant_hash: The hash of the variant to be stored
"""

# Input Validation
variant_hash_pattern = rf"^[a-fA-F0-9]{{{VARIANT_HASH_LEN}}}$"
if not re.match(variant_hash_pattern, variant_hash):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If wheel_variant_pack() is an internal API, then you control the variant_hash and this could probably be an assertion.

raise ValidationError(f"Invalid Variant Hash Value `{variant_hash}` ...")

# Find the .dist-info directory
dist_info_dirs = [
fn
for fn in os.listdir(directory) # noqa: PTH208
if os.path.isdir(os.path.join(directory, fn)) and whl_pck.DIST_INFO_RE.match(fn) # noqa: PTH112, PTH118
]
if len(dist_info_dirs) > 1:
raise whl_pck.WheelError(
f"Multiple .dist-info directories found in {directory}"
)
if not dist_info_dirs:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're checking the length of a list, and you're explicitly calling len() above, it's probably best to also do so here, e.g. if len(dist_info_dirs) == 0:

raise whl_pck.WheelError(f"No .dist-info directories found in {directory}")

# Determine the target wheel filename
dist_info_dir = dist_info_dirs[0]
name_version = whl_pck.DIST_INFO_RE.match(dist_info_dir).group("namever")

# Read the tags and the existing build number from .dist-info/WHEEL
wheel_file_path = os.path.join(directory, dist_info_dir, "WHEEL") # noqa: PTH118
with open(wheel_file_path, "rb") as f: # noqa: PTH123
info = whl_pck.BytesParser(policy=whl_pck.email.policy.compat32).parse(f)
tags: list[str] = info.get_all("Tag", [])
existing_build_number = info.get("Build")

if not tags:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another place where it's generally better to use if len(tags) == 0:

I generally prefer to be as explicit about the truthiness test as possible. E.g. if you know you're testing the length of a sequence, use len. If you are testing whether something is None or not, use if foo is None. Only when the truthiness or falseiness can be any number of things (e.g. None, the empty string, or a zero length sequence) do I typically use a generic if foo or if not foo. YMMV!

raise whl_pck.WheelError(
f"No tags present in {dist_info_dir}/WHEEL; cannot determine target "
f"wheel filename"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of surprised you didn't get a linter warning here, since the line 78 string doesn't need to be an f-string.

)

# Set the wheel file name and add/replace/remove the Build tag in .dist-info/WHEEL
build_number = build_number if build_number is not None else existing_build_number
if build_number is not None:
del info["Build"]
if build_number:
info["Build"] = build_number
name_version += "-" + build_number

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one could be an f-string: name_version += f"-{build_number}"


if build_number != existing_build_number:
with open(wheel_file_path, "wb") as f: # noqa: PTH123
whl_pck.BytesGenerator(f, maxheaderlen=0).flatten(info)

# Reassemble the tags for the wheel file
tagline = whl_pck.compute_tagline(tags)

# Repack the wheel
wheel_path = os.path.join(dest_dir, f"{name_version}-{tagline}-{variant_hash}.whl") # noqa: PTH118
with whl_pck.WheelFile(wheel_path, "w") as wf:
logging.info(
"Repacking wheel as `%(wheel_path)s` ...", {"wheel_path": wheel_path}
)
wf.write_files(directory)

return wheel_path


def make_variant(args: list[str]) -> None:
parser = argparse.ArgumentParser(
prog="make_variant",
description="Transform a normal Wheel into a Wheel Variant.",
)

parser.add_argument(
"-p",
"--property",
dest="properties",
type=VariantProperty.from_str,
required=True,
action="extend",
nargs="+",
help=(
"Variant Properties to add to the Wheel Variant, can be repeated as many "
"times as needed"
),
)

parser.add_argument(
"-f",
"--file",
dest="input_filepath",
type=pathlib.Path,
required=True,
help="Wheel file to process",
)

parser.add_argument(
"-o",
"--output-directory",
type=pathlib.Path,
required=True,
help="Output Directory to use to store the Wheel Variant",
)

parsed_args = parser.parse_args(args)

input_filepath: pathlib.Path = parsed_args.input_filepath
output_directory: pathlib.Path = parsed_args.output_directory

# Input Validation
if not input_filepath.is_file():
raise FileNotFoundError(f"Input Wheel File `{input_filepath}` does not exists.")

if not output_directory.is_dir():
raise FileNotFoundError(
f"Output Directory `{output_directory}` does not exists."
)

# Input Validation - Wheel Filename is valid and non variant already.
wheel_info = WHEEL_NAME_VALIDATION_REGEX.match(input_filepath.name)
if not wheel_info:
raise ValueError(f"{input_filepath.name!r} is not a valid wheel filename.")

# Transform properties into a VariantDescription
vdesc = VariantDescription(properties=parsed_args.properties)

with tempfile.TemporaryDirectory() as _tmpdir:
tempdir = pathlib.Path(_tmpdir)
wheel_unpack(input_filepath, tempdir)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we really need to unpack it. Unfortunately, it looks like zipfile lacks proper API to update files in place or copy compressed members (python/cpython#125718), but at least we could avoid writing files to disk and potentially altering their metadata in the process.

I.e. basically open both archives, iterate over members: if not metadata, copy between archives as-is; if metadata, read, alter and write. It will probably even be simpler than the current logic, which seems to make unnecessary changes to the archive (probably because it was adapted from the command altering tags, which we don't do here).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree though it's a little complicated for a first iteration - feel free to experiment and optimize ;)


wheel_dir = next(tempdir.iterdir())

for _dir in wheel_dir.iterdir():
if _dir.is_dir() and _dir.name.endswith(".dist-info"):
distinfo_dir = _dir
break
else:
raise FileNotFoundError("Impossible to find the .dist-info directory.")

if not (metadata_f := distinfo_dir / "METADATA").exists():
raise FileNotFoundError(metadata_f)

with metadata_f.open(mode="r+") as file:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be better to write a new file and move it into place atomically? E.g. copy lines from METADATA into METADATA.new, then os.rename() the latter to the former. The question I have is, what if this loop errors out or is interrupted in the middle of it? You can probably also avoid the seek() back to the beginning and the truncate().

You probably still need a try/except that deletes the .new file and cleans up after itself, reraising the original exception, but at least then you can still recover back to the original state if something unexpected happens.

# Read all lines
lines = file.readlines()

# Remove trailing empty lines
while lines and lines[-1].strip() == "":
lines.pop()

# Move the file pointer to the beginning
file.seek(0)

# Write back the non-empty content
file.writelines(lines)

# Truncate the file to remove any remaining old content
file.truncate()

file.write(f"{METADATA_VARIANT_HASH_HEADER}: {vdesc.hexdigest}\n")

for vprop in vdesc.properties:
file.write(f"{METADATA_VARIANT_PROPERTY_HEADER}: {vprop.to_str()}\n")

dest_whl_path = wheel_variant_pack(
directory=wheel_dir,
dest_dir=output_directory,
variant_hash=vdesc.hexdigest,
)

logger.info(
"Variant Wheel Created: `%s`", pathlib.Path(dest_whl_path).resolve()
)
13 changes: 13 additions & 0 deletions variantlib/constants.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
from __future__ import annotations

import re

VARIANT_HASH_LEN = 8
CONFIG_FILENAME = "wheelvariant.toml"

VALIDATION_NAMESPACE_REGEX = r"^[A-Za-z0-9_]+$"
VALIDATION_FEATURE_REGEX = r"^[A-Za-z0-9_]+$"
VALIDATION_VALUE_REGEX = r"^[A-Za-z0-9_.]+$"

METADATA_VARIANT_HASH_HEADER = "Variant-hash"
METADATA_VARIANT_PROPERTY_HEADER = "Variant"

WHEEL_NAME_VALIDATION_REGEX = re.compile(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A multi-line regex string with comments would help with readability here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I agree here but the regex is copied (and slightly modified) from pip (and we already have it repeated in too many places).

rf"""^(?P<namever>(?P<name>[^\s-]+?)-(?P<ver>[^\s-]*?))
((-(?P<build>\d[^-]*?))?-(?P<pyver>[^\s-]+?)-(?P<abi>[^\s-]+?)-(?P<plat>[^\s-]+?)
(-(?P<variant_hash>[0-9a-f]{{{VARIANT_HASH_LEN}}}))?
\.whl|\.dist-info)$""",
re.VERBOSE,
)
Loading