11#!/usr/bin/env python 
22from  __future__ import  annotations 
33
4+ from  collections  import  defaultdict 
45from  dataclasses  import  dataclass 
56from  datetime  import  datetime , timedelta , timezone 
67from  enum  import  Enum , unique 
1112from  cryptography  import  x509 
1213from  cryptography .hazmat ._oid  import  ExtensionOID 
1314from  cryptography .hazmat .backends  import  default_backend 
15+ from  cryptography .hazmat .primitives  import  serialization 
1416from  cryptography .hazmat .primitives .asymmetric  import  ec , padding , rsa 
1517from  OpenSSL .SSL  import  Connection  as  SSLConnection 
1618
@@ -53,15 +55,15 @@ class CRLConfig:
5355        CertRevocationCheckMode .DISABLED 
5456    )
5557    allow_certificates_without_crl_url : bool  =  False 
56-     connection_timeout_ms : int  =  3000 
57-     read_timeout_ms : int  =  3000 
58+     connection_timeout_ms : int  =  5000 
59+     read_timeout_ms : int  =  5000    # 5s 
5860    cache_validity_time : timedelta  =  timedelta (hours = 24 )
5961    enable_crl_cache : bool  =  True 
6062    enable_crl_file_cache : bool  =  True 
6163    crl_cache_dir : Path  |  str  |  None  =  None 
6264    crl_cache_removal_delay_days : int  =  7 
6365    crl_cache_cleanup_interval_hours : int  =  1 
64-     crl_cache_start_cleanup : bool  =  False 
66+     crl_cache_start_cleanup : bool  =  True 
6567
6668    @classmethod  
6769    def  from_connection (cls , sf_connection ) ->  CRLConfig :
@@ -176,6 +178,7 @@ class CRLValidator:
176178    def  __init__ (
177179        self ,
178180        session_manager : SessionManager  |  Any ,
181+         trusted_certificates : list [x509 .Certificate ],
179182        cert_revocation_check_mode : CertRevocationCheckMode  =  CRLConfig .cert_revocation_check_mode ,
180183        allow_certificates_without_crl_url : bool  =  CRLConfig .allow_certificates_without_crl_url ,
181184        connection_timeout_ms : int  =  CRLConfig .connection_timeout_ms ,
@@ -191,9 +194,22 @@ def __init__(
191194        self ._cache_validity_time  =  cache_validity_time 
192195        self ._cache_manager  =  cache_manager  or  CRLCacheManager .noop ()
193196
197+         # list of trusted CA and their certificates 
198+         self ._trusted_ca : dict [x509 .Name , list [x509 .Certificate ]] =  defaultdict (list )
199+         for  cert  in  trusted_certificates :
200+             self ._trusted_ca [cert .subject ].append (cert )
201+ 
202+         # declaration of validate_certificate_is_not_revoked function cache 
203+         self ._cache_for__validate_certificate_is_not_revoked : dict [
204+             x509 .Certificate , CRLValidationResult 
205+         ] =  {}
206+ 
194207    @classmethod  
195208    def  from_config (
196-         cls , config : CRLConfig , session_manager : SessionManager 
209+         cls ,
210+         config : CRLConfig ,
211+         session_manager : SessionManager ,
212+         trusted_certificates : list [x509 .Certificate ],
197213    ) ->  CRLValidator :
198214        """ 
199215        Create a CRLValidator instance from a CRLConfig. 
@@ -204,6 +220,7 @@ def from_config(
204220        Args: 
205221            config: CRLConfig instance containing CRL-related parameters 
206222            session_manager: SessionManager instance 
223+             trusted_certificates: List of trusted CA certificates 
207224
208225        Returns: 
209226            CRLValidator: Configured CRLValidator instance 
@@ -244,6 +261,7 @@ def from_config(
244261
245262        return  cls (
246263            session_manager = session_manager ,
264+             trusted_certificates = trusted_certificates ,
247265            cert_revocation_check_mode = config .cert_revocation_check_mode ,
248266            allow_certificates_without_crl_url = config .allow_certificates_without_crl_url ,
249267            connection_timeout_ms = config .connection_timeout_ms ,
@@ -272,9 +290,7 @@ def validate_certificate_chains(
272290
273291        if  certificate_chains  is  None  or  len (certificate_chains ) ==  0 :
274292            logger .warning ("Certificate chains are empty" )
275-             if  self ._cert_revocation_check_mode  ==  CertRevocationCheckMode .ADVISORY :
276-                 return  True 
277-             return  False 
293+             return  self ._cert_revocation_check_mode  ==  CertRevocationCheckMode .ADVISORY 
278294
279295        results  =  []
280296        for  chain  in  certificate_chains :
@@ -294,24 +310,133 @@ def validate_certificate_chains(
294310    def  _validate_single_chain (
295311        self , chain : list [x509 .Certificate ]
296312    ) ->  CRLValidationResult :
297-         """Validate a single certificate chain""" 
313+         """ 
314+         Returns: 
315+           UNREVOKED: If there is a path to any trusted certificate where all certificates are unrevoked. 
316+           REVOKED: If all paths to trusted certificates are revoked. 
317+           ERROR: If there is a path to any trusted certificate on which none certificate is revoked, 
318+              but some certificates can't be verified. 
319+         """ 
298320        # An empty chain is considered an error 
299321        if  len (chain ) ==  0 :
300322            return  CRLValidationResult .ERROR 
301-         # the last certificate of the chain is considered the root and isn't validated 
302-         results  =  []
303-         for  i  in  range (len (chain ) -  1 ):
304-             result  =  self ._validate_certificate (chain [i ], chain [i  +  1 ])
305-             if  result  ==  CRLValidationResult .REVOKED :
306-                 return  CRLValidationResult .REVOKED 
307-             results .append (result )
308323
309-         if  CRLValidationResult .ERROR  in  results :
324+         subject_certificates : dict [x509 .Name , list [x509 .Certificate ]] =  defaultdict (
325+             list 
326+         )
327+         for  cert  in  chain :
328+             subject_certificates [cert .subject ].append (cert )
329+         currently_visited_subjects : set [x509 .Name ] =  set ()
330+ 
331+         def  traverse_chain (cert : x509 .Certificate ) ->  CRLValidationResult  |  None :
332+             # UNREVOKED - unrevoked path to a trusted certificate found 
333+             # REVOKED - all paths are revoked 
334+             # ERROR - some certificates on potentially unrevoked paths can't be verified, or no path to a trusted CA is detected 
335+             # None - ignore this path (cycle detected) 
336+             if  self ._is_certificate_trusted_by_os (cert ):
337+                 logger .debug ("Found trusted certificate: %s" , cert .subject )
338+                 return  CRLValidationResult .UNREVOKED 
339+ 
340+             if  trusted_ca_issuer  :=  self ._get_trusted_ca_issuer (cert ):
341+                 logger .debug ("Certificate signed by trusted CA: %s" , cert .subject )
342+                 return  self ._validate_certificate_is_not_revoked_with_cache (
343+                     cert , trusted_ca_issuer 
344+                 )
345+ 
346+             if  cert .issuer  in  currently_visited_subjects :
347+                 # cycle detected - invalid path 
348+                 return  None 
349+ 
350+             valid_results : list [tuple [CRLValidationResult , x509 .Certificate ]] =  []
351+             for  ca_cert  in  subject_certificates [cert .issuer ]:
352+                 if  not  self ._verify_certificate_signature (cert , ca_cert ):
353+                     logger .debug (
354+                         "Certificate signature verification failed for %s, looking for other paths" ,
355+                         cert ,
356+                     )
357+                     continue 
358+ 
359+                 currently_visited_subjects .add (cert .issuer )
360+                 ca_result  =  traverse_chain (ca_cert )
361+                 currently_visited_subjects .remove (cert .issuer )
362+                 if  ca_result  is  None :
363+                     # ignore invalid path result 
364+                     continue 
365+                 if  ca_result  ==  CRLValidationResult .UNREVOKED :
366+                     # good path found 
367+                     return  self ._validate_certificate_is_not_revoked_with_cache (
368+                         cert , ca_cert 
369+                     )
370+                 valid_results .append ((ca_result , ca_cert ))
371+ 
372+             if  len (valid_results ) ==  0 :
373+                 # "root" certificate not cought by "is_trusted_by_os" check 
374+                 logger .debug ("No path towards trusted anchor: %s" , cert .subject )
375+                 return  CRLValidationResult .ERROR 
376+ 
377+             # check if there exists an ERROR path 
378+             for  ca_result , ca_cert  in  valid_results :
379+                 if  ca_result  ==  CRLValidationResult .ERROR :
380+                     cert_result  =  self ._validate_certificate_is_not_revoked_with_cache (
381+                         cert , ca_cert 
382+                     )
383+                     if  cert_result  ==  CRLValidationResult .REVOKED :
384+                         return  CRLValidationResult .REVOKED 
385+                     return  CRLValidationResult .ERROR 
386+ 
387+             # no ERROR result found, all paths are REVOKED 
388+             return  CRLValidationResult .REVOKED 
389+ 
390+         currently_visited_subjects .add (chain [0 ].subject )
391+         error_result  =  False 
392+         revoked_result  =  False 
393+         for  cert  in  subject_certificates [chain [0 ].subject ]:
394+             result  =  traverse_chain (cert )
395+             if  result  ==  CRLValidationResult .UNREVOKED :
396+                 return  result 
397+             error_result  |=  result  ==  CRLValidationResult .ERROR 
398+             revoked_result  |=  result  ==  CRLValidationResult .REVOKED 
399+ 
400+         if  error_result  or  not  revoked_result :
310401            return  CRLValidationResult .ERROR 
402+         return  CRLValidationResult .REVOKED 
311403
312-         return  CRLValidationResult .UNREVOKED 
404+     def  _is_certificate_trusted_by_os (self , cert : x509 .Certificate ) ->  bool :
405+         if  cert .subject  not  in self ._trusted_ca :
406+             return  False 
407+ 
408+         cert_der  =  cert .public_bytes (serialization .Encoding .DER )
409+         return  any (
410+             cert_der  ==  trusted_cert .public_bytes (serialization .Encoding .DER )
411+             for  trusted_cert  in  self ._trusted_ca [cert .subject ]
412+         )
313413
314-     def  _validate_certificate (
414+     def  _get_trusted_ca_issuer (self , cert : x509 .Certificate ) ->  x509 .Certificate  |  None :
415+         for  trusted_cert  in  self ._trusted_ca [cert .issuer ]:
416+             if  self ._verify_certificate_signature (cert , trusted_cert ):
417+                 return  trusted_cert 
418+         return  None 
419+ 
420+     def  _verify_certificate_signature (
421+         self , cert : x509 .Certificate , ca_cert : x509 .Certificate 
422+     ) ->  bool :
423+         try :
424+             cert .verify_directly_issued_by (ca_cert )
425+             return  True 
426+         except  Exception :
427+             return  False 
428+ 
429+     def  _validate_certificate_is_not_revoked_with_cache (
430+         self , cert : x509 .Certificate , ca_cert : x509 .Certificate 
431+     ) ->  CRLValidationResult :
432+         # validate certificate can be called multiple times with the same certificate 
433+         if  cert  not  in self ._cache_for__validate_certificate_is_not_revoked :
434+             self ._cache_for__validate_certificate_is_not_revoked [cert ] =  (
435+                 self ._validate_certificate_is_not_revoked (cert , ca_cert )
436+             )
437+         return  self ._cache_for__validate_certificate_is_not_revoked [cert ]
438+ 
439+     def  _validate_certificate_is_not_revoked (
315440        self , cert : x509 .Certificate , ca_cert : x509 .Certificate 
316441    ) ->  CRLValidationResult :
317442        """Validate a single certificate against CRL""" 
@@ -343,14 +468,29 @@ def _validate_certificate(
343468
344469    @staticmethod  
345470    def  _is_short_lived_certificate (cert : x509 .Certificate ) ->  bool :
346-         """Check if certificate is short-lived (validity <= 5 days)""" 
471+         """Check if certificate is short-lived according to CA/Browser Forum definition: 
472+         - For certificates issued on or after 15 March 2024 and prior to 15 March 2026: 
473+           validity period <= 10 days (864,000 seconds) 
474+         - For certificates issued on or after 15 March 2026: 
475+           validity period <= 7 days (604,800 seconds) 
476+         """ 
347477        try :
348478            # Use timezone.utc versions to avoid deprecation warnings 
479+             issue_date  =  cert .not_valid_before_utc 
349480            validity_period  =  cert .not_valid_after_utc  -  cert .not_valid_before_utc 
350481        except  AttributeError :
351482            # Fallback for older versions 
483+             issue_date  =  cert .not_valid_before 
352484            validity_period  =  cert .not_valid_after  -  cert .not_valid_before 
353-         return  validity_period .days  <=  5 
485+ 
486+         # Convert issue_date to UTC if it's not timezone-aware 
487+         if  issue_date .tzinfo  is  None :
488+             issue_date  =  issue_date .replace (tzinfo = timezone .utc )
489+ 
490+         march_15_2026  =  datetime (2026 , 3 , 15 , tzinfo = timezone .utc )
491+         if  issue_date  >=  march_15_2026 :
492+             return  validity_period .total_seconds () <=  604800   # 7 days in seconds 
493+         return  validity_period .total_seconds () <=  864000   # 10 days in seconds 
354494
355495    @staticmethod  
356496    def  _extract_crl_distribution_points (cert : x509 .Certificate ) ->  list [str ]:
@@ -446,7 +586,7 @@ def _check_certificate_against_crl_url(
446586                ca_cert .subject ,
447587                crl_url ,
448588            )
449-             # In most cases this indicates a configuration issue, but we'll still try verification 
589+             return   CRLValidationResult . ERROR 
450590
451591        if  not  self ._verify_crl_signature (crl , ca_cert ):
452592            logger .warning ("CRL signature verification failed for URL: %s" , crl_url )
@@ -545,25 +685,16 @@ def _extract_certificate_chains_from_connection(
545685        Returns: 
546686            List of certificate chains, where each chain is a list of x509.Certificate objects 
547687        """ 
548-         from  OpenSSL .crypto  import  FILETYPE_ASN1 , dump_certificate 
549- 
550688        try :
551-             cert_chain  =  connection .get_peer_cert_chain ()
689+             # Convert OpenSSL certificates to cryptography x509 certificates 
690+             cert_chain  =  connection .get_peer_cert_chain (as_cryptography = True )
552691            if  not  cert_chain :
553692                logger .debug ("No certificate chain found in connection" )
554693                return  []
555- 
556-             # Convert OpenSSL certificates to cryptography x509 certificates 
557-             x509_chain  =  []
558-             for  cert_openssl  in  cert_chain :
559-                 cert_der  =  dump_certificate (FILETYPE_ASN1 , cert_openssl )
560-                 cert_x509  =  x509 .load_der_x509_certificate (cert_der , default_backend ())
561-                 x509_chain .append (cert_x509 )
562- 
563694            logger .debug (
564-                 "Extracted %d certificates for CRL validation" , len (x509_chain )
695+                 "Extracted %d certificates for CRL validation" , len (cert_chain )
565696            )
566-             return  [x509_chain ]  # Return as a single chain 
697+             return  [cert_chain ]  # Return as a single chain 
567698
568699        except  Exception  as  e :
569700            logger .warning (
0 commit comments