|
25 | 25 |
|
26 | 26 | import rekor_types
|
27 | 27 | from cryptography.exceptions import InvalidSignature
|
| 28 | +from cryptography.hazmat.primitives import serialization |
28 | 29 | from cryptography.hazmat.primitives.asymmetric import ec
|
29 |
| -from cryptography.x509 import ExtendedKeyUsage, KeyUsage |
| 30 | +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey |
| 31 | +from cryptography.x509 import Certificate, ExtendedKeyUsage, KeyUsage |
30 | 32 | from cryptography.x509.oid import ExtendedKeyUsageOID
|
31 | 33 | from OpenSSL.crypto import (
|
32 | 34 | X509,
|
|
38 | 40 | from pydantic import ValidationError
|
39 | 41 | from rfc3161_client import TimeStampResponse, VerifierBuilder
|
40 | 42 | from rfc3161_client import VerificationError as Rfc3161VerificationError
|
| 43 | +from sigstore_protobuf_specs.dev.sigstore.common import v1 |
| 44 | +from sigstore_protobuf_specs.dev.sigstore.rekor import v2 |
41 | 45 |
|
42 | 46 | from sigstore import dsse
|
43 | 47 | from sigstore._internal.rekor import _hashedrekord_from_parts
|
@@ -371,6 +375,20 @@ def _verify_common_signing_cert(
|
371 | 375 | f"invalid signing cert: expired at time of signing, time via {vts}"
|
372 | 376 | )
|
373 | 377 |
|
| 378 | + @staticmethod |
| 379 | + def _get_key_details(certificate: Certificate) -> v1.PublicKeyDetails: |
| 380 | + """Determine PublicKeyDetails from a certificate""" |
| 381 | + public_key = certificate.public_key() |
| 382 | + if isinstance(public_key, EllipticCurvePublicKey): |
| 383 | + if public_key.curve.name == "secp256r1": |
| 384 | + return cast( |
| 385 | + v1.PublicKeyDetails, |
| 386 | + v1.PublicKeyDetails.PKIX_ECDSA_P256_SHA_256, |
| 387 | + ) |
| 388 | + # TODO support other keys |
| 389 | + raise ValueError(f"Unsupported EC curve: {public_key.curve.name}") |
| 390 | + raise ValueError(f"Unsupported public key type: {type(public_key)}") |
| 391 | + |
374 | 392 | def verify_dsse(
|
375 | 393 | self, bundle: Bundle, policy: VerificationPolicy
|
376 | 394 | ) -> tuple[str, bytes]:
|
@@ -418,34 +436,74 @@ def verify_dsse(
|
418 | 436 | # Instead, we manually pick apart the entry body below and verify
|
419 | 437 | # the parts we can (namely the payload hash and signature list).
|
420 | 438 | entry = bundle.log_entry
|
421 |
| - try: |
422 |
| - entry_body = rekor_types.Dsse.model_validate_json( |
423 |
| - base64.b64decode(entry.body) |
424 |
| - ) |
425 |
| - except ValidationError as exc: |
426 |
| - raise VerificationError(f"invalid DSSE log entry: {exc}") |
427 |
| - |
428 |
| - payload_hash = sha256_digest(envelope._inner.payload).digest.hex() |
429 | 439 | if (
|
430 |
| - entry_body.spec.root.payload_hash.algorithm # type: ignore[union-attr] |
431 |
| - != rekor_types.dsse.Algorithm.SHA256 |
| 440 | + entry._kind_version.kind == "dsse" |
| 441 | + and entry._kind_version.version == "0.0.2" |
432 | 442 | ):
|
433 |
| - raise VerificationError("expected SHA256 payload hash in DSSE log entry") |
434 |
| - if payload_hash != entry_body.spec.root.payload_hash.value: # type: ignore[union-attr] |
435 |
| - raise VerificationError("log entry payload hash does not match bundle") |
436 |
| - |
437 |
| - # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, |
438 |
| - # but we handle them just in case the signer has somehow produced multiple |
439 |
| - # signatures for their envelope with the same signing key. |
440 |
| - signatures = [ |
441 |
| - rekor_types.dsse.Signature( |
442 |
| - signature=base64.b64encode(signature.sig).decode(), |
443 |
| - verifier=base64_encode_pem_cert(bundle.signing_certificate), |
444 |
| - ) |
445 |
| - for signature in envelope._inner.signatures |
446 |
| - ] |
447 |
| - if signatures != entry_body.spec.root.signatures: |
448 |
| - raise VerificationError("log entry signatures do not match bundle") |
| 443 | + try: |
| 444 | + entry_body = v2.Entry().from_json(base64.b64decode(entry.body)) |
| 445 | + except ValidationError as exc: |
| 446 | + raise VerificationError(f"invalid DSSE log entry: {exc}") |
| 447 | + |
| 448 | + if ( |
| 449 | + entry_body.spec.dsse_v002.payload_hash.algorithm |
| 450 | + != v1.HashAlgorithm.SHA2_256 |
| 451 | + ): |
| 452 | + raise VerificationError("expected SHA256 hash in DSSE entry") |
| 453 | + |
| 454 | + payload_hash = sha256_digest(envelope._inner.payload).digest |
| 455 | + if entry_body.spec.dsse_v002.payload_hash.digest != payload_hash: |
| 456 | + raise VerificationError("DSSE entry payload hash does not match bundle") |
| 457 | + |
| 458 | + signatures = [ |
| 459 | + v2.Signature( |
| 460 | + content=signature.sig, |
| 461 | + verifier=v2.Verifier( |
| 462 | + x509_certificate=v1.X509Certificate( |
| 463 | + bundle.signing_certificate.public_bytes( |
| 464 | + encoding=serialization.Encoding.DER |
| 465 | + ) |
| 466 | + ), |
| 467 | + key_details=self._get_key_details(bundle.signing_certificate), |
| 468 | + ), |
| 469 | + ) |
| 470 | + for signature in envelope._inner.signatures |
| 471 | + ] |
| 472 | + if signatures != entry_body.spec.dsse_v002.signatures: |
| 473 | + raise VerificationError("log entry signatures do not match bundle") |
| 474 | + else: |
| 475 | + try: |
| 476 | + entry_body = rekor_types.Dsse.model_validate_json( |
| 477 | + base64.b64decode(entry.body) |
| 478 | + ) |
| 479 | + except ValidationError as exc: |
| 480 | + raise VerificationError(f"invalid DSSE log entry: {exc}") |
| 481 | + |
| 482 | + payload_hash = sha256_digest(envelope._inner.payload).digest.hex() |
| 483 | + if ( |
| 484 | + # type: ignore[union-attr] |
| 485 | + entry_body.spec.root.payload_hash.algorithm |
| 486 | + != rekor_types.dsse.Algorithm.SHA256 |
| 487 | + ): |
| 488 | + raise VerificationError( |
| 489 | + "expected SHA256 payload hash in DSSE log entry" |
| 490 | + ) |
| 491 | + # type: ignore[union-attr] |
| 492 | + if payload_hash != entry_body.spec.root.payload_hash.value: |
| 493 | + raise VerificationError("log entry payload hash does not match bundle") |
| 494 | + |
| 495 | + # NOTE: Like `dsse._verify`: multiple signatures would be frivolous here, |
| 496 | + # but we handle them just in case the signer has somehow produced multiple |
| 497 | + # signatures for their envelope with the same signing key. |
| 498 | + signatures = [ |
| 499 | + rekor_types.dsse.Signature( |
| 500 | + signature=base64.b64encode(signature.sig).decode(), |
| 501 | + verifier=base64_encode_pem_cert(bundle.signing_certificate), |
| 502 | + ) |
| 503 | + for signature in envelope._inner.signatures |
| 504 | + ] |
| 505 | + if signatures != entry_body.spec.root.signatures: |
| 506 | + raise VerificationError("log entry signatures do not match bundle") |
449 | 507 |
|
450 | 508 | return (envelope._inner.payload_type, envelope._inner.payload)
|
451 | 509 |
|
@@ -491,14 +549,47 @@ def verify_artifact(
|
491 | 549 | # the other bundle materials (and input being verified).
|
492 | 550 | entry = bundle.log_entry
|
493 | 551 |
|
494 |
| - expected_body = _hashedrekord_from_parts( |
495 |
| - bundle.signing_certificate, |
496 |
| - bundle._inner.message_signature.signature, # type: ignore[union-attr] |
497 |
| - hashed_input, |
498 |
| - ) |
499 |
| - actual_body = rekor_types.Hashedrekord.model_validate_json( |
500 |
| - base64.b64decode(entry.body) |
501 |
| - ) |
| 552 | + if ( |
| 553 | + entry._kind_version.kind == "hashedrekord" |
| 554 | + and entry._kind_version.version == "0.0.2" |
| 555 | + ): |
| 556 | + expected_body = v2.Entry( |
| 557 | + kind=entry._kind_version.kind, |
| 558 | + api_version=entry._kind_version.version, |
| 559 | + spec=v2.Spec( |
| 560 | + hashed_rekord_v002=v2.HashedRekordLogEntryV002( |
| 561 | + data=v1.HashOutput( |
| 562 | + algorithm=bundle._inner.message_signature.message_digest.algorithm, |
| 563 | + digest=bundle._inner.message_signature.message_digest.digest, |
| 564 | + ), |
| 565 | + signature=v2.Signature( |
| 566 | + content=bundle._inner.message_signature.signature, |
| 567 | + verifier=v2.Verifier( |
| 568 | + x509_certificate=v1.X509Certificate( |
| 569 | + bundle.signing_certificate.public_bytes( |
| 570 | + encoding=serialization.Encoding.DER |
| 571 | + ) |
| 572 | + ), |
| 573 | + key_details=self._get_key_details( |
| 574 | + bundle.signing_certificate |
| 575 | + ), |
| 576 | + ), |
| 577 | + ), |
| 578 | + ) |
| 579 | + ), |
| 580 | + ) |
| 581 | + actual_body = v2.Entry().from_json(base64.b64decode(entry.body)) |
| 582 | + else: |
| 583 | + expected_body = _hashedrekord_from_parts( |
| 584 | + bundle.signing_certificate, |
| 585 | + # type: ignore[union-attr] |
| 586 | + bundle._inner.message_signature.signature, |
| 587 | + hashed_input, |
| 588 | + ) |
| 589 | + actual_body = rekor_types.Hashedrekord.model_validate_json( |
| 590 | + base64.b64decode(entry.body) |
| 591 | + ) |
| 592 | + |
502 | 593 | if expected_body != actual_body:
|
503 | 594 | raise VerificationError(
|
504 | 595 | "transparency log entry is inconsistent with other materials"
|
|
0 commit comments