diff --git a/src/gardenlinux/oci/__init__.py b/src/gardenlinux/oci/__init__.py index 4102b543..96814084 100644 --- a/src/gardenlinux/oci/__init__.py +++ b/src/gardenlinux/oci/__init__.py @@ -5,8 +5,9 @@ """ from .container import Container +from .image_manifest import ImageManifest from .index import Index from .layer import Layer from .manifest import Manifest -__all__ = ["Container", "Index", "Layer", "Manifest"] +__all__ = ["Container", "ImageManifest", "Index", "Layer", "Manifest"] diff --git a/src/gardenlinux/oci/__main__.py b/src/gardenlinux/oci/__main__.py index a0ee136f..11750555 100755 --- a/src/gardenlinux/oci/__main__.py +++ b/src/gardenlinux/oci/__main__.py @@ -31,26 +31,28 @@ def cli(): help="Container Name", ) @click.option( - "--version", - required=True, - type=click.Path(), - help="Version of image", + "--cname", required=True, type=click.Path(), help="Canonical Name of Image" ) @click.option( - "--commit", + "--arch", required=False, type=click.Path(), default=None, - help="Commit of image", + help="Target Image CPU Architecture", ) @click.option( - "--arch", - required=True, + "--version", + required=False, type=click.Path(), - help="Target Image CPU Architecture", + default=None, + help="Version of image", ) @click.option( - "--cname", required=True, type=click.Path(), help="Canonical Name of Image" + "--commit", + required=False, + type=click.Path(), + default=None, + help="Commit of image", ) @click.option("--dir", "directory", required=True, help="path to the build artifacts") @click.option( @@ -76,10 +78,10 @@ def cli(): ) def push_manifest( container, + cname, + arch, version, commit, - arch, - cname, directory, cosign_file, manifest_file, @@ -107,6 +109,76 @@ def push_manifest( print(manifest.digest, file=open(cosign_file, "w")) +@cli.command() +@click.option( + "--container", + required=True, + type=click.Path(), + help="Container Name", +) +@click.option( + "--cname", + required=False, + type=click.Path(), + default=None, + help="Canonical Name of Image", +) +@click.option( + "--arch", + required=False, + type=click.Path(), + default=None, + help="Target Image CPU Architecture", +) +@click.option( + "--version", + required=False, + type=click.Path(), + default=None, + help="Version of image", +) +@click.option( + "--commit", + required=False, + type=click.Path(), + default=None, + help="Commit of image", +) +@click.option( + "--insecure", + default=False, + help="Use HTTP to communicate with the registry", +) +@click.option( + "--tag", + required=True, + multiple=True, + help="Tag to push the manifest with", +) +def push_manifest_tags( + container, + cname, + arch, + version, + commit, + insecure, + tag, +): + """ + Push artifacts and the manifest from a directory to a registry. + + :since: 0.10.0 + """ + + container = Container( + f"{container}:{version}", + insecure=insecure, + ) + + manifest = container.read_or_generate_manifest(cname, arch, version, commit) + container.push_manifest_for_tags(manifest, tag) + + @cli.command() @click.option( "--container", diff --git a/src/gardenlinux/oci/container.py b/src/gardenlinux/oci/container.py index 8e807dbd..d64bb4d1 100644 --- a/src/gardenlinux/oci/container.py +++ b/src/gardenlinux/oci/container.py @@ -28,6 +28,7 @@ from .index import Index from .layer import Layer +from .image_manifest import ImageManifest from .manifest import Manifest from .schemas import index as IndexSchema @@ -83,11 +84,21 @@ def __init__( self._container_version = container_data[1] container_url_data = urlsplit(self._container_url) + self._token = None + + if token is None: + token = getenv("GL_CLI_REGISTRY_TOKEN") + + if token is not None: + auth_backend = "token" + self._token = b64encode(token.encode("utf-8")).decode("utf-8") + else: + auth_backend = "basic" Registry.__init__( self, hostname=container_url_data.netloc, - auth_backend="token", + auth_backend=auth_backend, insecure=insecure, ) @@ -97,11 +108,7 @@ def __init__( self._container_name = container_url_data.path[1:] self._logger = logger - if token is None: - token = getenv("GL_CLI_REGISTRY_TOKEN") - - if token is not None: - self._token = b64encode(token.encode("utf-8")).decode("utf-8") + if self._token is not None: self.auth.set_token_auth(self._token) else: # Authentication credentials from environment @@ -117,17 +124,7 @@ def __init__( except Exception as login_error: self._logger.error(f"Login error: {str(login_error)}") - def generate_index(self): - """ - Generates an OCI image index - - :return: (object) OCI image index - :since: 0.7.0 - """ - - return Index() - - def generate_manifest( + def generate_image_manifest( self, cname: str, architecture: Optional[str] = None, @@ -145,7 +142,7 @@ def generate_manifest( :param feature_set: The expanded list of the included features of this manifest :return: (object) OCI image manifest - :since: 0.7.0 + :since: 0.10.0 """ cname_object = CName(cname, architecture, version) @@ -162,15 +159,13 @@ def generate_manifest( if commit is None: commit = "" - manifest = Manifest() + manifest = ImageManifest() - manifest["annotations"] = {} - manifest["annotations"]["version"] = version - manifest["annotations"]["cname"] = cname - manifest["annotations"]["architecture"] = architecture - manifest["annotations"]["feature_set"] = feature_set - manifest["annotations"]["flavor"] = f"{cname_object.flavor}-{architecture}" - manifest["annotations"]["commit"] = commit + manifest.version = version + manifest.cname = cname + manifest.arch = architecture + manifest.feature_set = feature_set + manifest.commit = commit description = ( f"Image: {cname} " @@ -189,6 +184,43 @@ def generate_manifest( return manifest + def generate_index(self): + """ + Generates an OCI image index + + :return: (object) OCI image index + :since: 0.7.0 + """ + + return Index() + + def generate_manifest( + self, + version: Optional[str] = None, + commit: Optional[str] = None, + ): + """ + Generates an OCI manifest + + :param cname: Canonical name of the manifest + :param architecture: Target architecture of the manifest + :param version: Artifacts version of the manifest + :param commit: The commit hash of the manifest + :param feature_set: The expanded list of the included features of this manifest + + :return: (object) OCI manifest + :since: 0.9.2 + """ + + manifest = Manifest() + + manifest.version = version + manifest.commit = commit + + manifest.config_from_dict({}, {}) + + return manifest + def _get_index_without_response_parsing(self): """ Return the response of an OCI image index request. @@ -520,14 +552,14 @@ def read_or_generate_index(self): def read_or_generate_manifest( self, - cname: str, + cname: Optional[str] = None, architecture: Optional[str] = None, version: Optional[str] = None, commit: Optional[str] = None, feature_set: Optional[str] = None, ) -> Manifest: """ - Reads from registry or generates the OCI image manifest. + Reads from registry or generates the OCI manifest. :param cname: Canonical name of the manifest :param architecture: Target architecture of the manifest @@ -539,19 +571,31 @@ def read_or_generate_manifest( :since: 0.7.0 """ - if architecture is None: - architecture = CName(cname, architecture, version).arch + if cname is None: + manifest_type = Manifest - response = self._get_manifest_without_response_parsing( - f"{self._container_version}-{cname}-{architecture}" - ) + response = self._get_manifest_without_response_parsing( + self._container_version + ) + else: + manifest_type = ImageManifest + + if architecture is None: + architecture = CName(cname, architecture, version).arch + + response = self._get_manifest_without_response_parsing( + f"{self._container_version}-{cname}-{architecture}" + ) if response.ok: - manifest = Manifest(**response.json()) + manifest = manifest_type(**response.json()) elif response.status_code == 404: - manifest = self.generate_manifest( - cname, architecture, version, commit, feature_set - ) + if cname is None: + manifest = self.generate_manifest(version, commit) + else: + manifest = self.generate_image_manifest( + cname, architecture, version, commit, feature_set + ) else: response.raise_for_status() diff --git a/src/gardenlinux/oci/image_manifest.py b/src/gardenlinux/oci/image_manifest.py new file mode 100644 index 00000000..b8cd0bdf --- /dev/null +++ b/src/gardenlinux/oci/image_manifest.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- + +import json +from copy import deepcopy +from oras.oci import Layer +from os import PathLike +from pathlib import Path + +from ..features import CName + +from .manifest import Manifest +from .platform import NewPlatform +from .schemas import EmptyManifestMetadata + + +class ImageManifest(Manifest): + """ + OCI image manifest + + :author: Garden Linux Maintainers + :copyright: Copyright 2024 SAP SE + :package: gardenlinux + :subpackage: oci + :since: 0.10.0 + :license: https://www.apache.org/licenses/LICENSE-2.0 + Apache License, Version 2.0 + """ + + @property + def arch(self): + """ + Returns the architecture of the OCI image manifest. + + :return: (str) OCI image architecture + :since: 0.7.0 + """ + + if "architecture" not in self.get("annotations", {}): + raise RuntimeError( + "Unexpected manifest with missing config annotation 'architecture' found" + ) + + return self["annotations"]["architecture"] + + @arch.setter + def arch(self, value): + """ + Sets the architecture of the OCI image manifest. + + :param value: OCI image architecture + + :since: 0.7.0 + """ + + self._ensure_annotations_dict() + self["annotations"]["architecture"] = value + + @property + def cname(self): + """ + Returns the GardenLinux canonical name of the OCI image manifest. + + :return: (str) OCI image GardenLinux canonical name + :since: 0.7.0 + """ + + if "cname" not in self.get("annotations", {}): + raise RuntimeError( + "Unexpected manifest with missing config annotation 'cname' found" + ) + + return self["annotations"]["cname"] + + @cname.setter + def cname(self, value): + """ + Sets the GardenLinux canonical name of the OCI image manifest. + + :param value: OCI image GardenLinux canonical name + + :since: 0.7.0 + """ + + self._ensure_annotations_dict() + self["annotations"]["cname"] = value + + @property + def feature_set(self): + """ + Returns the GardenLinux feature set of the OCI image manifest. + + :return: (str) OCI image GardenLinux feature set + :since: 0.7.0 + """ + + if "feature_set" not in self.get("annotations", {}): + raise RuntimeError( + "Unexpected manifest with missing config annotation 'feature_set' found" + ) + + return self["annotations"]["feature_set"] + + @feature_set.setter + def feature_set(self, value): + """ + Sets the GardenLinux feature set of the OCI image manifest. + + :param value: OCI image GardenLinux feature set + + :since: 0.7.0 + """ + + self._ensure_annotations_dict() + self["annotations"]["feature_set"] = value + + @property + def flavor(self): + """ + Returns the GardenLinux flavor of the OCI image manifest. + + :return: (str) OCI image GardenLinux flavor + :since: 0.7.0 + """ + + return CName(self.cname).flavor + + @property + def layers_as_dict(self): + """ + Returns the OCI image manifest layers as a dictionary. + + :return: (dict) OCI image manifest layers with title as key + :since: 0.7.0 + """ + + layers = {} + + for layer in self["layers"]: + if "org.opencontainers.image.title" not in layer.get("annotations", {}): + raise RuntimeError( + "Unexpected layer with missing annotation 'org.opencontainers.image.title' found" + ) + + layers[layer["annotations"]["org.opencontainers.image.title"]] = layer + + return layers + + @property + def version(self): + """ + Returns the GardenLinux version of the OCI image manifest. + + :return: (str) OCI image GardenLinux version + :since: 0.7.0 + """ + + if "version" not in self.get("annotations", {}): + raise RuntimeError( + "Unexpected manifest with missing config annotation 'version' found" + ) + + return self["annotations"]["version"] + + @version.setter + def version(self, value): + """ + Sets the GardenLinux version of the OCI image manifest. + + :param value: OCI image GardenLinux version + + :since: 0.7.0 + """ + + self._ensure_annotations_dict() + self["annotations"]["version"] = value + + def append_layer(self, layer): + """ + Appends the given OCI image manifest layer to the manifest + + :param layer: OCI image manifest layer + + :since: 0.7.0 + """ + + if not isinstance(layer, Layer): + raise RuntimeError("Unexpected layer type given") + + layer_dict = layer.dict + + if "org.opencontainers.image.title" not in layer_dict.get("annotations", {}): + raise RuntimeError( + "Unexpected layer with missing annotation 'org.opencontainers.image.title' found" + ) + + image_title = layer_dict["annotations"]["org.opencontainers.image.title"] + existing_layer_index = 0 + + for existing_layer in self["layers"]: + if "org.opencontainers.image.title" not in existing_layer.get( + "annotations", {} + ): + raise RuntimeError( + "Unexpected layer with missing annotation 'org.opencontainers.image.title' found" + ) + + if ( + image_title + == existing_layer["annotations"]["org.opencontainers.image.title"] + ): + break + + existing_layer_index += 1 + + if len(self["layers"]) > existing_layer_index: + self["layers"].pop(existing_layer_index) + + self["layers"].append(layer_dict) + + def write_metadata_file(self, manifest_file_path_name): + if not isinstance(manifest_file_path_name, PathLike): + manifest_file_path_name = Path(manifest_file_path_name) + + metadata_annotations = { + "cname": self.cname, + "architecture": self.arch, + "feature_set": self.feature_set, + } + + metadata = deepcopy(EmptyManifestMetadata) + metadata["mediaType"] = "application/vnd.oci.image.manifest.v1+json" + metadata["digest"] = self.digest + metadata["size"] = self.size + metadata["annotations"] = metadata_annotations + metadata["platform"] = NewPlatform(self.arch, self.version) + + with open(manifest_file_path_name, "w") as fp: + fp.write(json.dumps(metadata)) diff --git a/src/gardenlinux/oci/manifest.py b/src/gardenlinux/oci/manifest.py index 6ac7f2b5..f56579e8 100644 --- a/src/gardenlinux/oci/manifest.py +++ b/src/gardenlinux/oci/manifest.py @@ -4,14 +4,7 @@ from copy import deepcopy from hashlib import sha256 from oras.defaults import unknown_config_media_type as UNKNOWN_CONFIG_MEDIA_TYPE -from oras.oci import EmptyManifest, Layer -from os import PathLike -from pathlib import Path - -from ..features import CName - -from .platform import NewPlatform -from .schemas import EmptyManifestMetadata +from oras.oci import EmptyManifest class Manifest(dict): @@ -42,70 +35,12 @@ def __init__(self, *args, **kwargs): self.update(*args) self.update(**kwargs) - @property - def arch(self): - """ - Returns the architecture of the OCI image manifest. - - :return: (str) OCI image architecture - :since: 0.7.0 - """ - - if "architecture" not in self.get("annotations", {}): - raise RuntimeError( - "Unexpected manifest with missing config annotation 'architecture' found" - ) - - return self["annotations"]["architecture"] - - @arch.setter - def arch(self, value): - """ - Sets the architecture of the OCI image manifest. - - :param value: OCI image architecture - - :since: 0.7.0 - """ - - self._ensure_annotations_dict() - self["annotations"]["architecture"] = value - - @property - def cname(self): - """ - Returns the GardenLinux canonical name of the OCI image manifest. - - :return: (str) OCI image GardenLinux canonical name - :since: 0.7.0 - """ - - if "cname" not in self.get("annotations", {}): - raise RuntimeError( - "Unexpected manifest with missing config annotation 'cname' found" - ) - - return self["annotations"]["cname"] - - @cname.setter - def cname(self, value): - """ - Sets the GardenLinux canonical name of the OCI image manifest. - - :param value: OCI image GardenLinux canonical name - - :since: 0.7.0 - """ - - self._ensure_annotations_dict() - self["annotations"]["cname"] = value - @property def commit(self): """ - Returns the GardenLinux Git commit ID of the OCI image manifest. + Returns the GardenLinux Git commit ID of the OCI manifest. - :return: (str) OCI image GardenLinux Git commit ID + :return: (str) OCI GardenLinux Git commit ID :since: 0.7.0 """ @@ -119,9 +54,9 @@ def commit(self): @commit.setter def commit(self, value): """ - Sets the GardenLinux Git commit ID of the OCI image manifest. + Sets the GardenLinux Git commit ID of the OCI manifest. - :param value: OCI image GardenLinux Git commit ID + :param value: OCI GardenLinux Git commit ID :since: 0.7.0 """ @@ -152,46 +87,6 @@ def digest(self): digest = sha256(self.json).hexdigest() return f"sha256:{digest}" - @property - def feature_set(self): - """ - Returns the GardenLinux feature set of the OCI image manifest. - - :return: (str) OCI image GardenLinux feature set - :since: 0.7.0 - """ - - if "feature_set" not in self.get("annotations", {}): - raise RuntimeError( - "Unexpected manifest with missing config annotation 'feature_set' found" - ) - - return self["annotations"]["feature_set"] - - @feature_set.setter - def feature_set(self, value): - """ - Sets the GardenLinux feature set of the OCI image manifest. - - :param value: OCI image GardenLinux feature set - - :since: 0.7.0 - """ - - self._ensure_annotations_dict() - self["annotations"]["feature_set"] = value - - @property - def flavor(self): - """ - Returns the GardenLinux flavor of the OCI image manifest. - - :return: (str) OCI image GardenLinux flavor - :since: 0.7.0 - """ - - return CName(self.cname).flavor - @property def json(self): """ @@ -203,27 +98,6 @@ def json(self): return json.dumps(self).encode("utf-8") - @property - def layers_as_dict(self): - """ - Returns the OCI image manifest layers as a dictionary. - - :return: (dict) OCI image manifest layers with title as key - :since: 0.7.0 - """ - - layers = {} - - for layer in self["layers"]: - if "org.opencontainers.image.title" not in layer.get("annotations", {}): - raise RuntimeError( - "Unexpected layer with missing annotation 'org.opencontainers.image.title' found" - ) - - layers[layer["annotations"]["org.opencontainers.image.title"]] = layer - - return layers - @property def size(self): """ @@ -287,69 +161,6 @@ def config_from_dict(self, config: dict, annotations: dict): self["config"] = config - def append_layer(self, layer): - """ - Appends the given OCI image manifest layer to the manifest - - :param layer: OCI image manifest layer - - :since: 0.7.0 - """ - - if not isinstance(layer, Layer): - raise RuntimeError("Unexpected layer type given") - - layer_dict = layer.dict - - if "org.opencontainers.image.title" not in layer_dict.get("annotations", {}): - raise RuntimeError( - "Unexpected layer with missing annotation 'org.opencontainers.image.title' found" - ) - - image_title = layer_dict["annotations"]["org.opencontainers.image.title"] - existing_layer_index = 0 - - for existing_layer in self["layers"]: - if "org.opencontainers.image.title" not in existing_layer.get( - "annotations", {} - ): - raise RuntimeError( - "Unexpected layer with missing annotation 'org.opencontainers.image.title' found" - ) - - if ( - image_title - == existing_layer["annotations"]["org.opencontainers.image.title"] - ): - break - - existing_layer_index += 1 - - if len(self["layers"]) > existing_layer_index: - self["layers"].pop(existing_layer_index) - - self["layers"].append(layer_dict) - def _ensure_annotations_dict(self): if "annotations" not in self: self["annotations"] = {} - - def write_metadata_file(self, manifest_file_path_name): - if not isinstance(manifest_file_path_name, PathLike): - manifest_file_path_name = Path(manifest_file_path_name) - - metadata_annotations = { - "cname": self.cname, - "architecture": self.arch, - "feature_set": self.feature_set, - } - - metadata = deepcopy(EmptyManifestMetadata) - metadata["mediaType"] = "application/vnd.oci.image.manifest.v1+json" - metadata["digest"] = self.digest - metadata["size"] = self.size - metadata["annotations"] = metadata_annotations - metadata["platform"] = NewPlatform(self.arch, self.version) - - with open(manifest_file_path_name, "w") as fp: - fp.write(json.dumps(metadata)) diff --git a/tests/conftest.py b/tests/conftest.py index 5dbd3ceb..20a56d1b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,16 +1,16 @@ +from datetime import datetime, timedelta +from tempfile import mkstemp import json import os import shutil import subprocess import sys -import tempfile import pytest from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa -from datetime import datetime, timedelta from dotenv import load_dotenv from gardenlinux.features import Parser @@ -26,6 +26,7 @@ TEST_FEATURE_STRINGS_SHORT, TEST_ARCHITECTURES, ) + from .helper import call_command, spawn_background_process @@ -80,11 +81,6 @@ def generate_test_certificates(): print(f"Generated test certificates in {CERT_DIR}") -def write_zot_config(config_dict, file_path): - with open(file_path, "w") as config_file: - json.dump(config_dict, config_file, indent=4) - - def create_test_data(): """Generate test data for OCI registry tests (replaces build-test-data.sh)""" print("Creating fake artifacts...") @@ -127,20 +123,38 @@ def create_test_data(): f.write(f"dummy content for {file_path}") +def write_zot_config(config_dict, fd): + with os.fdopen(fd, "w") as fp: + json.dump(config_dict, fp, indent=4) + + @pytest.fixture(autouse=False, scope="function") def zot_session(): load_dotenv() print("start zot session") + + fd, htpasswd_file = mkstemp(dir=TEST_DATA_DIR, suffix=".htpasswd") + os.close(fd) + zot_config = { "distSpecVersion": "1.1.0", "storage": {"rootDirectory": "output/registry/zot"}, - "http": {"address": "127.0.0.1", "port": "18081"}, + "http": { + "address": "127.0.0.1", + "port": "18081", + "auth": {"htpasswd": {"path": f"{htpasswd_file}"}}, + "accessControl": { + "repositories": { + "**": {"anonymousPolicy": ["read", "create", "update", "delete"]}, + "protected/**": {"anonymousPolicy": []}, + } + }, + }, "log": {"level": "warn"}, } - with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as temp_config_file: - write_zot_config(zot_config, temp_config_file.name) - zot_config_file_path = temp_config_file.name + fd, zot_config_file_path = mkstemp(text=True, dir=TEST_DATA_DIR, suffix=".json") + write_zot_config(zot_config, fd) print(f"Spawning zot registry with config {zot_config_file_path}") zot_process = spawn_background_process( @@ -156,6 +170,8 @@ def zot_session(): if os.path.isdir("./output"): shutil.rmtree("./output") + if os.path.isfile(htpasswd_file): + os.remove(htpasswd_file) if os.path.isfile(zot_config_file_path): os.remove(zot_config_file_path) diff --git a/tests/oci/test_container.py b/tests/oci/test_container.py new file mode 100644 index 00000000..ad1dbb48 --- /dev/null +++ b/tests/oci/test_container.py @@ -0,0 +1,96 @@ +from base64 import b64encode +import io +import json +import pytest +import logging + +from gardenlinux.oci import Container +from requests import Response +from requests.exceptions import HTTPError + +from ..constants import ( + CONTAINER_NAME_ZOT_EXAMPLE, + REGISTRY, + TEST_COMMIT, + TEST_VERSION, +) + + +@pytest.fixture(name="Container_login_403") +def patch__Container_login_403(monkeypatch): + """Patch `login()` to return HTTP 403. `docker.errors.APIError` extends from `requests.exceptions.HTTPError` as well.""" + + def login_403(*args, **kwargs): + response = Response() + response.status_code = 403 + raise HTTPError("403 Forbidden", response=response) + + monkeypatch.setattr(Container, "login", login_403) + + +@pytest.fixture(name="Container_read_or_generate_403") +def patch__Container_read_or_generate_403(monkeypatch): + """Patch `read_or_generate_manifest()` to return HTTP 403.""" + + def read_or_generate_403(*args, **kwargs): + response = Response() + response.status_code = 403 + raise HTTPError("403 Forbidden", response=response) + + monkeypatch.setattr(Container, "read_or_generate_manifest", read_or_generate_403) + + +@pytest.mark.usefixtures("zot_session") +def test_manifest(): + """Verify a newly created manifest returns correct commit value.""" + # Arrange + container = Container(f"{CONTAINER_NAME_ZOT_EXAMPLE}:{TEST_VERSION}", insecure=True) + + manifest = container.read_or_generate_manifest( + version=TEST_VERSION, commit=TEST_COMMIT + ) + + # Assert + assert manifest.commit == TEST_COMMIT + + +@pytest.mark.usefixtures("zot_session") +@pytest.mark.usefixtures("Container_read_or_generate_403") +def test_manifest_403(): + """Verify container calls raises exceptions for certain errors.""" + # Arrange + container = Container(f"{CONTAINER_NAME_ZOT_EXAMPLE}:{TEST_VERSION}", insecure=True) + + with pytest.raises(HTTPError): + container.read_or_generate_manifest(version=TEST_VERSION, commit=TEST_COMMIT) + + +@pytest.mark.usefixtures("zot_session") +def test_manifest_auth_token(monkeypatch, caplog): + """Verify container calls use login environment variables if defined.""" + with monkeypatch.context(): + token = "test" + monkeypatch.setenv("GL_CLI_REGISTRY_TOKEN", token) + + # Arrange + container = Container( + f"{CONTAINER_NAME_ZOT_EXAMPLE}:{TEST_VERSION}", insecure=True + ) + + # Assert + assert container.auth.token == b64encode(bytes(token, "utf-8")).decode("utf-8") + + +@pytest.mark.usefixtures("zot_session") +@pytest.mark.usefixtures("Container_login_403") +def test_manifest_login_username_password(monkeypatch, caplog): + """Verify container calls use login environment variables if defined.""" + with monkeypatch.context(): + monkeypatch.setenv("GL_CLI_REGISTRY_USERNAME", "test") + monkeypatch.setenv("GL_CLI_REGISTRY_PASSWORD", "test") + + # Arrange + Container(f"{REGISTRY}/protected/test:{TEST_VERSION}", insecure=True) + + # Assert + assert "Login error: 403 Forbidden" in caplog.text diff --git a/tests/oci/test_image_manifest.py b/tests/oci/test_image_manifest.py new file mode 100644 index 00000000..1c7799c7 --- /dev/null +++ b/tests/oci/test_image_manifest.py @@ -0,0 +1,108 @@ +import pytest + +from gardenlinux.oci import ImageManifest, Layer + + +def test_ImageManifest_arch(): + # Arrange + empty_manifest = ImageManifest() + manifest = ImageManifest(annotations={"architecture": "amd64"}) + + # Assert + with pytest.raises(RuntimeError): + assert empty_manifest.arch == "amd64" + + empty_manifest.arch = "amd64" + assert empty_manifest.arch == "amd64" + + assert manifest.arch == "amd64" + + +def test_ImageManifest_cname(): + # Arrange + cname = "container-amd64-today-local" + + empty_manifest = ImageManifest() + manifest = ImageManifest(annotations={"cname": cname}) + + # Assert + with pytest.raises(RuntimeError): + assert empty_manifest.cname == cname + + empty_manifest.cname = cname + assert empty_manifest.cname == cname + + assert manifest.cname == cname + + +def test_ImageManifest_feature_set(): + # Arrange + feature_set = "container" + + empty_manifest = ImageManifest() + manifest = ImageManifest(annotations={"feature_set": feature_set}) + + # Assert + with pytest.raises(RuntimeError): + assert empty_manifest.feature_set == feature_set + + empty_manifest.feature_set = feature_set + assert empty_manifest.feature_set == feature_set + + assert manifest.feature_set == feature_set + + +def test_ImageManifest_flavor(): + # Arrange + flavor = "container" + cname = f"{flavor}-amd64-today-local" + + empty_manifest = ImageManifest() + manifest = ImageManifest(annotations={"cname": cname}) + + # Assert + with pytest.raises(RuntimeError): + assert empty_manifest.flavor == flavor + + empty_manifest.cname = cname + assert empty_manifest.flavor == flavor + + assert manifest.flavor == flavor + + +def test_ImageManifest_layer(tmp_path): + # Arrange + blob = tmp_path / "blob.txt" + blob.write_text("data") + + # Act + layer = Layer(blob) + + manifest = ImageManifest() + manifest.append_layer(layer) + + # Assert + + assert len(manifest.layers_as_dict) == 1 + assert manifest.layers_as_dict.popitem()[0] == "blob.txt" + + # Assert + with pytest.raises(RuntimeError): + assert manifest.append_layer({"test": "invalid"}) + + +def test_ImageManifest_version(): + # Arrange + version = "today" + + empty_manifest = ImageManifest() + manifest = ImageManifest(annotations={"version": version}) + + # Assert + with pytest.raises(RuntimeError): + assert empty_manifest.version == version + + empty_manifest.version = version + assert empty_manifest.version == version + + assert manifest.version == version diff --git a/tests/oci/test_oci.py b/tests/oci/test_oci.py index d0fcdce3..7b0b53c2 100644 --- a/tests/oci/test_oci.py +++ b/tests/oci/test_oci.py @@ -66,6 +66,41 @@ def push_manifest(runner, version, arch, cname, additional_tags=None): return False +def push_manifest_tags(runner, version, arch, cname, tags=None): + """Push manifest to registry and return success status""" + print(f"Pushing manifest for {cname} {arch}") + + cmd = [ + "push-manifest-tags", + "--container", + CONTAINER_NAME_ZOT_EXAMPLE, + "--version", + version, + "--arch", + arch, + "--cname", + cname, + "--insecure", + "True", + ] + + if tags: + for tag in tags: + cmd.extend(["--tag", tag]) + + try: + result = runner.invoke( + gl_oci, + cmd, + catch_exceptions=False, + ) + print(f"Push manifest tags output: {result.output}") + return result.exit_code == 0 + except Exception as e: + print(f"Error during push manifest tags: {str(e)}") + return False + + def update_index(runner, version, additional_tags=None): """Update index in registry and return success status""" print("Updating index") @@ -278,12 +313,24 @@ def test_push_manifest_and_index( repo_name = "gardenlinux-example" combined_tag = f"{version}-{cname}-{arch}" + post_push_manifest_tags = [] + # Push manifest and update index + if cname.startswith(f"{TEST_PLATFORMS[1]}-"): + post_push_manifest_tags = additional_tags_manifest + additional_tags_manifest = [] + push_successful = push_manifest( runner, version, arch, cname, additional_tags_manifest ) assert push_successful, "Manifest push should succeed" + if len(post_push_manifest_tags) > 0: + push_successful = push_manifest_tags( + runner, version, arch, cname, post_push_manifest_tags + ) + assert push_successful, "Manifest tags push should succeed" + if push_successful: update_index_successful = update_index(runner, version, additional_tags_index) assert update_index_successful, "Index update should succeed"