|
18 | 18 | from cryptography.hazmat.primitives.asymmetric import ed25519, padding, rsa
|
19 | 19 | from cryptography.hazmat.primitives.ciphers import algorithms
|
20 | 20 | from cryptography.hazmat.primitives.serialization import pkcs7
|
| 21 | +from cryptography.x509.oid import ( |
| 22 | + ExtendedKeyUsageOID, |
| 23 | + ExtensionOID, |
| 24 | + ObjectIdentifier, |
| 25 | +) |
| 26 | +from cryptography.x509.verification import ( |
| 27 | + PolicyBuilder, |
| 28 | + Store, |
| 29 | + VerificationError, |
| 30 | +) |
21 | 31 | from tests.x509.test_x509 import _generate_ca_and_leaf
|
22 | 32 |
|
23 | 33 | from ...hazmat.primitives.fixtures_rsa import (
|
@@ -125,20 +135,153 @@ def test_load_pkcs7_empty_certificates(self):
|
125 | 135 |
|
126 | 136 | def _load_cert_key():
|
127 | 137 | key = load_vectors_from_file(
|
128 |
| - os.path.join("x509", "custom", "ca", "ca_key.pem"), |
| 138 | + os.path.join("pkcs7", "ca_key.pem"), |
129 | 139 | lambda pemfile: serialization.load_pem_private_key(
|
130 | 140 | pemfile.read(), None, unsafe_skip_rsa_key_validation=True
|
131 | 141 | ),
|
132 | 142 | mode="rb",
|
133 | 143 | )
|
134 | 144 | cert = load_vectors_from_file(
|
135 |
| - os.path.join("x509", "custom", "ca", "ca.pem"), |
| 145 | + os.path.join("pkcs7", "ca.pem"), |
136 | 146 | loader=lambda pemfile: x509.load_pem_x509_certificate(pemfile.read()),
|
137 | 147 | mode="rb",
|
138 | 148 | )
|
139 | 149 | return cert, key
|
140 | 150 |
|
141 | 151 |
|
| 152 | +class TestPKCS7VerifyCertificate: |
| 153 | + @staticmethod |
| 154 | + def build_pkcs7_certificate( |
| 155 | + ca: bool = False, |
| 156 | + digital_signature: bool = True, |
| 157 | + usages: typing.Optional[typing.List[ObjectIdentifier]] = None, |
| 158 | + ) -> x509.Certificate: |
| 159 | + """ |
| 160 | + This static method is a helper to build certificates allowing us |
| 161 | + to test all cases in PKCS#7 certificate verification. |
| 162 | + """ |
| 163 | + # Load the standard certificate and private key |
| 164 | + certificate, private_key = _load_cert_key() |
| 165 | + |
| 166 | + # Basic certificate builder |
| 167 | + certificate_builder = ( |
| 168 | + x509.CertificateBuilder() |
| 169 | + .serial_number(certificate.serial_number) |
| 170 | + .subject_name(certificate.subject) |
| 171 | + .issuer_name(certificate.issuer) |
| 172 | + .public_key(private_key.public_key()) |
| 173 | + .not_valid_before(certificate.not_valid_before) |
| 174 | + .not_valid_after(certificate.not_valid_after) |
| 175 | + ) |
| 176 | + |
| 177 | + # Add AuthorityKeyIdentifier extension |
| 178 | + aki = certificate.extensions.get_extension_for_oid( |
| 179 | + ExtensionOID.AUTHORITY_KEY_IDENTIFIER |
| 180 | + ) |
| 181 | + certificate_builder = certificate_builder.add_extension( |
| 182 | + aki.value, critical=False |
| 183 | + ) |
| 184 | + |
| 185 | + # Add SubjectAlternativeName extension |
| 186 | + san = certificate.extensions.get_extension_for_oid( |
| 187 | + ExtensionOID.SUBJECT_ALTERNATIVE_NAME |
| 188 | + ) |
| 189 | + certificate_builder = certificate_builder.add_extension( |
| 190 | + san.value, critical=True |
| 191 | + ) |
| 192 | + |
| 193 | + # Add BasicConstraints extension |
| 194 | + bc_extension = x509.BasicConstraints(ca=ca, path_length=None) |
| 195 | + certificate_builder = certificate_builder.add_extension( |
| 196 | + bc_extension, False |
| 197 | + ) |
| 198 | + |
| 199 | + # Add KeyUsage extension |
| 200 | + ku_extension = x509.KeyUsage( |
| 201 | + digital_signature=digital_signature, |
| 202 | + content_commitment=False, |
| 203 | + key_encipherment=True, |
| 204 | + data_encipherment=True, |
| 205 | + key_agreement=True, |
| 206 | + key_cert_sign=True, |
| 207 | + crl_sign=True, |
| 208 | + encipher_only=False, |
| 209 | + decipher_only=False, |
| 210 | + ) |
| 211 | + certificate_builder = certificate_builder.add_extension( |
| 212 | + ku_extension, True |
| 213 | + ) |
| 214 | + |
| 215 | + # Add valid ExtendedKeyUsage extension |
| 216 | + usages = usages or [ExtendedKeyUsageOID.EMAIL_PROTECTION] |
| 217 | + certificate_builder = certificate_builder.add_extension( |
| 218 | + x509.ExtendedKeyUsage(usages), True |
| 219 | + ) |
| 220 | + |
| 221 | + # Build the certificate |
| 222 | + return certificate_builder.sign( |
| 223 | + private_key, certificate.signature_hash_algorithm, None |
| 224 | + ) |
| 225 | + |
| 226 | + def test_verify_pkcs7_certificate(self): |
| 227 | + # Prepare the parameters |
| 228 | + certificate = self.build_pkcs7_certificate() |
| 229 | + ca_policy, ee_policy = pkcs7.pkcs7_x509_extension_policies() |
| 230 | + |
| 231 | + # Verify the certificate |
| 232 | + verifier = ( |
| 233 | + PolicyBuilder() |
| 234 | + .store(Store([certificate])) |
| 235 | + .extension_policies(ca_policy=ca_policy, ee_policy=ee_policy) |
| 236 | + .build_client_verifier() |
| 237 | + ) |
| 238 | + verifier.verify(certificate, []) |
| 239 | + |
| 240 | + @pytest.mark.parametrize( |
| 241 | + "arguments", |
| 242 | + [ |
| 243 | + {"ca": True}, |
| 244 | + {"digital_signature": False}, |
| 245 | + {"usages": [ExtendedKeyUsageOID.CLIENT_AUTH]}, |
| 246 | + ], |
| 247 | + ) |
| 248 | + def test_verify_invalid_pkcs7_certificate(self, arguments: dict): |
| 249 | + # Prepare the parameters |
| 250 | + certificate = self.build_pkcs7_certificate(**arguments) |
| 251 | + |
| 252 | + # Verify the certificate |
| 253 | + self.verify_invalid_pkcs7_certificate(certificate) |
| 254 | + |
| 255 | + @staticmethod |
| 256 | + def verify_invalid_pkcs7_certificate(certificate: x509.Certificate): |
| 257 | + ca_policy, ee_policy = pkcs7.pkcs7_x509_extension_policies() |
| 258 | + verifier = ( |
| 259 | + PolicyBuilder() |
| 260 | + .store(Store([certificate])) |
| 261 | + .extension_policies(ca_policy=ca_policy, ee_policy=ee_policy) |
| 262 | + .build_client_verifier() |
| 263 | + ) |
| 264 | + |
| 265 | + with pytest.raises(VerificationError): |
| 266 | + verifier.verify(certificate, []) |
| 267 | + |
| 268 | + @pytest.mark.parametrize( |
| 269 | + "filename", ["ca_non_ascii_san.pem", "ca_ascii_san.pem"] |
| 270 | + ) |
| 271 | + def test_verify_pkcs7_certificate_wrong_san(self, filename): |
| 272 | + # Read a certificate with an invalid SAN |
| 273 | + pkcs7_certificate = load_vectors_from_file( |
| 274 | + os.path.join("pkcs7", filename), |
| 275 | + loader=lambda pemfile: x509.load_pem_x509_certificate( |
| 276 | + pemfile.read() |
| 277 | + ), |
| 278 | + mode="rb", |
| 279 | + ) |
| 280 | + |
| 281 | + # Verify the certificate |
| 282 | + self.verify_invalid_pkcs7_certificate(pkcs7_certificate) |
| 283 | + |
| 284 | + |
142 | 285 | @pytest.mark.supported(
|
143 | 286 | only_if=lambda backend: backend.pkcs7_supported(),
|
144 | 287 | skip_message="Requires OpenSSL with PKCS7 support",
|
|
0 commit comments