|
12 | 12 | from cryptography import x509 |
13 | 13 | from cryptography.hazmat._oid import ExtensionOID |
14 | 14 | from cryptography.hazmat.backends import default_backend |
15 | | -from cryptography.hazmat.primitives import hashes, serialization |
| 15 | +from cryptography.hazmat.primitives import serialization |
16 | 16 | from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa |
17 | 17 | from OpenSSL.SSL import Connection as SSLConnection |
18 | 18 |
|
@@ -198,9 +198,9 @@ def __init__( |
198 | 198 | ] = {} |
199 | 199 |
|
200 | 200 | # list of trusted CA and their certificates |
201 | | - self._trusted_ca: dict[x509.Name, x509.Certificate] = {} |
| 201 | + self._trusted_ca: dict[x509.Name, list(x509.Certificate)] = defaultdict(list) |
202 | 202 | for cert in trusted_certificates: |
203 | | - self._trusted_ca[cert.subject] = cert |
| 203 | + self._trusted_ca[cert.subject].append(cert) |
204 | 204 |
|
205 | 205 | @classmethod |
206 | 206 | def from_config( |
@@ -332,24 +332,12 @@ def traverse_chain(cert: x509.Certificate) -> CRLValidationResult | None: |
332 | 332 | # ERROR - some certificates on potentially unrevoked paths can't be verified, or no path to a trusted CA is detected |
333 | 333 | # None - ignore this path (cycle detected) |
334 | 334 | if self._is_certificate_trusted_by_os(cert): |
335 | | - # found a trusted certificate |
336 | 335 | logger.debug("Found trusted certificate: %s", cert.subject) |
337 | 336 | return CRLValidationResult.UNREVOKED |
338 | 337 |
|
339 | | - if cert.issuer in self._trusted_ca: |
340 | | - # issuer is trusted by OS |
341 | | - logger.debug("Found certificate with trusted issuer: %s", cert.issuer) |
342 | | - if self._verify_certificate_signature( |
343 | | - cert, self._trusted_ca[cert.issuer] |
344 | | - ): |
345 | | - return self._validate_certificate_with_cache( |
346 | | - cert, self._trusted_ca[cert.issuer] |
347 | | - ) |
348 | | - else: |
349 | | - logger.debug( |
350 | | - "Certificate signature verification failed: for %s, looking for other paths", |
351 | | - cert, |
352 | | - ) |
| 338 | + if trusted_ca_issuer := self._get_trusted_ca_issuer(cert): |
| 339 | + logger.debug("Certificate signed by trusted CA: %s", cert.subject) |
| 340 | + return self._validate_certificate_with_cache(cert, trusted_ca_issuer) |
353 | 341 |
|
354 | 342 | if cert.issuer in is_being_visited: |
355 | 343 | # cycle detected - invalid path |
@@ -406,12 +394,29 @@ def traverse_chain(cert: x509.Certificate) -> CRLValidationResult | None: |
406 | 394 | return CRLValidationResult.REVOKED |
407 | 395 |
|
408 | 396 | def _is_certificate_trusted_by_os(self, cert: x509.Certificate) -> bool: |
409 | | - if trusted_cert := self._trusted_ca.get(cert.subject): |
410 | | - # Compare DER-encoded forms of certificates |
411 | | - cert_der = cert.public_bytes(serialization.Encoding.DER) |
412 | | - trusted_cert_der = trusted_cert.public_bytes(serialization.Encoding.DER) |
413 | | - return cert_der == trusted_cert_der |
414 | | - return False |
| 397 | + if cert.subject not in self._trusted_ca: |
| 398 | + return False |
| 399 | + |
| 400 | + cert_der = cert.public_bytes(serialization.Encoding.DER) |
| 401 | + return any( |
| 402 | + cert_der == trusted_cert.public_bytes(serialization.Encoding.DER) |
| 403 | + for trusted_cert in self._trusted_ca[cert.subject] |
| 404 | + ) |
| 405 | + |
| 406 | + def _get_trusted_ca_issuer(self, cert: x509.Certificate) -> x509.Certificate | None: |
| 407 | + for trusted_cert in self._trusted_ca[cert.issuer]: |
| 408 | + if self._verify_certificate_signature(cert, trusted_cert): |
| 409 | + return trusted_cert |
| 410 | + return None |
| 411 | + |
| 412 | + def _verify_certificate_signature( |
| 413 | + self, cert: x509.Certificate, ca_cert: x509.Certificate |
| 414 | + ) -> bool: |
| 415 | + try: |
| 416 | + cert.verify_directly_issued_by(ca_cert) |
| 417 | + return True |
| 418 | + except Exception: |
| 419 | + return False |
415 | 420 |
|
416 | 421 | def _validate_certificate_with_cache( |
417 | 422 | self, cert: x509.Certificate, ca_cert: x509.Certificate |
@@ -584,95 +589,52 @@ def _check_certificate_against_crl_url( |
584 | 589 | # Check if certificate is revoked |
585 | 590 | return self._check_certificate_against_crl(cert, crl) |
586 | 591 |
|
587 | | - def _verify_certificate_signature( |
588 | | - self, cert: x509.Certificate, ca_cert: x509.Certificate |
589 | | - ) -> bool: |
590 | | - """Verify certificate signature with CA's public key""" |
591 | | - logger.debug( |
592 | | - "Verifying certificate signature of %s against %s public key", cert, ca_cert |
593 | | - ) |
594 | | - return self._verify_signature( |
595 | | - public_key=ca_cert.public_key(), |
596 | | - signature=cert.signature, |
597 | | - data=cert.tbs_certificate_bytes, |
598 | | - hash_algorithm=cert.signature_hash_algorithm, |
599 | | - signature_type="certificate signature", |
600 | | - ) |
601 | | - |
602 | 592 | def _verify_crl_signature( |
603 | 593 | self, crl: x509.CertificateRevocationList, ca_cert: x509.Certificate |
604 | 594 | ) -> bool: |
605 | 595 | """Verify CRL signature with CA's public key""" |
606 | | - # Get the signature algorithm from the CRL |
607 | | - signature_algorithm = crl.signature_algorithm_oid |
608 | | - hash_algorithm = crl.signature_hash_algorithm |
609 | | - |
610 | | - logger.debug( |
611 | | - "Verifying CRL signature with algorithm: %s, hash: %s", |
612 | | - signature_algorithm, |
613 | | - hash_algorithm, |
614 | | - ) |
615 | | - |
616 | | - result = self._verify_signature( |
617 | | - public_key=ca_cert.public_key(), |
618 | | - signature=crl.signature, |
619 | | - data=crl.tbs_certlist_bytes, |
620 | | - hash_algorithm=hash_algorithm, |
621 | | - signature_type="CRL signature", |
622 | | - ) |
623 | | - |
624 | | - if result: |
625 | | - logger.debug("CRL signature verification successful") |
626 | | - return result |
| 596 | + try: |
| 597 | + # Get the signature algorithm from the CRL |
| 598 | + signature_algorithm = crl.signature_algorithm_oid |
| 599 | + hash_algorithm = crl.signature_hash_algorithm |
627 | 600 |
|
628 | | - def _verify_signature( |
629 | | - self, |
630 | | - public_key: rsa.RSAPublicKey | ec.EllipticCurvePublicKey | Any, |
631 | | - signature: bytes, |
632 | | - data: bytes, |
633 | | - hash_algorithm: hashes.HashAlgorithm, |
634 | | - signature_type: str = "signature", |
635 | | - ) -> bool: |
636 | | - """Verify a signature using a public key |
| 601 | + logger.debug( |
| 602 | + "Verifying CRL signature with algorithm: %s, hash: %s", |
| 603 | + signature_algorithm, |
| 604 | + hash_algorithm, |
| 605 | + ) |
637 | 606 |
|
638 | | - Args: |
639 | | - public_key: The public key to use for verification |
640 | | - signature: The signature to verify |
641 | | - data: The data that was signed |
642 | | - hash_algorithm: The hash algorithm used in the signature |
643 | | - signature_type: Type of signature being verified (for logging) |
| 607 | + # Determine the appropriate padding based on the signature algorithm |
| 608 | + public_key = ca_cert.public_key() |
644 | 609 |
|
645 | | - Returns: |
646 | | - bool: True if verification succeeds, False otherwise |
647 | | - """ |
648 | | - try: |
649 | 610 | # Handle different key types with appropriate signature verification |
650 | 611 | if isinstance(public_key, rsa.RSAPublicKey): |
651 | 612 | # For RSA signatures, we need to use PKCS1v15 padding |
652 | 613 | public_key.verify( |
653 | | - signature, |
654 | | - data, |
| 614 | + crl.signature, |
| 615 | + crl.tbs_certlist_bytes, |
655 | 616 | padding.PKCS1v15(), |
656 | 617 | hash_algorithm, |
657 | 618 | ) |
658 | 619 | elif isinstance(public_key, ec.EllipticCurvePublicKey): |
659 | 620 | # For EC signatures, use ECDSA algorithm |
660 | 621 | public_key.verify( |
661 | | - signature, |
662 | | - data, |
| 622 | + crl.signature, |
| 623 | + crl.tbs_certlist_bytes, |
663 | 624 | ec.ECDSA(hash_algorithm), |
664 | 625 | ) |
665 | 626 | else: |
666 | 627 | # For other key types (DSA, etc.), try without padding |
667 | 628 | public_key.verify( |
668 | | - signature, |
669 | | - data, |
| 629 | + crl.signature, |
| 630 | + crl.tbs_certlist_bytes, |
670 | 631 | hash_algorithm, |
671 | 632 | ) |
672 | | - print("> verification successful") |
| 633 | + |
| 634 | + logger.debug("CRL signature verification successful") |
673 | 635 | return True |
674 | 636 | except Exception as e: |
675 | | - logger.warning("%s verification failed: %s", signature_type.capitalize(), e) |
| 637 | + logger.warning("CRL signature verification failed: %s", e) |
676 | 638 | return False |
677 | 639 |
|
678 | 640 | def _check_certificate_against_crl( |
|
0 commit comments