diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fb2a5f771..47cbadb5e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,9 @@ on: - series/* pull_request: schedule: - - cron: '0 12 * * *' + - cron: "0 12 * * *" + +permissions: {} jobs: test: @@ -98,7 +100,7 @@ jobs: if: always() needs: - - test + - test runs-on: ubuntu-latest @@ -121,7 +123,7 @@ jobs: - uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0 with: - python-version: '3.x' + python-version: "3.x" - run: pip install coverage[toml] diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index 22da12694..024b28ce7 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -7,6 +7,8 @@ on: workflow_dispatch: pull_request: +permissions: {} + jobs: conformance: runs-on: ubuntu-latest diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index caeb30238..9c4e1e866 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -5,6 +5,8 @@ on: branches: - main +permissions: {} + jobs: build: runs-on: ubuntu-latest diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index dff9b066a..f50367f09 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -6,6 +6,8 @@ on: - main pull_request: +permissions: {} + jobs: lint: runs-on: ubuntu-latest @@ -87,10 +89,10 @@ jobs: if: always() needs: - - lint - - check-readme - - licenses - - x509-testcases + - lint + - check-readme + - licenses + - x509-testcases runs-on: ubuntu-latest diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 55b4b3b9a..29f65b4ab 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -5,8 +5,7 @@ on: types: - published -permissions: # added using https://github.com/step-security/secure-workflows - contents: read +permissions: {} jobs: build: @@ -99,7 +98,7 @@ jobs: - name: Generate build provenance uses: actions/attest-build-provenance@v2 with: - subject-path: 'built-packages/*' + subject-path: "built-packages/*" release-pypi: needs: [build, generate-provenance] diff --git a/.github/workflows/requirements.yml b/.github/workflows/requirements.yml index 499e39233..fdaa4ac2e 100644 --- a/.github/workflows/requirements.yml +++ b/.github/workflows/requirements.yml @@ -12,7 +12,9 @@ on: required: true pull_request: schedule: - - cron: '0 12 * * *' + - cron: "0 12 * * *" + +permissions: {} jobs: test_requirements: diff --git a/.github/workflows/staging-tests.yml b/.github/workflows/staging-tests.yml index ef69b95a8..dad5ba26b 100644 --- a/.github/workflows/staging-tests.yml +++ b/.github/workflows/staging-tests.yml @@ -5,7 +5,9 @@ on: branches: - main schedule: - - cron: '0 */8 * * *' + - cron: "0 */8 * * *" + +permissions: {} jobs: staging-tests: @@ -27,7 +29,6 @@ jobs: cache: "pip" cache-dependency-path: pyproject.toml - - name: staging tests env: SIGSTORE_LOGLEVEL: DEBUG diff --git a/Makefile b/Makefile index c7945f307..b95a679fc 100644 --- a/Makefile +++ b/Makefile @@ -68,7 +68,6 @@ lint: $(VENV)/pyvenv.cfg ruff check $(ALL_PY_SRCS) && \ mypy $(PY_MODULE) && \ bandit -c pyproject.toml -r $(PY_MODULE) && \ - interrogate --fail-under 100 -c pyproject.toml $(PY_MODULE) && \ python docs/scripts/gen_ref_pages.py --check .PHONY: reformat diff --git a/pyproject.toml b/pyproject.toml index 8d89ec027..7441be85d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,8 +37,9 @@ dependencies = [ "rich >= 13,< 15", "rfc8785 ~= 0.1.2", "rfc3161-client >= 1.0.3,< 1.1.0", - # NOTE(ww): Both under active development, so strictly pinned. - "sigstore-protobuf-specs == 0.5.0", + # Both sigstore-models and sigstore-rekor types are unstable + # so we pin them conservatively. + "sigstore-models == 0.0.5", "sigstore-rekor-types == 0.0.18", "tuf ~= 6.0", "platformdirs ~= 4.2", @@ -58,7 +59,7 @@ Documentation = "https://sigstore.github.io/sigstore-python/" test = ["pytest", "pytest-cov", "pretend", "coverage[toml]"] lint = [ "bandit", - "interrogate >= 1.7.0", + # "interrogate >= 1.7.0", "mypy ~= 1.1", # NOTE(ww): ruff is under active development, so we pin conservatively here # and let Dependabot periodically perform this update. diff --git a/sigstore/_cli.py b/sigstore/_cli.py index 4ed1219b7..d63cbd1d0 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -30,10 +30,8 @@ from pydantic import ValidationError from rich.console import Console from rich.logging import RichHandler -from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( - Bundle as RawBundle, -) -from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm +from sigstore_models.bundle.v1 import Bundle as RawBundle +from sigstore_models.common.v1 import HashAlgorithm from typing_extensions import TypeAlias from sigstore import __version__, dsse @@ -670,7 +668,7 @@ def _sign_file_threaded( raise exp_certificate _logger.info( - f"Transparency log entry created at index: {result.log_entry.log_index}" + f"Transparency log entry created at index: {result.log_entry._inner.log_index}" ) if outputs.signature is not None: @@ -1236,7 +1234,7 @@ def _fix_bundle(args: argparse.Namespace) -> None: rekor = RekorClient.staging() if args.staging else RekorClient.production() - raw_bundle = RawBundle.from_dict(json.loads(args.bundle.read_bytes())) + raw_bundle = RawBundle.from_json(args.bundle.read_bytes()) if len(raw_bundle.verification_material.tlog_entries) != 1: _fatal("unfixable bundle: must have exactly one log entry") @@ -1249,8 +1247,8 @@ def _fix_bundle(args: argparse.Namespace) -> None: inclusion_proof = tlog_entry.inclusion_proof if not inclusion_proof.checkpoint: _logger.info("fixable: bundle's log entry is missing a checkpoint") - new_entry = rekor.log.entries.get(log_index=tlog_entry.log_index)._to_rekor() - raw_bundle.verification_material.tlog_entries = [new_entry] + new_entry = rekor.log.entries.get(log_index=tlog_entry.log_index) + raw_bundle.verification_material.tlog_entries = [new_entry._inner] # Try to create our invariant-preserving Bundle from the any changes above. try: diff --git a/sigstore/_internal/key_details.py b/sigstore/_internal/key_details.py index 7c65ec8ba..f9a53b975 100644 --- a/sigstore/_internal/key_details.py +++ b/sigstore/_internal/key_details.py @@ -13,17 +13,15 @@ # limitations under the License. """ -Utilities for getting the sigstore_protobuf_specs.dev.sigstore.common.v1.PublicKeyDetails. +Utilities for getting PublicKeyDetails. """ -from typing import cast - from cryptography.hazmat.primitives.asymmetric import ec, ed25519, padding, rsa from cryptography.x509 import Certificate -from sigstore_protobuf_specs.dev.sigstore.common import v1 +from sigstore_models.common.v1 import PublicKeyDetails -def _get_key_details(certificate: Certificate) -> v1.PublicKeyDetails: +def _get_key_details(certificate: Certificate) -> PublicKeyDetails: """ Determine PublicKeyDetails from the Certificate. We disclude the unrecommended types. @@ -35,28 +33,28 @@ def _get_key_details(certificate: Certificate) -> v1.PublicKeyDetails: params = certificate.signature_algorithm_parameters if isinstance(public_key, ec.EllipticCurvePublicKey): if isinstance(public_key.curve, ec.SECP256R1): - key_details = v1.PublicKeyDetails.PKIX_ECDSA_P256_SHA_256 + key_details = PublicKeyDetails.PKIX_ECDSA_P256_SHA_256 elif isinstance(public_key.curve, ec.SECP384R1): - key_details = v1.PublicKeyDetails.PKIX_ECDSA_P384_SHA_384 + key_details = PublicKeyDetails.PKIX_ECDSA_P384_SHA_384 elif isinstance(public_key.curve, ec.SECP521R1): - key_details = v1.PublicKeyDetails.PKIX_ECDSA_P521_SHA_512 + key_details = PublicKeyDetails.PKIX_ECDSA_P521_SHA_512 else: raise ValueError(f"Unsupported EC curve: {public_key.curve.name}") elif isinstance(public_key, rsa.RSAPublicKey): if public_key.key_size == 3072: if isinstance(params, padding.PKCS1v15): - key_details = v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256 + key_details = PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256 elif isinstance(params, padding.PSS): - key_details = v1.PublicKeyDetails.PKIX_RSA_PSS_3072_SHA256 + key_details = PublicKeyDetails.PKIX_RSA_PSS_3072_SHA256 else: raise ValueError( f"Unsupported public key type, size, and padding: {type(public_key)}, {public_key.key_size}, {params}" ) elif public_key.key_size == 4096: if isinstance(params, padding.PKCS1v15): - key_details = v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256 + key_details = PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256 elif isinstance(params, padding.PSS): - key_details = v1.PublicKeyDetails.PKIX_RSA_PSS_3072_SHA256 + key_details = PublicKeyDetails.PKIX_RSA_PSS_3072_SHA256 else: raise ValueError( f"Unsupported public key type, size, and padding: {type(public_key)}, {public_key.key_size}, {params}" @@ -64,9 +62,9 @@ def _get_key_details(certificate: Certificate) -> v1.PublicKeyDetails: else: raise ValueError(f"Unsupported RSA key size: {public_key.key_size}") elif isinstance(public_key, ed25519.Ed25519PublicKey): - key_details = v1.PublicKeyDetails.PKIX_ED25519 + key_details = PublicKeyDetails.PKIX_ED25519 # There is likely no need to explicitly detect PKIX_ED25519_PH, especially since the cryptography # library does not yet support Ed25519ph. else: raise ValueError(f"Unsupported public key type: {type(public_key)}") - return cast(v1.PublicKeyDetails, key_details) + return key_details diff --git a/sigstore/_internal/merkle.py b/sigstore/_internal/merkle.py index a39bdb919..1eab29807 100644 --- a/sigstore/_internal/merkle.py +++ b/sigstore/_internal/merkle.py @@ -23,16 +23,14 @@ from __future__ import annotations -import base64 import hashlib import struct import typing -from sigstore._utils import HexStr from sigstore.errors import VerificationError if typing.TYPE_CHECKING: - from sigstore.models import LogEntry + from sigstore.models import TransparencyLogEntry _LEAF_HASH_PREFIX = 0 @@ -54,7 +52,7 @@ def _decomp_inclusion_proof(index: int, size: int) -> tuple[int, int]: return inner, border -def _chain_inner(seed: bytes, hashes: list[str], log_index: int) -> bytes: +def _chain_inner(seed: bytes, hashes: list[bytes], log_index: int) -> bytes: """ Computes a subtree hash for a node on or below the tree's right border. Assumes |proof| hashes are ordered from lower levels to upper, and |seed| is the initial subtree/leaf hash on the path @@ -62,7 +60,7 @@ def _chain_inner(seed: bytes, hashes: list[str], log_index: int) -> bytes: """ for i in range(len(hashes)): - h = bytes.fromhex(hashes[i]) + h = hashes[i] if (log_index >> i) & 1 == 0: seed = _hash_children(seed, h) else: @@ -70,14 +68,14 @@ def _chain_inner(seed: bytes, hashes: list[str], log_index: int) -> bytes: return seed -def _chain_border_right(seed: bytes, hashes: list[str]) -> bytes: +def _chain_border_right(seed: bytes, hashes: list[bytes]) -> bytes: """ Chains proof hashes along tree borders. This differs from inner chaining because |proof| contains only left-side subtree hashes. """ for h in hashes: - seed = _hash_children(bytes.fromhex(h), seed) + seed = _hash_children(h, seed) return seed @@ -93,9 +91,9 @@ def _hash_leaf(leaf: bytes) -> bytes: return hashlib.sha256(data).digest() -def verify_merkle_inclusion(entry: LogEntry) -> None: +def verify_merkle_inclusion(entry: TransparencyLogEntry) -> None: """Verify the Merkle Inclusion Proof for a given Rekor entry.""" - inclusion_proof = entry.inclusion_proof + inclusion_proof = entry._inner.inclusion_proof # Figure out which subset of hashes corresponds to the inner and border nodes. inner, border = _decomp_inclusion_proof( @@ -111,7 +109,7 @@ def verify_merkle_inclusion(entry: LogEntry) -> None: # The new entry's hash isn't included in the inclusion proof so we should calculate this # ourselves. - leaf_hash: bytes = _hash_leaf(base64.b64decode(entry.body)) + leaf_hash: bytes = _hash_leaf(entry._inner.canonicalized_body) # Now chain the hashes belonging to the inner and border portions. We should expect the # calculated hash to match the root hash. @@ -119,12 +117,10 @@ def verify_merkle_inclusion(entry: LogEntry) -> None: leaf_hash, inclusion_proof.hashes[:inner], inclusion_proof.log_index ) - calc_hash: HexStr = HexStr( - _chain_border_right(intermediate_result, inclusion_proof.hashes[inner:]).hex() - ) + calc_hash = _chain_border_right(intermediate_result, inclusion_proof.hashes[inner:]) if calc_hash != inclusion_proof.root_hash: raise VerificationError( f"inclusion proof contains invalid root hash: expected {inclusion_proof}, calculated " - f"{calc_hash}" + f"{calc_hash.hex()}" ) diff --git a/sigstore/_internal/rekor/__init__.py b/sigstore/_internal/rekor/__init__.py index 0af66edc1..50bdad768 100644 --- a/sigstore/_internal/rekor/__init__.py +++ b/sigstore/_internal/rekor/__init__.py @@ -31,7 +31,7 @@ from sigstore.hashes import Hashed if typing.TYPE_CHECKING: - from sigstore.models import LogEntry + from sigstore.models import TransparencyLogEntry __all__ = [ "_hashedrekord_from_parts", @@ -72,7 +72,7 @@ class RekorLogSubmitter(ABC): def create_entry( self, request: EntryRequestBody, - ) -> LogEntry: + ) -> TransparencyLogEntry: """ Submit the request to Rekor. """ diff --git a/sigstore/_internal/rekor/checkpoint.py b/sigstore/_internal/rekor/checkpoint.py index c630d24fa..a0ec513aa 100644 --- a/sigstore/_internal/rekor/checkpoint.py +++ b/sigstore/_internal/rekor/checkpoint.py @@ -31,7 +31,7 @@ if typing.TYPE_CHECKING: from sigstore._internal.trust import RekorKeyring - from sigstore.models import LogEntry + from sigstore.models import TransparencyLogEntry @dataclass(frozen=True) @@ -205,25 +205,26 @@ def from_text(cls, text: str) -> SignedCheckpoint: return cls(signed_note=signed_note, checkpoint=checkpoint) -def verify_checkpoint(rekor_keyring: RekorKeyring, entry: LogEntry) -> None: +def verify_checkpoint(rekor_keyring: RekorKeyring, entry: TransparencyLogEntry) -> None: """ Verify the inclusion proof's checkpoint. """ - inclusion_proof = entry.inclusion_proof - if inclusion_proof is None: - raise VerificationError("Rekor entry has no inclusion proof") + inclusion_proof = entry._inner.inclusion_proof + if inclusion_proof.checkpoint is None: + raise VerificationError("Inclusion proof does not contain a checkpoint") # verification occurs in two stages: # 1) verify the signature on the checkpoint # 2) verify the root hash in the checkpoint matches the root hash from the inclusion proof. - signed_checkpoint = SignedCheckpoint.from_text(inclusion_proof.checkpoint) + signed_checkpoint = SignedCheckpoint.from_text(inclusion_proof.checkpoint.envelope) signed_checkpoint.signed_note.verify( - rekor_keyring, KeyID(bytes.fromhex(entry.log_id)) + rekor_keyring, + KeyID(entry._inner.log_id.key_id), ) checkpoint_hash = signed_checkpoint.checkpoint.log_hash - root_hash = inclusion_proof.root_hash + root_hash = inclusion_proof.root_hash.hex() if checkpoint_hash != root_hash: raise VerificationError( diff --git a/sigstore/_internal/rekor/client.py b/sigstore/_internal/rekor/client.py index 4dc8e09c6..57a321885 100644 --- a/sigstore/_internal/rekor/client.py +++ b/sigstore/_internal/rekor/client.py @@ -13,7 +13,7 @@ # limitations under the License. """ -Client implementation for interacting with Rekor. +Client implementation for interacting with Rekor (v1). """ from __future__ import annotations @@ -38,7 +38,7 @@ ) from sigstore.dsse import Envelope from sigstore.hashes import Hashed -from sigstore.models import LogEntry +from sigstore.models import TransparencyLogEntry _logger = logging.getLogger(__name__) @@ -120,7 +120,9 @@ class RekorEntries(_Endpoint): Represents the individual log entry endpoints on a Rekor instance. """ - def get(self, *, uuid: str | None = None, log_index: int | None = None) -> LogEntry: + def get( + self, *, uuid: str | None = None, log_index: int | None = None + ) -> TransparencyLogEntry: """ Retrieve a specific log entry, either by UUID or by log index. @@ -140,12 +142,12 @@ def get(self, *, uuid: str | None = None, log_index: int | None = None) -> LogEn resp.raise_for_status() except requests.HTTPError as http_error: raise RekorClientError(http_error) - return LogEntry._from_response(resp.json()) + return TransparencyLogEntry._from_v1_response(resp.json()) def post( self, payload: EntryRequestBody, - ) -> LogEntry: + ) -> TransparencyLogEntry: """ Submit a new entry for inclusion in the Rekor log. """ @@ -160,7 +162,7 @@ def post( integrated_entry = resp.json() _logger.debug(f"integrated: {integrated_entry}") - return LogEntry._from_response(integrated_entry) + return TransparencyLogEntry._from_v1_response(integrated_entry) @property def retrieve(self) -> RekorEntriesRetrieve: @@ -178,7 +180,7 @@ class RekorEntriesRetrieve(_Endpoint): def post( self, expected_entry: rekor_types.Hashedrekord | rekor_types.Dsse, - ) -> LogEntry | None: + ) -> TransparencyLogEntry | None: """ Retrieves an extant Rekor entry, identified by its artifact signature, artifact hash, and signing certificate. @@ -202,12 +204,19 @@ def post( # We select the oldest entry for our actual return value, # since a malicious actor could conceivably spam the log with # newer duplicate entries. - oldest_entry: LogEntry | None = None + oldest_entry: TransparencyLogEntry | None = None for result in results: - entry = LogEntry._from_response(result) + entry = TransparencyLogEntry._from_v1_response(result) + + # We expect every entry in Rekor v1 to have an integrated time. + if entry._inner.integrated_time is None: + raise ValueError( + f"Rekor v1 gave us an entry without an integrated time: {entry._inner.log_index}" + ) + if ( oldest_entry is None - or entry.integrated_time < oldest_entry.integrated_time + or entry._inner.integrated_time < oldest_entry._inner.integrated_time # type: ignore[operator] ): oldest_entry = entry @@ -247,7 +256,7 @@ def log(self) -> RekorLog: return RekorLog(f"{self.url}/log") - def create_entry(self, request: EntryRequestBody) -> LogEntry: + def create_entry(self, request: EntryRequestBody) -> TransparencyLogEntry: """ Submit the request to Rekor. """ diff --git a/sigstore/_internal/rekor/client_v2.py b/sigstore/_internal/rekor/client_v2.py index b21580a51..d4a4d0e10 100644 --- a/sigstore/_internal/rekor/client_v2.py +++ b/sigstore/_internal/rekor/client_v2.py @@ -13,20 +13,21 @@ # limitations under the License. """ -Client implementation for interacting with RekorV2. +Client implementation for interacting with Rekor v2. """ from __future__ import annotations +import base64 import json import logging import requests from cryptography.hazmat.primitives import serialization from cryptography.x509 import Certificate -from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1 -from sigstore_protobuf_specs.dev.sigstore.rekor import v2 -from sigstore_protobuf_specs.io import intoto +from sigstore_models.common import v1 as common_v1 +from sigstore_models.rekor import v2 as rekor_v2 +from sigstore_models.rekor.v1 import TransparencyLogEntry as _TransparencyLogEntry from sigstore._internal import USER_AGENT from sigstore._internal.key_details import _get_key_details @@ -37,7 +38,7 @@ ) from sigstore.dsse import Envelope from sigstore.hashes import Hashed -from sigstore.models import LogEntry +from sigstore.models import TransparencyLogEntry _logger = logging.getLogger(__name__) @@ -55,7 +56,7 @@ def __init__(self, base_url: str) -> None: """ self.url = f"{base_url}/api/v2" - def create_entry(self, payload: EntryRequestBody) -> LogEntry: + def create_entry(self, payload: EntryRequestBody) -> TransparencyLogEntry: """ Submit a new entry for inclusion in the Rekor log. @@ -88,7 +89,8 @@ def create_entry(self, payload: EntryRequestBody) -> LogEntry: integrated_entry = resp.json() _logger.debug(f"integrated: {integrated_entry}") - return LogEntry._from_dict_rekor(integrated_entry) + inner = _TransparencyLogEntry.from_dict(integrated_entry) + return TransparencyLogEntry(inner) @classmethod def _build_hashed_rekord_request( @@ -100,15 +102,17 @@ def _build_hashed_rekord_request( """ Construct a hashed rekord request to submit to Rekor. """ - req = v2.CreateEntryRequest( - hashed_rekord_request_v002=v2.HashedRekordRequestV002( - digest=hashed_input.digest, - signature=v2.Signature( - content=signature, - verifier=v2.Verifier( + req = rekor_v2.entry.CreateEntryRequest( + hashed_rekord_request_v002=rekor_v2.hashedrekord.HashedRekordRequestV002( + digest=base64.b64encode(hashed_input.digest), + signature=rekor_v2.verifier.Signature( + content=base64.b64encode(signature), + verifier=rekor_v2.verifier.Verifier( x509_certificate=common_v1.X509Certificate( - raw_bytes=certificate.public_bytes( - encoding=serialization.Encoding.DER + raw_bytes=base64.b64encode( + certificate.public_bytes( + encoding=serialization.Encoding.DER + ) ) ), key_details=_get_key_details(certificate), @@ -125,24 +129,16 @@ def _build_dsse_request( """ Construct a dsse request to submit to Rekor. """ - req = v2.CreateEntryRequest( - dsse_request_v002=v2.DsseRequestV002( - envelope=intoto.Envelope( - payload=envelope._inner.payload, - payload_type=envelope._inner.payload_type, - signatures=[ - intoto.Signature( - keyid=signature.keyid, - sig=signature.sig, - ) - for signature in envelope._inner.signatures - ], - ), + req = rekor_v2.entry.CreateEntryRequest( + dsse_request_v002=rekor_v2.dsse.DSSERequestV002( + envelope=envelope._inner, verifiers=[ - v2.Verifier( + rekor_v2.verifier.Verifier( x509_certificate=common_v1.X509Certificate( - raw_bytes=certificate.public_bytes( - encoding=serialization.Encoding.DER + raw_bytes=base64.b64encode( + certificate.public_bytes( + encoding=serialization.Encoding.DER + ) ) ), key_details=_get_key_details(certificate), diff --git a/sigstore/_internal/trust.py b/sigstore/_internal/trust.py index d335d87ed..e4da68def 100644 --- a/sigstore/_internal/trust.py +++ b/sigstore/_internal/trust.py @@ -35,29 +35,8 @@ Certificate, load_der_x509_certificate, ) -from sigstore_protobuf_specs.dev.sigstore.common.v1 import PublicKey as _PublicKey -from sigstore_protobuf_specs.dev.sigstore.common.v1 import ( - PublicKeyDetails as _PublicKeyDetails, -) -from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange -from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( - CertificateAuthority as _CertificateAuthority, -) -from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( - ClientTrustConfig as _ClientTrustConfig, -) -from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( - Service, - ServiceConfiguration, - ServiceSelector, - TransparencyLogInstance, -) -from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( - SigningConfig as _SigningConfig, -) -from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( - TrustedRoot as _TrustedRoot, -) +from sigstore_models.common import v1 as common_v1 +from sigstore_models.trustroot import v1 as trustroot_v1 from sigstore._internal.fulcio.client import FulcioClient from sigstore._internal.rekor import RekorLogSubmitter @@ -70,7 +49,6 @@ PublicKey, key_id, load_der_public_key, - read_embedded, ) from sigstore.errors import Error, MetadataError, TUFError, VerificationError @@ -83,7 +61,9 @@ _logger = logging.getLogger(__name__) -def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool: +def _is_timerange_valid( + period: common_v1.TimeRange | None, *, allow_expired: bool +) -> bool: """ Given a `period`, checks that the the current time is not before `start`. If `allow_expired` is `False`, also checks that the current time is not after @@ -116,19 +96,19 @@ class Key: key_id: KeyID _RSA_SHA_256_DETAILS: ClassVar = { - _PublicKeyDetails.PKCS1_RSA_PKCS1V5, - _PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256, - _PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256, - _PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256, + common_v1.PublicKeyDetails.PKCS1_RSA_PKCS1V5, + common_v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256, + common_v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256, + common_v1.PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256, } _EC_DETAILS_TO_HASH: ClassVar = { - _PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(), - _PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(), - _PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(), + common_v1.PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(), + common_v1.PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(), + common_v1.PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(), } - def __init__(self, public_key: _PublicKey) -> None: + def __init__(self, public_key: common_v1.PublicKey) -> None: """ Construct a key from the given Sigstore PublicKey message. """ @@ -147,7 +127,7 @@ def __init__(self, public_key: _PublicKey) -> None: key = load_der_public_key( public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,) ) - elif public_key.key_details == _PublicKeyDetails.PKIX_ED25519: + elif public_key.key_details == common_v1.PublicKeyDetails.PKIX_ED25519: hash_algorithm = None key = load_der_public_key( public_key.raw_bytes, types=(ed25519.Ed25519PublicKey,) @@ -198,7 +178,7 @@ class Keyring: Represents a set of keys, each of which is a potentially valid verifier. """ - def __init__(self, public_keys: list[_PublicKey] = []): + def __init__(self, public_keys: list[common_v1.PublicKey] = []): """ Create a new `Keyring`, with `keys` as the initial set of verifying keys. """ @@ -263,7 +243,7 @@ class CertificateAuthority: Certificate Authority used in a Trusted Root configuration. """ - def __init__(self, inner: _CertificateAuthority): + def __init__(self, inner: trustroot_v1.CertificateAuthority): """ Construct a new `CertificateAuthority`. @@ -278,7 +258,7 @@ def from_json(cls, path: str) -> CertificateAuthority: """ Create a CertificateAuthority directly from JSON. """ - inner = _CertificateAuthority().from_json(Path(path).read_bytes()) + inner = trustroot_v1.CertificateAuthority.from_json(Path(path).read_bytes()) return cls(inner) def _verify(self) -> None: @@ -335,7 +315,7 @@ def __str__(self) -> str: """Returns the variant's string value.""" return self.value - def __init__(self, inner: _SigningConfig): + def __init__(self, inner: trustroot_v1.SigningConfig): """ Construct a new `SigningConfig`. @@ -377,19 +357,19 @@ def from_file( path: str, ) -> SigningConfig: """Create a new signing config from file""" - inner = _SigningConfig().from_json(Path(path).read_bytes()) + inner = trustroot_v1.SigningConfig.from_json(Path(path).read_bytes()) return cls(inner) @staticmethod def _get_valid_services( - services: list[Service], + services: list[trustroot_v1.Service], supported_versions: list[int], - config: ServiceConfiguration | None, - ) -> list[Service]: + config: trustroot_v1.ServiceConfiguration | None, + ) -> list[trustroot_v1.Service]: """Return supported services, taking SigningConfig restrictions into account""" # split services by operator, only include valid services - services_by_operator: dict[str, list[Service]] = defaultdict(list) + services_by_operator: dict[str, list[trustroot_v1.Service]] = defaultdict(list) for service in services: if service.major_api_version not in supported_versions: continue @@ -401,20 +381,21 @@ def _get_valid_services( # build a list of services but make sure we only include one service per operator # and use the highest version available for that operator - result: list[Service] = [] + result: list[trustroot_v1.Service] = [] for op_services in services_by_operator.values(): op_services.sort(key=lambda s: s.major_api_version) result.append(op_services[-1]) # Depending on ServiceSelector, prune the result list - if not config or config.selector == ServiceSelector.ALL: + if not config or config.selector == trustroot_v1.ServiceSelector.ALL: return result - if config.selector == ServiceSelector.UNDEFINED: - raise ValueError("Undefined is not a valid signing config ServiceSelector") - # handle EXACT and ANY selectors - count = config.count if config.selector == ServiceSelector.EXACT else 1 + count = ( + config.count + if config.selector == trustroot_v1.ServiceSelector.EXACT and config.count + else 1 + ) if len(result) < count: raise ValueError( f"Expected {count} services in signing config, found {len(result)}" @@ -474,7 +455,7 @@ def __str__(self) -> str: """Returns the variant's string value.""" return self.value - def __init__(self, inner: _TrustedRoot): + def __init__(self, inner: trustroot_v1.TrustedRoot): """ Construct a new `TrustedRoot`. @@ -501,12 +482,12 @@ def from_file( path: str, ) -> TrustedRoot: """Create a new trust root from file""" - inner = _TrustedRoot().from_json(Path(path).read_bytes()) + inner = trustroot_v1.TrustedRoot.from_json(Path(path).read_bytes()) return cls(inner) def _get_tlog_keys( - self, tlogs: list[TransparencyLogInstance], purpose: KeyringPurpose - ) -> Iterable[_PublicKey]: + self, tlogs: list[trustroot_v1.TransparencyLogInstance], purpose: KeyringPurpose + ) -> Iterable[common_v1.PublicKey]: """ Yields an iterator of public keys for transparency log instances that are suitable for `purpose`. @@ -523,14 +504,18 @@ def _get_tlog_keys( def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring: """Return keyring with keys for Rekor.""" - keys: list[_PublicKey] = list(self._get_tlog_keys(self._inner.tlogs, purpose)) + keys: list[common_v1.PublicKey] = list( + self._get_tlog_keys(self._inner.tlogs, purpose) + ) if len(keys) == 0: raise MetadataError("Did not find any Rekor keys in trusted root") return RekorKeyring(Keyring(keys)) def ct_keyring(self, purpose: KeyringPurpose) -> CTKeyring: """Return keyring with key for CTFE.""" - ctfes: list[_PublicKey] = list(self._get_tlog_keys(self._inner.ctlogs, purpose)) + ctfes: list[common_v1.PublicKey] = list( + self._get_tlog_keys(self._inner.ctlogs, purpose) + ) if not ctfes: raise MetadataError("CTFE keys not found in trusted root") return CTKeyring(Keyring(ctfes)) @@ -585,7 +570,7 @@ def from_json(cls, raw: str) -> ClientTrustConfig: """ Deserialize the given client trust config. """ - inner = _ClientTrustConfig().from_json(raw) + inner = trustroot_v1.ClientTrustConfig.from_json(raw) return cls(inner) @classmethod @@ -626,48 +611,27 @@ def from_tuf( updater = TrustUpdater(url, offline) tr_path = updater.get_trusted_root_path() - inner_tr = _TrustedRoot().from_json(Path(tr_path).read_bytes()) + inner_tr = trustroot_v1.TrustedRoot.from_json(Path(tr_path).read_bytes()) try: sc_path = updater.get_signing_config_path() - inner_sc = _SigningConfig().from_json(Path(sc_path).read_bytes()) + inner_sc = trustroot_v1.SigningConfig.from_json(Path(sc_path).read_bytes()) except TUFError as e: - # TUF repo may not have signing config yet: hard code values for prod: - # https://github.com/sigstore/sigstore-python/issues/1388 - if url == DEFAULT_TUF_URL: - embedded = read_embedded("signing_config.v0.2.json", url) - inner_sc = _SigningConfig().from_json(embedded) - else: - raise e + raise e return cls( - _ClientTrustConfig( - ClientTrustConfig.ClientTrustConfigType.CONFIG_0_1, - inner_tr, - inner_sc, + trustroot_v1.ClientTrustConfig( + media_type=ClientTrustConfig.ClientTrustConfigType.CONFIG_0_1.value, + trusted_root=inner_tr, + signing_config=inner_sc, ) ) - def __init__(self, inner: _ClientTrustConfig) -> None: + def __init__(self, inner: trustroot_v1.ClientTrustConfig) -> None: """ @api private """ self._inner = inner - self._verify() - - def _verify(self) -> None: - """ - Performs various feats of heroism to ensure that the client trust config - is well-formed. - """ - - # The client trust config must have a recognized media type. - try: - ClientTrustConfig.ClientTrustConfigType(self._inner.media_type) - except ValueError: - raise Error( - f"unsupported client trust config format: {self._inner.media_type}" - ) @property def trusted_root(self) -> TrustedRoot: diff --git a/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstage.dev/signing_config.v0.2.json b/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstage.dev/signing_config.v0.2.json index fe66ad97b..62629a3c1 100644 --- a/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstage.dev/signing_config.v0.2.json +++ b/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstage.dev/signing_config.v0.2.json @@ -6,7 +6,8 @@ "majorApiVersion": 1, "validFor": { "start": "2022-04-14T21:38:40Z" - } + }, + "operator": "sigstore.dev" } ], "oidcUrls": [ @@ -15,7 +16,8 @@ "majorApiVersion": 1, "validFor": { "start": "2025-04-16T00:00:00Z" - } + }, + "operator": "sigstore.dev" } ], "rekorTlogUrls": [ @@ -24,7 +26,8 @@ "majorApiVersion": 1, "validFor": { "start": "2021-01-12T11:53:27Z" - } + }, + "operator": "sigstore.dev" } ], "tsaUrls": [ @@ -33,7 +36,8 @@ "majorApiVersion": 1, "validFor": { "start": "2025-04-09T00:00:00Z" - } + }, + "operator": "sigstore.dev" } ], "rekorTlogConfig": { @@ -42,4 +46,4 @@ "tsaConfig": { "selector": "ANY" } -} \ No newline at end of file +} diff --git a/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstore.dev/signing_config.v0.2.json b/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstore.dev/signing_config.v0.2.json index 8e72b7628..beaadec8d 100644 --- a/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstore.dev/signing_config.v0.2.json +++ b/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstore.dev/signing_config.v0.2.json @@ -1,5 +1,4 @@ { - "comment": "Place holder for use until prod actually has a signing config: see ClientTrustConfig.from_tuf()", "mediaType": "application/vnd.dev.sigstore.signingconfig.v0.2+json", "caUrls": [ { @@ -7,7 +6,8 @@ "majorApiVersion": 1, "validFor": { "start": "2022-04-13T20:06:15.000Z" - } + }, + "operator": "sigstore.dev" } ], "oidcUrls": [ @@ -15,8 +15,9 @@ "url": "https://oauth2.sigstore.dev/auth", "majorApiVersion": 1, "validFor": { - "start": "2025-04-30T00:00:00Z" - } + "start": "2022-04-13T20:06:15.000Z" + }, + "operator": "sigstore.dev" } ], "rekorTlogUrls": [ @@ -25,15 +26,24 @@ "majorApiVersion": 1, "validFor": { "start": "2021-01-12T11:53:27.000Z" - } + }, + "operator": "sigstore.dev" } ], "tsaUrls": [ + { + "url": "https://timestamp.sigstore.dev/api/v1/timestamp", + "majorApiVersion": 1, + "validFor": { + "start": "2025-07-04T00:00:00Z" + }, + "operator": "sigstore.dev" + } ], "rekorTlogConfig": { "selector": "ANY" }, "tsaConfig": { - "selector": "ALL" + "selector": "ANY" } -} \ No newline at end of file +} diff --git a/sigstore/_utils.py b/sigstore/_utils.py index 6c42cc67c..6d8433826 100644 --- a/sigstore/_utils.py +++ b/sigstore/_utils.py @@ -33,7 +33,7 @@ load_der_x509_certificate, ) from cryptography.x509.oid import ExtendedKeyUsageOID, ExtensionOID -from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm +from sigstore_models.common.v1 import HashAlgorithm from sigstore import hashes as sigstore_hashes from sigstore.errors import VerificationError diff --git a/sigstore/dsse/__init__.py b/sigstore/dsse/__init__.py index 863c65356..38caf5843 100644 --- a/sigstore/dsse/__init__.py +++ b/sigstore/dsse/__init__.py @@ -18,6 +18,7 @@ from __future__ import annotations +import base64 import logging from typing import Any, Literal, Optional @@ -25,9 +26,9 @@ from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec from pydantic import BaseModel, ConfigDict, Field, RootModel, StrictStr, ValidationError -from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm -from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope -from sigstore_protobuf_specs.io.intoto import Signature +from sigstore_models.common.v1 import HashAlgorithm +from sigstore_models.intoto import Envelope as _Envelope +from sigstore_models.intoto import Signature as _Signature from sigstore.errors import Error, VerificationError from sigstore.hashes import Hashed @@ -223,7 +224,7 @@ def _verify(self) -> None: @classmethod def _from_json(cls, contents: bytes | str) -> Envelope: """Return a DSSE envelope from the given JSON representation.""" - inner = _Envelope().from_json(contents) + inner = _Envelope.from_json(contents) return cls(inner) def to_json(self) -> str: @@ -270,9 +271,9 @@ def _sign(key: ec.EllipticCurvePrivateKey, stmt: Statement) -> Envelope: signature = key.sign(pae, ec.ECDSA(hashes.SHA256())) return Envelope( _Envelope( - payload=stmt._contents, + payload=base64.b64encode(stmt._contents), payload_type=Envelope._TYPE, - signatures=[Signature(sig=signature)], + signatures=[_Signature(sig=base64.b64encode(signature))], ) ) diff --git a/sigstore/errors.py b/sigstore/errors.py index 9cdbcc188..11cda707c 100644 --- a/sigstore/errors.py +++ b/sigstore/errors.py @@ -19,7 +19,7 @@ import sys from collections.abc import Mapping from logging import Logger -from typing import Any +from typing import Any, NoReturn class Error(Exception): @@ -30,7 +30,7 @@ def diagnostics(self) -> str: return str(self) - def log_and_exit(self, logger: Logger, raise_error: bool = False) -> None: + def log_and_exit(self, logger: Logger, raise_error: bool = False) -> NoReturn: """Prints all relevant error information to stderr and exits.""" remind_verbose = ( diff --git a/sigstore/hashes.py b/sigstore/hashes.py index 876f8ea87..d629f753c 100644 --- a/sigstore/hashes.py +++ b/sigstore/hashes.py @@ -20,7 +20,7 @@ from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric.utils import Prehashed from pydantic import BaseModel -from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm +from sigstore_models.common.v1 import HashAlgorithm from sigstore.errors import Error @@ -60,4 +60,4 @@ def __str__(self) -> str: """ Returns a str representation of this `Hashed`. """ - return f"{HashAlgorithm(self.algorithm)}:{self.digest.hex()}" + return f"{self.algorithm.value}:{self.digest.hex()}" diff --git a/sigstore/models.py b/sigstore/models.py index 231b68b2d..71b1c8bfe 100644 --- a/sigstore/models.py +++ b/sigstore/models.py @@ -19,12 +19,11 @@ from __future__ import annotations import base64 -import json import logging import typing from enum import Enum from textwrap import dedent -from typing import Any, Optional +from typing import Any import rfc8785 from cryptography.hazmat.primitives.serialization import Encoding @@ -32,39 +31,24 @@ Certificate, load_der_x509_certificate, ) -from pydantic import ( - BaseModel, - ConfigDict, - Field, - StrictInt, - StrictStr, - TypeAdapter, - ValidationInfo, - field_validator, -) -from pydantic.dataclasses import dataclass +from pydantic import TypeAdapter from rekor_types import Dsse, Hashedrekord, ProposedEntry from rfc3161_client import TimeStampResponse, decode_timestamp_response -from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1 -from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( - Bundle as _Bundle, -) -from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( +from sigstore_models.bundle import v1 as bundle_v1 +from sigstore_models.bundle.v1 import Bundle as _Bundle +from sigstore_models.bundle.v1 import ( TimestampVerificationData as _TimestampVerificationData, ) -from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( - VerificationMaterial as _VerificationMaterial, -) -from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1 -from sigstore_protobuf_specs.dev.sigstore.common.v1 import Rfc3161SignedTimestamp -from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1 -from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import InclusionProof, KindVersion +from sigstore_models.bundle.v1 import VerificationMaterial as _VerificationMaterial +from sigstore_models.common import v1 as common_v1 +from sigstore_models.common.v1 import MessageSignature, RFC3161SignedTimestamp +from sigstore_models.rekor import v1 as rekor_v1 +from sigstore_models.rekor.v1 import TransparencyLogEntry as _TransparencyLogEntry from sigstore import dsse from sigstore._internal.merkle import verify_merkle_inclusion from sigstore._internal.rekor.checkpoint import verify_checkpoint from sigstore._utils import ( - B64Str, KeyID, cert_is_leaf, cert_is_root_ca, @@ -78,115 +62,54 @@ _logger = logging.getLogger(__name__) -class LogInclusionProof(BaseModel): - """ - Represents an inclusion proof for a transparency log entry. - """ - - model_config = ConfigDict(populate_by_name=True) - - checkpoint: StrictStr = Field(..., alias="checkpoint") - hashes: list[StrictStr] = Field(..., alias="hashes") - log_index: StrictInt = Field(..., alias="logIndex") - root_hash: StrictStr = Field(..., alias="rootHash") - tree_size: StrictInt = Field(..., alias="treeSize") - - @field_validator("log_index") - def _log_index_positive(cls, v: int) -> int: - if v < 0: - raise ValueError(f"Inclusion proof has invalid log index: {v} < 0") - return v - - @field_validator("tree_size") - def _tree_size_positive(cls, v: int) -> int: - if v < 0: - raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0") - return v - - @field_validator("tree_size") - def _log_index_within_tree_size( - cls, v: int, info: ValidationInfo, **kwargs: Any - ) -> int: - if "log_index" in info.data and v <= info.data["log_index"]: - raise ValueError( - "Inclusion proof has log index greater than or equal to tree size: " - f"{v} <= {info.data['log_index']}" - ) - return v - - -@dataclass(frozen=True) -class LogEntry: +class TransparencyLogEntry: """ Represents a transparency log entry. - - Log entries are retrieved from the transparency log after signing or verification events, - or loaded from "Sigstore" bundles provided by the user. - - This representation allows for either a missing inclusion promise or a missing - inclusion proof, but not both: attempting to construct a `LogEntry` without - at least one will fail. - """ - - uuid: Optional[str] # noqa: UP045 - """ - This entry's unique ID in the log instance it was retrieved from. - - For sharded log deployments, IDs are unique per-shard. - - Not present for `LogEntry` instances loaded from Sigstore bundles. """ - body: B64Str - """ - The base64-encoded body of the transparency log entry. - """ - - integrated_time: int - """ - The UNIX time at which this entry was integrated into the transparency log. - """ - - log_id: str - """ - The log's ID (as the SHA256 hash of the DER-encoded public key for the log - at the time of entry inclusion). - """ + def __init__(self, inner: _TransparencyLogEntry) -> None: + """ + Creates a new `TransparencyLogEntry` from the given inner object. - log_index: int - """ - The index of this entry within the log. - """ + @private + """ + self._inner = inner + self._validate() - inclusion_proof: LogInclusionProof - """ - An inclusion proof for this log entry. - """ + def _validate(self) -> None: + """ + Ensure this transparency log entry is well-formed and upholds our + client invariants. + """ - inclusion_promise: Optional[B64Str] # noqa: UP045 - """ - An inclusion promise for this log entry, if present. + inclusion_proof: rekor_v1.InclusionProof | None = self._inner.inclusion_proof + # This check is required by us as the client, not the + # protobuf-specs themselves. + if not inclusion_proof or not inclusion_proof.checkpoint: + raise InvalidBundle("entry must contain inclusion proof, with checkpoint") - Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this - log entry. - """ + def __eq__(self, value: object) -> bool: + """ + Compares this `TransparencyLogEntry` with another object for equality. - _kind_version: KindVersion - """ - The kind and version of the log entry. - """ + Two `TransparencyLogEntry` instances are considered equal if their + inner contents are equal. + """ + if not isinstance(value, TransparencyLogEntry): + return NotImplemented + return self._inner == value._inner @classmethod - def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: + def _from_v1_response(cls, dict_: dict[str, Any]) -> TransparencyLogEntry: """ - Create a new `LogEntry` from the given API response. + Create a new `TransparencyLogEntry` from the given API response. """ # Assumes we only get one entry back entries = list(dict_.items()) if len(entries) != 1: raise ValueError("Received multiple entries in response") - uuid, entry = entries[0] + _, entry = entries[0] # Fill in the appropriate kind body_entry: ProposedEntry = TypeAdapter(ProposedEntry).validate_json( @@ -195,104 +118,62 @@ def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: if not isinstance(body_entry, (Hashedrekord, Dsse)): raise InvalidBundle("log entry is not of expected type") - return LogEntry( - uuid=uuid, - body=entry["body"], - integrated_time=entry["integratedTime"], - log_id=entry["logID"], - log_index=entry["logIndex"], - inclusion_proof=LogInclusionProof.model_validate( - entry["verification"]["inclusionProof"] + raw_inclusion_proof = entry["verification"]["inclusionProof"] + + # NOTE: The type ignores below are a consequence of our Pydantic + # modeling: mypy and other typecheckers see `ProtoU64` as `int`, + # but it gets coerced from a string due to Protobuf's JSON serialization. + inner = _TransparencyLogEntry( + log_index=str(entry["logIndex"]), # type: ignore[arg-type] + log_id=common_v1.LogId( + key_id=base64.b64encode(bytes.fromhex(entry["logID"])) ), - inclusion_promise=entry["verification"]["signedEntryTimestamp"], - _kind_version=KindVersion( + kind_version=rekor_v1.KindVersion( kind=body_entry.kind, version=body_entry.api_version ), + integrated_time=str(entry["integratedTime"]), # type: ignore[arg-type] + inclusion_promise=rekor_v1.InclusionPromise( + signed_entry_timestamp=entry["verification"]["signedEntryTimestamp"] + ), + inclusion_proof=rekor_v1.InclusionProof( + log_index=str(raw_inclusion_proof["logIndex"]), # type: ignore[arg-type] + root_hash=base64.b64encode( + bytes.fromhex(raw_inclusion_proof["rootHash"]) + ), + tree_size=str(raw_inclusion_proof["treeSize"]), # type: ignore[arg-type] + hashes=[ + base64.b64encode(bytes.fromhex(h)) + for h in raw_inclusion_proof["hashes"] + ], + checkpoint=rekor_v1.Checkpoint( + envelope=raw_inclusion_proof["checkpoint"] + ), + ), + canonicalized_body=entry["body"], ) - @classmethod - def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry: - """ - Create a new `LogEntry` from the given Rekor TransparencyLogEntry. - """ - tlog_entry = rekor_v1.TransparencyLogEntry() - tlog_entry.from_dict(dict_) - - inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof - # This check is required by us as the client, not the - # protobuf-specs themselves. - if not inclusion_proof or not inclusion_proof.checkpoint.envelope: - raise InvalidBundle("entry must contain inclusion proof, with checkpoint") - - parsed_inclusion_proof = LogInclusionProof( - checkpoint=inclusion_proof.checkpoint.envelope, - hashes=[h.hex() for h in inclusion_proof.hashes], - log_index=inclusion_proof.log_index, - root_hash=inclusion_proof.root_hash.hex(), - tree_size=inclusion_proof.tree_size, - ) - - inclusion_promise: B64Str | None = None - if tlog_entry.inclusion_promise: - inclusion_promise = B64Str( - base64.b64encode( - tlog_entry.inclusion_promise.signed_entry_timestamp - ).decode() - ) - - return LogEntry( - uuid=None, - body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), - integrated_time=tlog_entry.integrated_time, - log_id=tlog_entry.log_id.key_id.hex(), - log_index=tlog_entry.log_index, - inclusion_proof=parsed_inclusion_proof, - _kind_version=tlog_entry.kind_version, - inclusion_promise=inclusion_promise, - ) - - def _to_rekor(self) -> rekor_v1.TransparencyLogEntry: - """ - Create a new protobuf-level `TransparencyLogEntry` from this `LogEntry`. - - @private - """ - inclusion_proof = rekor_v1.InclusionProof( - log_index=self.inclusion_proof.log_index, - root_hash=bytes.fromhex(self.inclusion_proof.root_hash), - tree_size=self.inclusion_proof.tree_size, - hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes], - checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint), - ) - - tlog_entry = rekor_v1.TransparencyLogEntry( - log_index=self.log_index, - log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)), - integrated_time=self.integrated_time, - inclusion_proof=inclusion_proof, - kind_version=self._kind_version, - canonicalized_body=base64.b64decode(self.body), - ) - if self.inclusion_promise: - inclusion_promise = rekor_v1.InclusionPromise( - signed_entry_timestamp=base64.b64decode(self.inclusion_promise) - ) - tlog_entry.inclusion_promise = inclusion_promise - - return tlog_entry + return cls(inner) - def encode_canonical(self) -> bytes: + def _encode_canonical(self) -> bytes: """ Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. This encoded representation is suitable for verification against the Signed Entry Timestamp. """ + # We might not have an integrated time if our log entry is from rekor + # v2, i.e. was integrated synchronously instead of via an + # inclusion promise. + if self._inner.integrated_time is None: + raise ValueError( + "can't encode canonical form for SET without integrated time" + ) + payload: dict[str, int | str] = { - "body": self.body, - "integratedTime": self.integrated_time, - "logID": self.log_id, - "logIndex": self.log_index, + "body": base64.b64encode(self._inner.canonicalized_body).decode(), + "integratedTime": self._inner.integrated_time, + "logID": self._inner.log_id.key_id.hex(), + "logIndex": self._inner.log_index, } return rfc8785.dumps(payload) @@ -305,16 +186,16 @@ def _verify_set(self, keyring: RekorKeyring) -> None: Fails if the given log entry does not contain an inclusion promise. """ - if self.inclusion_promise is None: + if self._inner.inclusion_promise is None: raise VerificationError("SET: invalid inclusion promise: missing") - signed_entry_ts = base64.b64decode(self.inclusion_promise) + signed_entry_ts = self._inner.inclusion_promise.signed_entry_timestamp try: keyring.verify( - key_id=KeyID(bytes.fromhex(self.log_id)), + key_id=KeyID(self._inner.log_id.key_id), signature=signed_entry_ts, - data=self.encode_canonical(), + data=self._encode_canonical(), ) except VerificationError as exc: raise VerificationError(f"SET: invalid inclusion promise: {exc}") @@ -334,12 +215,14 @@ def _verify(self, keyring: RekorKeyring) -> None: verify_merkle_inclusion(self) verify_checkpoint(keyring, self) - _logger.debug(f"successfully verified inclusion proof: index={self.log_index}") + _logger.debug( + f"successfully verified inclusion proof: index={self._inner.log_index}" + ) - if self.inclusion_promise: + if self._inner.inclusion_promise and self._inner.integrated_time: self._verify_set(keyring) _logger.debug( - f"successfully verified inclusion promise: index={self.log_index}" + f"successfully verified inclusion promise: index={self._inner.log_index}" ) @@ -362,10 +245,12 @@ def _verify(self) -> None: It verifies that TimeStamp Responses embedded in the bundle are correctly formed. """ + if not (timestamps := self._inner.rfc3161_timestamps): + timestamps = [] + try: self._signed_ts = [ - decode_timestamp_response(ts.signed_timestamp) - for ts in self._inner.rfc3161_timestamps + decode_timestamp_response(ts.signed_timestamp) for ts in timestamps ] except ValueError: raise VerificationError("Invalid Timestamp Response") @@ -380,7 +265,7 @@ def from_json(cls, raw: str | bytes) -> TimestampVerificationData: """ Deserialize the given timestamp verification data. """ - inner = _TimestampVerificationData().from_json(raw) + inner = _TimestampVerificationData.from_json(raw) return cls(inner) @@ -394,11 +279,16 @@ def __init__(self, inner: _VerificationMaterial) -> None: self._inner = inner @property - def timestamp_verification_data(self) -> TimestampVerificationData: + def timestamp_verification_data(self) -> TimestampVerificationData | None: """ - Returns the Timestamp Verification Data. + Returns the Timestamp Verification Data, if present. """ - return TimestampVerificationData(self._inner.timestamp_verification_data) + if ( + self._inner.timestamp_verification_data + and self._inner.timestamp_verification_data.rfc3161_timestamps + ): + return TimestampVerificationData(self._inner.timestamp_verification_data) + return None class InvalidBundle(Error): @@ -483,8 +373,11 @@ def _verify(self) -> None: # In older bundles, there is an entire pool (misleadingly called # a chain) of certificates, the first of which is the signing # certificate. + if not self._inner.verification_material.x509_certificate_chain: + raise InvalidBundle("expected certificate chain in bundle") + chain = self._inner.verification_material.x509_certificate_chain - if not chain or not chain.certificates: + if not chain.certificates: raise InvalidBundle("expected non-empty certificate chain in bundle") # Per client policy in protobuf-specs: the first entry in the chain @@ -540,22 +433,22 @@ def _verify(self) -> None: # # Before all of this, we require that the inclusion proof be present # (when constructing the LogEntry). - log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict()) + log_entry = TransparencyLogEntry(tlog_entry) if media_type == Bundle.BundleType.BUNDLE_0_1: - if not log_entry.inclusion_promise: + if not log_entry._inner.inclusion_promise: raise InvalidBundle("bundle must contain an inclusion promise") - if not log_entry.inclusion_proof.checkpoint: + if not log_entry._inner.inclusion_proof.checkpoint: _logger.debug( "0.1 bundle contains inclusion proof without checkpoint; ignoring" ) else: - if not log_entry.inclusion_proof.checkpoint: + if not log_entry._inner.inclusion_proof.checkpoint: raise InvalidBundle("expected checkpoint in inclusion proof") if ( - not log_entry.inclusion_promise - and not self._inner.verification_material.timestamp_verification_data.rfc3161_timestamps + not log_entry._inner.inclusion_promise + and not self.verification_material.timestamp_verification_data ): raise InvalidBundle( "bundle must contain an inclusion promise or signed timestamp(s)" @@ -569,7 +462,7 @@ def signing_certificate(self) -> Certificate: return self._signing_certificate @property - def log_entry(self) -> LogEntry: + def log_entry(self) -> TransparencyLogEntry: """ Returns the bundle's log entry, containing an inclusion proof (with checkpoint) and an inclusion promise (if the latter is present). @@ -583,8 +476,8 @@ def _dsse_envelope(self) -> dsse.Envelope | None: @private """ - if self._inner.is_set("dsse_envelope"): - return dsse.Envelope(self._inner.dsse_envelope) # type: ignore[arg-type] + if self._inner.dsse_envelope is not None: + return dsse.Envelope(self._inner.dsse_envelope) return None @property @@ -611,7 +504,10 @@ def from_json(cls, raw: bytes | str) -> Bundle: """ Deserialize the given Sigstore bundle. """ - inner = _Bundle.from_dict(json.loads(raw)) + try: + inner = _Bundle.from_json(raw) + except ValueError as exc: + raise InvalidBundle(f"failed to load bundle: {exc}") return cls(inner) def to_json(self) -> str: @@ -622,14 +518,14 @@ def to_json(self) -> str: def _to_parts( self, - ) -> tuple[Certificate, common_v1.MessageSignature | dsse.Envelope, LogEntry]: + ) -> tuple[Certificate, MessageSignature | dsse.Envelope, TransparencyLogEntry]: """ Decompose the `Bundle` into its core constituent parts. @private """ - content: common_v1.MessageSignature | dsse.Envelope + content: MessageSignature | dsse.Envelope if self._dsse_envelope: content = self._dsse_envelope else: @@ -638,22 +534,24 @@ def _to_parts( return (self.signing_certificate, content, self.log_entry) @classmethod - def from_parts(cls, cert: Certificate, sig: bytes, log_entry: LogEntry) -> Bundle: + def from_parts( + cls, cert: Certificate, sig: bytes, log_entry: TransparencyLogEntry + ) -> Bundle: """ Construct a Sigstore bundle (of `hashedrekord` type) from its constituent parts. """ return cls._from_parts( - cert, common_v1.MessageSignature(signature=sig), log_entry + cert, MessageSignature(signature=base64.b64encode(sig)), log_entry ) @classmethod def _from_parts( cls, cert: Certificate, - content: common_v1.MessageSignature | dsse.Envelope, - log_entry: LogEntry, + content: MessageSignature | dsse.Envelope, + log_entry: TransparencyLogEntry, signed_timestamp: list[TimeStampResponse] | None = None, ) -> Bundle: """ @@ -666,26 +564,32 @@ def _from_parts( if signed_timestamp is not None: timestamp_verifcation_data.rfc3161_timestamps.extend( [ - Rfc3161SignedTimestamp(signed_timestamp=response.as_bytes()) + RFC3161SignedTimestamp( + signed_timestamp=base64.b64encode(response.as_bytes()) + ) for response in signed_timestamp ] ) - # Fill in the appropriate variants. - if isinstance(content, common_v1.MessageSignature): - # mypy will be mystified if types are specified here - content_dict: dict[str, Any] = {"message_signature": content} + # Fill in the appropriate variant. + message_signature = None + dsse_envelope = None + if isinstance(content, MessageSignature): + message_signature = content else: - content_dict = {"dsse_envelope": content._inner} + dsse_envelope = content._inner inner = _Bundle( media_type=Bundle.BundleType.BUNDLE_0_3.value, verification_material=bundle_v1.VerificationMaterial( - certificate=common_v1.X509Certificate(cert.public_bytes(Encoding.DER)), - tlog_entries=[log_entry._to_rekor()], + certificate=common_v1.X509Certificate( + raw_bytes=base64.b64encode(cert.public_bytes(Encoding.DER)) + ), + tlog_entries=[log_entry._inner], timestamp_verification_data=timestamp_verifcation_data, ), - **content_dict, + message_signature=message_signature, + dsse_envelope=dsse_envelope, ) return cls(inner) diff --git a/sigstore/sign.py b/sigstore/sign.py index efff22f8c..0aa62333a 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -38,6 +38,7 @@ from __future__ import annotations +import base64 import logging from collections.abc import Iterator from contextlib import contextmanager @@ -47,10 +48,7 @@ from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec from cryptography.x509.oid import NameOID -from sigstore_protobuf_specs.dev.sigstore.common.v1 import ( - HashOutput, - MessageSignature, -) +from sigstore_models.common.v1 import HashOutput, MessageSignature from sigstore import dsse from sigstore import hashes as sigstore_hashes @@ -190,7 +188,9 @@ def _finalize_sign( # Submit the proposed entry to the transparency log entry = self._signing_ctx._rekor.create_entry(proposed_entry) - _logger.debug(f"Transparency log entry created with index: {entry.log_index}") + _logger.debug( + f"Transparency log entry created with index: {entry._inner.log_index}" + ) return Bundle._from_parts(cert, content, entry, signed_timestamp) @@ -247,9 +247,9 @@ def sign_artifact( content = MessageSignature( message_digest=HashOutput( algorithm=hashed_input.algorithm, - digest=hashed_input.digest, + digest=base64.b64encode(hashed_input.digest), ), - signature=artifact_signature, + signature=base64.b64encode(artifact_signature), ) # Create the proposed hashedrekord entry diff --git a/sigstore/verify/verifier.py b/sigstore/verify/verifier.py index 9347e77bb..525335147 100644 --- a/sigstore/verify/verifier.py +++ b/sigstore/verify/verifier.py @@ -39,8 +39,8 @@ from pydantic import ValidationError from rfc3161_client import TimeStampResponse, VerifierBuilder from rfc3161_client import VerificationError as Rfc3161VerificationError -from sigstore_protobuf_specs.dev.sigstore.common import v1 -from sigstore_protobuf_specs.dev.sigstore.rekor import v2 +from sigstore_models.common import v1 +from sigstore_models.rekor import v2 from sigstore import dsse from sigstore._internal.rekor import _hashedrekord_from_parts @@ -175,9 +175,13 @@ def _verify_timestamp_authority( Returns the number of valid signed timestamp in the bundle. """ - timestamp_responses = ( - bundle.verification_material.timestamp_verification_data.rfc3161_timestamps - ) + timestamp_responses = [] + if ( + timestamp_verification_data + := bundle.verification_material.timestamp_verification_data + ): + timestamp_responses = timestamp_verification_data.rfc3161_timestamps + if len(timestamp_responses) > MAX_ALLOWED_TIMESTAMP: msg = f"too many signed timestamp: {len(timestamp_responses)} > {MAX_ALLOWED_TIMESTAMP}" raise VerificationError(msg) @@ -206,7 +210,7 @@ def _establish_time(self, bundle: Bundle) -> list[TimestampVerificationResult]: # If a timestamp from the timestamping service is available, the Verifier MUST # perform path validation using the timestamp from the Timestamping Service. - if bundle.verification_material.timestamp_verification_data.rfc3161_timestamps: + if bundle.verification_material.timestamp_verification_data: if not self._trusted_root.get_timestamp_authorities(): msg = ( "no Timestamp Authorities have been provided to validate this " @@ -223,9 +227,9 @@ def _establish_time(self, bundle: Bundle) -> list[TimestampVerificationResult]: # promise that cryptographically binds it. We verify the inclusion promise # itself later, as part of log entry verification. if ( - timestamp := bundle.log_entry.integrated_time - ) and bundle.log_entry.inclusion_promise: - kv = bundle.log_entry._kind_version + timestamp := bundle.log_entry._inner.integrated_time + ) and bundle.log_entry._inner.inclusion_promise: + kv = bundle.log_entry._inner.kind_version if not (kv.kind in ["dsse", "hashedrekord"] and kv.version == "0.0.1"): raise VerificationError( "Integrated time only supported for dsse/hashedrekord 0.0.1 types" @@ -427,17 +431,17 @@ def verify_dsse( # Instead, we manually pick apart the entry body below and verify # the parts we can (namely the payload hash and signature list). entry = bundle.log_entry - if entry._kind_version.kind != "dsse": + if entry._inner.kind_version.kind != "dsse": raise VerificationError( - f"Expected entry type dsse, got {entry._kind_version.kind}" + f"Expected entry type dsse, got {entry._inner.kind_version.kind}" ) - if entry._kind_version.version == "0.0.2": + if entry._inner.kind_version.version == "0.0.2": _validate_dsse_v002_entry_body(bundle) - elif entry._kind_version.version == "0.0.1": + elif entry._inner.kind_version.version == "0.0.1": _validate_dsse_v001_entry_body(bundle) else: raise VerificationError( - f"Unsupported dsse version {entry._kind_version.version}" + f"Unsupported dsse version {entry._inner.kind_version.version}" ) return (envelope._inner.payload_type, envelope._inner.payload) @@ -483,18 +487,18 @@ def verify_artifact( # (8): verify the consistency of the log entry's body against # the other bundle materials (and input being verified). entry = bundle.log_entry - if entry._kind_version.kind != "hashedrekord": + if entry._inner.kind_version.kind != "hashedrekord": raise VerificationError( - f"Expected entry type hashedrekord, got {entry._kind_version.kind}" + f"Expected entry type hashedrekord, got {entry._inner.kind_version.kind}" ) - if entry._kind_version.version == "0.0.2": - _validate_hashedrekord_v002_entry_body(bundle) - elif entry._kind_version.version == "0.0.1": + if entry._inner.kind_version.version == "0.0.2": + _validate_hashedrekord_v002_entry_body(bundle, hashed_input) + elif entry._inner.kind_version.version == "0.0.1": _validate_hashedrekord_v001_entry_body(bundle, hashed_input) else: raise VerificationError( - f"Unsupported hashedrekord version {entry._kind_version.version}" + f"Unsupported hashedrekord version {entry._inner.kind_version.version}" ) @@ -509,7 +513,9 @@ def _validate_dsse_v001_entry_body(bundle: Bundle) -> None: "cannot perform DSSE verification on a bundle without a DSSE envelope" ) try: - entry_body = rekor_types.Dsse.model_validate_json(base64.b64decode(entry.body)) + entry_body = rekor_types.Dsse.model_validate_json( + entry._inner.canonicalized_body + ) except ValidationError as exc: raise VerificationError(f"invalid DSSE log entry: {exc}") @@ -547,7 +553,7 @@ def _validate_dsse_v002_entry_body(bundle: Bundle) -> None: "cannot perform DSSE verification on a bundle without a DSSE envelope" ) try: - v2_body = v2.Entry().from_json(base64.b64decode(entry.body)) + v2_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body) except ValidationError as exc: raise VerificationError(f"invalid DSSE log entry: {exc}") @@ -562,8 +568,8 @@ def _validate_dsse_v002_entry_body(bundle: Bundle) -> None: raise VerificationError("DSSE entry payload hash does not match bundle") v2_signatures = [ - v2.Signature( - content=signature.sig, + v2.verifier.Signature( + content=base64.b64encode(signature.sig), verifier=_v2_verifier_from_certificate(bundle.signing_certificate), ) for signature in envelope._inner.signatures @@ -585,7 +591,7 @@ def _validate_hashedrekord_v001_entry_body( hashed_input, ) actual_body = rekor_types.Hashedrekord.model_validate_json( - base64.b64decode(entry.body) + entry._inner.canonicalized_body ) if expected_body != actual_body: raise VerificationError( @@ -593,7 +599,9 @@ def _validate_hashedrekord_v001_entry_body( ) -def _validate_hashedrekord_v002_entry_body(bundle: Bundle) -> None: +def _validate_hashedrekord_v002_entry_body( + bundle: Bundle, hashed_input: Hashed +) -> None: """ Validate Entry body for hashedrekord v002. """ @@ -602,32 +610,32 @@ def _validate_hashedrekord_v002_entry_body(bundle: Bundle) -> None: raise VerificationError( "invalid hashedrekord log entry: missing message signature" ) - v2_expected_body = v2.Entry( - kind=entry._kind_version.kind, - api_version=entry._kind_version.version, - spec=v2.Spec( - hashed_rekord_v002=v2.HashedRekordLogEntryV002( + v2_expected_body = v2.entry.Entry( + kind=entry._inner.kind_version.kind, + api_version=entry._inner.kind_version.version, + spec=v2.entry.Spec( + hashed_rekord_v002=v2.hashedrekord.HashedRekordLogEntryV002( data=v1.HashOutput( - algorithm=bundle._inner.message_signature.message_digest.algorithm, - digest=bundle._inner.message_signature.message_digest.digest, + algorithm=hashed_input.algorithm, + digest=base64.b64encode(hashed_input.digest), ), - signature=v2.Signature( - content=bundle._inner.message_signature.signature, + signature=v2.verifier.Signature( + content=base64.b64encode(bundle._inner.message_signature.signature), verifier=_v2_verifier_from_certificate(bundle.signing_certificate), ), ) ), ) - v2_actual_body = v2.Entry().from_json(base64.b64decode(entry.body)) + v2_actual_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body) if v2_expected_body != v2_actual_body: raise VerificationError( "transparency log entry is inconsistent with other materials" ) -def _v2_verifier_from_certificate(certificate: Certificate) -> v2.Verifier: +def _v2_verifier_from_certificate(certificate: Certificate) -> v2.verifier.Verifier: """ - Return a Rekor v2 protobuf Verifier for the signing certificate. + Return a Rekor v2 Verifier for the signing certificate. This method decides which signature algorithms are supported for verification (in a rekor v2 entry), see @@ -649,9 +657,11 @@ def _v2_verifier_from_certificate(certificate: Certificate) -> v2.Verifier: else: raise ValueError(f"Unsupported public key type: {type(public_key)}") - return v2.Verifier( + return v2.verifier.Verifier( x509_certificate=v1.X509Certificate( - certificate.public_bytes(encoding=serialization.Encoding.DER) + raw_bytes=base64.b64encode( + certificate.public_bytes(encoding=serialization.Encoding.DER) + ) ), - key_details=cast(v1.PublicKeyDetails, key_details), + key_details=key_details, ) diff --git a/test/assets/signing_config/signingconfig-only-v1-rekor.v2.json b/test/assets/signing_config/signingconfig-only-v1-rekor.v2.json index 1a4305259..abb1234f6 100644 --- a/test/assets/signing_config/signingconfig-only-v1-rekor.v2.json +++ b/test/assets/signing_config/signingconfig-only-v1-rekor.v2.json @@ -6,7 +6,8 @@ "majorApiVersion": 1, "validFor": { "start": "2023-04-14T21:38:40Z" - } + }, + "operator": "example.com" }, { "url": "https://fulcio-old.example.com", @@ -14,7 +15,8 @@ "validFor": { "start": "2022-04-14T21:38:40Z", "end": "2023-04-14T21:38:40Z" - } + }, + "operator": "example.com" } ], "oidcUrls": [ @@ -23,7 +25,8 @@ "majorApiVersion": 1, "validFor": { "start": "2025-04-16T00:00:00Z" - } + }, + "operator": "example.com" } ], "rekorTlogUrls": [ @@ -32,7 +35,8 @@ "majorApiVersion": 1, "validFor": { "start": "2021-01-12T11:53:27Z" - } + }, + "operator": "example.com" } ], "tsaUrls": [ @@ -41,7 +45,8 @@ "majorApiVersion": 1, "validFor": { "start": "2025-04-09T00:00:00Z" - } + }, + "operator": "example.com" } ], "rekorTlogConfig": { diff --git a/test/assets/signing_config/signingconfig.v2.json b/test/assets/signing_config/signingconfig.v2.json index 901d10b40..38a74a9af 100644 --- a/test/assets/signing_config/signingconfig.v2.json +++ b/test/assets/signing_config/signingconfig.v2.json @@ -6,7 +6,8 @@ "majorApiVersion": 1, "validFor": { "start": "2023-04-14T21:38:40Z" - } + }, + "operator": "example.com" }, { "url": "https://fulcio-old.example.com", @@ -14,7 +15,8 @@ "validFor": { "start": "2022-04-14T21:38:40Z", "end": "2023-04-14T21:38:40Z" - } + }, + "operator": "example.com" } ], "oidcUrls": [ @@ -23,7 +25,8 @@ "majorApiVersion": 1, "validFor": { "start": "2025-04-16T00:00:00Z" - } + }, + "operator": "example.com" } ], "rekorTlogUrls": [ @@ -32,14 +35,16 @@ "majorApiVersion": 1, "validFor": { "start": "2021-01-12T11:53:27Z" - } + }, + "operator": "example.com" }, { "url": "https://rekor-v2.example.com", "majorApiVersion": 2, "validFor": { "start": "2021-01-12T11:53:27Z" - } + }, + "operator": "example.com" } ], "tsaUrls": [ @@ -48,7 +53,8 @@ "majorApiVersion": 1, "validFor": { "start": "2025-04-09T00:00:00Z" - } + }, + "operator": "example.com" } ], "rekorTlogConfig": { diff --git a/test/assets/staging-tuf/13.snapshot.json b/test/assets/staging-tuf/16.snapshot.json similarity index 54% rename from test/assets/staging-tuf/13.snapshot.json rename to test/assets/staging-tuf/16.snapshot.json index 1eb631496..c9d54afce 100644 --- a/test/assets/staging-tuf/13.snapshot.json +++ b/test/assets/staging-tuf/16.snapshot.json @@ -2,21 +2,21 @@ "signatures": [ { "keyid": "c3479007e861445ce5dc109d9661ed77b35bbc0e3f161852c46114266fc2daa4", - "sig": "3046022100c36bf62c4b5f72f8e3defc1af05148518a282394b304f0e0a154c10feeaee9a1022100ed8bb83508e1fcd3906bdf71af0da30f066a048db0f8da589db7dfe5f1458537" + "sig": "304402202733036a5044a3257392cb6737c80d1972aa2bce8e7194fac23e3d0b939e83ce0220797111c4aa47094278a2997d727c728fcda795b02b8ec803e2265fdac9614a21" } ], "signed": { "_type": "snapshot", - "expires": "2035-04-30T07:17:48Z", + "expires": "2035-06-11T11:54:57Z", "meta": { "registry.npmjs.org.json": { "version": 5 }, "targets.json": { - "version": 13 + "version": 17 } }, "spec_version": "1.0", - "version": 13 + "version": 16 } } \ No newline at end of file diff --git a/test/assets/staging-tuf/13.targets.json b/test/assets/staging-tuf/17.targets.json similarity index 85% rename from test/assets/staging-tuf/13.targets.json rename to test/assets/staging-tuf/17.targets.json index e95d33949..ad1ddbf04 100644 --- a/test/assets/staging-tuf/13.targets.json +++ b/test/assets/staging-tuf/17.targets.json @@ -2,11 +2,11 @@ "signatures": [ { "keyid": "aa61e09f6af7662ac686cf0c6364079f63d3e7a86836684eeced93eace3acd81", - "sig": "3046022100c1968b55a40906590168f9b9ecd2251ef4056f79e9067fb80374ad4bc1a770a102210085d17acfcd779f8d004b54e0c5170e9e4629487603859bf85f4519d46ef3a994" + "sig": "3045022031cbae59944160c1b9b1df859c43cf74d8c5257c32924f1c78146ccd621aae53022100cc8097664966a0f187e41643a61524613434517ec97c9a21f319752fd842e122" }, { "keyid": "61f9609d2655b346fcebccd66b509d5828168d5e447110e261f0bcc8553624bc", - "sig": "3046022100fc18a5d048d94be077f240866f344bc679098dde898f4d61ed44ba1cd37f86ec022100cc3b9d06b15ea56f953afbd3917a53c674b86e94ee5d3ffb160f3f465c2fee70" + "sig": "30440220149fb96582721bcaf506b06465cf8df9b4b4c7847f19165eec8f7faeccc61ed8022020090a30e448e7cd71824bf0042ce9982b8882e557be343a919ffc4d825927f6" }, { "keyid": "9471fbda95411d10109e467ad526082d15f14a38de54ea2ada9687ab39d8e237", @@ -44,7 +44,7 @@ } ] }, - "expires": "2035-04-27T13:57:15Z", + "expires": "2035-06-10T18:17:38Z", "spec_version": "1.0", "targets": { "ctfe.pub": { @@ -133,18 +133,18 @@ }, "signing_config.v0.2.json": { "hashes": { - "sha256": "cb9a48c332a0d515db7760ad6972a09a0f4ed721fe5e839b70371e0d0802abe2" + "sha256": "0f395087486ba318321eda478d847962b1dd89846c7dc6e95752a6b110669393" }, - "length": 885 + "length": 1022 }, "trusted_root.json": { "hashes": { - "sha256": "3f8ab41b9311910106caf66cb5e4117b1bee0d1871def4e816c6c60cee69d421" + "sha256": "ed6a9cf4e7c2e3297a4b5974fce0d17132f03c63512029d7aa3a402b43acab49" }, - "length": 6399 + "length": 6824 } }, - "version": 13, + "version": 17, "x-tuf-on-ci-expiry-period": 3650, "x-tuf-on-ci-signing-period": 365 } diff --git a/test/assets/staging-tuf/targets/cb9a48c332a0d515db7760ad6972a09a0f4ed721fe5e839b70371e0d0802abe2.signing_config.v0.2.json b/test/assets/staging-tuf/targets/0f395087486ba318321eda478d847962b1dd89846c7dc6e95752a6b110669393.signing_config.v0.2.json similarity index 83% rename from test/assets/staging-tuf/targets/cb9a48c332a0d515db7760ad6972a09a0f4ed721fe5e839b70371e0d0802abe2.signing_config.v0.2.json rename to test/assets/staging-tuf/targets/0f395087486ba318321eda478d847962b1dd89846c7dc6e95752a6b110669393.signing_config.v0.2.json index fe66ad97b..b5680adc2 100644 --- a/test/assets/staging-tuf/targets/cb9a48c332a0d515db7760ad6972a09a0f4ed721fe5e839b70371e0d0802abe2.signing_config.v0.2.json +++ b/test/assets/staging-tuf/targets/0f395087486ba318321eda478d847962b1dd89846c7dc6e95752a6b110669393.signing_config.v0.2.json @@ -6,7 +6,8 @@ "majorApiVersion": 1, "validFor": { "start": "2022-04-14T21:38:40Z" - } + }, + "operator": "sigstore.dev" } ], "oidcUrls": [ @@ -15,7 +16,8 @@ "majorApiVersion": 1, "validFor": { "start": "2025-04-16T00:00:00Z" - } + }, + "operator": "sigstore.dev" } ], "rekorTlogUrls": [ @@ -24,7 +26,8 @@ "majorApiVersion": 1, "validFor": { "start": "2021-01-12T11:53:27Z" - } + }, + "operator": "sigstore.dev" } ], "tsaUrls": [ @@ -33,7 +36,8 @@ "majorApiVersion": 1, "validFor": { "start": "2025-04-09T00:00:00Z" - } + }, + "operator": "sigstore.dev" } ], "rekorTlogConfig": { diff --git a/test/assets/staging-tuf/targets/3f8ab41b9311910106caf66cb5e4117b1bee0d1871def4e816c6c60cee69d421.trusted_root.json b/test/assets/staging-tuf/targets/ed6a9cf4e7c2e3297a4b5974fce0d17132f03c63512029d7aa3a402b43acab49.trusted_root.json similarity index 93% rename from test/assets/staging-tuf/targets/3f8ab41b9311910106caf66cb5e4117b1bee0d1871def4e816c6c60cee69d421.trusted_root.json rename to test/assets/staging-tuf/targets/ed6a9cf4e7c2e3297a4b5974fce0d17132f03c63512029d7aa3a402b43acab49.trusted_root.json index 8691ef5d3..d565b63e9 100644 --- a/test/assets/staging-tuf/targets/3f8ab41b9311910106caf66cb5e4117b1bee0d1871def4e816c6c60cee69d421.trusted_root.json +++ b/test/assets/staging-tuf/targets/ed6a9cf4e7c2e3297a4b5974fce0d17132f03c63512029d7aa3a402b43acab49.trusted_root.json @@ -14,6 +14,20 @@ "logId": { "keyId": "0y8wo8MtY5wrdiIFohx7sHeI5oKDpK5vQhGHI6G+pJY=" } + }, + { + "baseUrl": "https://log2025-alpha1.rekor.sigstage.dev", + "hashAlgorithm": "SHA2_256", + "publicKey": { + "rawBytes": "MCowBQYDK2VwAyEAPn+AREHoBaZ7wgS1zBqpxmLSGnyhxXj4lFxSdWVB8o8=", + "keyDetails": "PKIX_ED25519", + "validFor": { + "start": "2025-04-16T00:00:00Z" + } + }, + "logId": { + "keyId": "8w1amZ2S5mJIQkQmPxdMuOrL/oJkvFg9MnQXmeOCXck=" + } } ], "certificateAuthorities": [ diff --git a/test/assets/staging-tuf/timestamp.json b/test/assets/staging-tuf/timestamp.json index 8feb575b0..c2e2cc89f 100644 --- a/test/assets/staging-tuf/timestamp.json +++ b/test/assets/staging-tuf/timestamp.json @@ -2,18 +2,18 @@ "signatures": [ { "keyid": "c3479007e861445ce5dc109d9661ed77b35bbc0e3f161852c46114266fc2daa4", - "sig": "30450220665b03b09118979b8c8d93b55077279e0424ae5802a0f59e14fdccef49b0c420022100f2fd10223ca19ee7e0671839e69508e8fd4a5ea875cf7e19fe6d0d77acd604a3" + "sig": "3046022100fedb5a3d1a3c461c1337d7535edca8012fb0ab8da31315dbdf22b7f38f76973e022100a87967789d2d2942919dcc4f33def8ee74745f577ff0ef5479cc9f573842e8de" } ], "signed": { "_type": "timestamp", - "expires": "2025-05-09T07:17:49Z", + "expires": "2025-07-29T13:28:44Z", "meta": { "snapshot.json": { - "version": 13 + "version": 16 } }, "spec_version": "1.0", - "version": 280 + "version": 353 } } \ No newline at end of file diff --git a/test/assets/trust_config/config.v1.json b/test/assets/trust_config/config.v1.json index 376d73319..d70a32f28 100644 --- a/test/assets/trust_config/config.v1.json +++ b/test/assets/trust_config/config.v1.json @@ -121,16 +121,18 @@ "url": "https://fulcio.example.com", "majorApiVersion": 1, "validFor": { - "start": "2023-04-14T21:38:40Z" - } + "start": "2023-04-14T21:38:40Z" + }, + "operator": "example.com" }, { "url": "https://fulcio-old.example.com", "majorApiVersion": 1, "validFor": { - "start": "2022-04-14T21:38:40Z", - "end": "2023-04-14T21:38:40Z" - } + "start": "2022-04-14T21:38:40Z", + "end": "2023-04-14T21:38:40Z" + }, + "operator": "example.com" } ], "oidcUrls": [ @@ -139,23 +141,26 @@ "majorApiVersion": 1, "validFor": { "start": "2025-04-16T00:00:00Z" - } + }, + "operator": "example.com" } - ], - "rekorTlogUrls": [ + ], + "rekorTlogUrls": [ { "url": "https://rekor.example.com", "majorApiVersion": 1, "validFor": { "start": "2021-01-12T11:53:27Z" - } + }, + "operator": "example.com" }, { "url": "https://rekor-v2.example.com", "majorApiVersion": 2, "validFor": { "start": "2021-01-12T11:53:27Z" - } + }, + "operator": "example.com" } ], "tsaUrls": [ @@ -164,7 +169,8 @@ "majorApiVersion": 1, "validFor": { "start": "2025-04-09T00:00:00Z" - } + }, + "operator": "example.com" } ], "rekorTlogConfig": { diff --git a/test/assets/trusted_root/trustedroot.v1.json b/test/assets/trusted_root/trustedroot.v1.json index 4f5a9726f..190c76a65 100644 --- a/test/assets/trusted_root/trustedroot.v1.json +++ b/test/assets/trusted_root/trustedroot.v1.json @@ -14,20 +14,6 @@ "logId": { "keyId": "wNI9atQGlz+VWfO6LRygH4QUfY/8W4RFwiT5i5WRgB0=" } - }, - { - "baseUrl": "https://example.com/unsupported_key", - "hashAlgorithm": "SHA2_256", - "publicKey": { - "rawBytes": "", - "keyDetails": "UNSPECIFIED", - "validFor": { - "start": "2021-01-12T11:53:27.000Z" - } - }, - "logId": { - "keyId": "xNI9atQGlz+VWfO6LRygH4QUfY/8W4RFwiT5i5WRgB0=" - } } ], "certificateAuthorities": [ diff --git a/test/assets/trusted_root/trustedroot.v1.local_tlog_ed25519_rekor-tiles.json b/test/assets/trusted_root/trustedroot.v1.local_tlog_ed25519_rekor-tiles.json index a4ba11bdf..4e79be8f2 100644 --- a/test/assets/trusted_root/trustedroot.v1.local_tlog_ed25519_rekor-tiles.json +++ b/test/assets/trusted_root/trustedroot.v1.local_tlog_ed25519_rekor-tiles.json @@ -14,20 +14,6 @@ "logId": { "keyId": "tAlACZWkUrif9Z9sOIrpk1ak1I8loRNufk79N6l1SNg=" } - }, - { - "baseUrl": "https://example.com/unsupported_key", - "hashAlgorithm": "SHA2_256", - "publicKey": { - "rawBytes": "", - "keyDetails": "UNSPECIFIED", - "validFor": { - "start": "2021-01-12T11:53:27.000Z" - } - }, - "logId": { - "keyId": "xNI9atQGlz+VWfO6LRygH4QUfY/8W4RFwiT5i5WRgB0=" - } } ], "certificateAuthorities": [ diff --git a/test/assets/tsa/trust_config.json b/test/assets/tsa/trust_config.json index 273f27395..6510f1bcd 100644 --- a/test/assets/tsa/trust_config.json +++ b/test/assets/tsa/trust_config.json @@ -120,7 +120,8 @@ "majorApiVersion": 1, "validFor": { "start": "2022-04-14T21:38:40.000Z" - } + }, + "operator": "sigstage.dev" } ], "rekorTlogUrls": [ @@ -129,7 +130,8 @@ "majorApiVersion": 1, "validFor": { "start": "2021-01-12T11:53:27.000Z" - } + }, + "operator": "sigstage.dev" } ], "tsaUrls": [ @@ -138,7 +140,8 @@ "majorApiVersion": 1, "validFor": { "start": "2024-11-07T14:59:40.000Z" - } + }, + "operator": "sigstage.dev" } ], "rekorTlogConfig": { diff --git a/test/integration/cli/test_plumbing.py b/test/integration/cli/test_plumbing.py index 62c014ded..487b05b49 100644 --- a/test/integration/cli/test_plumbing.py +++ b/test/integration/cli/test_plumbing.py @@ -14,7 +14,7 @@ import pytest -from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm +from sigstore_models.common.v1 import HashAlgorithm from sigstore.hashes import Hashed from sigstore.models import Bundle, InvalidBundle diff --git a/test/unit/conftest.py b/test/unit/conftest.py index da3ca1573..43dcd0028 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -31,7 +31,7 @@ from id import ( detect_credential, ) -from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import Service +from sigstore_models.trustroot.v1 import Service from tuf.api.exceptions import DownloadHTTPError from tuf.ngclient import FetcherInterface, updater @@ -251,7 +251,11 @@ def staging_with_rekorv2() -> tuple[ def signer(): trust_config = ClientTrustConfig.staging() trust_config.signing_config._tlogs.append( - Service("https://log2025-alpha1.rekor.sigstage.dev", 2) + Service( + url="https://log2025-alpha1.rekor.sigstage.dev", + major_api_version=2, + operator="sigstage.dev", + ) ) return SigningContext.from_trust_config(trust_config) diff --git a/test/unit/internal/rekor/test_client_v2.py b/test/unit/internal/rekor/test_client_v2.py index 41c0e52a2..9c650a2cc 100644 --- a/test/unit/internal/rekor/test_client_v2.py +++ b/test/unit/internal/rekor/test_client_v2.py @@ -17,10 +17,7 @@ import pytest from sigstore import dsse -from sigstore._internal.rekor.client_v2 import ( - LogEntry, -) -from sigstore.models import rekor_v1 +from sigstore.models import TransparencyLogEntry @pytest.mark.staging @@ -54,8 +51,7 @@ def test_rekor_v2_create_entry_dsse(staging_with_rekorv2): with sign_ctx.signer(identity) as signer: bundle = signer.sign_dsse(stmt) - assert isinstance(bundle.log_entry, LogEntry) - assert isinstance(bundle.log_entry._to_rekor(), rekor_v1.TransparencyLogEntry) + assert isinstance(bundle.log_entry, TransparencyLogEntry) @pytest.mark.staging @@ -71,5 +67,4 @@ def test_rekor_v2_create_entry_hashed_rekord(staging_with_rekorv2): with sign_ctx.signer(identity) as signer: bundle = signer.sign_artifact(b"") - assert isinstance(bundle.log_entry, LogEntry) - assert isinstance(bundle.log_entry._to_rekor(), rekor_v1.TransparencyLogEntry) + assert isinstance(bundle.log_entry, TransparencyLogEntry) diff --git a/test/unit/internal/test_key_details.py b/test/unit/internal/test_key_details.py index 43302fcba..b5bdac802 100644 --- a/test/unit/internal/test_key_details.py +++ b/test/unit/internal/test_key_details.py @@ -16,7 +16,7 @@ import pytest from cryptography.hazmat.primitives.asymmetric import dsa, ec, ed25519, padding, rsa -from sigstore_protobuf_specs.dev.sigstore.common import v1 +from sigstore_models.common.v1 import PublicKeyDetails from sigstore._internal.key_details import _get_key_details @@ -128,4 +128,4 @@ def test_get_key_details(mock_certificate): Ensures that we return a PublicKeyDetails for supported key types and schemes. """ key_details = _get_key_details(mock_certificate) - assert isinstance(key_details, v1.PublicKeyDetails) + assert isinstance(key_details, PublicKeyDetails) diff --git a/test/unit/internal/test_trust.py b/test/unit/internal/test_trust.py index 4eef7f68c..26b7278e7 100644 --- a/test/unit/internal/test_trust.py +++ b/test/unit/internal/test_trust.py @@ -19,8 +19,8 @@ import pytest from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cryptography.x509 import load_pem_x509_certificate -from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange -from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( +from sigstore_models.common.v1 import TimeRange +from sigstore_models.trustroot.v1 import ( Service, ServiceConfiguration, ServiceSelector, @@ -42,16 +42,16 @@ from sigstore.errors import Error # Test data for TestSigningcconfig -_service_v1_op1 = Service("url1", major_api_version=1, operator="op1") -_service2_v1_op1 = Service("url2", major_api_version=1, operator="op1") -_service_v2_op1 = Service("url3", major_api_version=2, operator="op1") -_service_v1_op2 = Service("url4", major_api_version=1, operator="op2") -_service_v1_op3 = Service("url5", major_api_version=1, operator="op3") +_service_v1_op1 = Service(url="url1", major_api_version=1, operator="op1") +_service2_v1_op1 = Service(url="url2", major_api_version=1, operator="op1") +_service_v2_op1 = Service(url="url3", major_api_version=2, operator="op1") +_service_v1_op2 = Service(url="url4", major_api_version=1, operator="op2") +_service_v1_op3 = Service(url="url5", major_api_version=1, operator="op3") _service_v1_op4 = Service( - "url6", + url="url6", major_api_version=1, operator="op4", - valid_for=TimeRange(datetime(3000, 1, 1, tzinfo=timezone.utc)), + valid_for=TimeRange(start=datetime(3000, 1, 1, tzinfo=timezone.utc)), ) @@ -61,6 +61,7 @@ def test_good(self, asset): authority = CertificateAuthority.from_json(path) assert len(authority.certificates(allow_expired=True)) == 3 + assert authority.validity_period_end is not None assert authority.validity_period_start < authority.validity_period_end def test_missing_root(self, asset): @@ -69,7 +70,7 @@ def test_missing_root(self, asset): CertificateAuthority.from_json(path) -class TestSigningcconfig: +class TestSigningConfig: def test_good(self, asset): path = asset("signing_config/signingconfig.v2.json") signing_config = SigningConfig.from_file(path) @@ -111,63 +112,63 @@ def test_good_only_v1_rekor(self, asset): pytest.param( [_service_v1_op1], [1], - ServiceConfiguration(ServiceSelector.ALL), + ServiceConfiguration(selector=ServiceSelector.ALL), [_service_v1_op1], id="base case", ), pytest.param( [_service_v1_op1, _service2_v1_op1], [1], - ServiceConfiguration(ServiceSelector.ALL), + ServiceConfiguration(selector=ServiceSelector.ALL), [_service2_v1_op1], id="multiple services, same operator: expect 1 service in result", ), pytest.param( [_service_v1_op1, _service_v1_op2], [1], - ServiceConfiguration(ServiceSelector.ALL), + ServiceConfiguration(selector=ServiceSelector.ALL), [_service_v1_op1, _service_v1_op2], id="2 services, different operator: expect 2 services in result", ), pytest.param( [_service_v1_op1, _service_v1_op2, _service_v1_op4], [1], - ServiceConfiguration(ServiceSelector.ALL), + ServiceConfiguration(selector=ServiceSelector.ALL), [_service_v1_op1, _service_v1_op2], id="3 services, one is not yet valid: expect 2 services in result", ), pytest.param( [_service_v1_op1, _service_v1_op2], [1], - ServiceConfiguration(ServiceSelector.ANY), + ServiceConfiguration(selector=ServiceSelector.ANY), [_service_v1_op1], id="ANY selector: expect 1 service only in result", ), pytest.param( [_service_v1_op1, _service_v1_op2, _service_v1_op3], [1], - ServiceConfiguration(ServiceSelector.EXACT, 2), + ServiceConfiguration(selector=ServiceSelector.EXACT, count=2), [_service_v1_op1, _service_v1_op2], id="EXACT selector: expect configured number of services in result", ), pytest.param( [_service_v1_op1, _service_v2_op1], [1, 2], - ServiceConfiguration(ServiceSelector.ALL), + ServiceConfiguration(selector=ServiceSelector.ALL), [_service_v2_op1], id="services with different version: expect highest version", ), pytest.param( [_service_v1_op1, _service_v2_op1], [1], - ServiceConfiguration(ServiceSelector.ALL), + ServiceConfiguration(selector=ServiceSelector.ALL), [_service_v1_op1], id="services with different version: expect the supported version", ), pytest.param( [_service_v1_op1, _service_v1_op2], [2], - ServiceConfiguration(ServiceSelector.ALL), + ServiceConfiguration(selector=ServiceSelector.ALL), [], id="No supported versions: expect no results", ), @@ -191,17 +192,12 @@ def test_get_valid_services(self, services, versions, config, expected_result): ( # ANY selector without services [], [1], - ServiceConfiguration(ServiceSelector.ANY), + ServiceConfiguration(selector=ServiceSelector.ANY), ), ( # EXACT selector without enough services [_service_v1_op1], [1], - ServiceConfiguration(ServiceSelector.EXACT, 2), - ), - ( # UNDEFINED selector - [_service_v1_op1], - [1], - ServiceConfiguration(ServiceSelector.UNDEFINED, 1), + ServiceConfiguration(selector=ServiceSelector.EXACT, count=2), ), ], ) @@ -228,7 +224,7 @@ def test_good(self, asset, file): assert ( root._inner.media_type == TrustedRoot.TrustedRootType.TRUSTED_ROOT_0_1.value ) - assert len(root._inner.tlogs) == 2 + assert len(root._inner.tlogs) == 1 assert len(root._inner.certificate_authorities) == 2 assert len(root._inner.ctlogs) == 2 assert len(root._inner.timestamp_authorities) == 1 @@ -243,7 +239,8 @@ def test_bad_media_type(self, asset): path = asset("trusted_root/trustedroot.badtype.json") with pytest.raises( - Error, match="unsupported trusted root format: bad-media-type" + ValueError, + match=r"Input should be 'application/vnd\.dev\.sigstore\.trustedroot\+json;version=0\.1' or 'application/vnd\.dev\.sigstore\.trustedroot\.v0\.2\+json'", ): TrustedRoot.from_file(path) @@ -273,8 +270,9 @@ def test_trust_root_tuf_caches_and_requests(mock_staging_tuf, tuf_dirs): # Don't expect trusted_root.json request as it's cached already expected_requests = { "timestamp.json": 1, - "13.snapshot.json": 1, - "13.targets.json": 1, + "16.snapshot.json": 1, + "17.targets.json": 1, + "ed6a9cf4e7c2e3297a4b5974fce0d17132f03c63512029d7aa3a402b43acab49.trusted_root.json": 1, } expected_fail_reqs = {"12.root.json": 1} assert reqs == expected_requests @@ -329,8 +327,8 @@ def test_is_timerange_valid(): def range_from(offset_lower=0, offset_upper=0): base = datetime.now(timezone.utc) return TimeRange( - base + timedelta(minutes=offset_lower), - base + timedelta(minutes=offset_upper), + start=base + timedelta(minutes=offset_lower), + end=base + timedelta(minutes=offset_upper), ) # Test None should always be valid @@ -356,10 +354,11 @@ def range_from(offset_lower=0, offset_upper=0): def test_trust_root_bundled_get(monkeypatch, mock_staging_tuf, tuf_asset): def get_public_bytes(keys): - return [ + assert len(keys) != 0 + return { k.public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) for k in keys - ] + } def _pem_keys(keys): return get_public_bytes([load_pem_public_key(k) for k in keys]) @@ -380,15 +379,17 @@ def _pem_keys(keys): # Assert that trust root from TUF contains the expected keys/certs trust_root = ClientTrustConfig.staging().trusted_root - assert ctfe_keys[0] in get_public_bytes( - [ - k.key - for k in trust_root.ct_keyring( - purpose=KeyringPurpose.VERIFY - )._keyring.values() - ] + assert ctfe_keys.issubset( + get_public_bytes( + [ + k.key + for k in trust_root.ct_keyring( + purpose=KeyringPurpose.VERIFY + )._keyring.values() + ] + ) ) - assert ( + assert rekor_keys.issubset( get_public_bytes( [ k.key @@ -397,21 +398,22 @@ def _pem_keys(keys): )._keyring.values() ] ) - == rekor_keys ) assert trust_root.get_fulcio_certs() == fulcio_certs # Assert that trust root from offline TUF contains the expected keys/certs trust_root = ClientTrustConfig.staging(offline=True).trusted_root - assert ctfe_keys[0] in get_public_bytes( - [ - k.key - for k in trust_root.ct_keyring( - purpose=KeyringPurpose.VERIFY - )._keyring.values() - ] + assert ctfe_keys.issubset( + get_public_bytes( + [ + k.key + for k in trust_root.ct_keyring( + purpose=KeyringPurpose.VERIFY + )._keyring.values() + ] + ) ) - assert ( + assert rekor_keys.issubset( get_public_bytes( [ k.key @@ -420,22 +422,23 @@ def _pem_keys(keys): )._keyring.values() ] ) - == rekor_keys ) assert trust_root.get_fulcio_certs() == fulcio_certs # Assert that trust root from file contains the expected keys/certs path = tuf_asset.target_path("trusted_root.json") trust_root = TrustedRoot.from_file(path) - assert ctfe_keys[0] in get_public_bytes( - [ - k.key - for k in trust_root.ct_keyring( - purpose=KeyringPurpose.VERIFY - )._keyring.values() - ] + assert ctfe_keys.issubset( + get_public_bytes( + [ + k.key + for k in trust_root.ct_keyring( + purpose=KeyringPurpose.VERIFY + )._keyring.values() + ] + ) ) - assert ( + assert rekor_keys.issubset( get_public_bytes( [ k.key @@ -444,7 +447,6 @@ def _pem_keys(keys): )._keyring.values() ] ) - == rekor_keys ) assert trust_root.get_fulcio_certs() == fulcio_certs @@ -484,6 +486,7 @@ def test_bad_media_type(self, asset): path = asset("trust_config/config.badtype.json") with pytest.raises( - Error, match="unsupported client trust config format: bad-media-type" + ValueError, + match=r"Input should be 'application/vnd\.dev\.sigstore\.clienttrustconfig.v0.1\+json'", ): ClientTrustConfig.from_json(path.read_text()) diff --git a/test/unit/test_dsse.py b/test/unit/test_dsse.py index 0a2ee879e..9eddd943d 100644 --- a/test/unit/test_dsse.py +++ b/test/unit/test_dsse.py @@ -39,8 +39,10 @@ def test_roundtrip(self): assert evp.signature == b"lol" serialized = evp.to_json() - assert serialized == raw + # envelope matches assert dsse.Envelope._from_json(serialized) == evp + # parsed JSON marches + assert json.loads(raw) == evp._inner.to_dict() def test_missing_signature(self): raw = json.dumps( diff --git a/test/unit/test_hashes.py b/test/unit/test_hashes.py index 3c92824c8..39275ba82 100644 --- a/test/unit/test_hashes.py +++ b/test/unit/test_hashes.py @@ -14,7 +14,7 @@ import hashlib import pytest -from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm +from sigstore_models.common.v1 import HashAlgorithm from sigstore.hashes import Hashed diff --git a/test/unit/test_models.py b/test/unit/test_models.py index c60b81f7f..0285be8f8 100644 --- a/test/unit/test_models.py +++ b/test/unit/test_models.py @@ -16,104 +16,63 @@ from base64 import b64encode import pytest -from pydantic import ValidationError +from sigstore_models.rekor.v1 import KindVersion +from sigstore_models.rekor.v1 import TransparencyLogEntry as _TransparencyLogEntry from sigstore.errors import VerificationError from sigstore.models import ( Bundle, InvalidBundle, - LogEntry, - LogInclusionProof, TimestampVerificationData, + TransparencyLogEntry, VerificationMaterial, ) -class TestLogEntry: +class TestTransparencyLogEntry: @pytest.mark.parametrize("integrated_time", [0, 1746819403]) def test_missing_inclusion_proof(self, integrated_time: int): with pytest.raises(ValueError, match=r"inclusion_proof"): - LogEntry( - uuid="fake", - body=b64encode(b"fake"), - integrated_time=integrated_time, - log_id="1234", - log_index=1, - inclusion_proof=None, - inclusion_promise=None, + TransparencyLogEntry( + _TransparencyLogEntry( + kind_version=KindVersion(kind="hashedrekord", version="fake"), + canonicalized_body=b64encode(b"fake"), + integrated_time=integrated_time, + log_id="1234", + log_index=1, + inclusion_proof=None, + inclusion_promise=None, + ) ) - def test_missing_inclusion_promise_and_integrated_time_round_trip( - self, signing_bundle - ): - """ - Ensures that LogEntry._to_rekor() succeeds even without an inclusion_promise and integrated_time. - """ - bundle: Bundle - _, bundle = signing_bundle("bundle.txt") - _dict = bundle.log_entry._to_rekor().to_dict() - print(_dict) - del _dict["inclusionPromise"] - del _dict["integratedTime"] - entry = LogEntry._from_dict_rekor(_dict) - assert entry.inclusion_promise is None - assert entry._to_rekor() is not None - assert LogEntry._from_dict_rekor(entry._to_rekor().to_dict()) == entry + # def test_missing_inclusion_promise_and_integrated_time_round_trip( + # self, signing_bundle + # ): + # """ + # Ensures that LogEntry._to_rekor() succeeds even without an inclusion_promise and integrated_time. + # """ + # bundle: Bundle + # _, bundle = signing_bundle("bundle.txt") + # _dict = bundle.log_entry._to_rekor().to_dict() + # print(_dict) + # del _dict["inclusionPromise"] + # del _dict["integratedTime"] + # entry = LogEntry._from_dict_rekor(_dict) + # assert entry.inclusion_promise is None + # assert entry._to_rekor() is not None + # assert LogEntry._from_dict_rekor(entry._to_rekor().to_dict()) == entry def test_logentry_roundtrip(self, signing_bundle): _, bundle = signing_bundle("bundle.txt") assert ( - LogEntry._from_dict_rekor(bundle.log_entry._to_rekor().to_dict()) + TransparencyLogEntry( + _TransparencyLogEntry.from_dict(bundle.log_entry._inner.to_dict()) + ) == bundle.log_entry ) -class TestLogInclusionProof: - def test_valid(self): - proof = LogInclusionProof( - log_index=1, root_hash="abcd", tree_size=2, hashes=[], checkpoint="" - ) - assert proof is not None - - def test_negative_log_index(self): - with pytest.raises( - ValidationError, match="Inclusion proof has invalid log index" - ): - LogInclusionProof( - log_index=-1, root_hash="abcd", tree_size=2, hashes=[], checkpoint="" - ) - - def test_negative_tree_size(self): - with pytest.raises( - ValidationError, match="Inclusion proof has invalid tree size" - ): - LogInclusionProof( - log_index=1, root_hash="abcd", tree_size=-1, hashes=[], checkpoint="" - ) - - def test_log_index_outside_tree_size(self): - with pytest.raises( - ValidationError, - match="Inclusion proof has log index greater than or equal to tree size", - ): - LogInclusionProof( - log_index=2, root_hash="abcd", tree_size=1, hashes=[], checkpoint="" - ) - - def test_checkpoint_missing(self): - with pytest.raises(ValidationError, match=r"should be a valid string"): - ( - LogInclusionProof( - checkpoint=None, - hashes=["fake"], - log_index=0, - root_hash="fake", - tree_size=100, - ), - ) - - class TestTimestampVerificationData: """ Tests for the `TimestampVerificationData` wrapper model. @@ -168,7 +127,7 @@ class TestBundle: """ def test_invalid_bundle_version(self, signing_bundle): - with pytest.raises(InvalidBundle, match="unsupported bundle format"): + with pytest.raises(InvalidBundle, match="failed to load bundle"): signing_bundle("bundle_invalid_version.txt") def test_invalid_empty_cert_chain(self, signing_bundle): diff --git a/test/unit/test_sign.py b/test/unit/test_sign.py index 6356c5012..006c571bc 100644 --- a/test/unit/test_sign.py +++ b/test/unit/test_sign.py @@ -17,7 +17,7 @@ import pretend import pytest -from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm +from sigstore_models.common.v1 import HashAlgorithm import sigstore.oidc from sigstore._internal.timestamp import TimestampAuthorityClient @@ -46,12 +46,15 @@ def test_sign_rekor_entry_consistent(request, sign_ctx_and_ident_for_env): with ctx.signer(identity) as signer: expected_entry = signer.sign_artifact(payload).log_entry - actual_entry = ctx._rekor.log.entries.get(log_index=expected_entry.log_index) + actual_entry = ctx._rekor.log.entries.get(log_index=expected_entry._inner.log_index) - assert expected_entry.body == actual_entry.body - assert expected_entry.integrated_time == actual_entry.integrated_time - assert expected_entry.log_id == actual_entry.log_id - assert expected_entry.log_index == actual_entry.log_index + assert ( + expected_entry._inner.canonicalized_body + == actual_entry._inner.canonicalized_body + ) + assert expected_entry._inner.integrated_time == actual_entry._inner.integrated_time + assert expected_entry._inner.log_id == actual_entry._inner.log_id + assert expected_entry._inner.log_index == actual_entry._inner.log_index @pytest.mark.staging diff --git a/test/unit/verify/test_verifier.py b/test/unit/verify/test_verifier.py index 16df1d88c..0fa6db042 100644 --- a/test/unit/verify/test_verifier.py +++ b/test/unit/verify/test_verifier.py @@ -250,11 +250,12 @@ def test_verifier_no_validity_end(self, verifier, asset, null_policy): ["inclusionPromise", "integratedTime"], ), ) - def test_vierifier_verify_no_inclusion_promise_and_integrated_time( + def test_verifier_verify_no_inclusion_promise_and_integrated_time( self, verifier, asset, null_policy, fields_to_delete ): """ - Ensure that we can still verify a Bundle with a rfc3161 timestamp if the SET can't be verified or isn't present. + Ensure that we can still verify a Bundle with an RFC 3161 timestamp if the SET isn't present. + There is one exception: When inclusionPromise is present, but integratedTime is not, then we expect a failure because the integratedTime is required to verify the inclusionPromise. """