diff --git a/docs/conf.py b/docs/conf.py index d140dba6..0464e350 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -2,7 +2,6 @@ import sys sys.path.insert(0, os.path.abspath("../src")) -import python_gardenlinux_lib # Configuration file for the Sphinx documentation builder. diff --git a/docs/index.rst b/docs/index.rst index e46290b5..b3257a88 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -4,13 +4,21 @@ contain the root `toctree` directive. python-gardenlinux-lib documentation -================================ -.. automodule:: python_gardenlinux_lib.parse_features +==================================== +.. automodule:: gardenlinux :members: -.. automodule:: python_gardenlinux_lib.package_repo_info +.. automodule:: gardenlinux.apt + :members: +.. automodule:: gardenlinux.features + :members: +.. automodule:: gardenlinux.flavors + :members: +.. automodule:: gardenlinux.git + :members: +.. automodule:: gardenlinux.oci :members: .. toctree:: - :maxdepth: 2 + :maxdepth: 3 :caption: Contents: Indices and tables diff --git a/poetry.lock b/poetry.lock index a2762a14..a26f51ab 100644 --- a/poetry.lock +++ b/poetry.lock @@ -130,18 +130,18 @@ uvloop = ["uvloop (>=0.15.2)"] [[package]] name = "boto3" -version = "1.38.27" +version = "1.38.30" description = "The AWS SDK for Python" optional = false python-versions = ">=3.9" groups = ["main"] files = [ - {file = "boto3-1.38.27-py3-none-any.whl", hash = "sha256:95f5fe688795303a8a15e8b7e7f255cadab35eae459d00cc281a4fd77252ea80"}, - {file = "boto3-1.38.27.tar.gz", hash = "sha256:94bd7fdd92d5701b362d4df100d21e28f8307a67ff56b6a8b0398119cf22f859"}, + {file = "boto3-1.38.30-py3-none-any.whl", hash = "sha256:949df0a0edd360f4ad60f1492622eecf98a359a2f72b1e236193d9b320c5dc8c"}, + {file = "boto3-1.38.30.tar.gz", hash = "sha256:17af769544b5743843bcc732709b43226de19f1ebff2c324a3440bbecbddb893"}, ] [package.dependencies] -botocore = ">=1.38.27,<1.39.0" +botocore = ">=1.38.30,<1.39.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.13.0,<0.14.0" @@ -150,14 +150,14 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.38.27" +version = "1.38.30" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.9" groups = ["main"] files = [ - {file = "botocore-1.38.27-py3-none-any.whl", hash = "sha256:a785d5e9a5eda88ad6ab9ed8b87d1f2ac409d0226bba6ff801c55359e94d91a8"}, - {file = "botocore-1.38.27.tar.gz", hash = "sha256:9788f7efe974328a38cbade64cc0b1e67d27944b899f88cb786ae362973133b6"}, + {file = "botocore-1.38.30-py3-none-any.whl", hash = "sha256:530e40a6e91c8a096cab17fcc590d0c7227c8347f71a867576163a44d027a714"}, + {file = "botocore-1.38.30.tar.gz", hash = "sha256:7836c5041c5f249431dbd5471c61db17d4053f72a1d6e3b2197c07ca0839588b"}, ] [package.dependencies] @@ -744,26 +744,17 @@ extra = ["lxml (>=4.6)", "pydot (>=3.0.1)", "pygraphviz (>=1.14)", "sympy (>=1.1 test = ["pytest (>=7.2)", "pytest-cov (>=4.0)", "pytest-xdist (>=3.0)"] test-extras = ["pytest-mpl", "pytest-randomly"] -[[package]] -name = "opencontainers" -version = "0.0.14" -description = "Python module for oci specifications" -optional = false -python-versions = "*" -groups = ["main", "dev"] -files = [ - {file = "opencontainers-0.0.14.tar.gz", hash = "sha256:fde3b8099b56b5c956415df8933e2227e1914e805a277b844f2f9e52341738f2"}, -] - [[package]] name = "oras" -version = "0.2.0" +version = "0.2.33" description = "OCI Registry as Storage Python SDK" optional = false python-versions = "*" groups = ["main"] -files = [] -develop = false +files = [ + {file = "oras-0.2.33-py3-none-any.whl", hash = "sha256:832a08b7b4d0bf6f962b1e644f58bf3277a71154c759c825eaf87d28aa6e904a"}, + {file = "oras-0.2.33.tar.gz", hash = "sha256:ecb7e55970c864c23f9412b788ec4a5f1935dbf742d8af2135fdbd1a0293e638"}, +] [package.dependencies] jsonschema = "*" @@ -774,12 +765,6 @@ all = ["docker (==5.0.1)", "jsonschema", "pytest (>=4.6.2)", "requests"] docker = ["docker (==5.0.1)"] tests = ["pytest (>=4.6.2)"] -[package.source] -type = "git" -url = "https://github.com/oras-project/oras-py.git" -reference = "caf8db5b279382335fbb1f6d7402ed9b73618d37" -resolved_reference = "caf8db5b279382335fbb1f6d7402ed9b73618d37" - [[package]] name = "packaging" version = "25.0" @@ -882,24 +867,25 @@ windows-terminal = ["colorama (>=0.4.6)"] [[package]] name = "pytest" -version = "8.3.5" +version = "8.4.0" description = "pytest: simple powerful testing with Python" optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820"}, - {file = "pytest-8.3.5.tar.gz", hash = "sha256:f4efe70cc14e511565ac476b57c279e12a855b11f48f212af1080ef2263d3845"}, + {file = "pytest-8.4.0-py3-none-any.whl", hash = "sha256:f40f825768ad76c0977cbacdf1fd37c6f7a468e460ea6a0636078f8972d4517e"}, + {file = "pytest-8.4.0.tar.gz", hash = "sha256:14d920b48472ea0dbf68e45b96cd1ffda4705f33307dcc86c676c1b5104838a6"}, ] [package.dependencies] -colorama = {version = "*", markers = "sys_platform == \"win32\""} -iniconfig = "*" -packaging = "*" +colorama = {version = ">=0.4", markers = "sys_platform == \"win32\""} +iniconfig = ">=1" +packaging = ">=20" pluggy = ">=1.5,<2" +pygments = ">=2.7.2" [package.extras] -dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "requests", "setuptools", "xmlschema"] [[package]] name = "python-dateutil" @@ -1459,4 +1445,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.1" python-versions = "^3.13" -content-hash = "1853f81edad02bab1fe0810a988b4a61751964fa3d0b1158010c71a84c0584b3" +content-hash = "c05db2b717268441581f70a1e8dc43d3a9cec39afc77ff8f32a1d4cccc6e00ab" diff --git a/pyproject.toml b/pyproject.toml index aa59f1a4..b6d4d19c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,11 @@ [tool.poetry] name = "gardenlinux" -version = "0.6.0" +version = "0.7.0" description = "Contains tools to work with the features directory of gardenlinux, for example deducting dependencies from feature sets or validating cnames" authors = ["Garden Linux Maintainers "] license = "Apache-2.0" readme = "README.md" -packages = [{include = "gardenlinux", from="src"}, {include = "python_gardenlinux_lib", from="src"}] +packages = [{include = "gardenlinux", from="src"}] [tool.poetry.dependencies] python = "^3.13" @@ -15,18 +15,16 @@ pytest = "^8.3.2" gitpython = "^3.1.44" apt-repo = "^0.5" jsonschema = "^4.23.0" -oras = { git = "https://github.com/oras-project/oras-py.git", rev="caf8db5b279382335fbb1f6d7402ed9b73618d37" } +oras = "^0.2.33" python-dotenv = "^1.0.1" cryptography = "^44.0.0" boto3 = "*" click = "^8.2.0" pygments = "^2.19.1" -opencontainers = "^0.0.14" [tool.poetry.group.dev.dependencies] bandit = "^1.8.3" black = "^24.8.0" -opencontainers = "^0.0.14" [tool.poetry.group.docs.dependencies] sphinx-rtd-theme = "^2.0.0" @@ -36,7 +34,6 @@ gl-cname = "gardenlinux.features.cname_main:main" gl-features-parse = "gardenlinux.features.__main__:main" gl-flavors-parse = "gardenlinux.flavors.__main__:main" gl-oci = "gardenlinux.oci.__main__:main" -flavors-parse = "gardenlinux.flavors.__main__:main" [tool.pytest.ini_options] pythonpath = ["src"] diff --git a/src/gardenlinux/apt/__init__.py b/src/gardenlinux/apt/__init__.py index d4c5261e..3e0ca58c 100644 --- a/src/gardenlinux/apt/__init__.py +++ b/src/gardenlinux/apt/__init__.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- +""" +APT module +""" + from .debsource import Debsrc, DebsrcFile -__all__ = ["Parser"] +__all__ = ["Debsrc", "DebsrcFile"] diff --git a/src/gardenlinux/apt/debsource.py b/src/gardenlinux/apt/debsource.py index f6f74d6d..aeaf733e 100644 --- a/src/gardenlinux/apt/debsource.py +++ b/src/gardenlinux/apt/debsource.py @@ -1,27 +1,63 @@ # -*- coding: utf-8 -*- -# SPDX-License-Identifier: MIT -# Based on code from glvd https://github.com/gardenlinux/glvd/blob/7ca2ff54e01da5e9eae61d1cd565eaf75f3c62ce/src/glvd/data/debsrc.py#L1 - -from __future__ import annotations +""" +deb sources +""" import re from typing import TextIO class Debsrc: + """ + Class to reflect deb sources. + + :author: Garden Linux Maintainers + :copyright: Copyright 2024 SAP SE + :package: gardenlinux + :subpackage: apt + :since: 0.7.0 + :license: https://www.apache.org/licenses/LICENSE-2.0 + Apache License, Version 2.0 + """ + def __init__(self, deb_source, deb_version): - self.deb_source = deb_source - self.deb_version = deb_version + """ + Constructor __init__(Debsrc) + + :param deb_source: Source name + :param deb_version: Source version - deb_source: str - deb_version: str + :since: 0.7.0 + """ + + self.deb_source: str = deb_source + self.deb_version: str = deb_version def __repr__(self) -> str: + """ + python.org: Called by the repr() built-in function to compute the "official" string representation of an object. + + :return: (str) String representation + :since: 0.7.0 + """ + return f"{self.deb_source} {self.deb_version}" class DebsrcFile(dict[str, Debsrc]): + """ + Class to represent deb sources loaded and parsed as dict. + + :author: Garden Linux Maintainers + :copyright: Copyright 2024 SAP SE + :package: gardenlinux + :subpackage: apt + :since: 0.7.0 + :license: https://www.apache.org/licenses/LICENSE-2.0 + Apache License, Version 2.0 + """ + __re = re.compile( r""" ^(?: @@ -43,31 +79,42 @@ class DebsrcFile(dict[str, Debsrc]): re.VERBOSE, ) - def _read_source(self, source: str, version: str) -> None: - self[source] = Debsrc( - deb_source=source, - deb_version=version, - ) - def read(self, f: TextIO) -> None: - current_source = current_version = None + """ + Read and parse the given TextIO data to extract deb sources. + + :param f: TextIO data to parse + + :since: 0.7.0 + """ - def finish(): - if current_source and current_version: - self._read_source(current_source, current_version) + parsed_source = parsed_version = None for line in f.readlines(): if match := self.__re.match(line): if i := match["source"]: - current_source = i + parsed_source = i elif i := match["version"]: - current_version = i + parsed_version = i elif match["eso"]: - current_source = current_version = None + parsed_source = parsed_version = None elif match["eoe"] is not None: - finish() - current_source = current_version = None + self._set_source(parsed_source, parsed_version) + parsed_source = parsed_version = None else: raise RuntimeError(f"Unable to read line: {line}") - finish() + self._set_source(parsed_source, parsed_version) + + def _set_source(self, source: str, version: str) -> None: + """ + Sets the dict value based on the given source key. + + :since: 0.7.0 + """ + + if source and version: + self[source] = Debsrc( + deb_source=source, + deb_version=version, + ) diff --git a/src/gardenlinux/apt/package_repo_info.py b/src/gardenlinux/apt/package_repo_info.py index ae758e0a..0cb197f6 100644 --- a/src/gardenlinux/apt/package_repo_info.py +++ b/src/gardenlinux/apt/package_repo_info.py @@ -1,16 +1,42 @@ # -*- coding: utf-8 -*- +""" +APT repositories +""" + from apt_repo import APTRepository from typing import Optional class GardenLinuxRepo(APTRepository): + """ + Class to reflect APT based GardenLinux repositories. + + :author: Garden Linux Maintainers + :copyright: Copyright 2024 SAP SE + :package: gardenlinux + :subpackage: apt + :since: 0.7.0 + :license: https://www.apache.org/licenses/LICENSE-2.0 + Apache License, Version 2.0 + """ + def __init__( self, dist: str, url: Optional[str] = "http://packages.gardenlinux.io/gardenlinux", components: Optional[list[str]] = ["main"], - ) -> None: + ): + """ + Constructor __init__(GardenLinuxRepo) + + :param dist: Repository dist + :param url: Repository url + :param components: Repository components provided + + :since: 0.7.0 + """ + self.components = components self.url = url self.dist = dist @@ -18,38 +44,67 @@ def __init__( def get_package_version_by_name(self, name: str) -> list[tuple[str, str]]: """ - :param str name: name of package to find - :returns: packages matching the input name + Returns the package version matching the given name. + + :param name: name of package to find + + :return: (list) Packages matching the input name + :since: 0.7.0 """ + return [ (package.package, package.version) for package in self.repo.get_packages_by_name(name) ] - def get_packages_versions(self): + def get_packages_versions(self) -> list[tuple[str, str]]: """ Returns list of (package, version) tuples + + :return: (list) Packages versions + :since: 0.7.0 """ + return [(p.package, p.version) for p in self.repo.packages] -def compare_gardenlinux_repo_version(version_a: str, version_b: str): +def compare_gardenlinux_repo_version( + version_a: str, version_b: str +) -> list[tuple[str, str, str]]: """ - :param str version_a: Version of first Garden Linux repo - :param str version_b: Version of first Garden Linux repo + Compares differences between repository versions given. Example: print(compare_gardenlinux_repo_version("1443.2", "1443.1")) + + :param version_a: Version of first Garden Linux repo + :param version_b: Version of first Garden Linux repo + + :return: (list) Differences between repo a and repo b + :since: 0.7.0 """ + return compare_repo(GardenLinuxRepo(version_a), GardenLinuxRepo(version_b)) def compare_repo( a: GardenLinuxRepo, b: GardenLinuxRepo, available_in_both: Optional[bool] = False -): +) -> list[tuple[str, str, str]]: """ - :param a GardenLinuxRepo: first repo to compare - :param b GardenLinuxRepo: second repo to compare - :returns: differences between repo a and repo b + Compares differences between repositories given. + + Example: + gl_repo = GardenLinuxRepo("today") + gl_repo_1592 = GardenLinuxRepo("1592.0") + deb_testing = GardenLinuxRepo("testing", "https://deb.debian.org/debian/") + print(compare_repo(gl_repo, gl_repo_1592, available_in_both=True)) + print(compare_repo(gl_repo, deb_testing, available_in_both=False)) + + :param a GardenLinuxRepo: First repo to compare + :param b GardenLinuxRepo: Second repo to compare + :param available_in_both: Compare packages available in both repos only + + :return: (list) Differences between repo a and repo b + :since: 0.7.0 """ packages_a = dict(a.get_packages_versions()) @@ -69,15 +124,3 @@ def compare_repo( ) or (name not in packages_b or name not in packages_a) ] - - -# EXAMPLE USAGE. -# print(compare_gardenlinux_repo_version("1443.2", "1443.1")) - -# gl_repo = GardenLinuxRepo("today") -# gl_repo_1592 = GardenLinuxRepo("1592.0") -# deb_testing = GardenLinuxRepo("testing", "https://deb.debian.org/debian/") -# print(compare_repo(gl_repo, gl_repo_1592, available_in_both=True)) -# print(compare_repo(gl_repo, deb_testing, available_in_both=True)) -# # print(gl_repo.get_packages_versions()) -# print(gl_repo.get_package_version_by_name("wget")) diff --git a/src/gardenlinux/constants.py b/src/gardenlinux/constants.py index 50521bb3..1fc515e2 100644 --- a/src/gardenlinux/constants.py +++ b/src/gardenlinux/constants.py @@ -166,3 +166,4 @@ OCI_ANNOTATION_SIGNATURE_KEY = "io.gardenlinux.oci.signature" OCI_ANNOTATION_SIGNED_STRING_KEY = "io.gardenlinux.oci.signed-string" +OCI_IMAGE_INDEX_MEDIA_TYPE = "application/vnd.oci.image.index.v1+json" diff --git a/src/gardenlinux/features/__init__.py b/src/gardenlinux/features/__init__.py index a04d4bdd..a6e7692e 100644 --- a/src/gardenlinux/features/__init__.py +++ b/src/gardenlinux/features/__init__.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- +""" +Features module +""" + from .cname import CName from .parser import Parser diff --git a/src/gardenlinux/features/__main__.py b/src/gardenlinux/features/__main__.py index 839e48cd..8c70098d 100644 --- a/src/gardenlinux/features/__main__.py +++ b/src/gardenlinux/features/__main__.py @@ -1,15 +1,20 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from .cname import CName -from .parser import Parser +""" +gl-features-parse main entrypoint +""" -from functools import reduce -from os import path import argparse import os import re import sys +from functools import reduce +from os import path +from typing import Any, List, Set + +from .cname import CName +from .parser import Parser _ARGS_TYPE_ALLOWED = [ @@ -25,7 +30,13 @@ ] -def main(): +def main() -> None: + """ + gl-features-parse main() + + :since: 0.7.0 + """ + parser = argparse.ArgumentParser() parser.add_argument("--arch", dest="arch") @@ -152,13 +163,31 @@ def main(): print(f"{version}-{commit_id}") -def get_cname_base(sorted_features): +def get_cname_base(sorted_features: Set[str]): + """ + Get the base cname for the feature set given. + + :param sorted_features: Sorted feature set + + :return: (str) Base cname + :since: 0.7.0 + """ + return reduce( lambda a, b: a + ("-" if not b.startswith("_") else "") + b, sorted_features ) -def get_version_and_commit_id_from_files(gardenlinux_root): +def get_version_and_commit_id_from_files(gardenlinux_root: str) -> tuple[str, str]: + """ + Returns the version and commit ID based on files in the GardenLinux root directory. + + :param gardenlinux_root: GardenLinux root directory + + :return: (tuple) Version and commit ID if readable + :since: 0.7.0 + """ + commit_id = None version = None @@ -173,25 +202,53 @@ def get_version_and_commit_id_from_files(gardenlinux_root): return (version, commit_id) -def get_minimal_feature_set(graph): +def get_minimal_feature_set(graph: Any) -> Set[str]: + """ + Returns the minimal set of features described by the given graph. + + :param graph: networkx.Digraph + + :return: (set) Minimal set of features + :since: 0.7.0 + """ + return set([node for (node, degree) in graph.in_degree() if degree == 0]) -def graph_as_mermaid_markup(flavor, graph): +def graph_as_mermaid_markup(flavor: str, graph: Any) -> str: """ Generates a mermaid.js representation of the graph. This is helpful to identify dependencies between features. Syntax docs: https://mermaid.js.org/syntax/flowchart.html?id=flowcharts-basic-syntax + + :param flavor: Flavor name + :param graph: networkx.Digraph + + :return: (str) mermaid.js representation + :since: 0.7.0 """ + markup = f"---\ntitle: Dependency Graph for Feature {flavor}\n---\ngraph TD;\n" + for u, v in graph.edges: markup += f" {u}-->{v};\n" + return markup -def sort_subset(input_set, order_list): +def sort_subset(input_set: Set[str], order_list: List[str]) -> List[str]: + """ + Returns items from `order_list` if given in `input_set`. + + :param input_set: Set of values + :param order_list: networkx.Digraph + + :return: (str) mermaid.js representation + :since: 0.7.0 + """ + return [item for item in order_list if item in input_set] diff --git a/src/gardenlinux/features/cname.py b/src/gardenlinux/features/cname.py index 7eb6e21a..9085da21 100644 --- a/src/gardenlinux/features/cname.py +++ b/src/gardenlinux/features/cname.py @@ -1,13 +1,41 @@ # -*- coding: utf-8 -*- +""" +Canonical name (cname) +""" + from typing import Optional import re from ..constants import ARCHS +from .parser import Parser + class CName(object): + """ + Class to represent a canonical name (cname). + + :author: Garden Linux Maintainers + :copyright: Copyright 2024 SAP SE + :package: gardenlinux + :subpackage: features + :since: 0.7.0 + :license: https://www.apache.org/licenses/LICENSE-2.0 + Apache License, Version 2.0 + """ + def __init__(self, cname, arch=None, version=None): + """ + Constructor __init__(CName) + + :param cname: Canonical name to represent + :param arch: Architecture if not part of cname + :param version: Version if not part of cname + + :since: 0.7.0 + """ + self._arch = None self._flavor = None self._commit_id = None @@ -44,10 +72,22 @@ def __init__(self, cname, arch=None, version=None): @property def arch(self) -> Optional[str]: + """ + Returns the architecture for the cname parsed. + + :return: (str) CName architecture + """ + return self._arch @property def cname(self) -> str: + """ + Returns the cname parsed. + + :return: (str) CName + """ + cname = self._flavor if self._arch is not None: @@ -60,18 +100,52 @@ def cname(self) -> str: @property def commit_id(self) -> Optional[str]: + """ + Returns the commit ID if part of the cname parsed. + + :return: (str) Commit ID + """ + return self._commit_id @property def flavor(self) -> str: + """ + Returns the flavor for the cname parsed. + + :return: (str) Flavor + """ + return self._flavor + @property + def feature_set(self) -> str: + """ + Returns the feature set for the cname parsed. + + :return: (str) Feature set of the cname + """ + + return Parser().filter_as_string(self.flavor) + @property def version(self) -> Optional[str]: + """ + Returns the version if part of the cname parsed. + + :return: (str) Version + """ + return self._version @property def version_and_commit_id(self) -> Optional[str]: + """ + Returns the version and commit ID if part of the cname parsed. + + :return: (str) Version and commit ID + """ + if self._commit_id is None: return None diff --git a/src/gardenlinux/features/cname_main.py b/src/gardenlinux/features/cname_main.py index 6f80f641..e4ec807e 100644 --- a/src/gardenlinux/features/cname_main.py +++ b/src/gardenlinux/features/cname_main.py @@ -1,22 +1,33 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +""" +gl-cname main entrypoint +""" + from functools import reduce from os.path import basename, dirname import argparse import re +from .cname import CName +from .parser import Parser + from .__main__ import ( get_cname_base, get_minimal_feature_set, get_version_and_commit_id_from_files, sort_subset, ) -from .cname import CName -from .parser import Parser def main(): + """ + gl-cname main() + + :since: 0.7.0 + """ + parser = argparse.ArgumentParser() parser.add_argument("--arch", dest="arch") diff --git a/src/gardenlinux/features/parser.py b/src/gardenlinux/features/parser.py index 4b5cdb37..a3795043 100644 --- a/src/gardenlinux/features/parser.py +++ b/src/gardenlinux/features/parser.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- +""" +Features parser based on networkx.Digraph +""" + from glob import glob from typing import Callable, Optional import logging @@ -14,16 +18,47 @@ BARE_FLAVOR_FEATURE_CONTENT, BARE_FLAVOR_LIBC_FEATURE_CONTENT, ) + from ..logger import LoggerSetup class Parser(object): + """ + Parser for GardenLinux features. + + :author: Garden Linux Maintainers + :copyright: Copyright 2024 SAP SE + :package: gardenlinux + :subpackage: features + :since: 0.7.0 + :license: https://www.apache.org/licenses/LICENSE-2.0 + Apache License, Version 2.0 + """ + + _GARDENLINUX_ROOT: str = "." + """ + Default GardenLinux root directory + """ + def __init__( self, - gardenlinux_root: str = ".", - feature_dir_name: str = "features", + gardenlinux_root: Optional[str] = None, + feature_dir_name: Optional[str] = "features", logger: Optional[logging.Logger] = None, ): + """ + Constructor __init__(Parser) + + :param gardenlinux_root: GardenLinux root directory + :param feature_dir_name: Name of the features directory + :param logger: Logger instance + + :since: 0.7.0 + """ + + if gardenlinux_root is None: + gardenlinux_root = Parser._GARDENLINUX_ROOT + feature_base_dir = os.path.join(gardenlinux_root, feature_dir_name) if not os.access(feature_base_dir, os.R_OK): @@ -45,6 +80,13 @@ def __init__( @property def graph(self) -> networkx.Graph: + """ + Returns the features graph based on the GardenLinux features directory. + + :return: (networkx.Graph) Features graph + :since: 0.7.0 + """ + if self._graph is None: feature_yaml_files = glob("{0}/*/info.yaml".format(self._feature_base_dir)) features = [self._read_feature_yaml(i) for i in feature_yaml_files] @@ -84,6 +126,17 @@ def filter( ignore_excludes: bool = False, additional_filter_func: Optional[Callable[(str,), bool]] = None, ) -> networkx.Graph: + """ + Filters the features graph. + + :param cname: Canonical name to filter + :param ignore_excludes: Ignore `exclude` feature files + :param additional_filter_func: Additional filter function + + :return: (networkx.Graph) Filtered features graph + :since: 0.7.0 + """ + input_features = Parser.get_cname_as_feature_set(cname) filter_set = input_features.copy() @@ -120,10 +173,14 @@ def filter_as_dict( additional_filter_func: Optional[Callable[(str,), bool]] = None, ) -> dict: """ - :param str cname: the target cname to get the feature dict for - :param str gardenlinux_root: path of garden linux src root + Filters the features graph and returns it as a dict. + + :param cname: Canonical name to filter + :param ignore_excludes: Ignore `exclude` feature files + :param additional_filter_func: Additional filter function - :return: dict with list of features for a given cname, split into platform, element and flag + :return: (dict) List of features for a given cname, split into platform, element and flag + :since: 0.7.0 """ graph = self.filter(cname, ignore_excludes, additional_filter_func) @@ -148,10 +205,14 @@ def filter_as_list( additional_filter_func: Optional[Callable[(str,), bool]] = None, ) -> list: """ - :param str cname: the target cname to get the feature dict for - :param str gardenlinux_root: path of garden linux src root + Filters the features graph and returns it as a list. - :return: list of features for a given cname + :param cname: Canonical name to filter + :param ignore_excludes: Ignore `exclude` feature files + :param additional_filter_func: Additional filter function + + :return: (list) Features list for a given cname + :since: 0.7.0 """ graph = self.filter(cname, ignore_excludes, additional_filter_func) @@ -164,10 +225,14 @@ def filter_as_string( additional_filter_func: Optional[Callable[(str,), bool]] = None, ) -> str: """ - :param str cname: the target cname to get the feature set for - :param str gardenlinux_root: path of garden linux src root + Filters the features graph and returns it as a string. + + :param cname: Canonical name to filter + :param ignore_excludes: Ignore `exclude` feature files + :param additional_filter_func: Additional filter function - :return: a comma separated string with the expanded feature set for the cname + :return: (str) Comma separated string with the expanded feature set for the cname + :since: 0.7.0 """ graph = self.filter(cname, ignore_excludes, additional_filter_func) @@ -176,6 +241,15 @@ def filter_as_string( return ",".join(features) def _exclude_from_filter_set(graph, input_features, filter_set): + """ + Removes the given `filter_set` out of `input_features`. + + :param input_features: Features + :param filter_set: Set to filter out + + :since: 0.7.0 + """ + exclude_graph_view = Parser._get_graph_view_for_attr(graph, "exclude") exclude_list = [] @@ -197,16 +271,25 @@ def _exclude_from_filter_set(graph, input_features, filter_set): raise ValueError("Including explicitly excluded feature") def _get_node_features(self, node): + """ + Returns the features for a given features node. + + :param node: Graph node + + :return: (dict) Features content dictionary + :since: 0.7.0 + """ + return node.get("content", {}).get("features", {}) def _read_feature_yaml(self, feature_yaml_file: str): """ - Legacy function copied from gardenlinux/builder + Reads and returns the content of the given features file. - extracts the feature name from the feature_yaml_file param, - reads the info.yaml into a dict and outputs a dict containing the cname and the info yaml + :param feature_yaml_file: Features file to read - :param str feature_yaml_file: path to the target info.yaml that must be read + :return: (dict) Features content dictionary + :since: 0.7.0 """ name = os.path.basename(os.path.dirname(feature_yaml_file)) @@ -218,11 +301,30 @@ def _read_feature_yaml(self, feature_yaml_file: str): @staticmethod def get_cname_as_feature_set(cname): + """ + Returns the features of a given canonical name. + + :param cname: Canonical name + + :return: (set) Features of the cname + :since: 0.7.0 + """ + cname = cname.replace("_", "-_") return set(cname.split("-")) @staticmethod def _get_filter_set_callable(filter_set, additional_filter_func): + """ + Returns the filter function used for the graph. + + :param filter_set: Filter set + :param additional_filter_func: Additional filter function to apply + + :return: (callable) Filter function + :since: 0.7.0 + """ + def filter_func(node): additional_filter_result = ( True if additional_filter_func is None else additional_filter_func(node) @@ -233,12 +335,32 @@ def filter_func(node): @staticmethod def _get_graph_view_for_attr(graph, attr): + """ + Returns a graph view to return `attr` data. + + :param filter_set: Filter set + :param additional_filter_func: Additional filter function to apply + + :return: (object) networkx view + :since: 0.7.0 + """ + return networkx.subgraph_view( graph, filter_edge=Parser._get_graph_view_for_attr_callable(graph, attr) ) @staticmethod def _get_graph_view_for_attr_callable(graph, attr): + """ + Returns the filter function used to filter for `attr` data. + + :param graph: Graph to filter + :param attr: Graph edge attribute to filter for + + :return: (callable) Filter function + :since: 0.7.0 + """ + def filter_func(a, b): return graph.get_edge_data(a, b)["attr"] == attr @@ -246,10 +368,40 @@ def filter_func(a, b): @staticmethod def _get_graph_node_type(node): + """ + Returns the node feature type. + + :param node: Graph node + + :return: (str) Feature type + :since: 0.7.0 + """ + return node.get("content", {}).get("type") + @staticmethod + def set_default_gardenlinux_root_dir(root_dir): + """ + Sets the default GardenLinux root directory used. + + :param root_dir: GardenLinux root directory + + :since: 0.7.0 + """ + + Parser._GARDENLINUX_ROOT = root_dir + @staticmethod def sort_graph_nodes(graph): + """ + Sorts graph nodes by feature type. + + :param graph: Graph to sort + + :return: (list) Sorted feature set + :since: 0.7.0 + """ + def key_function(node): prefix_map = {"platform": "0", "element": "1", "flag": "2"} node_type = Parser._get_graph_node_type(graph.nodes.get(node, {})) @@ -261,4 +413,13 @@ def key_function(node): @staticmethod def sort_reversed_graph_nodes(graph): + """ + Sorts graph nodes by feature type. + + :param graph: Graph to reverse and sort + + :return: (list) Reversed and sorted feature set + :since: 0.7.0 + """ + return Parser.sort_graph_nodes(graph.reverse()) diff --git a/src/gardenlinux/flavors/__init__.py b/src/gardenlinux/flavors/__init__.py index bff333bd..8fdf747f 100644 --- a/src/gardenlinux/flavors/__init__.py +++ b/src/gardenlinux/flavors/__init__.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- +""" +Flavors module +""" + from .parser import Parser __all__ = ["Parser"] diff --git a/src/gardenlinux/flavors/__main__.py b/src/gardenlinux/flavors/__main__.py index bff775be..9d7997ac 100644 --- a/src/gardenlinux/flavors/__main__.py +++ b/src/gardenlinux/flavors/__main__.py @@ -1,17 +1,31 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +""" +gl-flavors-parse main entrypoint +""" + from argparse import ArgumentParser -from git import Git import json import os import sys +from ..git import Git + from .parser import Parser def generate_markdown_table(combinations, no_arch): - """Generate a markdown table of platforms and their flavors.""" + """ + Generate a markdown table of platforms and their flavors. + + :param combinations: List of tuples of architectures and flavors + :param no_arch: Noop + + :return: (str) Markdown table + :since: 0.7.0 + """ + table = "| Platform | Architecture | Flavor |\n" table += "|------------|--------------------|------------------------------------------|\n" @@ -25,6 +39,13 @@ def generate_markdown_table(combinations, no_arch): def parse_args(): + """ + Parses arguments used for main() + + :return: (object) Parsed argparse.ArgumentParser namespace + :since: 0.7.0 + """ + parser = ArgumentParser(description="Parse flavors.yaml and generate combinations.") parser.add_argument( @@ -91,10 +112,15 @@ def parse_args(): def main(): + """ + gl-flavors-parse main() + + :since: 0.7.0 + """ + args = parse_args() - repo_path = Git(".").rev_parse("--show-superproject-working-tree") - flavors_file = os.path.join(repo_path, "flavors.yaml") + flavors_file = os.path.join(Git().root, "flavors.yaml") if not os.path.isfile(flavors_file): sys.exit(f"Error: {flavors_file} does not exist.") diff --git a/src/gardenlinux/flavors/parser.py b/src/gardenlinux/flavors/parser.py index fb35b696..5ac69c8b 100644 --- a/src/gardenlinux/flavors/parser.py +++ b/src/gardenlinux/flavors/parser.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- +""" +Flavors parser +""" + from jsonschema import validate as jsonschema_validate import fnmatch import yaml @@ -9,7 +13,28 @@ class Parser(object): + """ + Parser for GardenLinux `flavors.yaml`. + + :author: Garden Linux Maintainers + :copyright: Copyright 2024 SAP SE + :package: gardenlinux + :subpackage: flavors + :since: 0.7.0 + :license: https://www.apache.org/licenses/LICENSE-2.0 + Apache License, Version 2.0 + """ + def __init__(self, data, logger=None): + """ + Constructor __init__(Parser) + + :param data: Flavors data to parse + :param logger: Logger instance + + :since: 0.7.0 + """ + flavors_data = yaml.safe_load(data) if isinstance(data, str) else data jsonschema_validate(instance=flavors_data, schema=GL_FLAVORS_SCHEMA) @@ -34,7 +59,22 @@ def filter( filter_categories=[], exclude_categories=[], ): - """Parses the flavors.yaml file and generates combinations.""" + """ + Filters flavors data and generates combinations. + + :param include_only_patterns: Include pattern list + :param wildcard_excludes: Exclude wildcard list + :param only_build: Return only build-enabled flavors + :param only_test: Return only test-enabled flavors + :param only_test_platform: Return only platform-test-enabled flavors + :param only_publish: Return only flavors to be published + :param filter_categories: List of categories to include + :param exclude_categories: List of categories to exclude + + :return: (list) Filtered flavors + :since: 0.7.0 + """ + self._logger.debug("flavors.Parser filtering with {0}".format(locals())) combinations = [] # Use a list for consistent order @@ -92,7 +132,15 @@ def filter( @staticmethod def group_by_arch(combinations): - """Groups combinations by architecture into a JSON dictionary.""" + """ + Groups combinations by architecture into a dictionary. + + :param combinations: Flavor combinations to group + + :return: (list) Grouped flavor combinations + :since: 0.7.0 + """ + arch_dict = {} for arch, combination in combinations: arch_dict.setdefault(arch, []).append(combination) @@ -102,7 +150,15 @@ def group_by_arch(combinations): @staticmethod def remove_arch(combinations): - """Removes the architecture from combinations.""" + """ + Removes the architecture from combinations. + + :param combinations: Flavor combinations to remove the architecture + + :return: (list) Changed flavor combinations + :since: 0.7.0 + """ + return [ combination.replace(f"-{arch}", "") for arch, combination in combinations ] @@ -111,7 +167,15 @@ def remove_arch(combinations): def should_exclude(combination, excludes, wildcard_excludes): """ Checks if a combination should be excluded based on exact match or wildcard patterns. + + :param combinations: Flavor combinations + :param excludes: List of features to exclude + :param wildcard_excludes: List of feature wildcards to exclude + + :return: (bool) True if excluded + :since: 0.7.0 """ + # Exclude if in explicit excludes if combination in excludes: return True @@ -125,7 +189,14 @@ def should_include_only(combination, include_only_patterns): """ Checks if a combination should be included based on `--include-only` wildcard patterns. If no patterns are provided, all combinations are included by default. + + :param combinations: Flavor combinations + :param include_only_patterns: List of features to include + + :return: (bool) True if included + :since: 0.7.0 """ + if not include_only_patterns: return True return any( diff --git a/src/gardenlinux/git/__init__.py b/src/gardenlinux/git/__init__.py index 7a9e04ae..5776ad0d 100644 --- a/src/gardenlinux/git/__init__.py +++ b/src/gardenlinux/git/__init__.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- +""" +Git module +""" + from .git import Git __all__ = ["Git"] diff --git a/src/gardenlinux/git/git.py b/src/gardenlinux/git/git.py index fcae4c8e..8bf44a9e 100755 --- a/src/gardenlinux/git/git.py +++ b/src/gardenlinux/git/git.py @@ -1,30 +1,83 @@ # -*- coding: utf-8 -*- +import sys +from git import Repo from git import Git as _Git +from os import PathLike from pathlib import Path -import sys from ..logger import LoggerSetup -class Git: - """Git operations handler.""" +class Git(object): + """ + Git operations handler based on the given Git directory. + + :author: Garden Linux Maintainers + :copyright: Copyright 2024 SAP SE + :package: gardenlinux + :subpackage: git + :since: 0.7.0 + :license: https://www.apache.org/licenses/LICENSE-2.0 + Apache License, Version 2.0 + """ - def __init__(self, logger=None): - """Initialize Git handler. + def __init__(self, git_directory=".", logger=None): + """ + Constructor __init__(Git) + + :param git_directory: Git directory + :param logger: Logger instance - Args: - logger: Optional logger instance + :since: 0.7.0 """ if logger is None or not logger.hasHandlers(): logger = LoggerSetup.get_logger("gardenlinux.git") + if not isinstance(git_directory, PathLike): + git_directory = Path(git_directory) + + if not git_directory.is_dir(): + raise RuntimeError(f"Git directory given is invalid: {git_directory}") + + self._git_directory = git_directory self._logger = logger - def get_root(self): - """Get the root directory of the current Git repository.""" - root_dir = Git(".").rev_parse("--show-superproject-working-tree") - self.log.debug(f"Git root directory: {root_dir}") + @property + def commit_id(self): + """ + Returns the commit ID for Git `HEAD`. + + :return: (str) Git commit ID + :since: 0.7.0 + """ + + return str(self.root_repo.head.commit) + @property + def root(self): + """ + Returns the root directory of the current Git repository. + + :return: (object) Git root directory + :since: 0.7.0 + """ + + root_dir = _Git(self._git_directory).rev_parse( + "--show-superproject-working-tree" + ) + + self._logger.debug(f"Git root directory: {root_dir}") return Path(root_dir) + + @property + def root_repo(self): + """ + Returns the root Git `Repo` instance. + + :return: (object) Git root Git `Repo` instance + :since: 0.7.0 + """ + + return Repo(self.root) diff --git a/src/gardenlinux/oci/__init__.py b/src/gardenlinux/oci/__init__.py index 40a96afc..4102b543 100644 --- a/src/gardenlinux/oci/__init__.py +++ b/src/gardenlinux/oci/__init__.py @@ -1 +1,12 @@ # -*- coding: utf-8 -*- + +""" +OCI module +""" + +from .container import Container +from .index import Index +from .layer import Layer +from .manifest import Manifest + +__all__ = ["Container", "Index", "Layer", "Manifest"] diff --git a/src/gardenlinux/oci/__main__.py b/src/gardenlinux/oci/__main__.py index 4830f6dc..a0ee136f 100755 --- a/src/gardenlinux/oci/__main__.py +++ b/src/gardenlinux/oci/__main__.py @@ -1,15 +1,25 @@ #!/usr/bin/env python3 +""" +gl-oci main entrypoint +""" + import os import click from pygments.lexer import default -from .registry import GlociRegistry +from .container import Container @click.group() def cli(): + """ + gl-oci click argument entrypoint + + :since: 0.7.0 + """ + pass @@ -26,6 +36,13 @@ def cli(): type=click.Path(), help="Version of image", ) +@click.option( + "--commit", + required=False, + type=click.Path(), + default=None, + help="Commit of image", +) @click.option( "--arch", required=True, @@ -60,6 +77,7 @@ def cli(): def push_manifest( container, version, + commit, arch, cname, directory, @@ -68,18 +86,25 @@ def push_manifest( insecure, additional_tag, ): - """push artifacts from a dir to a registry, get the index-entry for the manifest in return""" - container_name = f"{container}:{version}" - registry = GlociRegistry( - container_name=container_name, - token=os.getenv("GL_CLI_REGISTRY_TOKEN"), + """ + Push artifacts and the manifest from a directory to a registry. + + :since: 0.7.0 + """ + + container = Container( + f"{container}:{version}", insecure=insecure, ) - digest = registry.push_from_dir( - arch, version, cname, directory, manifest_file, additional_tag + + manifest = container.read_or_generate_manifest(cname, arch, version, commit) + + container.push_manifest_and_artifacts_from_directory( + manifest, directory, manifest_file, additional_tag ) + if cosign_file: - print(digest, file=open(cosign_file, "w")) + print(manifest.digest, file=open(cosign_file, "w")) @cli.command() @@ -114,18 +139,27 @@ def push_manifest( help="Additional tag to push the index with", ) def update_index(container, version, manifest_folder, insecure, additional_tag): - """push a index entry from a list of files to an index""" - container_name = f"{container}:{version}" - registry = GlociRegistry( - container_name=container_name, - token=os.getenv("GL_CLI_REGISTRY_TOKEN"), + """ + Push a list of files from the `manifest_folder` to an index. + + :since: 0.7.0 + """ + + container = Container( + f"{container}:{version}", insecure=insecure, ) - registry.update_index(manifest_folder, additional_tag) + + container.push_index_from_directory(manifest_folder, additional_tag) def main(): - """Entry point for the gl-oci command.""" + """ + gl-oci main() + + :since: 0.7.0 + """ + cli() diff --git a/src/gardenlinux/oci/checksum.py b/src/gardenlinux/oci/checksum.py deleted file mode 100644 index abfa909f..00000000 --- a/src/gardenlinux/oci/checksum.py +++ /dev/null @@ -1,18 +0,0 @@ -# -*- coding: utf-8 -*- - -import hashlib - - -def verify_sha256(checksum: str, data: bytes): - data_checksum = f"sha256:{hashlib.sha256(data).hexdigest()}" - if checksum != data_checksum: - raise ValueError(f"Invalid checksum. {checksum} != {data_checksum}") - - -def calculate_sha256(file_path: str) -> str: - """Calculate the SHA256 checksum of a file.""" - sha256_hash = hashlib.sha256() - with open(file_path, "rb") as f: - for byte_block in iter(lambda: f.read(4096), b""): - sha256_hash.update(byte_block) - return sha256_hash.hexdigest() diff --git a/src/gardenlinux/oci/container.py b/src/gardenlinux/oci/container.py new file mode 100644 index 00000000..392769e0 --- /dev/null +++ b/src/gardenlinux/oci/container.py @@ -0,0 +1,603 @@ +# -*- coding: utf-8 -*- + +""" +OCI container +""" + +import json +import jsonschema +import logging +from base64 import b64encode +from configparser import ConfigParser, UNNAMED_SECTION +from collections.abc import Sequence +from hashlib import sha256 +from oras.container import Container as OrasContainer +from oras.defaults import unknown_config_media_type as UNKNOWN_CONFIG_MEDIA_TYPE +from oras.provider import Registry +from oras.utils import make_targz, extract_targz +from os import fdopen, getenv, PathLike +from pathlib import Path +from requests import Response +from tempfile import mkstemp +from typing import Optional +from urllib.parse import urlsplit + +from ..constants import GL_MEDIA_TYPE_LOOKUP, OCI_IMAGE_INDEX_MEDIA_TYPE +from ..features.cname import CName +from ..logger import LoggerSetup + +from .index import Index +from .layer import Layer +from .manifest import Manifest +from .schemas import index as IndexSchema + + +class Container(Registry): + """ + OCI container instance to provide methods for interaction. + + :author: Garden Linux Maintainers + :copyright: Copyright 2024 SAP SE + :package: gardenlinux + :subpackage: flavors + :since: 0.7.0 + :license: https://www.apache.org/licenses/LICENSE-2.0 + Apache License, Version 2.0 + """ + + def __init__( + self, + container_url: str, + insecure: bool = False, + token: Optional[str] = None, + logger: Optional[logging.Logger] = None, + ): + """ + Constructor __init__(Container) + + :param container_url: OCI container URL + :param insecure: True if access is provided via HTTP without encryption + :param token: OCI access token + :param logger: Logger instance + + :since: 0.7.0 + """ + + if "://" in container_url: + container_data = container_url.rsplit(":", 2) + + if len(container_data) < 3: + raise RuntimeError("Container name given is invalid") + + self._container_url = f"{container_data[0]}:{container_data[1]}" + self._container_version = container_data[2] + else: + container_data = container_url.rsplit(":", 1) + + if len(container_data) < 2: + raise RuntimeError("Container name given is invalid") + + scheme = "http" if insecure else "https" + + self._container_url = f"{scheme}://{container_data[0]}" + self._container_version = container_data[1] + + container_url_data = urlsplit(self._container_url) + + Registry.__init__( + self, + hostname=container_url_data.netloc, + auth_backend="token", + insecure=insecure, + ) + + if logger is None or not logger.hasHandlers(): + logger = LoggerSetup.get_logger("gardenlinux.oci") + + self._container_name = container_url_data.path[1:] + self._logger = logger + + if token is None: + token = getenv("GL_CLI_REGISTRY_TOKEN") + + if token is not None: + self._token = b64encode(token.encode("utf-8")).decode("utf-8") + self.auth.set_token_auth(self._token) + else: + # Authentication credentials from environment + username = getenv("GL_CLI_REGISTRY_USERNAME") + password = getenv("GL_CLI_REGISTRY_PASSWORD") + + # Login to registry if credentials are provided + if username and password: + self._logger.debug(f"Logging in with username/password") + + try: + self.login(username, password) + except Exception as login_error: + self._logger.error(f"Login error: {str(login_error)}") + + def generate_index(self): + """ + Generates an OCI image index + + :return: (object) OCI image index + :since: 0.7.0 + """ + + return Index() + + def generate_manifest( + self, + cname: str, + architecture: Optional[str] = None, + version: Optional[str] = None, + commit: Optional[str] = None, + feature_set: Optional[str] = None, + ): + """ + Generates an OCI image manifest + + :param cname: Canonical name of the manifest + :param architecture: Target architecture of the manifest + :param version: Artifacts version of the manifest + :param commit: The commit hash of the manifest + :param feature_set: The expanded list of the included features of this manifest + + :return: (object) OCI image manifest + :since: 0.7.0 + """ + + cname_object = CName(cname, architecture, version) + + if architecture is None: + architecture = cname_object.arch + if version is None: + version = cname_object.version + if commit is None: + commit = cname_object.commit_id + if feature_set is None: + feature_set = cname_object.feature_set + + if commit is None: + commit = "" + + manifest = Manifest() + + manifest["annotations"] = {} + manifest["annotations"]["version"] = version + manifest["annotations"]["cname"] = cname + manifest["annotations"]["architecture"] = architecture + manifest["annotations"]["feature_set"] = feature_set + manifest["annotations"]["flavor"] = f"{cname_object.flavor}-{architecture}" + manifest["annotations"]["commit"] = commit + + description = ( + f"Image: {cname} " + f"Flavor: {cname_object.flavor} " + f"Architecture: {architecture} " + f"Features: {feature_set} " + f"Commit: {commit} " + ) + + manifest["annotations"]["org.opencontainers.image.description"] = description + + manifest.config_from_dict( + {}, + {"cname": cname, "architecture": architecture}, + ) + + return manifest + + def _get_index_without_response_parsing(self): + """ + Return the response of an OCI image index request. + + :return: (object) OCI image index request response + :since: 0.7.0 + """ + + manifest_url = self.get_container( + f"{self._container_name}:{self._container_version}" + ).manifest_url() + + return self.do_request( + f"{self.prefix}://{manifest_url}", + headers={"Accept": OCI_IMAGE_INDEX_MEDIA_TYPE}, + ) + + def _get_manifest_without_response_parsing(self, reference): + """ + Return the response of an OCI image manifest request. + + :return: (object) OCI image manifest request response + :since: 0.7.0 + """ + + return self.do_request( + f"{self.prefix}://{self.hostname}/v2/{self._container_name}/manifests/{reference}", + headers={"Accept": "application/vnd.oci.image.manifest.v1+json"}, + ) + + def push_index_from_directory( + self, manifests_dir: PathLike | str, additional_tags: list = None + ): + """ + Replaces an old manifest entries with new ones + + :param manifests_dir: Directory where the manifest entries are read from + :param additional_tags: Additional tags to push the index with + + :since: 0.7.0 + """ + + if not isinstance(manifests_dir, PathLike): + manifests_dir = Path(manifests_dir) + + index = self.read_or_generate_index() + + # Ensure mediaType is set for existing indices + if "mediaType" not in index: + index["mediaType"] = OCI_IMAGE_INDEX_MEDIA_TYPE + + new_entries = 0 + + for file_path_name in manifests_dir.iterdir(): + with open(file_path_name, "r") as fp: + manifest = json.loads(fp.read()) + + if manifest["annotations"]["cname"] in index.manifests_as_dict: + existing_manifest = index.manifests_as_dict[ + manifest["annotations"]["cname"] + ] + + if manifest["digest"] == existing_manifest["digest"]: + self._logger.debug( + f"Skipping manifest with digest {manifest["digest"]} - already exists" + ) + + continue + + index.append_manifest(manifest) + + self._logger.info( + f"Index appended locally {manifest["annotations"]["cname"]}" + ) + + new_entries += 1 + + self._check_200_response(self._upload_index(index)) + self._logger.info(f"Index pushed with {new_entries} new entries") + + if isinstance(additional_tags, Sequence) and len(additional_tags) > 0: + self._logger.info(f"Processing {len(additional_tags)} additional tags") + + self.push_index_for_tags( + index, + additional_tags, + ) + + def push_index_for_tags(self, index, tags): + """ + Push tags for an given OCI image index. + + :param index: OCI image index + :param tags: List of tags to push the index for + + :since: 0.7.0 + """ + + # For each additional tag, push the manifest using Registry.upload_manifest + for tag in tags: + self._check_200_response(self._upload_index(index, tag)) + + def push_manifest( + self, + manifest: Manifest, + manifest_file: Optional[str] = None, + additional_tags: Optional[list] = None, + ) -> Manifest: + """ + Pushes an OCI image manifest. + + :param manifest: OCI image manifest + :param artifacts_with_metadata: A list of file names and their artifacts metadata + :param manifest_file: File name where the modified manifest is written to + :param additional_tags: Additional tags to push the manifest with + + :return: (object) OCI image manifest + :since: 0.7.0 + """ + + if not isinstance(manifest, Manifest): + raise RuntimeError("Artifacts image manifest given is invalid") + + container_name = f"{self._container_name}:{self._container_version}" + + fd, config_file = mkstemp() + + try: + with fdopen(fd, mode="wb") as fp: + fp.write(manifest.config_json) + + self._check_200_response( + self.upload_blob(config_file, container_name, manifest["config"]) + ) + + self._logger.debug(f"Successfully pushed config for {container_name}") + finally: + Path(config_file).unlink() + + manifest_container = OrasContainer( + f"{self._container_url}:{self._container_version}-{manifest.cname}-{manifest.arch}" + ) + + self._check_200_response(self.upload_manifest(manifest, manifest_container)) + + self._logger.info( + f"Successfully pushed {manifest_container} ({manifest.digest})" + ) + + if isinstance(additional_tags, Sequence) and len(additional_tags) > 0: + self._logger.info(f"Processing {len(additional_tags)} additional tags") + + self.push_manifest_for_tags( + manifest, + additional_tags, + ) + + if manifest_file is not None: + manifest.write_metadata_file(manifest_file) + self._logger.info(f"Index entry written to {manifest_file}") + + return manifest + + def push_manifest_and_artifacts( + self, + manifest: Manifest, + artifacts_with_metadata: list[dict], + artifacts_dir: Optional[PathLike | str] = ".build", + manifest_file: Optional[str] = None, + additional_tags: Optional[list] = None, + ) -> Manifest: + """ + Pushes an OCI image manifest and its artifacts. + + :param manifest: OCI image manifest + :param artifacts_with_metadata: A list of file names and their artifacts metadata + :param artifacts_dir: Path of the image artifacts + :param manifest_file: File name where the modified manifest is written to + :param additional_tags: Additional tags to push the manifest with + + :return: (object) OCI image manifest + :since: 0.7.0 + """ + + if not isinstance(manifest, Manifest): + raise RuntimeError("Artifacts image manifest given is invalid") + + if not isinstance(artifacts_dir, PathLike): + artifacts_dir = Path(artifacts_dir) + + container_name = f"{self._container_name}:{self._container_version}" + + # For each file, create sign, attach and push a layer + for artifact in artifacts_with_metadata: + file_path_name = artifacts_dir.joinpath(artifact["file_name"]) + + layer = Layer(file_path_name, artifact["media_type"]) + + if artifact["annotations"]: + layer["annotations"].update(artifact["annotations"]) + + if not file_path_name.exists(): + raise ValueError(f"{file_path_name} does not exist.") + + cleanup_blob = False + + try: + if file_path_name.is_dir(): + file_path_name = Path(make_targz(file_path_name)) + cleanup_blob = True + + manifest.append_layer(layer) + layer_dict = layer.to_dict() + + self._logger.debug(f"Layer: {layer_dict}") + + self._check_200_response( + self.upload_blob(file_path_name, container_name, layer_dict) + ) + + self._logger.info( + f"Pushed {artifact["file_name"]}: {layer_dict["digest"]}" + ) + finally: + if cleanup_blob and file_path_name.exists(): + file_path_name.unlink() + + self.push_manifest(manifest, manifest_file, additional_tags) + + return manifest + + def push_manifest_and_artifacts_from_directory( + self, + manifest: Manifest, + artifacts_dir: Optional[PathLike | str] = ".build", + manifest_file: Optional[str] = None, + additional_tags: Optional[list] = None, + ) -> Manifest: + """ + Pushes an OCI image manifest and its artifacts from the given directory. + + :param manifest: OCI image manifest + :param artifacts_dir: Path of the image artifacts + :param manifest_file: File name where the modified manifest is written to + :param additional_tags: Additional tags to push the manifest with + + :return: (object) OCI image manifest + :since: 0.7.0 + """ + + if not isinstance(artifacts_dir, PathLike): + artifacts_dir = Path(artifacts_dir) + + if not isinstance(manifest, Manifest): + raise RuntimeError("Artifacts image manifest given is invalid") + + files = [ + file_name for file_name in artifacts_dir.iterdir() if file_name.is_file() + ] + + # Scan and extract nested artifacts + for file_path_name in files: + if file_path_name.match("*.pxe.tar.gz"): + self._logger.info(f"Found nested artifact {file_path_name}") + extract_targz(file_path_name, artifacts_dir) + + artifacts_with_metadata = Container.get_artifacts_metadata_from_files( + files, manifest.arch + ) + + for artifact in artifacts_with_metadata: + if artifact["media_type"] == "application/io.gardenlinux.release": + artifact_config = ConfigParser(allow_unnamed_section=True) + artifact_config.read(artifacts_dir.joinpath(artifact["file_name"])) + + if artifact_config.has_option(UNNAMED_SECTION, "GARDENLINUX_FEATURES"): + manifest.feature_set = artifact_config.get( + UNNAMED_SECTION, "GARDENLINUX_FEATURES" + ) + + if manifest.commit == "" and artifact_config.has_option( + UNNAMED_SECTION, "GARDENLINUX_COMMIT_ID" + ): + manifest.commit = artifact_config.get( + UNNAMED_SECTION, "GARDENLINUX_COMMIT_ID" + ) + + return self.push_manifest_and_artifacts( + manifest, + artifacts_with_metadata, + artifacts_dir, + manifest_file, + additional_tags, + ) + + def push_manifest_for_tags(self, manifest, tags): + """ + Push tags for an given OCI image manifest. + + :param manifest: OCI image manifest + :param tags: List of tags to push the index for + + :since: 0.7.0 + """ + + # For each additional tag, push the manifest using Registry.upload_manifest + for tag in tags: + manifest_container = OrasContainer(f"{self._container_url}:{tag}") + + self._check_200_response(self.upload_manifest(manifest, manifest_container)) + + def read_or_generate_index(self): + """ + Reads from registry or generates the OCI image index. + + :return: OCI image manifest + :since: 0.7.0 + """ + + response = self._get_index_without_response_parsing() + + if response.ok: + index = Index(**response.json()) + elif response.status_code == 404: + index = self.generate_index() + else: + response.raise_for_status() + + return index + + def read_or_generate_manifest( + self, + cname: str, + architecture: Optional[str] = None, + version: Optional[str] = None, + commit: Optional[str] = None, + feature_set: Optional[str] = None, + ) -> Manifest: + """ + Reads from registry or generates the OCI image manifest. + + :param cname: Canonical name of the manifest + :param architecture: Target architecture of the manifest + :param version: Artifacts version of the manifest + :param commit: The Git commit ID of the manifest + :param feature_set: The expanded list of the included features of this manifest + + :return: OCI image manifest + :since: 0.7.0 + """ + + if architecture is None: + architecture = CName(cname, architecture, version).arch + + response = self._get_manifest_without_response_parsing( + f"{self._container_version}-{cname}-{architecture}" + ) + + if response.ok: + manifest = Manifest(**response.json()) + elif response.status_code == 404: + manifest = self.generate_manifest( + cname, architecture, version, commit, feature_set + ) + else: + response.raise_for_status() + + return manifest + + def _upload_index(self, index: dict, reference: Optional[str] = None) -> Response: + """ + Uploads the given OCI image index and returns the response. + + :param index: OCI image index + :param reference: OCI container reference (tag) to push to + + :return: (object) OCI image index put response + :since: 0.7.0 + """ + + jsonschema.validate(index, schema=IndexSchema) + + if reference is None: + reference = self._container_version + + return self.do_request( + f"{self.prefix}://{self.hostname}/v2/{self._container_name}/manifests/{reference}", + "PUT", + headers={"Content-Type": OCI_IMAGE_INDEX_MEDIA_TYPE}, + data=index.json, + ) + + @staticmethod + def get_artifacts_metadata_from_files(files: list, arch: str) -> list: + """ + Returns OCI layer metadata for the given list of files. + + :param files: a list of filenames (not paths) to set oci_metadata for + :param arch: arch of the target image + + :return: (list) List of dicts, where each dict represents a layer + :since: 0.7.0 + """ + + artifacts_with_metadata = [] + + for file_name in files: + artifacts_with_metadata.append( + Layer.generate_metadata_from_file_name(file_name, arch) + ) + + return artifacts_with_metadata diff --git a/src/gardenlinux/oci/index.py b/src/gardenlinux/oci/index.py new file mode 100644 index 00000000..bd125981 --- /dev/null +++ b/src/gardenlinux/oci/index.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- + +import json +from copy import deepcopy + +from .schemas import EmptyIndex + + +class Index(dict): + def __init__(self, *args, **kwargs): + dict.__init__(self) + + self.update(deepcopy(EmptyIndex)) + self.update(*args) + self.update(**kwargs) + + @property + def json(self): + return json.dumps(self).encode("utf-8") + + @property + def manifests_as_dict(self): + manifests = {} + + for manifest in self["manifests"]: + if "cname" not in manifest.get("annotations", {}): + raise RuntimeError( + "Unexpected manifest with missing annotation 'cname' found" + ) + + manifests[manifest["annotations"]["cname"]] = manifest + + return manifests + + def append_manifest(self, manifest): + if not isinstance(manifest, dict): + raise RuntimeError("Unexpected manifest type given") + + if "cname" not in manifest.get("annotations", {}): + raise RuntimeError("Unexpected layer with missing annotation 'cname' found") + + cname = manifest["annotations"]["cname"] + existing_manifest_index = 0 + + for existing_manifest in self["manifests"]: + if "cname" not in existing_manifest.get("annotations", {}): + raise RuntimeError( + "Unexpected layer with missing annotation 'cname' found" + ) + + if cname == existing_manifest["annotations"]["cname"]: + break + + existing_manifest_index += 1 + + if len(self["manifests"]) > existing_manifest_index: + self["manifests"].pop(existing_manifest_index) + + self["manifests"].append(manifest) diff --git a/src/gardenlinux/oci/layer.py b/src/gardenlinux/oci/layer.py new file mode 100644 index 00000000..23b24fe0 --- /dev/null +++ b/src/gardenlinux/oci/layer.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- + +from collections.abc import Mapping +from os import PathLike +from oras.defaults import annotation_title as ANNOTATION_TITLE +from oras.oci import Layer as _Layer +from pathlib import Path +from typing import Optional + +from ..constants import GL_MEDIA_TYPE_LOOKUP, GL_MEDIA_TYPES + +_SUPPORTED_MAPPING_KEYS = ("annotations",) + + +class Layer(_Layer, Mapping): + def __init__( + self, + blob_path: PathLike | str, + media_type: Optional[str] = None, + is_dir: bool = False, + ): + if not isinstance(blob_path, PathLike): + blob_path = Path(blob_path) + + _Layer.__init__(self, blob_path, media_type, is_dir) + + self._annotations = { + ANNOTATION_TITLE: blob_path.name, + } + + def __delitem__(self, key): + """ + python.org: Called to implement deletion of self[key]. + + :param key: Mapping key + """ + + if key == "annotations": + self._annotations.clear() + + raise KeyError( + f"'{self.__class__.__name__}' object is not subscriptable except for keys: {_SUPPORTED_MAPPING_KEYS}" + ) + + def __getitem__(self, key): + """ + python.org: Called to implement evaluation of self[key]. + + :param key: Mapping key + + :return: (mixed) Mapping key value + """ + + if key == "annotations": + return self._annotations + + raise KeyError( + f"'{self.__class__.__name__}' object is not subscriptable except for keys: {_SUPPORTED_MAPPING_KEYS}" + ) + + def __iter__(self): + """ + python.org: Return an iterator object. + + :return: (object) Iterator object + """ + + iter(_SUPPORTED_MAPPING_KEYS) + + def __len__(self): + """ + python.org: Called to implement the built-in function len(). + + :return: (int) Number of database instance attributes + """ + + return len(_SUPPORTED_MAPPING_KEYS) + + def __setitem__(self, key, value): + """ + python.org: Called to implement assignment to self[key]. + + :param key: Mapping key + :param value: self[key] value + """ + + if key == "annotations": + self._annotations = value + + raise KeyError( + f"'{self.__class__.__name__}' object is not subscriptable except for keys: {_SUPPORTED_MAPPING_KEYS}" + ) + + def to_dict(self): + """ + Return a dictionary representation of the layer + """ + layer = _Layer.to_dict(self) + layer["annotations"] = self._annotations + + return layer + + @staticmethod + def generate_metadata_from_file_name(file_name: PathLike | str, arch: str) -> dict: + """ + :param str file_name: file_name of the blob + :param str arch: the arch of the target image + :return: dict of oci layer metadata for a given layer file + """ + + if not isinstance(file_name, PathLike): + file_name = Path(file_name) + + media_type = Layer.lookup_media_type_for_file_name(file_name) + + return { + "file_name": file_name.name, + "media_type": media_type, + "annotations": {"io.gardenlinux.image.layer.architecture": arch}, + } + + @staticmethod + def lookup_media_type_for_file_name(file_name: str) -> str: + """ + :param str file_name: file_name of the target layer + :return: mediatype + """ + + if not isinstance(file_name, PathLike): + file_name = Path(file_name) + + for suffix in GL_MEDIA_TYPES: + if file_name.match(f"*.{suffix}"): + return GL_MEDIA_TYPE_LOOKUP[suffix] + + raise ValueError( + f"Media type for {file_name} is not defined. You may want to add the definition to parse_features_lib" + ) diff --git a/src/gardenlinux/oci/manifest.py b/src/gardenlinux/oci/manifest.py new file mode 100644 index 00000000..438172a6 --- /dev/null +++ b/src/gardenlinux/oci/manifest.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- + +import json +from copy import deepcopy +from hashlib import sha256 +from oras.defaults import unknown_config_media_type as UNKNOWN_CONFIG_MEDIA_TYPE +from oras.oci import EmptyManifest, Layer +from os import PathLike +from pathlib import Path + +from ..features import CName + +from .platform import NewPlatform +from .schemas import EmptyManifestMetadata + + +class Manifest(dict): + def __init__(self, *args, **kwargs): + dict.__init__(self) + + self._config_bytes = b"{}" + + self.update(deepcopy(EmptyManifest)) + self.update(*args) + self.update(**kwargs) + + @property + def arch(self): + if "architecture" not in self.get("annotations", {}): + raise RuntimeError( + "Unexpected manifest with missing config annotation 'architecture' found" + ) + + return self["annotations"]["architecture"] + + @arch.setter + def arch(self, value): + self._ensure_annotations_dict() + self["annotations"]["architecture"] = value + + @property + def cname(self): + if "cname" not in self.get("annotations", {}): + raise RuntimeError( + "Unexpected manifest with missing config annotation 'cname' found" + ) + + return self["annotations"]["cname"] + + @cname.setter + def cname(self, value): + self._ensure_annotations_dict() + self["annotations"]["cname"] = value + + @property + def commit(self): + if "commit" not in self.get("annotations", {}): + raise RuntimeError( + "Unexpected manifest with missing config annotation 'commit' found" + ) + + return self["annotations"]["commit"] + + @commit.setter + def commit(self, value): + self._ensure_annotations_dict() + self["annotations"]["commit"] = value + + @property + def config_json(self): + return self._config_bytes + + @property + def digest(self): + digest = sha256(self.json).hexdigest() + return f"sha256:{digest}" + + @property + def feature_set(self): + if "feature_set" not in self.get("annotations", {}): + raise RuntimeError( + "Unexpected manifest with missing config annotation 'feature_set' found" + ) + + return self["annotations"]["feature_set"] + + @feature_set.setter + def feature_set(self, value): + self._ensure_annotations_dict() + self["annotations"]["feature_set"] = value + + @property + def flavor(self): + return CName(self.cname).flavor + + @property + def json(self): + return json.dumps(self).encode("utf-8") + + @property + def layers_as_dict(self): + layers = {} + + for layer in self["layers"]: + if "org.opencontainers.image.title" not in layer.get("annotations", {}): + raise RuntimeError( + "Unexpected layer with missing annotation 'org.opencontainers.image.title' found" + ) + + layers[layer["annotations"]["org.opencontainers.image.title"]] = layer + + return layers + + @property + def size(self): + return len(self.json) + + @property + def version(self): + if "version" not in self.get("annotations", {}): + raise RuntimeError( + "Unexpected manifest with missing config annotation 'version' found" + ) + + return self["annotations"]["version"] + + @version.setter + def version(self, value): + self._ensure_annotations_dict() + self["annotations"]["version"] = value + + def config_from_dict(self, config: dict, annotations: dict): + """ + Write a new OCI configuration to file, and generate oci metadata for it + For reference see https://github.com/opencontainers/image-spec/blob/main/config.md + annotations, mediatype, size, digest are not part of digest and size calculation, + and therefore must be attached to the output dict and not written to the file. + + :param config: dict with custom configuration (the payload of the configuration) + :param annotations: dict with custom annotations to be attached to metadata part of config + + """ + + self._config_bytes = json.dumps(config).encode("utf-8") + + config["annotations"] = annotations + config["mediaType"] = UNKNOWN_CONFIG_MEDIA_TYPE + config["size"] = len(self._config_bytes) + config["digest"] = f"sha256:{sha256(self._config_bytes).hexdigest()}" + + self["config"] = config + + def append_layer(self, layer): + if not isinstance(layer, Layer): + raise RuntimeError("Unexpected layer type given") + + layer_dict = layer.to_dict() + + if "org.opencontainers.image.title" not in layer_dict.get("annotations", {}): + raise RuntimeError( + "Unexpected layer with missing annotation 'org.opencontainers.image.title' found" + ) + + image_title = layer_dict["annotations"]["org.opencontainers.image.title"] + existing_layer_index = 0 + + for existing_layer in self["layers"]: + if "org.opencontainers.image.title" not in existing_layer.get( + "annotations", {} + ): + raise RuntimeError( + "Unexpected layer with missing annotation 'org.opencontainers.image.title' found" + ) + + if ( + image_title + == existing_layer["annotations"]["org.opencontainers.image.title"] + ): + break + + existing_layer_index += 1 + + if len(self["layers"]) > existing_layer_index: + self["layers"].pop(existing_layer_index) + + self["layers"].append(layer_dict) + + def _ensure_annotations_dict(self): + if "annotations" not in self: + self["annotations"] = {} + + def write_metadata_file(self, manifest_file_path_name): + if not isinstance(manifest_file_path_name, PathLike): + manifest_file_path_name = Path(manifest_file_path_name) + + metadata_annotations = { + "cname": self.cname, + "architecture": self.arch, + "feature_set": self.feature_set, + } + + metadata = deepcopy(EmptyManifestMetadata) + metadata["mediaType"] = "application/vnd.oci.image.manifest.v1+json" + metadata["digest"] = self.digest + metadata["size"] = self.size + metadata["annotations"] = metadata_annotations + metadata["platform"] = NewPlatform(self.arch, self.version) + + with open(manifest_file_path_name, "w") as fp: + fp.write(json.dumps(metadata)) diff --git a/src/gardenlinux/oci/platform.py b/src/gardenlinux/oci/platform.py new file mode 100644 index 00000000..f4085695 --- /dev/null +++ b/src/gardenlinux/oci/platform.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- + +from copy import deepcopy + +from .schemas import EmptyPlatform + + +def NewPlatform(architecture: str, version: str) -> dict: + platform = deepcopy(EmptyPlatform) + platform["architecture"] = architecture + platform["os.version"] = version + + return platform diff --git a/src/gardenlinux/oci/registry.py b/src/gardenlinux/oci/registry.py deleted file mode 100644 index 8923e824..00000000 --- a/src/gardenlinux/oci/registry.py +++ /dev/null @@ -1,861 +0,0 @@ -# -*- coding: utf-8 -*- - -import base64 -import configparser -import copy -import hashlib -import json -import logging -import os -import shutil -import sys -import tarfile -import tempfile -import uuid -from enum import Enum, auto -from platform import architecture -from typing import Optional, Tuple - -import jsonschema -import oras.auth -import oras.client -import oras.defaults -import oras.oci -import oras.provider -import oras.utils -import requests -from oras.container import Container as OrasContainer -from oras.decorator import ensure_container -from oras.provider import Registry -from oras.schemas import manifest as oras_manifest_schema - -from ..constants import OCI_ANNOTATION_SIGNATURE_KEY, OCI_ANNOTATION_SIGNED_STRING_KEY -from ..features import CName -from .checksum import ( - calculate_sha256, - verify_sha256, -) -from .wrapper import retry_on_error -from python_gardenlinux_lib.features.parse_features import get_oci_metadata_from_fileset -from .schemas import ( - EmptyIndex, - EmptyManifestMetadata, - EmptyPlatform, -) -from .schemas import index as indexSchema - - -class ManifestState(Enum): - Incomplete = auto() - Complete = auto() - Final = auto() - - -logger = logging.getLogger(__name__) -logging.basicConfig(stream=sys.stdout, level=logging.INFO) - - -def get_image_state(manifest: dict) -> str: - if "annotations" not in manifest: - logger.warning("No annotations set for manifest.") - return "UNDEFINED" - if "image_state" not in manifest["annotations"]: - logger.warning("No image_state set for manifest.") - return "UNDEFINED" - return manifest["annotations"]["image_state"] - - -def NewPlatform(architecture: str, version: str) -> dict: - platform = copy.deepcopy(EmptyPlatform) - platform["architecture"] = architecture - platform["os.version"] = version - return platform - - -def NewManifestMetadata( - digest: str, size: int, annotations: dict, platform_data: dict -) -> dict: - manifest_meta_data = copy.deepcopy(EmptyManifestMetadata) - manifest_meta_data["mediaType"] = "application/vnd.oci.image.manifest.v1+json" - manifest_meta_data["digest"] = digest - manifest_meta_data["size"] = size - manifest_meta_data["annotations"] = annotations - manifest_meta_data["platform"] = platform_data - return manifest_meta_data - - -def NewIndex() -> dict: - index = copy.deepcopy(EmptyIndex) - index["mediaType"] = "application/vnd.oci.image.index.v1+json" - return index - - -def create_config_from_dict(conf: dict, annotations: dict) -> Tuple[dict, str]: - """ - Write a new OCI configuration to file, and generate oci metadata for it - For reference see https://github.com/opencontainers/image-spec/blob/main/config.md - annotations, mediatype, size, digest are not part of digest and size calculation, - and therefore must be attached to the output dict and not written to the file. - - :param conf: dict with custom configuration (the payload of the configuration) - :param annotations: dict with custom annotations to be attached to metadata part of config - - """ - config_path = os.path.join(os.path.curdir, str(uuid.uuid4())) - with open(config_path, "w") as fp: - json.dump(conf, fp) - conf["annotations"] = annotations - conf["mediaType"] = oras.defaults.unknown_config_media_type - conf["size"] = oras.utils.get_size(config_path) - conf["digest"] = f"sha256:{oras.utils.get_file_hash(config_path)}" - return conf, config_path - - -def construct_manifest_entry_signed_data_string( - cname: str, version: str, new_manifest_metadata: dict, architecture: str -) -> str: - data_to_sign = ( - f"version:{version} cname:{cname} architecture:{architecture} manifest-size" - f":{new_manifest_metadata['size']} manifest-digest:{new_manifest_metadata['digest']}" - ) - return data_to_sign - - -def construct_layer_signed_data_string( - cname: str, version: str, architecture: str, media_type: str, checksum_sha256: str -) -> str: - data_to_sign = f"version:{version} cname:{cname} architecture:{architecture} media_type:{media_type} digest:{checksum_sha256}" - return data_to_sign - - -class GlociRegistry(Registry): - def __init__( - self, - container_name: str, - insecure: bool = False, - token: Optional[str] = None, - config_path: Optional[str] = None, - ): - super().__init__(auth_backend="token", insecure=insecure) - self.container = OrasContainer(container_name) - self.container_name = container_name - self.registry_url = self.container.registry - self.config_path = config_path - if not token: - logger.info("No Token provided.") - else: - self.token = base64.b64encode(token.encode("utf-8")).decode("utf-8") - self.auth.set_token_auth(self.token) - - @ensure_container - def get_manifest_json( - self, container: OrasContainer, allowed_media_type: Optional[list[str]] = None - ): - if not allowed_media_type: - default_image_index_media_type = ( - "application/vnd.oci.image.manifest.v1+json" - ) - allowed_media_type = [default_image_index_media_type] - # self.load_configs(container) - headers = {"Accept": ";".join(allowed_media_type)} - headers.update(self.headers) - get_manifest = f"{self.prefix}://{container.manifest_url()}" - response = self.do_request(get_manifest, "GET", headers=headers) - self._check_200_response(response) - return response - - @ensure_container - def get_manifest_size( - self, container: OrasContainer, allowed_media_type: Optional[list[str]] = None - ): - response = self.get_manifest_json(container, allowed_media_type) - if response is None: - return 0 - return len(response.content) - - @ensure_container - def get_digest( - self, container: OrasContainer, allowed_media_type: Optional[list[str]] = None - ): - response = self.get_manifest_json(container, allowed_media_type) - if response is None: - return "" - return f"sha256:{hashlib.sha256(response.content).hexdigest()}" - - def get_index(self, allowed_media_type: Optional[list[str]] = None): - """ - Returns the manifest for a cname+arch combination of a container - Will return None if no result was found - - TODO: refactor: use get_manifest_json and call it with index mediatype. - """ - if not allowed_media_type: - default_image_index_media_type = "application/vnd.oci.image.index.v1+json" - allowed_media_type = [default_image_index_media_type] - - headers = {"Accept": ";".join(allowed_media_type)} - manifest_url = f"{self.prefix}://{self.container.manifest_url()}" - response = self.do_request(manifest_url, "GET", headers=headers) - try: - self._check_200_response(response) - index = response.json() - return index - - except ValueError: - logger.info("Index not found, creating new Index!") - return NewIndex() - - @ensure_container - def get_manifest_meta_data_by_cname( - self, - container: OrasContainer, - cname: str, - version: str, - arch: str, - allowed_media_type: Optional[list[str]] = None, - ): - """ - Returns the manifest for a cname+arch combination of a container - Will return None if no result was found - """ - index = self.get_index(allowed_media_type=allowed_media_type) - - if "manifests" not in index: - logger.debug("Index is empty") - return None - - for manifest_meta in index["manifests"]: - # Annotations are optional: - # https://github.com/opencontainers/image-spec/blob/v1.0.1/descriptor.md#properties - - if "annotations" in manifest_meta: - if ( - "cname" in manifest_meta["annotations"] - and "architecture" in manifest_meta["annotations"] - and "os.version" in manifest_meta["platform"] - and manifest_meta["annotations"]["cname"] == cname - and manifest_meta["annotations"]["architecture"] == arch - and manifest_meta["platform"]["os.version"] == version - ): - return manifest_meta - - return None - - @ensure_container - def get_manifest_by_digest( - self, - container: OrasContainer, - digest: str, - allowed_media_type: Optional[list[str]] = None, - ): - if not allowed_media_type: - default_image_manifest_media_type = ( - "application/vnd.oci.image.manifest.v1+json" - ) - allowed_media_type = [default_image_manifest_media_type] - - manifest_url = f"{self.prefix}://{container.get_blob_url(digest)}".replace( - "/blobs/", "/manifests/" - ) - headers = {"Accept": ";".join(allowed_media_type)} - response = self.do_request(manifest_url, "GET", headers=headers, stream=False) - self._check_200_response(response) - manifest = response.json() - verify_sha256(digest, response.content) - jsonschema.validate(manifest, schema=oras_manifest_schema) - return manifest - - @ensure_container - def get_manifest_by_cname( - self, - container: OrasContainer, - cname: str, - version: str, - arch: str, - allowed_media_type: Optional[list[str]] = None, - ): - """ - Returns the manifest for a cname+arch combination of a container - Will return None if no result was found - """ - if not allowed_media_type: - default_image_manifest_media_type = ( - "application/vnd.oci.image.manifest.v1+json" - ) - allowed_media_type = [default_image_manifest_media_type] - manifest_meta = self.get_manifest_meta_data_by_cname( - container, cname, version, arch - ) - if manifest_meta is None: - logger.error(f"No manifest found for {cname}-{arch}") - return None - if "digest" not in manifest_meta: - logger.error("No digest found in metadata!") - manifest_digest = manifest_meta["digest"] - return self.get_manifest_by_digest( - container, manifest_digest, allowed_media_type=allowed_media_type - ) - - def change_state(self, cname: str, version: str, architecture: str, new_state: str): - manifest_container = OrasContainer( - f"{self.container_name}-{cname}-{architecture}" - ) - manifest = self.get_manifest_by_cname( - manifest_container, cname, version, architecture - ) - - if "annotations" not in manifest: - logger.warning("No annotations found in manifest, init annotations now.") - manifest["annotations"] = {} - - def attach_layer( - self, - cname: str, - version: str, - architecture: str, - file_path: str, - media_type: str, - ): - if not os.path.exists(file_path): - exit(f"{file_path} does not exist.") - - manifest_container = OrasContainer( - f"{self.container_name}-{cname}-{architecture}" - ) - - manifest = self.get_manifest_by_cname( - self.container, cname, version, architecture - ) - - layer = self.create_layer(file_path, cname, version, architecture, media_type) - self._check_200_response(self.upload_blob(file_path, self.container, layer)) - - manifest["layers"].append(layer) - - old_manifest_digest = self.get_digest(manifest_container) - self._check_200_response(self.upload_manifest(manifest, manifest_container)) - - new_manifest_metadata = self.get_manifest_meta_data_by_cname( - self.container, cname, version, architecture - ) - new_manifest_metadata["digest"] = self.get_digest(manifest_container) - new_manifest_metadata["size"] = self.get_manifest_size(manifest_container) - new_manifest_metadata["platform"] = NewPlatform(architecture, version) - - new_index = self.update_index(old_manifest_digest, new_manifest_metadata) - self._check_200_response(self.upload_index(new_index)) - - print(f"Successfully attached {file_path} to {manifest_container}") - - def sign_manifest_entry( - self, new_manifest_metadata: dict, version: str, architecture: str, cname: str - ): - data_to_sign = construct_manifest_entry_signed_data_string( - cname, version, new_manifest_metadata, architecture - ) - signature = self.signer.sign_data(data_to_sign) - new_manifest_metadata["annotations"].update( - { - OCI_ANNOTATION_SIGNATURE_KEY: signature, - OCI_ANNOTATION_SIGNED_STRING_KEY: data_to_sign, - } - ) - - def sign_layer( - self, - layer: dict, - cname: str, - version: str, - architecture: str, - checksum_sha256: str, - media_type: str, - ): - data_to_sign = construct_layer_signed_data_string( - cname, version, architecture, media_type, checksum_sha256 - ) - signature = self.signer.sign_data(data_to_sign) - layer["annotations"].update( - { - OCI_ANNOTATION_SIGNATURE_KEY: signature, - OCI_ANNOTATION_SIGNED_STRING_KEY: data_to_sign, - } - ) - - def verify_manifest_meta_signature(self, manifest_meta: dict): - if "annotations" not in manifest_meta: - raise ValueError("manifest does not contain annotations") - if OCI_ANNOTATION_SIGNATURE_KEY not in manifest_meta["annotations"]: - raise ValueError("manifest is not signed") - if OCI_ANNOTATION_SIGNED_STRING_KEY not in manifest_meta["annotations"]: - raise ValueError("manifest is not signed") - signature = manifest_meta["annotations"][OCI_ANNOTATION_SIGNATURE_KEY] - signed_data = manifest_meta["annotations"][OCI_ANNOTATION_SIGNED_STRING_KEY] - cname = manifest_meta["annotations"]["cname"] - version = manifest_meta["platform"]["os.version"] - architecture = manifest_meta["annotations"]["architecture"] - signed_data_expected = construct_manifest_entry_signed_data_string( - cname, version, manifest_meta, architecture - ) - if signed_data_expected != signed_data: - raise ValueError( - f"Signed data does not match expected signed data.\n{signed_data} != {signed_data_expected}" - ) - self.signer.verify_signature(signed_data, signature) - - def verify_manifest_signature(self, manifest: dict): - if "layers" not in manifest: - raise ValueError("manifest does not contain layers") - if "annotations" not in manifest: - raise ValueError("manifest does not contain annotations") - - cname = manifest["annotations"]["cname"] - version = manifest["annotations"]["version"] - architecture = manifest["annotations"]["architecture"] - for layer in manifest["layers"]: - if "annotations" not in layer: - raise ValueError(f"layer does not contain annotations. layer: {layer}") - if OCI_ANNOTATION_SIGNATURE_KEY not in layer["annotations"]: - raise ValueError(f"layer is not signed. layer: {layer}") - if OCI_ANNOTATION_SIGNED_STRING_KEY not in layer["annotations"]: - raise ValueError(f"layer is not signed. layer: {layer}") - media_type = layer["mediaType"] - checksum_sha256 = layer["digest"].removeprefix("sha256:") - signature = layer["annotations"][OCI_ANNOTATION_SIGNATURE_KEY] - signed_data = layer["annotations"][OCI_ANNOTATION_SIGNED_STRING_KEY] - signed_data_expected = construct_layer_signed_data_string( - cname, version, architecture, media_type, checksum_sha256 - ) - if signed_data_expected != signed_data: - raise ValueError( - f"Signed data does not match expected signed data. {signed_data} != {signed_data_expected}" - ) - self.signer.verify_signature(signed_data, signature) - - @ensure_container - def remove_container(self, container: OrasContainer): - self.delete_tag(container.manifest_url()) - - def status_all(self): - """ - Validate if container is valid - - all manifests require a info.yaml in the layers - - info.yaml needs to be signed (TODO) - - all layers listed in info.yaml must exist - - all mediatypes of layers listed in info.yaml must be set correctly - """ - index = self.get_index() - - if "manifests" not in index: - logger.info("No manifests in index") - return - for manifest_meta in index["manifests"]: - manifest_digest = manifest_meta["digest"] - manifest = self.get_manifest_by_digest(self.container, manifest_digest) - image_state = get_image_state(manifest) - print(f"{manifest_digest}:\t{image_state}") - - def upload_index(self, index: dict) -> requests.Response: - jsonschema.validate(index, schema=indexSchema) - headers = { - "Content-Type": "application/vnd.oci.image.index.v1+json", - "Content-Length": str(len(index)), - } - tag = self.container.digest or self.container.tag - - index_url = ( - f"{self.container.registry}/v2/{self.container.api_prefix}/manifests/{tag}" - ) - response = self.do_request( - f"{self.prefix}://{index_url}", # noqa - "PUT", - headers=headers, - json=index, - ) - return response - - def push_image_manifest( - self, - architecture: str, - cname: str, - version: str, - build_artifacts_dir: str, - oci_metadata: list, - feature_set: str, - manifest_file: str, - commit: Optional[str] = None, - ): - """ - creates and pushes an image manifest - - :param oci_metadata: a list of filenames and their OCI metadata, can be constructed with - parse_features.get_oci_metadata or parse_features.get_oci_metadata_from_fileset - :param str architecture: target architecture of the image - :param str cname: canonical name of the target image - :param str build_artifacts_dir: directory where the build artifacts are located - :param str feature_set: the expanded list of the included features of this manifest. It will be set in the - manifest itself and in the index entry for this manifest - :param str commit: the commit hash of the image - :param str manifest_file: the file where the manifest is written to - :returns the digest of the pushed manifest - """ - - # Handle null commit value - if commit is None: - commit = "" - - # TODO: construct oci_artifacts default data - - manifest_image = oras.oci.NewManifest() - total_size = 0 - - # For each file, create sign, attach and push a layer - for artifact in oci_metadata: - annotations_input = artifact["annotations"] - media_type = artifact["media_type"] - file_path = os.path.join(build_artifacts_dir, artifact["file_name"]) - - if not os.path.exists(file_path): - raise ValueError(f"{file_path} does not exist.") - - cleanup_blob = False - if os.path.isdir(file_path): - file_path = oras.utils.make_targz(file_path) - cleanup_blob = True - - # Create and sign layer information - layer = self.create_layer( - file_path, cname, version, architecture, media_type - ) - total_size += int(layer["size"]) - - if annotations_input: - layer["annotations"].update(annotations_input) - # Attach this layer to the manifest that is currently created (and pushed later) - manifest_image["layers"].append(layer) - logger.debug(f"Layer: {layer}") - # Push - response = self.upload_blob(file_path, self.container, layer) - self._check_200_response(response) - logger.info(f"Pushed {artifact["file_name"]} {layer["digest"]}") - if cleanup_blob and os.path.exists(file_path): - os.remove(file_path) - # This ends up in the manifest - parsed_cname = CName(cname, arch=architecture) - architecture = parsed_cname.arch - flavor = parsed_cname.flavor - manifest_image["annotations"] = {} - manifest_image["annotations"]["version"] = version - manifest_image["annotations"]["cname"] = cname - manifest_image["annotations"]["architecture"] = architecture - manifest_image["annotations"]["feature_set"] = feature_set - manifest_image["annotations"]["flavor"] = f"${flavor}-${architecture}" - manifest_image["annotations"]["commit"] = commit - description = ( - f"Image: {cname} " - f"Flavor: {flavor} " - f"Architecture: {architecture} " - f"Features: {feature_set} " - f"Commit: {commit} " - ) - manifest_image["annotations"][ - "org.opencontainers.image.description" - ] = description - - config_annotations = {"cname": cname, "architecture": architecture} - conf, config_file = create_config_from_dict(dict(), config_annotations) - - response = self.upload_blob(config_file, self.container, conf) - - os.remove(config_file) - self._check_200_response(response) - - manifest_image["config"] = conf - - manifest_container = OrasContainer( - f"{self.container_name}-{cname}-{architecture}" - ) - - local_digest = f"sha256:{hashlib.sha256(json.dumps(manifest_image).encode('utf-8')).hexdigest()}" - - self._check_200_response( - self.upload_manifest(manifest_image, manifest_container) - ) - logger.info(f"Successfully pushed {self.container} {local_digest}") - - # This ends up in the index-entry for the manifest - metadata_annotations = {"cname": cname, "architecture": architecture} - metadata_annotations["feature_set"] = feature_set - manifest_digest = self.get_digest(manifest_container) - if manifest_digest != local_digest: - raise ValueError("local and remotely calculated digests do not match") - manifest_index_metadata = NewManifestMetadata( - manifest_digest, - self.get_manifest_size(manifest_container), - metadata_annotations, - NewPlatform(architecture, version), - ) - - print(json.dumps(manifest_index_metadata), file=open(manifest_file, "w")) - logger.info(f"Index entry written to {manifest_file}") - - return local_digest - - def update_index(self, manifest_folder, additional_tags: list = None): - """ - replaces an old manifest entry with a new manifest entry - - :param str manifest_folder: the folder where the manifest entries are read from - :param list additional_tags: the additional tags to push the index with - """ - index = self.get_index() - # Ensure mediaType is set for existing indices - if "mediaType" not in index: - index["mediaType"] = "application/vnd.oci.image.index.v1+json" - - new_entries = 0 - - for file in os.listdir(manifest_folder): - manifest_metadata = json.loads( - open(manifest_folder + "/" + file, "r").read() - ) - # Skip if manifest with same digest already exists - found = False - for entry in index["manifests"]: - if entry["digest"] == manifest_metadata["digest"]: - found = True - break - if found: - logger.info( - f"Skipping manifest with digest {manifest_metadata["digest"]} - already exists" - ) - continue - index["manifests"].append(manifest_metadata) - logger.info( - f"Index appended locally {manifest_metadata["annotations"]["cname"]}" - ) - new_entries += 1 - - self._check_200_response(self.upload_index(index)) - logger.info(f"Index pushed with {new_entries} new entries") - - for tag in additional_tags: - self.container.digest = None - self.container.tag = tag - self.upload_index(index) - logger.info(f"Index pushed with additional tag {tag}") - - def create_layer( - self, - file_path: str, - cname: str, - version: str, - architecture: str, - media_type: str, - ): - checksum_sha256 = calculate_sha256(file_path) - layer = oras.oci.NewLayer(file_path, media_type, is_dir=False) - layer["annotations"] = { - oras.defaults.annotation_title: os.path.basename(file_path), - } - return layer - - @retry_on_error(max_retries=3, initial_delay=2, backoff_factor=2) - def upload_blob(self, file_path, container, metadata=None): - """ - Upload a blob to the registry with retry logic for network errors. - - Args: - file_path: Path to the file to upload - container: Container object - metadata: Optional metadata for the blob - - Returns: - Response from the upload - """ - # Call the parent class's upload_blob method - return super().upload_blob(file_path, container, metadata) - - def push_from_dir( - self, - architecture: str, - version: str, - cname: str, - directory: str, - manifest_file: str, - additional_tags: list = None, - ): - """ - Push artifacts from a directory to a registry - - Args: - architecture: Target architecture of the image - version: Version tag for the image - cname: Canonical name of the image - directory: Directory containing the artifacts - manifest_file: File to write the manifest index entry to - additional_tags: Additional tags to push the manifest with - - Returns: - The digest of the pushed manifest - """ - if additional_tags is None: - additional_tags = [] - - try: - # scan and extract nested artifacts - for file in os.listdir(directory): - try: - if file.endswith(".pxe.tar.gz"): - logger.info(f"Found nested artifact {file}") - nested_tar_obj = tarfile.open(f"{directory}/{file}") - nested_tar_obj.extractall(filter="data", path=directory) - nested_tar_obj.close() - except (OSError, tarfile.FilterError, tarfile.TarError) as e: - print(f"Failed to extract nested artifact {file}", e) - exit(1) - - # Get metadata from files - oci_metadata = get_oci_metadata_from_fileset( - os.listdir(directory), architecture - ) - - features = "" - commit = "" - for artifact in oci_metadata: - if artifact["media_type"] == "application/io.gardenlinux.release": - try: - file_path = f"{directory}/{artifact['file_name']}" - - config = configparser.ConfigParser(allow_unnamed_section=True) - config.read(file_path) - - if config.has_option( - configparser.UNNAMED_SECTION, "GARDENLINUX_FEATURES" - ): - features = config.get( - configparser.UNNAMED_SECTION, "GARDENLINUX_FEATURES" - ) - if config.has_option( - configparser.UNNAMED_SECTION, "GARDENLINUX_COMMIT_ID" - ): - commit = config.get( - configparser.UNNAMED_SECTION, "GARDENLINUX_COMMIT_ID" - ) - - except (configparser.Error, IOError) as e: - logger.error( - f"Error reading config file {artifact['file_name']}: {e}" - ) - break - - # Push the image manifest - digest = self.push_image_manifest( - architecture, - cname, - version, - directory, - oci_metadata, - features, - manifest_file, - commit=commit, - ) - - # Process additional tags if provided - if additional_tags and len(additional_tags) > 0: - print(f"DEBUG: Processing {len(additional_tags)} additional tags") - logger.info(f"Processing {len(additional_tags)} additional tags") - - self.push_additional_tags_manifest( - architecture, - cname, - version, - additional_tags, - container=self.container, - ) - - return digest - except Exception as e: - print("Error: ", e) - exit(1) - - def push_additional_tags_manifest( - self, architecture, cname, version, additional_tags, container - ): - """ - Push additional tags for an existing manifest using ORAS Registry methods - - Args: - architecture: Target architecture of the image - cname: Canonical name of the image - version: Version tag for the image - additional_tags: List of additional tags to push - container: Container object - """ - try: - # Source tag is the tag containing the version-cname-architecture combination - source_tag = f"{version}-{cname}-{architecture}" - source_container = copy.deepcopy(container) - source_container.tag = source_tag - - # Authentication credentials from environment - token = os.getenv("GL_CLI_REGISTRY_TOKEN") - username = os.getenv("GL_CLI_REGISTRY_USERNAME") - password = os.getenv("GL_CLI_REGISTRY_PASSWORD") - - # Login to registry if credentials are provided - if username and password: - logger.debug(f"Logging in with username/password") - try: - self.login(username, password) - except Exception as login_error: - logger.error(f"Login error: {str(login_error)}") - elif token: - # If token is provided, set it directly on the Registry instance - logger.debug(f"Using token authentication") - self.token = base64.b64encode(token.encode("utf-8")).decode("utf-8") - self.auth.set_token_auth(self.token) - - # Get the manifest from the source container - try: - logger.debug(f"Getting manifest from {source_container}") - manifest = self.get_manifest(source_container) - if not manifest: - logger.error(f"Failed to get manifest for {source_container}") - return - logger.info( - f"Successfully retrieved manifest: {manifest['mediaType'] if 'mediaType' in manifest else 'unknown'}" - ) - except Exception as get_error: - logger.error(f"Error getting manifest: {str(get_error)}") - return - - # For each additional tag, push the manifest using Registry.upload_manifest - for tag in additional_tags: - try: - logger.debug(f"Pushing additional tag: {tag}") - - # Create a new container for this tag - tag_container = copy.deepcopy(container) - tag_container.tag = tag - - logger.debug(f"Pushing to container: {tag_container}") - - # Upload the manifest to the new tag - response = self.upload_manifest(manifest, tag_container) - - if response and response.status_code in [200, 201]: - logger.info(f"Successfully pushed tag {tag} for manifest") - else: - status_code = getattr(response, "status_code", "unknown") - response_text = getattr(response, "text", "No response text") - logger.error( - f"Failed to push tag {tag} for manifest: {status_code}" - ) - - except Exception as tag_error: - logger.error( - f"Error pushing tag {tag} for manifest: {str(tag_error)}" - ) - - except Exception as e: - logger.error(f"Error in push_additional_tags_manifest: {str(e)}") diff --git a/src/python_gardenlinux_lib/__init__.py b/src/python_gardenlinux_lib/__init__.py deleted file mode 100644 index cc733797..00000000 --- a/src/python_gardenlinux_lib/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .version import Version - -__all__ = ["Version"] diff --git a/src/python_gardenlinux_lib/features/package_repo_info.py b/src/python_gardenlinux_lib/features/package_repo_info.py deleted file mode 100644 index a6b0462b..00000000 --- a/src/python_gardenlinux_lib/features/package_repo_info.py +++ /dev/null @@ -1,81 +0,0 @@ -from apt_repo import APTRepository -from typing import Optional - - -class GardenLinuxRepo(APTRepository): - def __init__( - self, - dist: str, - url: Optional[str] = "http://packages.gardenlinux.io/gardenlinux", - components: Optional[list[str]] = ["main"], - ) -> None: - self.components = components - self.url = url - self.dist = dist - self.repo = APTRepository(self.url, self.dist, self.components) - - def get_package_version_by_name(self, name: str) -> list[tuple[str, str]]: - """ - :param str name: name of package to find - :returns: packages matching the input name - """ - return [ - (package.package, package.version) - for package in self.repo.get_packages_by_name(name) - ] - - def get_packages_versions(self): - """ - Returns list of (package, version) tuples - """ - return [(p.package, p.version) for p in self.repo.packages] - - -def compare_gardenlinux_repo_version(version_a: str, version_b: str): - """ - :param str version_a: Version of first Garden Linux repo - :param str version_b: Version of first Garden Linux repo - - Example: print(compare_gardenlinux_repo_version("1443.2", "1443.1")) - """ - return compare_repo(GardenLinuxRepo(version_a), GardenLinuxRepo(version_b)) - - -def compare_repo( - a: GardenLinuxRepo, b: GardenLinuxRepo, available_in_both: Optional[bool] = False -): - """ - :param a GardenLinuxRepo: first repo to compare - :param b GardenLinuxRepo: second repo to compare - :returns: differences between repo a and repo b - """ - - packages_a = dict(a.get_packages_versions()) - packages_b = dict(b.get_packages_versions()) - if available_in_both: - all_names = set(packages_a.keys()).intersection(set(packages_b.keys())) - else: - all_names = set(packages_a.keys()).union(set(packages_b.keys())) - - return [ - (name, packages_a.get(name), packages_b.get(name)) - for name in all_names - if ( - name in packages_a - and name in packages_b - and packages_a[name] != packages_b[name] - ) - or (name not in packages_b or name not in packages_a) - ] - - -# EXAMPLE USAGE. -# print(compare_gardenlinux_repo_version("1443.2", "1443.1")) - -# gl_repo = GardenLinuxRepo("today") -# gl_repo_1592 = GardenLinuxRepo("1592.0") -# deb_testing = GardenLinuxRepo("testing", "https://deb.debian.org/debian/") -# print(compare_repo(gl_repo, gl_repo_1592, available_in_both=True)) -# print(compare_repo(gl_repo, deb_testing, available_in_both=True)) -# # print(gl_repo.get_packages_versions()) -# print(gl_repo.get_package_version_by_name("wget")) diff --git a/src/python_gardenlinux_lib/features/parse_features.py b/src/python_gardenlinux_lib/features/parse_features.py deleted file mode 100644 index eca20ff4..00000000 --- a/src/python_gardenlinux_lib/features/parse_features.py +++ /dev/null @@ -1,229 +0,0 @@ -from gardenlinux.constants import GL_MEDIA_TYPE_LOOKUP, GL_MEDIA_TYPES - -from gardenlinux.features import Parser -from typing import Optional -import networkx -import os -import re -import subprocess -import yaml - - -def get_gardenlinux_commit(gardenlinux_root: str, limit: Optional[int] = None) -> str: - """ - :param str gardenlinux_root: path to garden linux src - :return: output of get_commit script from gardenlinux src - """ - get_commit_process_result = subprocess.run( - [f"{gardenlinux_root}/get_commit"], - capture_output=True, - ) - - if not get_commit_process_result.stdout: - raise ValueError( - f"{gardenlinux_root}/get_commit did not return any output, could not determine correct commit hash" - ) - - commit_str = get_commit_process_result.stdout.decode("UTF-8").strip("\n") - - if commit_str.count("\n") > 1: - raise ValueError(f"{commit_str} contains multiple lines") - - if limit: - if limit >= len(commit_str): - return commit_str - return commit_str[:limit] - else: - return commit_str - - -def construct_layer_metadata( - filetype: str, cname: str, version: str, arch: str, commit: str -) -> dict: - """ - :param str filetype: filetype of blob - :param str cname: the cname of the target image - :param str version: the version of the target image - :param str arch: the arch of the target image - :param str commit: commit of the garden linux source - :return: dict of oci layer metadata for a given layer file - """ - media_type = lookup_media_type_for_filetype(filetype) - return { - "file_name": f"{cname}-{arch}-{version}-{commit}.{filetype}", - "media_type": media_type, - "annotations": {"io.gardenlinux.image.layer.architecture": arch}, - } - - -def construct_layer_metadata_from_filename(filename: str, arch: str) -> dict: - """ - :param str filename: filename of the blob - :param str arch: the arch of the target image - :return: dict of oci layer metadata for a given layer file - """ - media_type = lookup_media_type_for_file(filename) - return { - "file_name": filename, - "media_type": media_type, - "annotations": {"io.gardenlinux.image.layer.architecture": arch}, - } - - -def get_file_set_from_cname(cname: str, version: str, arch: str, gardenlinux_root: str): - """ - :param str cname: the target cname of the image - :param str version: the version of the target image - :param str arch: the arch of the target image - :param str gardenlinux_root: path of garden linux src root - :return: set of file names for a given cname - """ - file_set = set() - features_by_type = Parser(gardenlinux_root).filter_as_dict(cname) - commit_str = get_gardenlinux_commit(gardenlinux_root, 8) - - if commit_str == "local": - raise ValueError("Using local commit. Refusing to upload to OCI Registry") - for platform in features_by_type["platform"]: - image_file_types = deduce_filetypes(f"{gardenlinux_root}/features/{platform}") - for ft in image_file_types: - file_set.add( - f"{cname}-{arch}-{version}-{commit_str}.{ft}", - ) - return file_set - - -def get_oci_metadata_from_fileset(fileset: list, arch: str): - """ - :param str arch: arch of the target image - :param set fileset: a list of filenames (not paths) to set oci_metadata for - :return: list of dicts, where each dict represents a layer - """ - oci_layer_metadata_list = list() - - for file in fileset: - oci_layer_metadata_list.append( - construct_layer_metadata_from_filename(file, arch) - ) - - return oci_layer_metadata_list - - -def get_oci_metadata(cname: str, version: str, arch: str, gardenlinux_root: str): - """ - :param str cname: the target cname of the image - :param str version: the target version of the image - :param str arch: arch of the target image - :param str gardenlinux_root: path of garden linux src root - :return: list of dicts, where each dict represents a layer - """ - - # This is the feature deduction approach (glcli oci push) - file_set = get_file_set_from_cname(cname, version, arch, gardenlinux_root) - - # This is the tarball extraction approach (glcli oci push-tarball) - oci_layer_metadata_list = list() - - for file in file_set: - oci_layer_metadata_list.append( - construct_layer_metadata_from_filename(file, arch) - ) - - return oci_layer_metadata_list - - -def lookup_media_type_for_filetype(filetype: str) -> str: - """ - :param str filetype: filetype of the target layer - :return: mediatype - """ - if filetype in GL_MEDIA_TYPE_LOOKUP: - return GL_MEDIA_TYPE_LOOKUP[filetype] - else: - raise ValueError( - f"media type for {filetype} is not defined. You may want to add the definition to parse_features_lib" - ) - - -def lookup_media_type_for_file(filename: str) -> str: - """ - :param str filename: filename of the target layer - :return: mediatype - """ - for suffix in GL_MEDIA_TYPES: - if filename.endswith(suffix): - return GL_MEDIA_TYPE_LOOKUP[suffix] - else: - raise ValueError( - f"media type for {filename} is not defined. You may want to add the definition to parse_features_lib" - ) - - -def deduce_feature_name(feature_dir: str): - """ - :param str feature_dir: Directory of single Feature - :return: string of feature name - """ - parsed = parse_feature_yaml(feature_dir) - if "name" not in parsed: - raise ValueError("Expected name from parse_feature_yaml function to be set") - return parsed["name"] - - -def deduce_archive_filetypes(feature_dir): - """ - :param str feature_dir: Directory of single Feature - :return: str list of filetype for archive - """ - return deduce_filetypes_from_string(feature_dir, "image") - - -def deduce_image_filetypes(feature_dir): - """ - :param str feature_dir: Directory of single Feature - :return: str list of filetype for image - """ - return deduce_filetypes_from_string(feature_dir, "convert") - - -def deduce_filetypes(feature_dir): - """ - :param str feature_dir: Directory of single Feature - :return: str list of filetypes for the feature - """ - image_file_types = deduce_image_filetypes(feature_dir) - archive_file_types = deduce_archive_filetypes(feature_dir) - if not image_file_types: - image_file_types.append("raw") - if not archive_file_types: - archive_file_types.append("tar") - image_file_types.extend(archive_file_types) - return image_file_types - - -def deduce_filetypes_from_string(feature_dir: str, script_base_name: str): - """ - Garden Linux features can optionally have an image. or convert. script, - where the indicates the target filetype. - - image. script converts the .tar to archive. - - convert. script converts the .raw to image type. - - Both scripts are bash scripts invoked by the builder, but this function does only read the file ending - - :param str feature_dir: feature directory of garden linux - :param str script_base_name: typically it is either image or convert - :return: list of available filetypes deduced on the available script_base_name. - """ - result = list() - - for filename in os.listdir(feature_dir): - if re.search(f"{script_base_name}.*", filename): - result.append(filename.split(f"{script_base_name}.")[1]) - - return sorted(result) - - -def sort_set(input_set, order_list): - return [item for item in order_list if item in input_set] diff --git a/src/python_gardenlinux_lib/flavors/__init__.py b/src/python_gardenlinux_lib/flavors/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/python_gardenlinux_lib/oras/__init__.py b/src/python_gardenlinux_lib/oras/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/python_gardenlinux_lib/oras/crypto.py b/src/python_gardenlinux_lib/oras/crypto.py deleted file mode 100644 index 68784cad..00000000 --- a/src/python_gardenlinux_lib/oras/crypto.py +++ /dev/null @@ -1,16 +0,0 @@ -import hashlib - - -def verify_sha256(checksum: str, data: bytes): - data_checksum = f"sha256:{hashlib.sha256(data).hexdigest()}" - if checksum != data_checksum: - raise ValueError(f"Invalid checksum. {checksum} != {data_checksum}") - - -def calculate_sha256(file_path: str) -> str: - """Calculate the SHA256 checksum of a file.""" - sha256_hash = hashlib.sha256() - with open(file_path, "rb") as f: - for byte_block in iter(lambda: f.read(4096), b""): - sha256_hash.update(byte_block) - return sha256_hash.hexdigest() diff --git a/src/python_gardenlinux_lib/oras/defaults.py b/src/python_gardenlinux_lib/oras/defaults.py deleted file mode 100644 index 9184536a..00000000 --- a/src/python_gardenlinux_lib/oras/defaults.py +++ /dev/null @@ -1,2 +0,0 @@ -annotation_signature_key = "io.gardenlinux.oci.signature" -annotation_signed_string_key = "io.gardenlinux.oci.signed-string" diff --git a/src/python_gardenlinux_lib/oras/helper.py b/src/python_gardenlinux_lib/oras/helper.py deleted file mode 100644 index de8894db..00000000 --- a/src/python_gardenlinux_lib/oras/helper.py +++ /dev/null @@ -1,24 +0,0 @@ -import re -import json -import os - - -def write_dict_to_json_file(input, output_path): - if os.path.exists(output_path): - raise ValueError(f"{output_path} already exists") - with open(output_path, "w") as fp: - json.dump(input, fp) - - -def get_uri_for_digest(uri, digest): - """ - Given a URI for an image, return a URI for the related digest. - - URI may be in any of the following forms: - - ghcr.io/homebrew/core/hello - ghcr.io/homebrew/core/hello:2.10 - ghcr.io/homebrew/core/hello@sha256:ff81...47a - """ - base_uri = re.split(r"[@:]", uri, maxsplit=1)[0] - return f"{base_uri}@{digest}" diff --git a/src/python_gardenlinux_lib/oras/registry.py b/src/python_gardenlinux_lib/oras/registry.py deleted file mode 100644 index f84d0f4b..00000000 --- a/src/python_gardenlinux_lib/oras/registry.py +++ /dev/null @@ -1,732 +0,0 @@ -import base64 -import copy -import hashlib -import json -import logging -import os -import shutil -import sys -import tarfile -import tempfile -import uuid -from enum import Enum, auto -from typing import Optional, Tuple - -import jsonschema -import oras.auth -import oras.client -import oras.defaults -import oras.oci -import oras.provider -import oras.utils -from python_gardenlinux_lib.features.parse_features import ( - get_oci_metadata_from_fileset, -) -import requests -from oras.container import Container as OrasContainer -from oras.decorator import ensure_container -from oras.provider import Registry -from oras.schemas import manifest as oras_manifest_schema - -from python_gardenlinux_lib.oras.crypto import ( - calculate_sha256, - verify_sha256, -) -from python_gardenlinux_lib.oras.defaults import ( - annotation_signature_key, - annotation_signed_string_key, -) -from python_gardenlinux_lib.oras.schemas import ( - EmptyIndex, - EmptyManifestMetadata, - EmptyPlatform, -) -from python_gardenlinux_lib.oras.schemas import index as indexSchema - - -class ManifestState(Enum): - Incomplete = auto() - Complete = auto() - Final = auto() - - -logger = logging.getLogger(__name__) -# logging.basicConfig(filename="gl-oci.log", level=logging.DEBUG) -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) - - -def attach_state(d: dict, state: str): - d["image_state"] = state - - -def get_image_state(manifest: dict) -> str: - if "annotations" not in manifest: - logger.warning("No annotations set for manifest.") - return "UNDEFINED" - if "image_state" not in manifest["annotations"]: - logger.warning("No image_state set for manifest.") - return "UNDEFINED" - return manifest["annotations"]["image_state"] - - -def NewPlatform(architecture: str, version: str) -> dict: - platform = copy.deepcopy(EmptyPlatform) - platform["architecture"] = architecture - platform["os.version"] = version - return platform - - -def NewManifestMetadata( - digest: str, size: int, annotations: dict, platform_data: dict -) -> dict: - manifest_meta_data = copy.deepcopy(EmptyManifestMetadata) - manifest_meta_data["mediaType"] = "application/vnd.oci.image.manifest.v1+json" - manifest_meta_data["digest"] = digest - manifest_meta_data["size"] = size - manifest_meta_data["annotations"] = annotations - manifest_meta_data["platform"] = platform_data - manifest_meta_data["artifactType"] = "" - return manifest_meta_data - - -def NewIndex() -> dict: - return copy.deepcopy(EmptyIndex) - - -def create_config_from_dict(conf: dict, annotations: dict) -> Tuple[dict, str]: - """ - Write a new OCI configuration to file, and generate oci metadata for it - For reference see https://github.com/opencontainers/image-spec/blob/main/config.md - annotations, mediatype, size, digest are not part of digest and size calculation, - and therefore must be attached to the output dict and not written to the file. - - :param conf: dict with custom configuration (the payload of the configuration) - :param annotations: dict with custom annotations to be attached to metadata part of config - - """ - config_path = os.path.join(os.path.curdir, str(uuid.uuid4())) - with open(config_path, "w") as fp: - json.dump(conf, fp) - conf["annotations"] = annotations - conf["mediaType"] = oras.defaults.unknown_config_media_type - conf["size"] = oras.utils.get_size(config_path) - conf["digest"] = f"sha256:{oras.utils.get_file_hash(config_path)}" - return conf, config_path - - -def construct_manifest_entry_signed_data_string( - cname: str, version: str, new_manifest_metadata: dict, architecture: str -) -> str: - data_to_sign = ( - f"version:{version} cname:{cname} architecture:{architecture} manifest-size" - f":{new_manifest_metadata['size']} manifest-digest:{new_manifest_metadata['digest']}" - ) - return data_to_sign - - -def construct_layer_signed_data_string( - cname: str, version: str, architecture: str, media_type: str, checksum_sha256: str -) -> str: - data_to_sign = f"version:{version} cname:{cname} architecture:{architecture} media_type:{media_type} digest:{checksum_sha256}" - return data_to_sign - - -class GlociRegistry(Registry): - def __init__( - self, - container_name: str, - insecure: bool = False, - token: Optional[str] = None, - config_path: Optional[str] = None, - ): - super().__init__(auth_backend="token", insecure=insecure) - self.container = OrasContainer(container_name) - self.container_name = container_name - self.registry_url = self.container.registry - self.config_path = config_path - if not token: - logger.info("No Token provided.") - else: - self.token = base64.b64encode(token.encode("utf-8")).decode("utf-8") - self.auth.set_token_auth(self.token) - - @ensure_container - def get_manifest_json( - self, container: OrasContainer, allowed_media_type: Optional[list[str]] = None - ): - if not allowed_media_type: - default_image_index_media_type = ( - "application/vnd.oci.image.manifest.v1+json" - ) - allowed_media_type = [default_image_index_media_type] - # self.load_configs(container) - headers = {"Accept": ";".join(allowed_media_type)} - headers.update(self.headers) - get_manifest = f"{self.prefix}://{container.manifest_url()}" - response = self.do_request(get_manifest, "GET", headers=headers) - self._check_200_response(response) - return response - - @ensure_container - def get_manifest_size( - self, container: OrasContainer, allowed_media_type: Optional[list[str]] = None - ): - response = self.get_manifest_json(container, allowed_media_type) - if response is None: - return 0 - return len(response.content) - - @ensure_container - def get_digest( - self, container: OrasContainer, allowed_media_type: Optional[list[str]] = None - ): - response = self.get_manifest_json(container, allowed_media_type) - if response is None: - return "" - return f"sha256:{hashlib.sha256(response.content).hexdigest()}" - - def get_index(self, allowed_media_type: Optional[list[str]] = None): - """ - Returns the manifest for a cname+arch combination of a container - Will return None if no result was found - - TODO: refactor: use get_manifest_json and call it with index mediatype. - """ - if not allowed_media_type: - default_image_index_media_type = "application/vnd.oci.image.index.v1+json" - allowed_media_type = [default_image_index_media_type] - - headers = {"Accept": ";".join(allowed_media_type)} - manifest_url = f"{self.prefix}://{self.container.manifest_url()}" - response = self.do_request(manifest_url, "GET", headers=headers) - try: - self._check_200_response(response) - index = response.json() - return index - - except ValueError: - logger.debug("Index not found, creating new Index!") - return NewIndex() - - @ensure_container - def get_manifest_meta_data_by_cname( - self, - container: OrasContainer, - cname: str, - version: str, - arch: str, - allowed_media_type: Optional[list[str]] = None, - ): - """ - Returns the manifest for a cname+arch combination of a container - Will return None if no result was found - """ - index = self.get_index(allowed_media_type=allowed_media_type) - - if "manifests" not in index: - logger.debug("Index is empty") - return None - - for manifest_meta in index["manifests"]: - # Annotations are optional: - # https://github.com/opencontainers/image-spec/blob/v1.0.1/descriptor.md#properties - - if "annotations" in manifest_meta: - if ( - "cname" in manifest_meta["annotations"] - and "architecture" in manifest_meta["annotations"] - and "os.version" in manifest_meta["platform"] - and manifest_meta["annotations"]["cname"] == cname - and manifest_meta["annotations"]["architecture"] == arch - and manifest_meta["platform"]["os.version"] == version - ): - return manifest_meta - - return None - - @ensure_container - def get_manifest_by_digest( - self, - container: OrasContainer, - digest: str, - allowed_media_type: Optional[list[str]] = None, - ): - if not allowed_media_type: - default_image_manifest_media_type = ( - "application/vnd.oci.image.manifest.v1+json" - ) - allowed_media_type = [default_image_manifest_media_type] - - manifest_url = f"{self.prefix}://{container.get_blob_url(digest)}".replace( - "/blobs/", "/manifests/" - ) - headers = {"Accept": ";".join(allowed_media_type)} - response = self.do_request(manifest_url, "GET", headers=headers, stream=False) - self._check_200_response(response) - manifest = response.json() - verify_sha256(digest, response.content) - jsonschema.validate(manifest, schema=oras_manifest_schema) - return manifest - - @ensure_container - def get_manifest_by_cname( - self, - container: OrasContainer, - cname: str, - version: str, - arch: str, - allowed_media_type: Optional[list[str]] = None, - ): - """ - Returns the manifest for a cname+arch combination of a container - Will return None if no result was found - """ - if not allowed_media_type: - default_image_manifest_media_type = ( - "application/vnd.oci.image.manifest.v1+json" - ) - allowed_media_type = [default_image_manifest_media_type] - manifest_meta = self.get_manifest_meta_data_by_cname( - container, cname, version, arch - ) - if manifest_meta is None: - logger.error(f"No manifest found for {cname}-{arch}") - return None - if "digest" not in manifest_meta: - logger.error("No digest found in metadata!") - manifest_digest = manifest_meta["digest"] - return self.get_manifest_by_digest( - container, manifest_digest, allowed_media_type=allowed_media_type - ) - - def update_index(self, old_digest: Optional[str], manifest_meta_data: dict): - """ - replaces an old manifest entry with a new manifest entry - """ - index = self.get_index() - - if "manifests" not in index: - logger.debug("Index is empty") - updated = False - if old_digest is not None: - for i, manifest in enumerate(index["manifests"]): - if manifest["digest"] == old_digest: - logger.debug("Found old manifest entry") - index["manifests"][i] = manifest_meta_data - updated = True - break - if not updated: - logger.debug("Did NOT find old manifest entry") - index["manifests"].append(manifest_meta_data) - - return index - - def change_state(self, cname: str, version: str, architecture: str, new_state: str): - manifest_container = OrasContainer( - f"{self.container_name}-{cname}-{architecture}" - ) - manifest = self.get_manifest_by_cname( - manifest_container, cname, version, architecture - ) - - if "annotations" not in manifest: - logger.warning("No annotations found in manifest, init annotations now.") - manifest["annotations"] = {} - attach_state(manifest["annotations"], new_state) - - def attach_layer( - self, - cname: str, - version: str, - architecture: str, - file_path: str, - media_type: str, - ): - if not os.path.exists(file_path): - exit(f"{file_path} does not exist.") - - manifest_container = OrasContainer( - f"{self.container_name}-{cname}-{architecture}" - ) - - manifest = self.get_manifest_by_cname( - self.container, cname, version, architecture - ) - - layer = self.create_layer(file_path, cname, version, architecture, media_type) - self._check_200_response(self.upload_blob(file_path, self.container, layer)) - - manifest["layers"].append(layer) - - old_manifest_digest = self.get_digest(manifest_container) - self._check_200_response(self.upload_manifest(manifest, manifest_container)) - - new_manifest_metadata = self.get_manifest_meta_data_by_cname( - self.container, cname, version, architecture - ) - new_manifest_metadata["digest"] = self.get_digest(manifest_container) - new_manifest_metadata["size"] = self.get_manifest_size(manifest_container) - new_manifest_metadata["platform"] = NewPlatform(architecture, version) - - new_index = self.update_index(old_manifest_digest, new_manifest_metadata) - self._check_200_response(self.upload_index(new_index)) - - print(f"Successfully attached {file_path} to {manifest_container}") - - def sign_manifest_entry( - self, new_manifest_metadata: dict, version: str, architecture: str, cname: str - ): - data_to_sign = construct_manifest_entry_signed_data_string( - cname, version, new_manifest_metadata, architecture - ) - signature = self.signer.sign_data(data_to_sign) - new_manifest_metadata["annotations"].update( - { - annotation_signature_key: signature, - annotation_signed_string_key: data_to_sign, - } - ) - - def sign_layer( - self, - layer: dict, - cname: str, - version: str, - architecture: str, - checksum_sha256: str, - media_type: str, - ): - data_to_sign = construct_layer_signed_data_string( - cname, version, architecture, media_type, checksum_sha256 - ) - signature = self.signer.sign_data(data_to_sign) - layer["annotations"].update( - { - annotation_signature_key: signature, - annotation_signed_string_key: data_to_sign, - } - ) - - def verify_manifest_meta_signature(self, manifest_meta: dict): - if "annotations" not in manifest_meta: - raise ValueError("manifest does not contain annotations") - if annotation_signature_key not in manifest_meta["annotations"]: - raise ValueError("manifest is not signed") - if annotation_signed_string_key not in manifest_meta["annotations"]: - raise ValueError("manifest is not signed") - signature = manifest_meta["annotations"][annotation_signature_key] - signed_data = manifest_meta["annotations"][annotation_signed_string_key] - cname = manifest_meta["annotations"]["cname"] - version = manifest_meta["platform"]["os.version"] - architecture = manifest_meta["annotations"]["architecture"] - signed_data_expected = construct_manifest_entry_signed_data_string( - cname, version, manifest_meta, architecture - ) - if signed_data_expected != signed_data: - raise ValueError( - f"Signed data does not match expected signed data.\n{signed_data} != {signed_data_expected}" - ) - self.signer.verify_signature(signed_data, signature) - - def verify_manifest_signature(self, manifest: dict): - if "layers" not in manifest: - raise ValueError("manifest does not contain layers") - if "annotations" not in manifest: - raise ValueError("manifest does not contain annotations") - - cname = manifest["annotations"]["cname"] - version = manifest["annotations"]["version"] - architecture = manifest["annotations"]["architecture"] - for layer in manifest["layers"]: - if "annotations" not in layer: - raise ValueError(f"layer does not contain annotations. layer: {layer}") - if annotation_signature_key not in layer["annotations"]: - raise ValueError(f"layer is not signed. layer: {layer}") - if annotation_signed_string_key not in layer["annotations"]: - raise ValueError(f"layer is not signed. layer: {layer}") - media_type = layer["mediaType"] - checksum_sha256 = layer["digest"].removeprefix("sha256:") - signature = layer["annotations"][annotation_signature_key] - signed_data = layer["annotations"][annotation_signed_string_key] - signed_data_expected = construct_layer_signed_data_string( - cname, version, architecture, media_type, checksum_sha256 - ) - if signed_data_expected != signed_data: - raise ValueError( - f"Signed data does not match expected signed data. {signed_data} != {signed_data_expected}" - ) - self.signer.verify_signature(signed_data, signature) - - @ensure_container - def remove_container(self, container: OrasContainer): - self.delete_tag(container.manifest_url()) - - def status_all(self): - """ - Validate if container is valid - - all manifests require a info.yaml in the layers - - info.yaml needs to be signed (TODO) - - all layers listed in info.yaml must exist - - all mediatypes of layers listed in info.yaml must be set correctly - """ - index = self.get_index() - - if "manifests" not in index: - logger.info("No manifests in index") - return - for manifest_meta in index["manifests"]: - manifest_digest = manifest_meta["digest"] - manifest = self.get_manifest_by_digest(self.container, manifest_digest) - image_state = get_image_state(manifest) - print(f"{manifest_digest}:\t{image_state}") - - def upload_index(self, index: dict) -> requests.Response: - jsonschema.validate(index, schema=indexSchema) - headers = { - "Content-Type": "application/vnd.oci.image.index.v1+json", - "Content-Length": str(len(index)), - } - tag = self.container.digest or self.container.tag - - index_url = ( - f"{self.container.registry}/v2/{self.container.api_prefix}/manifests/{tag}" - ) - response = self.do_request( - f"{self.prefix}://{index_url}", # noqa - "PUT", - headers=headers, - json=index, - ) - return response - - def push_image_manifest( - self, - architecture: str, - cname: str, - version: str, - build_artifacts_dir: str, - oci_metadata: list, - feature_set: str, - ): - """ - creates and pushes an image manifest - - :param oci_metadata: a list of filenames and their OCI metadata, can be constructed with - parse_features.get_oci_metadata or parse_features.get_oci_metadata_from_fileset - :param str architecture: target architecture of the image - :param str cname: canonical name of the target image - :param str build_artifacts_dir: directory where the build artifacts are located - :param str feature_set: the expanded list of the included features of this manifest. It will be set in the - manifest itself and in the index entry for this manifest - :returns the digest of the pushed manifest - """ - - # TODO: construct oci_artifacts default data - - manifest_image = oras.oci.NewManifest() - total_size = 0 - - # For each file, create sign, attach and push a layer - for artifact in oci_metadata: - annotations_input = artifact["annotations"] - media_type = artifact["media_type"] - file_path = os.path.join(build_artifacts_dir, artifact["file_name"]) - - if not os.path.exists(file_path): - raise ValueError(f"{file_path} does not exist.") - - cleanup_blob = False - if os.path.isdir(file_path): - file_path = oras.utils.make_targz(file_path) - cleanup_blob = True - - # Create and sign layer information - layer = self.create_layer( - file_path, cname, version, architecture, media_type - ) - total_size += int(layer["size"]) - - if annotations_input: - layer["annotations"].update(annotations_input) - # Attach this layer to the manifest that is currently created (and pushed later) - manifest_image["layers"].append(layer) - logger.debug(f"Layer: {layer}") - # Push - response = self.upload_blob(file_path, self.container, layer) - self._check_200_response(response) - if cleanup_blob and os.path.exists(file_path): - os.remove(file_path) - # This ends up in the manifest - manifest_image["annotations"] = {} - manifest_image["annotations"]["version"] = version - manifest_image["annotations"]["cname"] = cname - manifest_image["annotations"]["architecture"] = architecture - manifest_image["annotations"]["feature_set"] = feature_set - description = ( - f"Garden Linux: {cname} " - f"Architecture: {architecture} " - f"Features: {feature_set}" - ) - manifest_image["annotations"][ - "org.opencontainers.image.description" - ] = description - attach_state(manifest_image["annotations"], "") - - config_annotations = {"cname": cname, "architecture": architecture} - conf, config_file = create_config_from_dict(dict(), config_annotations) - - response = self.upload_blob(config_file, self.container, conf) - - os.remove(config_file) - self._check_200_response(response) - - manifest_image["config"] = conf - - manifest_container = OrasContainer( - f"{self.container_name}-{cname}-{architecture}" - ) - - local_digest = f"sha256:{hashlib.sha256(json.dumps(manifest_image).encode('utf-8')).hexdigest()}" - - self._check_200_response( - self.upload_manifest(manifest_image, manifest_container) - ) - - # This ends up in the index-entry for the manifest - metadata_annotations = {"cname": cname, "architecture": architecture} - attach_state(metadata_annotations, "") - metadata_annotations["feature_set"] = feature_set - manifest_digest = self.get_digest(manifest_container) - if manifest_digest != local_digest: - raise ValueError("local and remotely calculated digests do not match") - manifest_index_metadata = NewManifestMetadata( - manifest_digest, - self.get_manifest_size(manifest_container), - metadata_annotations, - NewPlatform(architecture, version), - ) - - old_manifest_meta_data = self.get_manifest_meta_data_by_cname( - self.container, cname, version, architecture - ) - if old_manifest_meta_data is not None: - new_index = self.update_index( - old_manifest_meta_data["digest"], manifest_index_metadata - ) - else: - new_index = self.update_index(None, manifest_index_metadata) - - self._check_200_response(self.upload_index(new_index)) - - print(f"Successfully pushed {self.container}") - return local_digest - - def create_layer( - self, - file_path: str, - cname: str, - version: str, - architecture: str, - media_type: str, - ): - checksum_sha256 = calculate_sha256(file_path) - layer = oras.oci.NewLayer(file_path, media_type, is_dir=False) - layer["annotations"] = { - oras.defaults.annotation_title: os.path.basename(file_path), - } - return layer - - def push_from_tar(self, architecture: str, version: str, cname: str, tar: str): - tmpdir = tempfile.mkdtemp() - extract_tar(tar, tmpdir) - - try: - oci_metadata = get_oci_metadata_from_fileset( - os.listdir(tmpdir), architecture - ) - - features = "" - for artifact in oci_metadata: - if artifact["media_type"] == "application/io.gardenlinux.release": - file = open(f"{tmpdir}/{artifact["file_name"]}", "r") - lines = file.readlines() - for line in lines: - if line.strip().startswith("GARDENLINUX_FEATURES="): - features = line.strip().removeprefix( - "GARDENLINUX_FEATURES=" - ) - break - file.close() - - digest = self.push_image_manifest( - architecture, cname, version, tmpdir, oci_metadata, features - ) - except Exception as e: - print("Error: ", e) - shutil.rmtree(tmpdir, ignore_errors=True) - print("removed tmp files.") - exit(1) - shutil.rmtree(tmpdir, ignore_errors=True) - print("removed tmp files.") - return digest - - def push_from_dir(self, architecture: str, version: str, cname: str, dir: str): - # Step 1 scan and extract nested artifacts: - for file in os.listdir(dir): - try: - if file.endswith(".pxe.tar.gz"): - logger.info(f"Found nested artifact {file}") - nested_tar_obj = tarfile.open(f"{dir}/{file}") - nested_tar_obj.extractall(filter="data", path=dir) - nested_tar_obj.close() - except (OSError, tarfile.FilterError, tarfile.TarError) as e: - print(f"Failed to extract nested artifact {file}", e) - exit(1) - - try: - oci_metadata = get_oci_metadata_from_fileset(os.listdir(dir), architecture) - - features = "" - for artifact in oci_metadata: - if artifact["media_type"] == "application/io.gardenlinux.release": - file = open(f"{dir}/{artifact["file_name"]}", "r") - lines = file.readlines() - for line in lines: - if line.strip().startswith("GARDENLINUX_FEATURES="): - features = line.strip().removeprefix( - "GARDENLINUX_FEATURES=" - ) - break - file.close() - - digest = self.push_image_manifest( - architecture, cname, version, dir, oci_metadata, features - ) - except Exception as e: - print("Error: ", e) - exit(1) - return digest - - -def extract_tar(tar: str, tmpdir: str): - """ - Extracts the contents of the tarball to the specified tmp directory. In case - a nested artifact is found (.pxe.tar.gz) its contents are extracted as well - :param tar: str the full path to the tarball - :param tmpdir: str the tmp directory to extract to - """ - try: - tar_obj = tarfile.open(tar) - tar_obj.extractall(filter="data", path=tmpdir) - tar_obj.close() - for file in os.listdir(tmpdir): - if file.endswith(".pxe.tar.gz"): - logger.info(f"Found nested artifact {file}") - nested_tar_obj = tarfile.open(f"{tmpdir}/{file}") - nested_tar_obj.extractall(filter="data", path=tmpdir) - nested_tar_obj.close() - - except (OSError, tarfile.FilterError, tarfile.TarError) as e: - print("Failed to extract tarball", e) - shutil.rmtree(tmpdir, ignore_errors=True) - exit(1) diff --git a/src/python_gardenlinux_lib/oras/schemas.py b/src/python_gardenlinux_lib/oras/schemas.py deleted file mode 100644 index 51626e17..00000000 --- a/src/python_gardenlinux_lib/oras/schemas.py +++ /dev/null @@ -1,59 +0,0 @@ -# for reference: -# https://json-schema.org/understanding-json-schema/reference/object - -# TODO: add more schemas, and validate dicts with schemas before accessing them. - -schema_url = "http://json-schema.org/draft-07/schema" - -platformProperties = { - "architecture": {"type": "string"}, - "os": {"type": "string"}, - "os.version": {"type": "string"}, - "variant": {"type": "string"}, -} - -manifestMetaProperties = { - "mediaType": {"type": "string"}, - "platform": {"type": "object", "properties": platformProperties}, -} - -indexProperties = { - "schemaVersion": {"type": "number"}, - "mediaType": {"type": "string"}, - "subject": {"type": ["null", "object"]}, - "manifests": {"type": "array", "items": manifestMetaProperties}, - "annotations": {"type": ["object", "null", "array"]}, -} - - -index = { - "$schema": schema_url, - "title": "Index Schema", - "type": "object", - "required": [ - "schemaVersion", - "manifests", - ], - "properties": indexProperties, - "additionalProperties": True, -} - -EmptyPlatform = { - "architecture": "", - "os": "gardenlinux", - "os.version": "experimental", -} - -EmptyManifestMetadata = { - "mediaType": "application/vnd.oci.image.manifest.v1+json", - "digest": "", - "size": 0, - "annotations": {}, - "artifactType": "", -} - -EmptyIndex = { - "schemaVersion": 2, - "mediaType": "application/vnd.oci.image.index.v1+json", - "manifests": [], -} diff --git a/src/python_gardenlinux_lib/s3/__init__.py b/src/python_gardenlinux_lib/s3/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/python_gardenlinux_lib/s3/s3.py b/src/python_gardenlinux_lib/s3/s3.py deleted file mode 100755 index 760484a7..00000000 --- a/src/python_gardenlinux_lib/s3/s3.py +++ /dev/null @@ -1,162 +0,0 @@ -#!/usr/bin/env python -import argparse -import json -import logging -import os -import re -import sys -import time - -import boto3 -import yaml - -# Create a null logger as default -null_logger = logging.getLogger("gardenlinux.lib.s3") -null_logger.addHandler(logging.NullHandler()) - - -def get_s3_client(): - """Get or create an S3 client.""" - if not hasattr(get_s3_client, "_client"): - get_s3_client._client = boto3.client("s3") - return get_s3_client._client - - -def fetch_s3_bucket_contents(bucket_name, prefix="", logger=null_logger): - """ - Fetch contents of an S3 bucket with prefix. - - Args: - bucket_name (str): Name of the S3 bucket - prefix (str): Prefix for filtering objects - logger (logging.Logger): Logger instance to use - - Returns: - list: List of S3 object contents - """ - s3_client = get_s3_client() - objects = [] - - try: - paginator = s3_client.get_paginator("list_objects_v2") - - for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix): - if "Contents" in page: - objects.extend(page["Contents"]) - - return objects - except Exception as e: - logger.error(f"Error fetching objects from bucket {bucket_name}: {e}") - return [] - - -def get_s3_artifacts( - bucket_name, - prefix, - cache_file=".artifacts_cache.json", - cache_ttl=3600, - logger=null_logger, -): - """ - Get and cache S3 artifacts list with indexed searching. - - Args: - bucket_name (str): Name of the S3 bucket - prefix (str): Prefix for S3 objects - cache_file (str): Path to cache file - cache_ttl (int): Cache time-to-live in seconds - logger (logging.Logger): Logger instance to use - - Returns: - dict: Dictionary containing 'index' and 'artifacts' keys - """ - index_file = ".artifacts_index.json" - - try: - # Check if cache files exist and are fresh - if os.path.exists(cache_file) and os.path.exists(index_file): - cache_age = time.time() - os.path.getmtime(cache_file) - if cache_age < cache_ttl: - try: - with open(cache_file, "r") as f: - artifacts = json.load(f) - with open(index_file, "r") as f: - index = json.load(f) - logger.debug("Using cached artifacts data") - return {"index": index, "artifacts": artifacts} - except json.JSONDecodeError: - logger.warning("Cache files corrupted, will fetch fresh data") - try: - os.remove(cache_file) - os.remove(index_file) - except OSError: - pass - - # If no cache or expired, fetch from artifacts bucket - logger.info(f"Fetching artifacts from s3://{bucket_name}/{prefix}") - objects = fetch_s3_bucket_contents(bucket_name, prefix, logger) - - if not objects: - logger.warning("No objects found in artifacts bucket") - return {"index": {}, "artifacts": []} - - # Extract just the keys - artifacts = [obj["Key"] for obj in objects] - logger.debug(f"Found {len(artifacts)} artifacts") - - # Build the index - index = build_artifact_index(artifacts) - logger.debug(f"Built index with {len(index)} entries") - - # Save to cache files - try: - with open(cache_file, "w") as f: - json.dump(artifacts, f) - with open(index_file, "w") as f: - json.dump(index, f) - logger.debug("Saved artifacts and index to cache") - except Exception as e: - logger.warning(f"Failed to save cache files: {e}") - - return {"index": index, "artifacts": artifacts} - - except Exception as e: - logger.error(f"Error getting S3 artifacts: {e}") - return {"index": {}, "artifacts": []} - - -def build_artifact_index(artifacts): - """ - Build an index mapping version-commit to flavors for faster lookups. - - Args: - artifacts (list): List of artifact paths - - Returns: - dict: Dictionary mapping version-commit to list of flavors - """ - index = {} - - for artifact in artifacts: - try: - # Split path into components - parts = artifact.split("/") - if len(parts) < 3: - continue - - # Extract version and commit from path - version_commit = parts[0] # Format: "version-commit" - flavor = parts[1] # The flavor name - - # Skip if flavor is actually a version number - if flavor and not re.match(r"^\d+\.\d+", flavor): - # Add to index - if version_commit not in index: - index[version_commit] = [] - if flavor not in index[version_commit]: # Avoid duplicates - index[version_commit].append(flavor) - - except Exception: - continue - - return index diff --git a/src/python_gardenlinux_lib/version.py b/src/python_gardenlinux_lib/version.py deleted file mode 100644 index 949cd2c6..00000000 --- a/src/python_gardenlinux_lib/version.py +++ /dev/null @@ -1,168 +0,0 @@ -import re -import subprocess -from datetime import datetime, timezone -import requests -from pathlib import Path - -from gardenlinux.logger import LoggerSetup - - -class Version: - """Handles version-related operations for Garden Linux.""" - - def __init__(self, git_root: Path, logger=None): - """Initialize Version handler. - - Args: - git_root: Path to the Git repository root - logger: Optional logger instance - """ - self.git_root = git_root - self.log = logger or LoggerSetup.get_logger("gardenlinux.version") - self.start_date = "Mar 31 00:00:00 UTC 2020" - - def get_minor_from_repo(self, major): - """Check repo.gardenlinux.io for highest available suite minor for given major. - - Args: - major: major version - Returns: - minor version - """ - minor = 0 - limit = 100 # Hard limit the search - repo_url = f"https://repo.gardenlinux.io/gardenlinux/dists/{major}.{{}}/Release" - - while minor <= limit: - try: - check_url = repo_url.format(minor) - response = requests.get(check_url) - if response.status_code != 200: - # No more versions found, return last successful minor - return minor - 1 - minor += 1 - except requests.RequestException as e: - self.log.debug(f"Error checking repo URL {check_url}: {e}") - return minor - 1 - - # If we hit the limit, return the last minor - return minor - 1 - - def get_version(self): - """Get version using same logic as garden-version bash script. - - Args: - version: version string - Returns: - version string - """ - - try: - # Check VERSION file - version_file = self.git_root / "VERSION" - if version_file.exists(): - version = version_file.read_text().strip() - # Remove comments and empty lines - version = re.sub(r"#.*$", "", version, flags=re.MULTILINE) - version = "\n".join( - line for line in version.splitlines() if line.strip() - ) - version = version.strip() - else: - version = "today" - - if not version: - version = "today" - - # Handle numeric versions (e.g., "27.1") - if re.match(r"^[0-9\.]*$", version): - major = version.split(".")[0] - if int(major) < 10000000: # Sanity check for major version - if "." in version: - return version # Return full version if minor is specified - else: - # Get latest minor version from repo - minor = self.get_minor_from_repo(major) - return f"{major}.{minor}" - - # Handle 'today' or 'experimental' - if version in ["today", "experimental"]: - # Calculate days since start date - start_timestamp = datetime.strptime( - self.start_date, "%b %d %H:%M:%S %Z %Y" - ).timestamp() - today_timestamp = datetime.now(timezone.utc).timestamp() - major = int((today_timestamp - start_timestamp) / (24 * 60 * 60)) - return version - - # Handle date input - try: - # Try to parse as date - input_date = datetime.strptime(version, "%Y%m%d") - start_date = datetime.strptime(self.start_date, "%b %d %H:%M:%S %Z %Y") - days_diff = (input_date - start_date).days - return f"{days_diff}.0" - except ValueError: - pass - - return version - - except Exception as e: - self.log.error(f"Error determining version: {e}") - return "local" - - def get_short_commit(self): - """Get short commit using same logic as the get_commit bash script. - - Returns: - short commit string - """ - try: - # Check if COMMIT file exists in git root - commit_file = self.git_root / "COMMIT" - if commit_file.exists(): - return commit_file.read_text().strip() - - # Check if git repo is clean - status_output = ( - subprocess.check_output( - ["git", "status", "--porcelain"], stderr=subprocess.DEVNULL - ) - .decode() - .strip() - ) - - if status_output: - self.log.info(f"git status:\n {status_output}") - # Dirty repo or not a git repo - return "local" - else: - # Clean repo - use git commit hash - return ( - subprocess.check_output( - ["git", "rev-parse", "--short", "HEAD"], - stderr=subprocess.DEVNULL, - ) - .decode() - .strip() - ) - - except subprocess.CalledProcessError: - return "local" - - def get_cname(self, platform, features, arch): - """Get canonical name (cname) for Garden Linux image. - - Args: - platform: Platform identifier (e.g., 'kvm', 'aws') - features: List of features - arch: Architecture ('amd64' or 'arm64') - - Returns: - Generated cname string - """ - # Get version and commit - version = self.get_version() - commit = self.get_short_commit() - - return f"{platform}-{features}-{arch}-{version}-{commit}" diff --git a/src/python_gardenlinux_lib/features/__init__.py b/tests/apt/__init__.py similarity index 100% rename from src/python_gardenlinux_lib/features/__init__.py rename to tests/apt/__init__.py diff --git a/tests/test_parse_debsource.py b/tests/apt/test_debsource.py similarity index 100% rename from tests/test_parse_debsource.py rename to tests/apt/test_debsource.py diff --git a/tests/conftest.py b/tests/conftest.py index 7c30dad4..5dbd3ceb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,14 +4,15 @@ import subprocess import sys import tempfile -from datetime import datetime, timedelta - import pytest + from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa +from datetime import datetime, timedelta from dotenv import load_dotenv +from gardenlinux.features import Parser from .constants import ( TEST_DATA_DIR, @@ -161,10 +162,13 @@ def zot_session(): def pytest_sessionstart(session): generate_test_certificates() + # Replace the bash script call with our Python function create_test_data() os.makedirs("./manifests", exist_ok=True) + Parser.set_default_gardenlinux_root_dir(GL_ROOT_DIR) + def pytest_sessionfinish(session): if os.path.isfile(CERT_DIR + "/oci-sign.crt"): diff --git a/tests/constants.py b/tests/constants.py index 549a3d0f..390c7d9c 100644 --- a/tests/constants.py +++ b/tests/constants.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -from python_gardenlinux_lib.features.parse_features import get_gardenlinux_commit +from gardenlinux.git import Git TEST_DATA_DIR = "test-data" GL_ROOT_DIR = f"{TEST_DATA_DIR}/gardenlinux" @@ -17,6 +17,6 @@ TEST_ARCHITECTURES = ["arm64", "amd64"] TEST_FEATURE_STRINGS_SHORT = ["gardener_prod"] TEST_FEATURE_SET = "_slim,base,container" -TEST_COMMIT = get_gardenlinux_commit(GL_ROOT_DIR, 8) +TEST_COMMIT = Git(GL_ROOT_DIR).commit_id[:8] TEST_VERSION = "1000.0" TEST_VERSION_STABLE = "1000" diff --git a/tests/features/test_cname.py b/tests/features/test_cname.py index c77071f1..f286d205 100644 --- a/tests/features/test_cname.py +++ b/tests/features/test_cname.py @@ -18,6 +18,10 @@ "metal_pxe", "metal_pxe", ), + ( + "container-amd64", + "container", + ), ], ) def test_cname_flavor(input_cname: str, expected_output: dict): diff --git a/tests/test_deduce_image_type.py b/tests/test_deduce_image_type.py deleted file mode 100644 index 393440a7..00000000 --- a/tests/test_deduce_image_type.py +++ /dev/null @@ -1,39 +0,0 @@ -from python_gardenlinux_lib.features.parse_features import ( - deduce_archive_filetypes, - deduce_image_filetypes, -) -import pytest - -from .constants import GL_ROOT_DIR - - -@pytest.mark.parametrize( - "feature_name, expected_file_type", - [ - ("ali", ["qcow2"]), - ("azure", ["vhd"]), - ("gcp", ["gcpimage.tar.gz"]), - ("gdch", ["gcpimage.tar.gz"]), - ("openstack", ["qcow2", "vmdk"]), - ("openstackbaremetal", ["qcow2", "vmdk"]), - ("vmware", ["ova"]), - ], -) -def test_deduce_image_type(feature_name, expected_file_type): - file_type = deduce_image_filetypes(f"{GL_ROOT_DIR}/features/{feature_name}") - assert sorted(expected_file_type) == file_type - - -@pytest.mark.parametrize( - "feature_name, expected_file_type", - [ - ("_bfpxe", ["pxe.tar.gz"]), - ("_iso", ["iso"]), - ("_pxe", ["pxe.tar.gz"]), - ("container", ["oci"]), - ("firecracker", ["firecracker.tar.gz"]), - ], -) -def test_deduce_archive_type(feature_name, expected_file_type): - file_type = deduce_archive_filetypes(f"{GL_ROOT_DIR}/features/{feature_name}") - assert sorted(expected_file_type) == file_type diff --git a/tests/test_get_oci_metadata.py b/tests/test_get_oci_metadata.py deleted file mode 100644 index aba9142d..00000000 --- a/tests/test_get_oci_metadata.py +++ /dev/null @@ -1,47 +0,0 @@ -import pytest - -from python_gardenlinux_lib.features.parse_features import get_oci_metadata -from .constants import GL_ROOT_DIR - - -@pytest.mark.parametrize( - "input_cname, version, arch", - [ - # ("aws-gardener_prod", "today"), - # ("openstack-gardener_prod", "today"), - ("openstack-gardener_pxe", "1443.9", "amd64"), - ], -) -def test_get_oci_metadata(input_cname: str, version: str, arch: str): - """ - Work in Progess: currently only used to see what get_oci_metadata returns - """ - metadata = get_oci_metadata(input_cname, version, arch, GL_ROOT_DIR) - expected = [ - { - "file_name": "openstack-gardener_pxe-amd64-1443.9-c81fcc9f.qcow2", - "media_type": "application/io.gardenlinux.image.format.qcow2", - "annotations": {"io.gardenlinux.image.layer.architecture": "amd64"}, - }, - { - "file_name": "openstack-gardener_pxe-amd64-1443.9-c81fcc9f.vmdk", - "media_type": "application/io.gardenlinux.image.format.vmdk", - "annotations": {"io.gardenlinux.image.layer.architecture": "amd64"}, - }, - { - "file_name": "openstack-gardener_pxe-amd64-1443.9-c81fcc9f.tar", - "media_type": "application/io.gardenlinux.image.archive.format.tar", - "annotations": {"io.gardenlinux.image.layer.architecture": "amd64"}, - }, - ] - for elem in metadata: - print( - elem["file_name"], - "\tmedia-type:", - elem["media_type"], - "\t annotations", - elem["annotations"], - "\tkeys:", - elem.keys(), - ) - # assert metadata == expected diff --git a/tests/test_oci.py b/tests/test_oci.py index 471c5b50..521b3c07 100644 --- a/tests/test_oci.py +++ b/tests/test_oci.py @@ -6,20 +6,17 @@ import logging # Import reggie library correctly -from opencontainers.distribution.reggie import ( - NewClient, - WithDebug, - WithName, - WithReference, - WithUserAgent, -) +from oras.provider import Registry sys.path.append("src") from gardenlinux.oci.__main__ import cli as gl_oci + from .constants import ( CONTAINER_NAME_ZOT_EXAMPLE, GARDENLINUX_ROOT_DIR_EXAMPLE, + REGISTRY, + REGISTRY_URL, TEST_COMMIT, TEST_FEATURE_SET, TEST_VERSION, @@ -104,8 +101,7 @@ def update_index(runner, version, additional_tags=None): def get_catalog(client): """Get catalog from registry and return repositories list""" - catalog_req = client.NewRequest("GET", "/v2/_catalog") - catalog_resp = client.Do(catalog_req) + catalog_resp = client.do_request(f"{REGISTRY_URL}/v2/_catalog") assert ( catalog_resp.status_code == 200 @@ -117,8 +113,7 @@ def get_catalog(client): def get_tags(client, repo): """Get tags for a repository""" - tags_req = client.NewRequest("GET", f"/v2/{repo}/tags/list") - tags_resp = client.Do(tags_req) + tags_resp = client.do_request(f"{REGISTRY_URL}/v2/{repo}/tags/list") assert ( tags_resp.status_code == 200 @@ -131,16 +126,13 @@ def get_tags(client, repo): def get_manifest(client, repo, reference): """Get manifest and digest for a repository reference""" # Create a simple request for the manifest - manifest_req = client.NewRequest("GET", f"/v2/{repo}/manifests/{reference}") - - # Set the headers for accept types - use headers (with an 's') instead of header - manifest_req.headers.update( - { + manifest_resp = client.do_request( + f"{REGISTRY_URL}/v2/{repo}/manifests/{reference}", + headers={ "Accept": "application/vnd.oci.image.manifest.v1+json,application/vnd.docker.distribution.manifest.v2+json,application/vnd.oci.image.index.v1+json" - } + }, ) - manifest_resp = client.Do(manifest_req) assert ( manifest_resp.status_code == 200 ), f"Failed to get manifest for {repo}:{reference}, status: {manifest_resp.status_code}" @@ -215,15 +207,13 @@ def verify_additional_tags( print(f"Verifying additional tag: {tag}") try: # Create a simple request for the manifest - tag_req = client.NewRequest("GET", f"/v2/{repo}/manifests/{tag}") - tag_req.headers.update( - { + tag_resp = client.do_request( + f"{REGISTRY_URL}/v2/{repo}/manifests/{tag}", + headers={ "Accept": "application/vnd.oci.image.manifest.v1+json,application/vnd.docker.distribution.manifest.v2+json,application/vnd.oci.image.index.v1+json" - } + }, ) - tag_resp = client.Do(tag_req) - if tag_resp.status_code != 200: print( f"✗ Could not find additional tag {tag}: status {tag_resp.status_code}" @@ -287,7 +277,6 @@ def test_push_manifest_and_index( ): print(f"\n\n=== Starting test for {cname} {arch} {version} ===") runner = CliRunner() - registry_url = "http://127.0.0.1:18081" repo_name = "gardenlinux-example" combined_tag = f"{version}-{cname}-{arch}" @@ -305,9 +294,7 @@ def test_push_manifest_and_index( print(f"\n=== Verifying registry for {cname} {arch} {version} ===") # Initialize reggie client - client = NewClient( - registry_url, WithDebug(True), WithUserAgent("gardenlinux-test-client/1.0") - ) + client = Registry(hostname=REGISTRY, insecure=True) # Get repositories and verify main repo exists print("\nFetching catalog...")