Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ dependencies = [

[project.optional-dependencies]
cli = [
"wheel"
]
dev = [
"check-manifest",
Expand Down
172 changes: 41 additions & 131 deletions variantlib/commands/make_variant.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@
import email.parser
import email.policy
import logging
import os
import pathlib
import re
import tempfile

import wheel.cli.pack as whl_pck
from wheel.cli.unpack import unpack as wheel_unpack
import shutil
import zipfile

from variantlib.api import VariantDescription
from variantlib.api import VariantProperty
from variantlib.api import set_variant_metadata
from variantlib.api import validate_variant
from variantlib.constants import VARIANT_HASH_LEN
from variantlib.constants import WHEEL_NAME_VALIDATION_REGEX
from variantlib.errors import ValidationError

Expand All @@ -30,88 +25,6 @@
)


def wheel_variant_pack(
directory: str | pathlib.Path,
dest_dir: str | pathlib.Path,
variant_hash: str,
build_number: str | None = None,
) -> str:
"""Repack a previously unpacked wheel directory into a new wheel file.

The .dist-info/WHEEL file must contain one or more tags so that the target
wheel file name can be determined.

This function is heavily taken from:
https://github.com/pypa/wheel/blob/main/src/wheel/_commands/pack.py#L14

Minimal changes tried to be applied to make it work with the Variant Hash.

:param directory: The unpacked wheel directory
:param dest_dir: Destination directory (defaults to the current directory)
:param variant_hash: The hash of the variant to be stored
"""

# Input Validation
variant_hash_pattern = rf"^[a-fA-F0-9]{{{VARIANT_HASH_LEN}}}$"
if not re.match(variant_hash_pattern, variant_hash):
raise ValidationError(f"Invalid Variant Hash Value `{variant_hash}` ...")

# Find the .dist-info directory
dist_info_dirs = [
fn
for fn in os.listdir(directory) # noqa: PTH208
if os.path.isdir(os.path.join(directory, fn)) and whl_pck.DIST_INFO_RE.match(fn) # noqa: PTH112, PTH118
]
if len(dist_info_dirs) > 1:
raise whl_pck.WheelError(
f"Multiple .dist-info directories found in {directory}"
)
if not dist_info_dirs:
raise whl_pck.WheelError(f"No .dist-info directories found in {directory}")

# Determine the target wheel filename
dist_info_dir = dist_info_dirs[0]
name_version = whl_pck.DIST_INFO_RE.match(dist_info_dir).group("namever")

# Read the tags and the existing build number from .dist-info/WHEEL
wheel_file_path = os.path.join(directory, dist_info_dir, "WHEEL") # noqa: PTH118
with open(wheel_file_path, "rb") as f: # noqa: PTH123
info = whl_pck.BytesParser(policy=whl_pck.email.policy.compat32).parse(f)
tags: list[str] = info.get_all("Tag", [])
existing_build_number = info.get("Build")

if not tags:
raise whl_pck.WheelError(
f"No tags present in {dist_info_dir}/WHEEL; cannot determine target "
f"wheel filename"
)

# Set the wheel file name and add/replace/remove the Build tag in .dist-info/WHEEL
build_number = build_number if build_number is not None else existing_build_number
if build_number is not None:
del info["Build"]
if build_number:
info["Build"] = build_number
name_version += "-" + build_number

if build_number != existing_build_number:
with open(wheel_file_path, "wb") as f: # noqa: PTH123
whl_pck.BytesGenerator(f, maxheaderlen=0).flatten(info)

# Reassemble the tags for the wheel file
tagline = whl_pck.compute_tagline(tags)

# Repack the wheel
wheel_path = os.path.join(dest_dir, f"{name_version}-{tagline}-{variant_hash}.whl") # noqa: PTH118
with whl_pck.WheelFile(wheel_path, "w") as wf:
logging.info(
"Repacking wheel as `%(wheel_path)s` ...", {"wheel_path": wheel_path}
)
wf.write_files(directory)

return wheel_path


def make_variant(args: list[str]) -> None:
parser = argparse.ArgumentParser(
prog="make_variant",
Expand Down Expand Up @@ -165,7 +78,7 @@ def make_variant(args: list[str]) -> None:

# Input Validation - Wheel Filename is valid and non variant already.
wheel_info = WHEEL_NAME_VALIDATION_REGEX.match(input_filepath.name)
if not wheel_info:
if wheel_info is None:
raise ValueError(f"{input_filepath.name!r} is not a valid wheel filename.")

# Transform properties into a VariantDescription
Expand All @@ -185,45 +98,42 @@ def make_variant(args: list[str]) -> None:
f"{', '.join(x.to_str() for x in vdesc_valid.unknown_properties)}"
)

with tempfile.TemporaryDirectory() as _tmpdir:
tempdir = pathlib.Path(_tmpdir)
wheel_unpack(input_filepath, tempdir)

wheel_dir = next(tempdir.iterdir())

for _dir in wheel_dir.iterdir():
if _dir.is_dir() and _dir.name.endswith(".dist-info"):
distinfo_dir = _dir
break
else:
raise FileNotFoundError("Impossible to find the .dist-info directory.")

if not (metadata_f := distinfo_dir / "METADATA").exists():
raise FileNotFoundError(metadata_f)

with metadata_f.open(mode="r+b") as file:
# Parse the metadata
metadata_parser = email.parser.BytesParser()
metadata = metadata_parser.parse(file)

# Update the metadata
set_variant_metadata(metadata, vdesc)

# Move the file pointer to the beginning
file.seek(0)

# Write back the serialized metadata
file.write(metadata.as_bytes(policy=METADATA_POLICY))

# Truncate the file to remove any remaining old content
file.truncate()

dest_whl_path = wheel_variant_pack(
directory=wheel_dir,
dest_dir=output_directory,
variant_hash=vdesc.hexdigest,
)
# Determine output wheel filename
output_filepath = (
output_directory
/ f"{wheel_info.group('base_wheel_name')}-{vdesc.hexdigest}.whl"
)

logger.info(
"Variant Wheel Created: `%s`", pathlib.Path(dest_whl_path).resolve()
)
with (
zipfile.ZipFile(input_filepath, "r") as input_zip,
zipfile.ZipFile(output_filepath, "w") as output_zip,
):
metadata_found = False
for file_info in input_zip.infolist():
components = file_info.filename.split("/", 2)
with (
input_zip.open(file_info, "r") as input_file,
output_zip.open(file_info, "w") as output_file,
):
if (
len(components) == 2
and components[0].endswith(".dist-info")
and components[1] == "METADATA"
):
# Parse the metadata
metadata_parser = email.parser.BytesParser()
metadata = metadata_parser.parse(input_file)

# Update the metadata
set_variant_metadata(metadata, vdesc)

# Write the serialized metadata
output_file.write(metadata.as_bytes(policy=METADATA_POLICY))
metadata_found = True
else:
shutil.copyfileobj(input_file, output_file)

if not metadata_found:
raise FileNotFoundError("No *.dist-info/METADATA file found in wheel")

logger.info("Variant Wheel Created: `%s`", output_filepath.resolve())
Loading