|
25 | 25 |
|
26 | 26 | import rekor_types
|
27 | 27 | from cryptography.exceptions import InvalidSignature
|
| 28 | +from cryptography.hazmat.primitives import serialization |
28 | 29 | from cryptography.hazmat.primitives.asymmetric import ec
|
29 |
| -from cryptography.x509 import ExtendedKeyUsage, KeyUsage |
| 30 | +from cryptography.x509 import Certificate, ExtendedKeyUsage, KeyUsage |
30 | 31 | from cryptography.x509.oid import ExtendedKeyUsageOID
|
31 | 32 | from OpenSSL.crypto import (
|
32 | 33 | X509,
|
|
38 | 39 | from pydantic import ValidationError
|
39 | 40 | from rfc3161_client import TimeStampResponse, VerifierBuilder
|
40 | 41 | from rfc3161_client import VerificationError as Rfc3161VerificationError
|
| 42 | +from sigstore_protobuf_specs.dev.sigstore.common import v1 |
| 43 | +from sigstore_protobuf_specs.dev.sigstore.rekor import v2 |
41 | 44 |
|
42 | 45 | from sigstore import dsse
|
43 | 46 | from sigstore._internal.rekor import _hashedrekord_from_parts
|
@@ -417,34 +420,18 @@ def verify_dsse(
|
417 | 420 | # Instead, we manually pick apart the entry body below and verify
|
418 | 421 | # the parts we can (namely the payload hash and signature list).
|
419 | 422 | entry = bundle.log_entry
|
420 |
| - try: |
421 |
| - entry_body = rekor_types.Dsse.model_validate_json( |
422 |
| - base64.b64decode(entry.body) |
| 423 | + if entry._kind_version.kind != "dsse": |
| 424 | + raise VerificationError( |
| 425 | + f"Expected entry type dsse, got {entry._kind_version.kind}" |
423 | 426 | )
|
424 |
| - except ValidationError as exc: |
425 |
| - raise VerificationError(f"invalid DSSE log entry: {exc}") |
426 |
| - |
427 |
| - payload_hash = sha256_digest(envelope._inner.payload).digest.hex() |
428 |
| - if ( |
429 |
| - entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] |
430 |
| - != rekor_types.dsse.Algorithm.SHA256 |
431 |
| - ): |
432 |
| - raise VerificationError("expected SHA256 payload hash in DSSE log entry") |
433 |
| - if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] |
434 |
| - raise VerificationError("log entry payload hash does not match bundle") |
435 |
| - |
436 |
| - # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, |
437 |
| - # but we handle them just in case the signer has somehow produced multiple |
438 |
| - # signatures for their envelope with the same signing key. |
439 |
| - signatures = [ |
440 |
| - rekor_types.dsse.Signature( |
441 |
| - signature=base64.b64encode(signature.sig).decode(), |
442 |
| - verifier=base64_encode_pem_cert(bundle.signing_certificate), |
| 427 | + if entry._kind_version.version == "0.0.2": |
| 428 | + _validate_dsse_v002_entry_body(bundle) |
| 429 | + elif entry._kind_version.version == "0.0.1": |
| 430 | + _validate_dsse_v001_entry_body(bundle) |
| 431 | + else: |
| 432 | + raise VerificationError( |
| 433 | + f"Unsupported dsse version {entry._kind_version.version}" |
443 | 434 | )
|
444 |
| - for signature in envelope._inner.signatures |
445 |
| - ] |
446 |
| - if signatures != entry_body.spec.root.signatures: |
447 |
| - raise VerificationError("log entry signatures do not match bundle") |
448 | 435 |
|
449 | 436 | return (envelope._inner.payload_type, envelope._inner.payload)
|
450 | 437 |
|
@@ -489,16 +476,175 @@ def verify_artifact(
|
489 | 476 | # (8): verify the consistency of the log entry's body against
|
490 | 477 | # the other bundle materials (and input being verified).
|
491 | 478 | entry = bundle.log_entry
|
| 479 | + if entry._kind_version.kind != "hashedrekord": |
| 480 | + raise VerificationError( |
| 481 | + f"Expected entry type hashedrekord, got {entry._kind_version.kind}" |
| 482 | + ) |
| 483 | + |
| 484 | + if entry._kind_version.version == "0.0.2": |
| 485 | + _validate_hashedrekord_v002_entry_body(bundle) |
| 486 | + elif entry._kind_version.version == "0.0.1": |
| 487 | + _validate_hashedrekord_v001_entry_body(bundle, hashed_input) |
| 488 | + else: |
| 489 | + raise VerificationError( |
| 490 | + f"Unsupported hashedrekord version {entry._kind_version.version}" |
| 491 | + ) |
492 | 492 |
|
493 |
| - expected_body = _hashedrekord_from_parts( |
494 |
| - bundle.signing_certificate, |
495 |
| - bundle._inner.message_signature.signature, # type: ignore[union-attr] |
496 |
| - hashed_input, |
| 493 | + |
| 494 | +def _validate_dsse_v001_entry_body(bundle: Bundle) -> None: |
| 495 | + """ |
| 496 | + Validate the Entry body for dsse v001. |
| 497 | + """ |
| 498 | + entry = bundle.log_entry |
| 499 | + envelope = bundle._dsse_envelope |
| 500 | + if envelope is None: |
| 501 | + raise VerificationError( |
| 502 | + "cannot perform DSSE verification on a bundle without a DSSE envelope" |
497 | 503 | )
|
498 |
| - actual_body = rekor_types.Hashedrekord.model_validate_json( |
499 |
| - base64.b64decode(entry.body) |
| 504 | + try: |
| 505 | + entry_body = rekor_types.Dsse.model_validate_json(base64.b64decode(entry.body)) |
| 506 | + except ValidationError as exc: |
| 507 | + raise VerificationError(f"invalid DSSE log entry: {exc}") |
| 508 | + |
| 509 | + payload_hash = sha256_digest(envelope._inner.payload).digest.hex() |
| 510 | + if ( |
| 511 | + entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] |
| 512 | + != rekor_types.dsse.Algorithm.SHA256 |
| 513 | + ): |
| 514 | + raise VerificationError("expected SHA256 payload hash in DSSE log entry") |
| 515 | + if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] |
| 516 | + raise VerificationError("log entry payload hash does not match bundle") |
| 517 | + |
| 518 | + # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, |
| 519 | + # but we handle them just in case the signer has somehow produced multiple |
| 520 | + # signatures for their envelope with the same signing key. |
| 521 | + signatures = [ |
| 522 | + rekor_types.dsse.Signature( |
| 523 | + signature=base64.b64encode(signature.sig).decode(), |
| 524 | + verifier=base64_encode_pem_cert(bundle.signing_certificate), |
500 | 525 | )
|
501 |
| - if expected_body != actual_body: |
502 |
| - raise VerificationError( |
503 |
| - "transparency log entry is inconsistent with other materials" |
| 526 | + for signature in envelope._inner.signatures |
| 527 | + ] |
| 528 | + if signatures != entry_body.spec.root.signatures: |
| 529 | + raise VerificationError("log entry signatures do not match bundle") |
| 530 | + |
| 531 | + |
| 532 | +def _validate_dsse_v002_entry_body(bundle: Bundle) -> None: |
| 533 | + """ |
| 534 | + Validate Entry body for dsse v002. |
| 535 | + """ |
| 536 | + entry = bundle.log_entry |
| 537 | + envelope = bundle._dsse_envelope |
| 538 | + if envelope is None: |
| 539 | + raise VerificationError( |
| 540 | + "cannot perform DSSE verification on a bundle without a DSSE envelope" |
| 541 | + ) |
| 542 | + try: |
| 543 | + v2_body = v2.Entry().from_json(base64.b64decode(entry.body)) |
| 544 | + except ValidationError as exc: |
| 545 | + raise VerificationError(f"invalid DSSE log entry: {exc}") |
| 546 | + |
| 547 | + if v2_body.spec.dsse_v002 is None: |
| 548 | + raise VerificationError("invalid DSSE log entry: missing dsse_v002 field") |
| 549 | + |
| 550 | + if v2_body.spec.dsse_v002.payload_hash.algorithm != v1.HashAlgorithm.SHA2_256: |
| 551 | + raise VerificationError("expected SHA256 hash in DSSE entry") |
| 552 | + |
| 553 | + digest = sha256_digest(envelope._inner.payload).digest |
| 554 | + if v2_body.spec.dsse_v002.payload_hash.digest != digest: |
| 555 | + raise VerificationError("DSSE entry payload hash does not match bundle") |
| 556 | + |
| 557 | + v2_signatures = [ |
| 558 | + v2.Signature( |
| 559 | + content=signature.sig, |
| 560 | + verifier=_v2_verifier_from_certificate(bundle.signing_certificate), |
| 561 | + ) |
| 562 | + for signature in envelope._inner.signatures |
| 563 | + ] |
| 564 | + if v2_signatures != v2_body.spec.dsse_v002.signatures: |
| 565 | + raise VerificationError("log entry signatures do not match bundle") |
| 566 | + |
| 567 | + |
| 568 | +def _validate_hashedrekord_v001_entry_body( |
| 569 | + bundle: Bundle, hashed_input: Hashed |
| 570 | +) -> None: |
| 571 | + """ |
| 572 | + Validate the Entry body for hashedrekord v001. |
| 573 | + """ |
| 574 | + entry = bundle.log_entry |
| 575 | + expected_body = _hashedrekord_from_parts( |
| 576 | + bundle.signing_certificate, |
| 577 | + bundle._inner.message_signature.signature, # type: ignore[union-attr] |
| 578 | + hashed_input, |
| 579 | + ) |
| 580 | + actual_body = rekor_types.Hashedrekord.model_validate_json( |
| 581 | + base64.b64decode(entry.body) |
| 582 | + ) |
| 583 | + if expected_body != actual_body: |
| 584 | + raise VerificationError( |
| 585 | + "transparency log entry is inconsistent with other materials" |
| 586 | + ) |
| 587 | + |
| 588 | + |
| 589 | +def _validate_hashedrekord_v002_entry_body(bundle: Bundle) -> None: |
| 590 | + """ |
| 591 | + Validate Entry body for hashedrekord v002. |
| 592 | + """ |
| 593 | + entry = bundle.log_entry |
| 594 | + if bundle._inner.message_signature is None: |
| 595 | + raise VerificationError( |
| 596 | + "invalid hashedrekord log entry: missing message signature" |
| 597 | + ) |
| 598 | + v2_expected_body = v2.Entry( |
| 599 | + kind=entry._kind_version.kind, |
| 600 | + api_version=entry._kind_version.version, |
| 601 | + spec=v2.Spec( |
| 602 | + hashed_rekord_v002=v2.HashedRekordLogEntryV002( |
| 603 | + data=v1.HashOutput( |
| 604 | + algorithm=bundle._inner.message_signature.message_digest.algorithm, |
| 605 | + digest=bundle._inner.message_signature.message_digest.digest, |
| 606 | + ), |
| 607 | + signature=v2.Signature( |
| 608 | + content=bundle._inner.message_signature.signature, |
| 609 | + verifier=_v2_verifier_from_certificate(bundle.signing_certificate), |
| 610 | + ), |
504 | 611 | )
|
| 612 | + ), |
| 613 | + ) |
| 614 | + v2_actual_body = v2.Entry().from_json(base64.b64decode(entry.body)) |
| 615 | + if v2_expected_body != v2_actual_body: |
| 616 | + raise VerificationError( |
| 617 | + "transparency log entry is inconsistent with other materials" |
| 618 | + ) |
| 619 | + |
| 620 | + |
| 621 | +def _v2_verifier_from_certificate(certificate: Certificate) -> v2.Verifier: |
| 622 | + """ |
| 623 | + Return a Rekor v2 protobuf Verifier for the signing certificate. |
| 624 | +
|
| 625 | + This method decides which signature algorithms are supported for verification |
| 626 | + (in a rekor v2 entry), see |
| 627 | + https://github.com/sigstore/architecture-docs/blob/main/algorithm-registry.md. |
| 628 | + Note that actual signature verification happens in verify_artifact() and |
| 629 | + verify_dsse(): New keytypes need to be added here and in those methods. |
| 630 | + """ |
| 631 | + public_key = certificate.public_key() |
| 632 | + |
| 633 | + if isinstance(public_key, ec.EllipticCurvePublicKey): |
| 634 | + if isinstance(public_key.curve, ec.SECP256R1): |
| 635 | + key_details = v1.PublicKeyDetails.PKIX_ECDSA_P256_SHA_256 |
| 636 | + elif isinstance(public_key.curve, ec.SECP384R1): |
| 637 | + key_details = v1.PublicKeyDetails.PKIX_ECDSA_P384_SHA_384 |
| 638 | + elif isinstance(public_key.curve, ec.SECP521R1): |
| 639 | + key_details = v1.PublicKeyDetails.PKIX_ECDSA_P521_SHA_512 |
| 640 | + else: |
| 641 | + raise ValueError(f"Unsupported EC curve: {public_key.curve.name}") |
| 642 | + else: |
| 643 | + raise ValueError(f"Unsupported public key type: {type(public_key)}") |
| 644 | + |
| 645 | + return v2.Verifier( |
| 646 | + x509_certificate=v1.X509Certificate( |
| 647 | + certificate.public_bytes(encoding=serialization.Encoding.DER) |
| 648 | + ), |
| 649 | + key_details=cast(v1.PublicKeyDetails, key_details), |
| 650 | + ) |
0 commit comments