@@ -270,61 +270,67 @@ def from_config(
270270 cache_manager = cache_manager ,
271271 )
272272
273- def validate_certificate_chains (
274- self , certificate_chains : list [list [ x509 .Certificate ]]
273+ def validate_certificate_chain (
274+ self , peer_cert : x509 . Certificate , chain : list [x509 .Certificate ] | None
275275 ) -> bool :
276276 """
277- Validate certificate chains against CRLs with actual HTTP requests
277+ Validate a certificate chain against CRLs with actual HTTP requests
278278
279279 Args:
280- certificate_chains: List of certificate chains to validate
280+ peer_cert: The peer certificate to validate (e.g., server certificate)
281+ chain: Certificate chain to use for validation (can be None or empty)
281282
282283 Returns:
283284 True if validation passes, False otherwise
284-
285- Raises:
286- ValueError: If certificate_chains is None or empty
287285 """
288286 if self ._cert_revocation_check_mode == CertRevocationCheckMode .DISABLED :
289287 return True
290288
291- if certificate_chains is None or len (certificate_chains ) == 0 :
292- logger .warning ("Certificate chains are empty" )
293- return self ._cert_revocation_check_mode == CertRevocationCheckMode .ADVISORY
294-
295- results = []
296- for chain in certificate_chains :
297- result = self ._validate_single_chain (chain )
298- # If any of the chains is valid, the whole check is considered positive
299- if result == CRLValidationResult .UNREVOKED :
300- return True
301- results .append (result )
289+ chain = chain if chain is not None else []
290+ result = self ._validate_chain (peer_cert , chain )
302291
303- # In non-advisory mode we require at least one chain get a clear UNREVOKED status
304- if self ._cert_revocation_check_mode != CertRevocationCheckMode .ADVISORY :
292+ if result == CRLValidationResult .UNREVOKED :
293+ return True
294+ if result == CRLValidationResult .REVOKED :
305295 return False
296+ # In advisory mode, errors are treated positively
297+ return self ._cert_revocation_check_mode == CertRevocationCheckMode .ADVISORY
306298
307- # We're in advisory mode, so any error is treated positively
308- return any (result == CRLValidationResult .ERROR for result in results )
309-
310- def _validate_single_chain (
311- self , chain : list [x509 .Certificate ]
299+ def _validate_chain (
300+ self , start_cert : x509 .Certificate , chain : list [x509 .Certificate ]
312301 ) -> CRLValidationResult :
313302 """
303+ Validate a certificate chain starting from start_cert.
304+
305+ Args:
306+ start_cert: The certificate to start validation from
307+ chain: List of certificates to use for building the trust path
308+
314309 Returns:
315310 UNREVOKED: If there is a path to any trusted certificate where all certificates are unrevoked.
316311 REVOKED: If all paths to trusted certificates are revoked.
317312 ERROR: If there is a path to any trusted certificate on which none certificate is revoked,
318313 but some certificates can't be verified.
319314 """
320- # An empty chain is considered an error
321- if len (chain ) == 0 :
315+ # Check if start certificate is expired
316+ if not self ._is_within_validity_dates (start_cert ):
317+ logger .warning (
318+ "Start certificate is expired or not yet valid: %s" , start_cert .subject
319+ )
322320 return CRLValidationResult .ERROR
323321
324322 subject_certificates : dict [x509 .Name , list [x509 .Certificate ]] = defaultdict (
325323 list
326324 )
327325 for cert in chain :
326+ if not self ._is_ca_certificate (cert ):
327+ logger .warning ("Ignoring non-CA certificate: %s" , cert )
328+ continue
329+ if not self ._is_within_validity_dates (cert ):
330+ logger .warning (
331+ "Ignoring certificate not within validity dates: %s" , cert
332+ )
333+ continue
328334 subject_certificates [cert .subject ].append (cert )
329335 currently_visited_subjects : set [x509 .Name ] = set ()
330336
@@ -387,19 +393,7 @@ def traverse_chain(cert: x509.Certificate) -> CRLValidationResult | None:
387393 # no ERROR result found, all paths are REVOKED
388394 return CRLValidationResult .REVOKED
389395
390- currently_visited_subjects .add (chain [0 ].subject )
391- error_result = False
392- revoked_result = False
393- for cert in subject_certificates [chain [0 ].subject ]:
394- result = traverse_chain (cert )
395- if result == CRLValidationResult .UNREVOKED :
396- return result
397- error_result |= result == CRLValidationResult .ERROR
398- revoked_result |= result == CRLValidationResult .REVOKED
399-
400- if error_result or not revoked_result :
401- return CRLValidationResult .ERROR
402- return CRLValidationResult .REVOKED
396+ return traverse_chain (start_cert )
403397
404398 def _is_certificate_trusted_by_os (self , cert : x509 .Certificate ) -> bool :
405399 if cert .subject not in self ._trusted_ca :
@@ -426,6 +420,50 @@ def _verify_certificate_signature(
426420 except Exception :
427421 return False
428422
423+ @staticmethod
424+ def _is_ca_certificate (ca_cert : x509 .Certificate ) -> bool :
425+ # Check if a certificate has basicConstraints extension with CA flag set to True.
426+ try :
427+ basic_constraints = ca_cert .extensions .get_extension_for_oid (
428+ ExtensionOID .BASIC_CONSTRAINTS
429+ ).value
430+ return basic_constraints .ca
431+ except x509 .ExtensionNotFound :
432+ # If the extension is not present, the certificate is not a CA
433+ return False
434+
435+ @staticmethod
436+ def _get_certificate_validity_dates (
437+ cert : x509 .Certificate ,
438+ ) -> tuple [datetime , datetime ]:
439+ # Extract UTC-aware validity dates from a certificate.
440+
441+ try :
442+ # Use timezone-aware versions to avoid deprecation warnings
443+ not_valid_before = cert .not_valid_before_utc
444+ not_valid_after = cert .not_valid_after_utc
445+ except AttributeError :
446+ # Fallback for older versions without _utc methods
447+ not_valid_before = cert .not_valid_before
448+ not_valid_after = cert .not_valid_after
449+
450+ # Convert to UTC if not timezone-aware
451+ if not_valid_before .tzinfo is None :
452+ not_valid_before = not_valid_before .replace (tzinfo = timezone .utc )
453+ if not_valid_after .tzinfo is None :
454+ not_valid_after = not_valid_after .replace (tzinfo = timezone .utc )
455+
456+ return not_valid_before , not_valid_after
457+
458+ @staticmethod
459+ def _is_within_validity_dates (cert : x509 .Certificate ) -> bool :
460+ # Check if a certificate is currently valid (not expired and not before validity period).
461+ not_valid_before , not_valid_after = (
462+ CRLValidator ._get_certificate_validity_dates (cert )
463+ )
464+ now = datetime .now (timezone .utc )
465+ return not_valid_before <= now <= not_valid_after
466+
429467 def _validate_certificate_is_not_revoked_with_cache (
430468 self , cert : x509 .Certificate , ca_cert : x509 .Certificate
431469 ) -> CRLValidationResult :
@@ -474,23 +512,13 @@ def _is_short_lived_certificate(cert: x509.Certificate) -> bool:
474512 - For certificates issued on or after 15 March 2026:
475513 validity period <= 7 days (604,800 seconds)
476514 """
477- try :
478- # Use timezone.utc versions to avoid deprecation warnings
479- issue_date = cert .not_valid_before_utc
480- validity_period = cert .not_valid_after_utc - cert .not_valid_before_utc
481- except AttributeError :
482- # Fallback for older versions
483- issue_date = cert .not_valid_before
484- validity_period = cert .not_valid_after - cert .not_valid_before
485-
486- # Convert issue_date to UTC if it's not timezone-aware
487- if issue_date .tzinfo is None :
488- issue_date = issue_date .replace (tzinfo = timezone .utc )
515+ issue_date , expiry_date = CRLValidator ._get_certificate_validity_dates (cert )
516+ validity_period = expiry_date - issue_date + timedelta (days = 1 )
489517
490518 march_15_2026 = datetime (2026 , 3 , 15 , tzinfo = timezone .utc )
491519 if issue_date >= march_15_2026 :
492- return validity_period .total_seconds () <= 604800 # 7 days in seconds
493- return validity_period .total_seconds () <= 864000 # 10 days in seconds
520+ return validity_period .days <= 7
521+ return validity_period .days <= 10
494522
495523 @staticmethod
496524 def _extract_crl_distribution_points (cert : x509 .Certificate ) -> list [str ]:
@@ -594,6 +622,11 @@ def _check_certificate_against_crl_url(
594622 # We cannot trust a CRL whose signature cannot be verified
595623 return CRLValidationResult .ERROR
596624
625+ # Verify that the CRL URL matches the IDP extension
626+ if not self ._verify_against_idp_extension (crl , crl_url ):
627+ logger .warning ("CRL URL does not match IDP extension for URL: %s" , crl_url )
628+ return CRLValidationResult .ERROR
629+
597630 # Check if certificate is revoked
598631 return self ._check_certificate_against_crl (cert , crl )
599632
@@ -645,6 +678,52 @@ def _verify_crl_signature(
645678 logger .warning ("CRL signature verification failed: %s" , e )
646679 return False
647680
681+ def _verify_against_idp_extension (
682+ self , crl : x509 .CertificateRevocationList , crl_url : str
683+ ) -> bool :
684+ # Verify that the CRL distribution point URL matches the IDP extension.
685+ logger .debug (
686+ "Trying to verify CRL URL against IDP extension for URL: %s" , crl_url
687+ )
688+
689+ try :
690+ idp_extension = crl .extensions .get_extension_for_oid (
691+ ExtensionOID .ISSUING_DISTRIBUTION_POINT
692+ )
693+ idp = idp_extension .value
694+
695+ # If the IDP has a distribution point, verify it matches the CRL URL
696+ if not idp .full_name :
697+ # according to baseline requirements this should not happen
698+ # https://github.com/cabforum/servercert/blob/main/docs/BR.md
699+ logger .debug (
700+ "IDP extension has no full_name - treating as invalid" ,
701+ crl_url ,
702+ )
703+ return False
704+
705+ for name in idp .full_name :
706+ if isinstance (name , x509 .UniformResourceIdentifier ):
707+ if name .value == crl_url :
708+ logger .debug ("CRL URL matches IDP extension: %s" , crl_url )
709+ return True
710+ # If we found distribution points but none matched
711+ logger .warning (
712+ "CRL URL %s does not match any IDP distribution point" , crl_url
713+ )
714+ return False
715+
716+ except x509 .ExtensionNotFound :
717+ # If the IDP extension is not present, consider it valid
718+ logger .debug (
719+ "No IDP extension found in CRL, treating as valid for URL: %s" , crl_url
720+ )
721+ return True
722+ except Exception as e :
723+ # If we can't parse the IDP extension, log and treat as error
724+ logger .warning ("Failed to verify IDP extension: %s" , e )
725+ return False
726+
648727 def _check_certificate_against_crl (
649728 self , cert : x509 .Certificate , crl : x509 .CertificateRevocationList
650729 ) -> CRLValidationResult :
@@ -660,44 +739,56 @@ def validate_connection(self, connection: SSLConnection) -> bool:
660739 """
661740 Validate an OpenSSL connection against CRLs.
662741
663- This method extracts certificate chains from the connection and validates them
664- against Certificate Revocation Lists (CRLs).
742+ This method extracts the peer certificate and certificate chain from the
743+ connection and validates them against Certificate Revocation Lists (CRLs).
665744
666745 Args:
667746 connection: OpenSSL connection object
668747
669748 Returns:
670749 True if validation passes, False otherwise
671750 """
672- certificate_chains = self ._extract_certificate_chains_from_connection (
673- connection
674- )
675- return self .validate_certificate_chains (certificate_chains )
751+ try :
752+ # Get the peer certificate (the start certificate)
753+ peer_cert = connection .get_peer_certificate (as_cryptography = True )
754+ if peer_cert is None :
755+ logger .warning ("No peer certificate found in connection" )
756+ return (
757+ self ._cert_revocation_check_mode == CertRevocationCheckMode .ADVISORY
758+ )
759+
760+ # Extract the certificate chain
761+ cert_chain = self ._extract_certificate_chain_from_connection (connection )
762+
763+ return self .validate_certificate_chain (peer_cert , cert_chain )
764+ except Exception as e :
765+ logger .warning ("Failed to validate connection: %s" , e )
766+ return self ._cert_revocation_check_mode == CertRevocationCheckMode .ADVISORY
676767
677- def _extract_certificate_chains_from_connection (
768+ def _extract_certificate_chain_from_connection (
678769 self , connection
679- ) -> list [list [ x509 .Certificate ]] :
680- """Extract certificate chains from OpenSSL connection for CRL validation.
770+ ) -> list [x509 .Certificate ] | None :
771+ """Extract certificate chain from OpenSSL connection for CRL validation.
681772
682773 Args:
683774 connection: OpenSSL connection object
684775
685776 Returns:
686- List of certificate chains, where each chain is a list of x509.Certificate objects
777+ Certificate chain as a list of x509.Certificate objects, or None on error
687778 """
688779 try :
689780 # Convert OpenSSL certificates to cryptography x509 certificates
690781 cert_chain = connection .get_peer_cert_chain (as_cryptography = True )
691782 if not cert_chain :
692783 logger .debug ("No certificate chain found in connection" )
693- return []
784+ return None
694785 logger .debug (
695786 "Extracted %d certificates for CRL validation" , len (cert_chain )
696787 )
697- return [ cert_chain ] # Return as a single chain
788+ return cert_chain
698789
699790 except Exception as e :
700791 logger .warning (
701792 "Failed to extract certificate chain for CRL validation: %s" , e
702793 )
703- return []
794+ return None
0 commit comments