|
12 | 12 | from cryptography import x509 |
13 | 13 | from cryptography.hazmat._oid import ExtensionOID |
14 | 14 | from cryptography.hazmat.backends import default_backend |
15 | | -from cryptography.hazmat.primitives import serialization |
| 15 | +from cryptography.hazmat.primitives import hashes, serialization |
16 | 16 | from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa |
17 | 17 | from OpenSSL.SSL import Connection as SSLConnection |
18 | 18 |
|
@@ -335,18 +335,35 @@ def traverse_chain(cert: x509.Certificate) -> CRLValidationResult | None: |
335 | 335 | # found a trusted certificate |
336 | 336 | logger.debug("Found trusted certificate: %s", cert.subject) |
337 | 337 | return CRLValidationResult.UNREVOKED |
| 338 | + |
338 | 339 | if cert.issuer in self._trusted_ca: |
339 | 340 | # issuer is trusted by OS |
340 | 341 | logger.debug("Found certificate with trusted issuer: %s", cert.issuer) |
341 | | - return self._validate_certificate_with_cache( |
| 342 | + if self._verify_certificate_signature( |
342 | 343 | cert, self._trusted_ca[cert.issuer] |
343 | | - ) |
| 344 | + ): |
| 345 | + return self._validate_certificate_with_cache( |
| 346 | + cert, self._trusted_ca[cert.issuer] |
| 347 | + ) |
| 348 | + else: |
| 349 | + logger.debug( |
| 350 | + "Certificate signature verification failed: for %s, looking for other paths", |
| 351 | + cert, |
| 352 | + ) |
| 353 | + |
344 | 354 | if cert.issuer in is_being_visited: |
345 | 355 | # cycle detected - invalid path |
346 | 356 | return None |
347 | 357 |
|
348 | 358 | valid_results: list[tuple[CRLValidationResult, x509.Certificate]] = [] |
349 | 359 | for ca_cert in subject_certificates[cert.issuer]: |
| 360 | + if not self._verify_certificate_signature(cert, ca_cert): |
| 361 | + logger.debug( |
| 362 | + "Certificate signature verification failed for %s, looking for other paths", |
| 363 | + cert, |
| 364 | + ) |
| 365 | + continue |
| 366 | + |
350 | 367 | is_being_visited.add(cert.issuer) |
351 | 368 | ca_result = traverse_chain(ca_cert) |
352 | 369 | is_being_visited.remove(cert.issuer) |
@@ -567,52 +584,94 @@ def _check_certificate_against_crl_url( |
567 | 584 | # Check if certificate is revoked |
568 | 585 | return self._check_certificate_against_crl(cert, crl) |
569 | 586 |
|
| 587 | + def _verify_certificate_signature( |
| 588 | + self, cert: x509.Certificate, ca_cert: x509.Certificate |
| 589 | + ) -> bool: |
| 590 | + """Verify certificate signature with CA's public key""" |
| 591 | + logger.debug( |
| 592 | + "Verifying certificate signature of %s against %s public key", cert, ca_cert |
| 593 | + ) |
| 594 | + return self._verify_signature( |
| 595 | + public_key=ca_cert.public_key(), |
| 596 | + signature=cert.signature, |
| 597 | + data=cert.tbs_certificate_bytes, |
| 598 | + hash_algorithm=cert.signature_hash_algorithm, |
| 599 | + signature_type="certificate signature", |
| 600 | + ) |
| 601 | + |
570 | 602 | def _verify_crl_signature( |
571 | 603 | self, crl: x509.CertificateRevocationList, ca_cert: x509.Certificate |
572 | 604 | ) -> bool: |
573 | 605 | """Verify CRL signature with CA's public key""" |
574 | | - try: |
575 | | - # Get the signature algorithm from the CRL |
576 | | - signature_algorithm = crl.signature_algorithm_oid |
577 | | - hash_algorithm = crl.signature_hash_algorithm |
| 606 | + # Get the signature algorithm from the CRL |
| 607 | + signature_algorithm = crl.signature_algorithm_oid |
| 608 | + hash_algorithm = crl.signature_hash_algorithm |
| 609 | + |
| 610 | + logger.debug( |
| 611 | + "Verifying CRL signature with algorithm: %s, hash: %s", |
| 612 | + signature_algorithm, |
| 613 | + hash_algorithm, |
| 614 | + ) |
578 | 615 |
|
579 | | - logger.debug( |
580 | | - "Verifying CRL signature with algorithm: %s, hash: %s", |
581 | | - signature_algorithm, |
582 | | - hash_algorithm, |
583 | | - ) |
| 616 | + result = self._verify_signature( |
| 617 | + public_key=ca_cert.public_key(), |
| 618 | + signature=crl.signature, |
| 619 | + data=crl.tbs_certlist_bytes, |
| 620 | + hash_algorithm=hash_algorithm, |
| 621 | + signature_type="CRL signature", |
| 622 | + ) |
584 | 623 |
|
585 | | - # Determine the appropriate padding based on the signature algorithm |
586 | | - public_key = ca_cert.public_key() |
| 624 | + if result: |
| 625 | + logger.debug("CRL signature verification successful") |
| 626 | + return result |
587 | 627 |
|
| 628 | + def _verify_signature( |
| 629 | + self, |
| 630 | + public_key: rsa.RSAPublicKey | ec.EllipticCurvePublicKey | Any, |
| 631 | + signature: bytes, |
| 632 | + data: bytes, |
| 633 | + hash_algorithm: hashes.HashAlgorithm, |
| 634 | + signature_type: str = "signature", |
| 635 | + ) -> bool: |
| 636 | + """Verify a signature using a public key |
| 637 | +
|
| 638 | + Args: |
| 639 | + public_key: The public key to use for verification |
| 640 | + signature: The signature to verify |
| 641 | + data: The data that was signed |
| 642 | + hash_algorithm: The hash algorithm used in the signature |
| 643 | + signature_type: Type of signature being verified (for logging) |
| 644 | +
|
| 645 | + Returns: |
| 646 | + bool: True if verification succeeds, False otherwise |
| 647 | + """ |
| 648 | + try: |
588 | 649 | # Handle different key types with appropriate signature verification |
589 | 650 | if isinstance(public_key, rsa.RSAPublicKey): |
590 | 651 | # For RSA signatures, we need to use PKCS1v15 padding |
591 | 652 | public_key.verify( |
592 | | - crl.signature, |
593 | | - crl.tbs_certlist_bytes, |
| 653 | + signature, |
| 654 | + data, |
594 | 655 | padding.PKCS1v15(), |
595 | 656 | hash_algorithm, |
596 | 657 | ) |
597 | 658 | elif isinstance(public_key, ec.EllipticCurvePublicKey): |
598 | 659 | # For EC signatures, use ECDSA algorithm |
599 | 660 | public_key.verify( |
600 | | - crl.signature, |
601 | | - crl.tbs_certlist_bytes, |
| 661 | + signature, |
| 662 | + data, |
602 | 663 | ec.ECDSA(hash_algorithm), |
603 | 664 | ) |
604 | 665 | else: |
605 | 666 | # For other key types (DSA, etc.), try without padding |
606 | 667 | public_key.verify( |
607 | | - crl.signature, |
608 | | - crl.tbs_certlist_bytes, |
| 668 | + signature, |
| 669 | + data, |
609 | 670 | hash_algorithm, |
610 | 671 | ) |
611 | | - |
612 | | - logger.debug("CRL signature verification successful") |
613 | 672 | return True |
614 | 673 | except Exception as e: |
615 | | - logger.warning("CRL signature verification failed: %s", e) |
| 674 | + logger.warning("%s verification failed: %s", signature_type.capitalize(), e) |
616 | 675 | return False |
617 | 676 |
|
618 | 677 | def _check_certificate_against_crl( |
|
0 commit comments