|
24 | 24 | CRLValidationResult, |
25 | 25 | CRLValidator, |
26 | 26 | ) |
27 | | -from snowflake.connector.crl_cache import CRLCacheEntry, CRLCacheManager |
| 27 | +from snowflake.connector.crl_cache import ( |
| 28 | + CRLCacheEntry, |
| 29 | + CRLCacheManager, |
| 30 | + CRLInMemoryCache, |
| 31 | + NoopCRLCache, |
| 32 | +) |
28 | 33 | from snowflake.connector.session_manager import SessionManager |
29 | 34 |
|
30 | 35 |
|
@@ -166,6 +171,39 @@ def generate_valid_crl(self) -> bytes: |
166 | 171 | ) |
167 | 172 | return crl.public_bytes(serialization.Encoding.DER) |
168 | 173 |
|
| 174 | + def create_crl_with_timestamp( |
| 175 | + self, last_update: datetime |
| 176 | + ) -> x509.CertificateRevocationList: |
| 177 | + """ |
| 178 | + Create a CRL with a specific last_update timestamp. |
| 179 | +
|
| 180 | + Args: |
| 181 | + last_update: The last_update timestamp for the CRL |
| 182 | +
|
| 183 | + Returns: |
| 184 | + A CRL object with the specified timestamp |
| 185 | + """ |
| 186 | + builder = x509.CertificateRevocationListBuilder() |
| 187 | + builder = builder.issuer_name(self.ca_certificate.subject) |
| 188 | + builder = builder.last_update(last_update) |
| 189 | + builder = builder.next_update( |
| 190 | + datetime.now(timezone.utc) + timedelta(days=1) |
| 191 | + ) |
| 192 | + |
| 193 | + # Add any revoked certificates |
| 194 | + for serial_number in self.revoked_serial_numbers: |
| 195 | + revoked_cert = ( |
| 196 | + x509.RevokedCertificateBuilder() |
| 197 | + .serial_number(serial_number) |
| 198 | + .revocation_date(datetime.now(timezone.utc)) |
| 199 | + .build() |
| 200 | + ) |
| 201 | + builder = builder.add_revoked_certificate(revoked_cert) |
| 202 | + |
| 203 | + return builder.sign( |
| 204 | + self.ca_private_key, hashes.SHA256(), backend=default_backend() |
| 205 | + ) |
| 206 | + |
169 | 207 | def generate_expired_crl(self) -> bytes: |
170 | 208 | """Generate an expired CRL""" |
171 | 209 | builder = x509.CertificateRevocationListBuilder() |
@@ -1585,6 +1623,7 @@ def test_crl_validator_check_certificate_against_crl_expired( |
1585 | 1623 | # Mock expired CRL |
1586 | 1624 | mock_crl = Mock(spec=x509.CertificateRevocationList) |
1587 | 1625 | mock_crl.next_update_utc = datetime.now(timezone.utc) - timedelta(days=1) # Expired |
| 1626 | + mock_crl.last_update_utc = datetime.now(timezone.utc) - timedelta(days=2) |
1588 | 1627 | mock_crl.get_revoked_certificate_by_serial_number.return_value = None |
1589 | 1628 | mock_crl.issuer = parent.subject |
1590 | 1629 | # Mock extensions to raise ExtensionNotFound for IDP extension |
@@ -2476,3 +2515,122 @@ def test_check_certificate_against_crl_url_with_idp_mismatch( |
2476 | 2515 | ) |
2477 | 2516 |
|
2478 | 2517 | assert result == CRLValidationResult.ERROR |
| 2518 | + |
| 2519 | + |
| 2520 | +@pytest.mark.parametrize("downloaded_crl_is_newer", [True, False]) |
| 2521 | +def test_crl_validator_freshness_validation( |
| 2522 | + cert_gen, session_manager, downloaded_crl_is_newer |
| 2523 | +): |
| 2524 | + """Test that validator uses the most recent CRL based on last_update timestamp""" |
| 2525 | + chain = cert_gen.create_simple_chain() |
| 2526 | + |
| 2527 | + # Create CRLs with different timestamps |
| 2528 | + older_last_update = datetime.now(timezone.utc) - timedelta(days=2) |
| 2529 | + newer_last_update = datetime.now(timezone.utc) |
| 2530 | + older_crl = cert_gen.create_crl_with_timestamp(older_last_update) |
| 2531 | + newer_crl = cert_gen.create_crl_with_timestamp(newer_last_update) |
| 2532 | + |
| 2533 | + # Determine which CRL to cache and which to download |
| 2534 | + cached_crl = older_crl if downloaded_crl_is_newer else newer_crl |
| 2535 | + downloaded_crl = newer_crl if downloaded_crl_is_newer else older_crl |
| 2536 | + |
| 2537 | + # Create cache manager and pre-populate with the cached CRL |
| 2538 | + memory_cache = CRLInMemoryCache(cache_validity_time=timedelta(hours=24)) |
| 2539 | + memory_cache.put( |
| 2540 | + "http://test.com/crl", |
| 2541 | + CRLCacheEntry( |
| 2542 | + crl=cached_crl, download_time=older_last_update |
| 2543 | + ), # use old date to enforce "download" logic |
| 2544 | + ) |
| 2545 | + cache_manager = CRLCacheManager(memory_cache, NoopCRLCache()) |
| 2546 | + |
| 2547 | + validator = CRLValidator( |
| 2548 | + session_manager=session_manager, |
| 2549 | + trusted_certificates=[cert_gen.ca_certificate], |
| 2550 | + cache_manager=cache_manager, |
| 2551 | + ) |
| 2552 | + |
| 2553 | + # Mock _download_crl to return the downloaded CRL |
| 2554 | + download_timestamp = datetime.now(timezone.utc) |
| 2555 | + with mock_patch.object( |
| 2556 | + validator, "_download_crl", return_value=(downloaded_crl, download_timestamp) |
| 2557 | + ) as mock_download, mock_patch.object( |
| 2558 | + validator, "_verify_crl_signature", return_value=True |
| 2559 | + ): |
| 2560 | + validator._check_certificate_against_crl_url( |
| 2561 | + chain.leaf_cert, cert_gen.ca_certificate, "http://test.com/crl" |
| 2562 | + ) |
| 2563 | + # self-check for debug - ensure download was called |
| 2564 | + mock_download.assert_called_once_with("http://test.com/crl") |
| 2565 | + |
| 2566 | + # Verify the cached CRL is the newer one |
| 2567 | + cached_entry = memory_cache.get("http://test.com/crl") |
| 2568 | + assert cached_entry is not None |
| 2569 | + final_cached_last_update = validator._get_crl_last_update(cached_entry.crl) |
| 2570 | + # Compare with tolerance for microseconds (CRL might lose precision) |
| 2571 | + assert abs((final_cached_last_update - newer_last_update).total_seconds()) < 1 |
| 2572 | + |
| 2573 | + |
| 2574 | +def test_get_crl_last_update(cert_gen): |
| 2575 | + """Test helper method to extract last_update from CRL""" |
| 2576 | + # Create a CRL with a known last_update |
| 2577 | + builder = x509.CertificateRevocationListBuilder() |
| 2578 | + builder = builder.issuer_name(cert_gen.ca_certificate.subject) |
| 2579 | + expected_last_update = datetime.now(timezone.utc) |
| 2580 | + builder = builder.last_update(expected_last_update) |
| 2581 | + builder = builder.next_update(datetime.now(timezone.utc) + timedelta(days=1)) |
| 2582 | + crl = builder.sign( |
| 2583 | + cert_gen.ca_private_key, hashes.SHA256(), backend=default_backend() |
| 2584 | + ) |
| 2585 | + |
| 2586 | + validator = CRLValidator( |
| 2587 | + session_manager=Mock(), |
| 2588 | + trusted_certificates=[cert_gen.ca_certificate], |
| 2589 | + ) |
| 2590 | + |
| 2591 | + # Extract last_update |
| 2592 | + last_update = validator._get_crl_last_update(crl) |
| 2593 | + assert last_update is not None |
| 2594 | + # Compare with some tolerance for microseconds |
| 2595 | + assert abs((last_update - expected_last_update).total_seconds()) < 1 |
| 2596 | + |
| 2597 | + |
| 2598 | +def test_is_crl_more_recent(cert_gen): |
| 2599 | + """Test comparison of CRL freshness by last_update timestamp""" |
| 2600 | + # Create two CRLs with different last_update times |
| 2601 | + older_last_update = datetime.now(timezone.utc) - timedelta(hours=2) |
| 2602 | + newer_last_update = datetime.now(timezone.utc) |
| 2603 | + |
| 2604 | + # Build older CRL |
| 2605 | + older_builder = x509.CertificateRevocationListBuilder() |
| 2606 | + older_builder = older_builder.issuer_name(cert_gen.ca_certificate.subject) |
| 2607 | + older_builder = older_builder.last_update(older_last_update) |
| 2608 | + older_builder = older_builder.next_update( |
| 2609 | + datetime.now(timezone.utc) + timedelta(days=1) |
| 2610 | + ) |
| 2611 | + older_crl = older_builder.sign( |
| 2612 | + cert_gen.ca_private_key, hashes.SHA256(), backend=default_backend() |
| 2613 | + ) |
| 2614 | + |
| 2615 | + # Build newer CRL |
| 2616 | + newer_builder = x509.CertificateRevocationListBuilder() |
| 2617 | + newer_builder = newer_builder.issuer_name(cert_gen.ca_certificate.subject) |
| 2618 | + newer_builder = newer_builder.last_update(newer_last_update) |
| 2619 | + newer_builder = newer_builder.next_update( |
| 2620 | + datetime.now(timezone.utc) + timedelta(days=1) |
| 2621 | + ) |
| 2622 | + newer_crl = newer_builder.sign( |
| 2623 | + cert_gen.ca_private_key, hashes.SHA256(), backend=default_backend() |
| 2624 | + ) |
| 2625 | + |
| 2626 | + validator = CRLValidator( |
| 2627 | + session_manager=Mock(), |
| 2628 | + trusted_certificates=[cert_gen.ca_certificate], |
| 2629 | + ) |
| 2630 | + |
| 2631 | + # Test that newer is more recent than older |
| 2632 | + assert validator._is_crl_more_recent(newer_crl, older_crl) is True |
| 2633 | + # Test that older is not more recent than newer |
| 2634 | + assert validator._is_crl_more_recent(older_crl, newer_crl) is False |
| 2635 | + # Test that a CRL is not more recent than itself |
| 2636 | + assert validator._is_crl_more_recent(newer_crl, newer_crl) is False |
0 commit comments