Skip to content

Commit 035224a

Browse files
authored
Merge pull request #436 from italia/fix/invalid_x5c_store
Invalid x5c store and status list fix
2 parents 3c34d96 + c76d05a commit 035224a

File tree

6 files changed

+124
-76
lines changed

6 files changed

+124
-76
lines changed

pyeudiw/jwk/parse.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,13 @@ def parse_certificate(cert: str | bytes) -> JWK:
5959
:rtype: JWK
6060
"""
6161

62-
if type(cert) == bytes or type(cert) == str and not cert.startswith("-----BEGIN CERTIFICATE-----"):
63-
cert = DER_cert_to_PEM_cert(cert)
64-
65-
return parse_pem(cert)
66-
62+
try:
63+
return parse_pem(cert)
64+
except Exception:
65+
try:
66+
return parse_pem(DER_cert_to_PEM_cert(cert))
67+
except Exception:
68+
raise InvalidJwk(f"unable to parse key from pem: {cert}")
6769

6870
def parse_b64der(b64der: str) -> JWK:
6971
"""
@@ -88,4 +90,10 @@ def parse_x5c_keys(x5c: list[str]) -> list[JWK]:
8890
:rtype: JWK
8991
"""
9092

91-
return [parse_pem(pem) for pem in x5c]
93+
try:
94+
return [parse_certificate(cert) for cert in x5c]
95+
except Exception:
96+
try:
97+
return [parse_certificate(DER_cert_to_PEM_cert(cert)) for cert in x5c]
98+
except Exception:
99+
raise InvalidJwk(f"unable to parse key from pem chain: {x5c}")

pyeudiw/openid4vp/vp_sd_jwt_vc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def validate(
8989

9090
payload = decode_jwt_payload(token)
9191

92-
if "status" in payload:
92+
if "status" in payload and "status_list" in payload["status"]:
9393
status_list = StatusListTokenHelper.from_status(payload["status"])
9494
if status_list.is_expired() or \
9595
status_list.get_status(payload["status"]["status_list"]["idx"]) > 0:

pyeudiw/tests/jwk/test_parse.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,4 @@ def test_parse_x5c_keys():
5454
assert False
5555
except Exception as e:
5656
assert isinstance(e, InvalidJwk)
57-
assert str(e) == "unable to parse key from pem: invalid_x5c"
57+
assert str(e) == "unable to parse key from pem chain: ['invalid_x5c']"

pyeudiw/tests/satosa/test_backend.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
from pyeudiw.jwt.jwe_helper import JWEHelper
5555
from pyeudiw.satosa.utils.response import JsonResponse
5656
from pyeudiw.tests.x509.test_x509 import gen_chain
57-
from pyeudiw.x509.verify import der_list_to_pem_list
57+
from pyeudiw.x509.verify import to_pem_list
5858
from pyeudiw.jwk.parse import parse_pem
5959

6060
PKEY = {
@@ -113,7 +113,7 @@ class TestOpenID4VPBackend:
113113
def create_backend(self):
114114
db_engine_inst = DBEngine(CONFIG["storage"])
115115

116-
self.chain = der_list_to_pem_list(DEFAULT_X509_CHAIN)
116+
self.chain = to_pem_list(DEFAULT_X509_CHAIN)
117117
issuer_pem = self.chain[-1]
118118
self.x509_leaf_private_key = DEFAULT_X509_LEAF_JWK
119119

pyeudiw/trust/handler/x509.py

Lines changed: 60 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44
from pyeudiw.trust.handler.interface import TrustHandlerInterface
55
from pyeudiw.trust.model.trust_source import TrustSourceData, TrustEvaluationType
66
from pyeudiw.trust.handler.exceptions import InvalidTrustHandlerConfiguration
7-
from pyeudiw.jwk.parse import parse_pem, parse_x5c_keys, parse_certificate
7+
from pyeudiw.jwk.parse import parse_x5c_keys, parse_certificate
88
from cryptojwt.jwk.jwk import key_from_jwk_dict
99
from pyeudiw.x509.verify import (
1010
PEM_cert_to_B64DER_cert,
1111
to_DER_cert,
1212
verify_x509_attestation_chain,
1313
get_expiry_date_from_x5c,
14-
der_list_to_pem_list,
15-
pem_list_to_der_list,
14+
to_pem_list,
15+
to_der_list,
1616
get_x509_info,
1717
get_trust_anchor_from_x5c,
1818
get_certificate_type
@@ -94,7 +94,7 @@ def __init__(
9494
logger.error(f"Invalid x509 leaf certificate using CA {k}. Unmatching private key, the chain will be removed")
9595
continue
9696

97-
chain = pem_list_to_der_list(v) if type(v[0]) == str and v[0].startswith("-----BEGIN CERTIFICATE-----") else v
97+
chain = to_der_list(v)
9898

9999
if verify_x509_attestation_chain(chain):
100100
self.relying_party_certificate_chains_by_ca[k] = chain
@@ -104,79 +104,87 @@ def __init__(
104104

105105
self.private_keys = private_keys
106106

107-
def extract_and_update_trust_materials(
108-
self, issuer: str, trust_source: TrustSourceData
109-
) -> TrustSourceData:
110-
# Return the first valid chain
111-
for ca, chain in self.relying_party_certificate_chains_by_ca.items():
112-
if not verify_x509_attestation_chain(chain):
113-
logger.error(f"Invalid x509 certificate chain using CA {ca}. Chain validation failed, the chain will be removed")
114-
del self.relying_party_certificate_chains_by_ca[ca]
115-
continue
116-
117-
exp = get_expiry_date_from_x5c(chain)
118-
119-
trust_source.add_trust_param(
120-
X509Handler._TRUST_TYPE,
121-
TrustEvaluationType(
122-
attribute_name="x5c",
123-
x5c=der_list_to_pem_list(chain),
124-
expiration_date=exp,
125-
jwks=self.private_keys,
126-
trust_handler_name=self.name,
127-
)
128-
)
129-
130-
return trust_source
131-
132-
return trust_source
133-
134-
def validate_trust_material(
135-
self,
136-
x5c: list[str],
137-
trust_source: TrustSourceData,
138-
) -> tuple[bool, TrustSourceData]:
139-
# TODO: qui c'è del lavoro veramente sporco da fare.
140-
# Bisogna
141-
# (1) normalizzare la rappresentazione della chain a DER; per fare questo bisogna fare inferenza se PEM o Base64+DER
142-
# (2) normalizzare il salvatagggio della chain a PEM
143-
# (3) incrociare le dita che MDOC non si sfasci...
107+
def _verify_chain(self, x5c: list[str]) -> bool:
108+
"""
109+
Verify the x5c chain.
110+
:param x5c: The x5c chain to verify.
111+
:return: True if the chain is valid, False otherwise.
112+
"""
144113
der_chain = [to_DER_cert(cert) for cert in x5c]
145-
pem_chain = der_list_to_pem_list(der_chain)
146114

147115
if len(der_chain) > 1 and not verify_x509_attestation_chain(der_chain):
148116
logger.error(f"Invalid x509 certificate chain. Chain validation failed")
149-
return False, trust_source
117+
return False
150118

151119
issuer = get_trust_anchor_from_x5c(der_chain)
152120

153121
if not issuer:
154122
logger.error("Invalid x509 certificate chain. Issuer not found")
155-
return False, trust_source
123+
return False
156124

157125
if not issuer in self.certificate_authorities:
158126
logger.error("Invalid x509 certificate chain. Issuer not found in the list of trusted CAs")
159-
return False, trust_source
127+
return False
160128

161-
issuer_pem = self.certificate_authorities[issuer]
129+
issuer_cert = self.certificate_authorities[issuer]
162130

163131
try:
164-
issuer_jwk = parse_pem(issuer_pem)
165-
chain_jwks = parse_x5c_keys(x5c)
132+
issuer_jwk = parse_certificate(issuer_cert)
133+
chain_jwks = parse_x5c_keys(der_chain)
166134
except Exception as e:
167-
logger.error("Invalid x509 certificate chain. Parsing failed: {e}")
168-
return False, trust_source
135+
logger.error(f"Invalid x509 certificate chain. Parsing failed: {e}")
136+
return False
169137

170138
if not issuer_jwk.thumbprint == chain_jwks[-1].thumbprint:
171139
logger.error("Invalid x509 certificate chain. Issuer thumbprint does not match")
140+
return False
141+
142+
return True
143+
144+
def extract_and_update_trust_materials(
145+
self, issuer: str, trust_source: TrustSourceData
146+
) -> TrustSourceData:
147+
# Return the first valid chain
148+
if issuer == self.client_id:
149+
for ca, chain in self.relying_party_certificate_chains_by_ca.items():
150+
if not self._verify_chain(chain):
151+
logger.error(f"Invalid x509 certificate chain using CA {ca}. Chain will be ignored")
152+
continue
153+
154+
exp = get_expiry_date_from_x5c(chain)
155+
156+
trust_source.add_trust_param(
157+
X509Handler._TRUST_TYPE,
158+
TrustEvaluationType(
159+
attribute_name="x5c",
160+
x5c=to_pem_list(chain),
161+
expiration_date=exp,
162+
jwks=self.private_keys,
163+
trust_handler_name=self.name,
164+
)
165+
)
166+
167+
return trust_source
168+
169+
return trust_source
170+
171+
def validate_trust_material(
172+
self,
173+
x5c: list[str],
174+
trust_source: TrustSourceData,
175+
) -> tuple[bool, TrustSourceData]:
176+
chain_jwks = parse_x5c_keys(x5c)
177+
valid = self._verify_chain(x5c)
178+
179+
if not valid:
172180
return False, trust_source
173181

174182
trust_source.add_trust_param(
175183
"x509",
176184
TrustEvaluationType(
177185
attribute_name=self.get_handled_trust_material_name(),
178-
x5c=pem_chain,
179-
expiration_date=get_expiry_date_from_x5c(der_chain),
186+
x5c=to_pem_list(x5c),
187+
expiration_date=get_expiry_date_from_x5c(x5c),
180188
jwks=chain_jwks,
181189
trust_handler_name=self.name,
182190
)

pyeudiw/x509/verify.py

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def verify_x509_attestation_chain(x5c: list[bytes]) -> bool:
9090
if not _check_datetime(exp):
9191
return False
9292

93-
pems = [DER_cert_to_PEM_cert(cert) for cert in x5c]
93+
pems = [to_PEM_cert(cert) for cert in x5c]
9494

9595
return _verify_x509_certificate_chain(pems)
9696

@@ -149,8 +149,22 @@ def to_PEM_cert(cer: str | bytes) -> str:
149149
use it unless you do NOT hany prior way to know the actual representation
150150
format of a certificate
151151
"""
152-
raise NotImplementedError("TODO")
152+
cert_s = b""
153153

154+
if isinstance(cer, str):
155+
if is_pem_format(cer):
156+
return cer
157+
cert_s = cer.encode()
158+
else:
159+
cert_s = cer
160+
161+
if cert_s.startswith(b"-----BEGIN CERTIFICATE-----"):
162+
return str(cert_s)
163+
164+
if _BASE64_RE.fullmatch(str(cert_s)):
165+
return B64DER_cert_to_PEM_cert(cert_s)
166+
else:
167+
return DER_cert_to_PEM_cert(cert_s)
154168

155169
def pem_to_pems_list(cert: str) -> list[str]:
156170
"""
@@ -164,29 +178,31 @@ def pem_to_pems_list(cert: str) -> list[str]:
164178
"""
165179
return [str(cert) for cert in pem.parse(cert)]
166180

167-
def der_list_to_pem_list(der_list: list[bytes]) -> list[str]:
181+
def to_pem_list(der_list: list[bytes] | list[str]) -> list[str]:
168182
"""
169-
Convert the x509 certificate chain from DER to PEM.
183+
If the input is a list of DER certificates, it will be converted to a list of PEM certificates.
184+
If the input is a list of PEM certificates, it will be returned as is.
170185
171186
:param der: The x509 certificate chain in DER format
172187
:type der: list[bytes]
173188
174189
:returns: The x509 certificate chain in PEM format
175190
:rtype: list[str]
176191
"""
177-
return [DER_cert_to_PEM_cert(cert) for cert in der_list]
192+
return [to_PEM_cert(cert) for cert in der_list]
178193

179-
def pem_list_to_der_list(pem_list: list[str]) -> list[bytes]:
194+
def to_der_list(pem_list: list[str] | list[bytes]) -> list[bytes]:
180195
"""
181-
Convert the x509 certificate chain from PEM to DER.
196+
If the input is a list of PEM certificates, it will be converted to a list of DER certificates.
197+
If the input is a list of DER certificates, it will be returned as is.
182198
183199
:param pem_list: The x509 certificate chain in PEM format
184200
:type pem_list: list[str]
185201
186202
:returns: The x509 certificate chain in DER format
187203
:rtype: list[bytes]
188204
"""
189-
return [PEM_cert_to_DER_cert(cert) for cert in pem_list]
205+
return [to_DER_cert(cert) for cert in pem_list]
190206

191207
def get_expiry_date_from_x5c(x5c: list[bytes]) -> datetime:
192208
"""
@@ -211,7 +227,7 @@ def verify_x509_anchor(pem_str: str) -> bool:
211227
:returns: True if the x509 anchor certificate is valid else False
212228
:rtype: bool
213229
"""
214-
cert_data = load_der_x509_certificate(PEM_cert_to_DER_cert(pem_str))
230+
cert_data = load_der_x509_certificate(to_DER_cert(pem_str))
215231

216232
if not _check_datetime(cert_data.not_valid_after):
217233
logging.error(LOG_ERROR.format("check datetime failed"))
@@ -258,7 +274,7 @@ def get_issuer_from_x5c(x5c: list[bytes] | list[str]) -> Optional[str]:
258274
:returns: The issuer
259275
:rtype: str
260276
"""
261-
der = x5c[0] if isinstance(x5c[0], bytes) else PEM_cert_to_DER_cert(x5c[0])
277+
der = to_DER_cert(x5c[0])
262278
return get_get_subject_name(der)
263279

264280

@@ -272,7 +288,7 @@ def get_trust_anchor_from_x5c(x5c: list[bytes] | list[str]) -> Optional[str]:
272288
:returns: The issuer
273289
:rtype: str
274290
"""
275-
der = x5c[-1] if isinstance(x5c[-1], bytes) else PEM_cert_to_DER_cert(x5c[-1])
291+
der = to_DER_cert(x5c[-1])
276292
return get_get_subject_name(der)
277293

278294
def get_expiry_date_from_x5c(x5c: list[bytes] | list[str]) -> datetime:
@@ -285,7 +301,7 @@ def get_expiry_date_from_x5c(x5c: list[bytes] | list[str]) -> datetime:
285301
:returns: The expiry date
286302
:rtype: datetime
287303
"""
288-
der = x5c[0] if isinstance(x5c[0], bytes) else PEM_cert_to_DER_cert(x5c[0])
304+
der = to_DER_cert(x5c[0])
289305
cert = load_der_x509_certificate(der)
290306
return cert.not_valid_after
291307

@@ -303,7 +319,7 @@ def get_x509_info(cert: bytes | str, info_type: str = "x509_san_dns") -> str:
303319
"""
304320
get_common_name = lambda cert: cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value
305321

306-
der = cert if isinstance(cert, bytes) else PEM_cert_to_DER_cert(cert)
322+
der = to_DER_cert(cert)
307323
cert: x509.Certificate = load_der_x509_certificate(der, default_backend())
308324

309325
try:
@@ -334,13 +350,29 @@ def is_der_format(cert: bytes) -> str:
334350
except crypto.Error as e:
335351
logging.error(LOG_ERROR.format(e))
336352
return False
353+
354+
def is_pem_format(cert: str) -> str:
355+
"""
356+
Check if the certificate is in PEM format.
337357
358+
:param cert: The certificate
359+
:type cert: bytes
360+
361+
:returns: True if the certificate is in PEM format else False
362+
:rtype: bool
363+
"""
364+
try:
365+
crypto.load_certificate(crypto.FILETYPE_PEM, cert)
366+
return True
367+
except crypto.Error as e:
368+
logging.error(LOG_ERROR.format(e))
369+
return False
338370

339371
def get_public_key_from_x509_chain(x5c: list[bytes]) -> ECKey | RSAKey | dict:
340372
raise NotImplementedError("TODO")
341373

342374
def get_certificate_type(cert: str | bytes) -> str:
343-
pem = cert if isinstance(cert, str) and cert.startswith("-----BEGIN CERTIFICATE-----") else DER_cert_to_PEM_cert(cert)
375+
pem = to_PEM_cert(cert)
344376

345377
cert = x509.load_pem_x509_certificate(pem.encode(), default_backend())
346378
public_key = cert.public_key()

0 commit comments

Comments
 (0)