Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions src/cryptography/hazmat/primitives/serialization/pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
algorithms,
)
from cryptography.utils import _check_byteslike
from cryptography.x509 import Certificate
from cryptography.x509.oid import ExtendedKeyUsageOID
from cryptography.x509.verification import (
Criticality,
ExtensionPolicy,
Policy,
)

load_pem_pkcs7_certificates = rust_pkcs7.load_pem_pkcs7_certificates

Expand Down Expand Up @@ -53,6 +60,120 @@ class PKCS7Options(utils.Enum):
NoCerts = "Don't embed signer certificate"


def pkcs7_x509_extension_policies() -> tuple[ExtensionPolicy, ExtensionPolicy]:
"""
Gets the default X.509 extension policy for S/MIME, based on RFC 8550.
Visit https://www.rfc-editor.org/rfc/rfc8550#section-4.4 for more info.
"""
# CA policy
ca_policy = ExtensionPolicy.webpki_defaults_ca()

# EE policy
def _validate_basic_constraints(
policy: Policy, cert: Certificate, bc: x509.BasicConstraints | None
) -> None:
"""
We check that Certificates used as EE (i.e., the cert used to sign
a PKCS#7/SMIME message) must not have ca=true in their basic
constraints extension. RFC 5280 doesn't impose this requirement, but we
firmly agree about it being best practice.
"""
if bc is not None and bc.ca:
raise ValueError("Basic Constraints CA must be False.")

def _validate_key_usage(
policy: Policy, cert: Certificate, ku: x509.KeyUsage | None
) -> None:
"""
Checks that the Key Usage extension, if present, has at least one of
the digital signature or content commitment (formerly non-repudiation)
bits set.
"""
if (
ku is not None
and not ku.digital_signature
and not ku.content_commitment
):
raise ValueError(
"Key Usage, if specified, must have at least one of the "
"digital signature or content commitment (formerly non "
"repudiation) bits set."
)

def _validate_subject_alternative_name(
policy: Policy,
cert: Certificate,
san: x509.SubjectAlternativeName,
) -> None:
"""
For each general name in the SAN, for those which are email addresses:
- If it is an RFC822Name, general part must be ascii.
- If it is an OtherName, general part must be non-ascii.
"""
for general_name in san:
if (
isinstance(general_name, x509.RFC822Name)
and "@" in general_name.value
and not general_name.value.split("@")[0].isascii()
):
raise ValueError(
f"RFC822Name {general_name.value} contains non-ASCII "
"characters."
)
if (
isinstance(general_name, x509.OtherName)
and "@" in general_name.value.decode()
and general_name.value.decode().split("@")[0].isascii()
):
raise ValueError(
f"OtherName {general_name.value.decode()} is ASCII, "
"so must be stored in RFC822Name."
)

def _validate_extended_key_usage(
policy: Policy, cert: Certificate, eku: x509.ExtendedKeyUsage | None
) -> None:
"""
Checks that the Extended Key Usage extension, if present,
includes either emailProtection or anyExtendedKeyUsage bits.
"""
if (
eku is not None
and ExtendedKeyUsageOID.EMAIL_PROTECTION not in eku
and ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE not in eku
):
raise ValueError(
"Extended Key Usage, if specified, must include "
"emailProtection or anyExtendedKeyUsage."
)

ee_policy = (
ExtensionPolicy.webpki_defaults_ee()
.may_be_present(
x509.BasicConstraints,
Criticality.AGNOSTIC,
_validate_basic_constraints,
)
.may_be_present(
x509.KeyUsage,
Criticality.CRITICAL,
_validate_key_usage,
)
.require_present(
x509.SubjectAlternativeName,
Criticality.AGNOSTIC,
_validate_subject_alternative_name,
)
.may_be_present(
x509.ExtendedKeyUsage,
Criticality.AGNOSTIC,
_validate_extended_key_usage,
)
)

return ca_policy, ee_policy


class PKCS7SignatureBuilder:
def __init__(
self,
Expand Down
148 changes: 148 additions & 0 deletions tests/hazmat/primitives/test_pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
from cryptography.hazmat.primitives.asymmetric import ed25519, padding, rsa
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.serialization import pkcs7
from cryptography.x509.oid import (
ExtendedKeyUsageOID,
ObjectIdentifier,
)
from cryptography.x509.verification import (
PolicyBuilder,
Store,
VerificationError,
)
from tests.x509.test_x509 import _generate_ca_and_leaf

from ...hazmat.primitives.fixtures_rsa import (
Expand Down Expand Up @@ -139,6 +148,145 @@ def _load_cert_key():
return cert, key


class TestPKCS7VerifyCertificate:
@staticmethod
def build_pkcs7_certificate(
ca: bool = False,
digital_signature: bool = True,
usages: typing.Optional[typing.List[ObjectIdentifier]] = None,
) -> x509.Certificate:
"""
This static method is a helper to build certificates allowing us
to test all cases in PKCS#7 certificate verification.
"""
# Load the standard certificate and private key
certificate, private_key = _load_cert_key()

# Basic certificate builder
certificate_builder = (
x509.CertificateBuilder()
.serial_number(certificate.serial_number)
.subject_name(certificate.subject)
.issuer_name(certificate.issuer)
.public_key(private_key.public_key())
.not_valid_before(certificate.not_valid_before)
.not_valid_after(certificate.not_valid_after)
)

# Add AuthorityKeyIdentifier extension
aki = x509.AuthorityKeyIdentifier(
b"\xfc\xeb\xb4\xd8\x12\xf2\xc9=\x99\xc3<g\xf4}7}\xe6\x13\xed\xfa",
None,
None,
)
certificate_builder = certificate_builder.add_extension(
aki,
critical=False,
)

# Add SubjectAlternativeName extension
san = x509.SubjectAlternativeName(
[
x509.RFC822Name("[email protected]"),
]
)
certificate_builder = certificate_builder.add_extension(
san,
critical=True,
)

# Add BasicConstraints extension
bc_extension = x509.BasicConstraints(ca=ca, path_length=None)
certificate_builder = certificate_builder.add_extension(
bc_extension, False
)

# Add KeyUsage extension
ku_extension = x509.KeyUsage(
digital_signature=digital_signature,
content_commitment=False,
key_encipherment=True,
data_encipherment=True,
key_agreement=True,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False,
)
certificate_builder = certificate_builder.add_extension(
ku_extension, True
)

# Add valid ExtendedKeyUsage extension
usages = usages or [ExtendedKeyUsageOID.EMAIL_PROTECTION]
certificate_builder = certificate_builder.add_extension(
x509.ExtendedKeyUsage(usages), True
)

# Build the certificate
return certificate_builder.sign(
private_key, certificate.signature_hash_algorithm, None
)

def test_verify_pkcs7_certificate(self):
# Prepare the parameters
certificate = self.build_pkcs7_certificate()
ca_policy, ee_policy = pkcs7.pkcs7_x509_extension_policies()

# Verify the certificate
verifier = (
PolicyBuilder()
.store(Store([certificate]))
.extension_policies(ca_policy=ca_policy, ee_policy=ee_policy)
.build_client_verifier()
)
verifier.verify(certificate, [])

@pytest.mark.parametrize(
"arguments",
[
{"ca": True},
{"digital_signature": False},
{"usages": [ExtendedKeyUsageOID.CLIENT_AUTH]},
],
)
def test_verify_invalid_pkcs7_certificate(self, arguments: dict):
# Prepare the parameters
certificate = self.build_pkcs7_certificate(**arguments)

# Verify the certificate
self.verify_invalid_pkcs7_certificate(certificate)

@staticmethod
def verify_invalid_pkcs7_certificate(certificate: x509.Certificate):
ca_policy, ee_policy = pkcs7.pkcs7_x509_extension_policies()
verifier = (
PolicyBuilder()
.store(Store([certificate]))
.extension_policies(ca_policy=ca_policy, ee_policy=ee_policy)
.build_client_verifier()
)

with pytest.raises(VerificationError):
verifier.verify(certificate, [])

@pytest.mark.parametrize(
"filename", ["non-ascii-san.pem", "ascii-san.pem"]
)
def test_verify_pkcs7_certificate_wrong_san(self, filename):
# Read a certificate with an invalid SAN
pkcs7_certificate = load_vectors_from_file(
os.path.join("pkcs7", filename),
loader=lambda pemfile: x509.load_pem_x509_certificate(
pemfile.read()
),
mode="rb",
)

# Verify the certificate
self.verify_invalid_pkcs7_certificate(pkcs7_certificate)


@pytest.mark.supported(
only_if=lambda backend: backend.pkcs7_supported(),
skip_message="Requires OpenSSL with PKCS7 support",
Expand Down
Loading