Skip to content

Commit c7f18e9

Browse files
committed
fix: x509 chain representation
1 parent 4c6a230 commit c7f18e9

File tree

3 files changed

+95
-16
lines changed

3 files changed

+95
-16
lines changed

pyeudiw/trust/handler/interface.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ def validate_trust_material(
134134
"""
135135
raise NotImplementedError
136136

137+
def is_it_me(self, client_id: str) -> bool:
138+
"""
139+
TODO
140+
"""
141+
return client_id == self.default_client_id
142+
137143
@property
138144
def name(self) -> str:
139145
"""
@@ -152,4 +158,7 @@ def default_client_id(self) -> str:
152158
:returns: The default client id of the trust handler
153159
:rtype: str
154160
"""
161+
# TODO: investiga dove questo viene veramente chiamat
162+
# TODO: proposal to rename configured_client_id
163+
# breakpoint()
155164
return self.client_id

pyeudiw/trust/handler/x509.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import base64
21
import logging
3-
from ssl import PEM_cert_to_DER_cert
42
from typing import Union
53

64
from pyeudiw.trust.handler.interface import TrustHandlerInterface
@@ -9,6 +7,8 @@
97
from pyeudiw.jwk.parse import parse_pem, parse_x5c_keys, parse_certificate
108
from cryptojwt.jwk.jwk import key_from_jwk_dict
119
from pyeudiw.x509.verify import (
10+
PEM_cert_to_B64DER_cert,
11+
to_DER_cert,
1212
verify_x509_attestation_chain,
1313
get_expiry_date_from_x5c,
1414
der_list_to_pem_list,
@@ -36,11 +36,13 @@ def __init__(
3636
private_keys: list[dict[str, str]],
3737
client_id_scheme: str = "x509_san_uri",
3838
certificate_authorities: dict[str, str] = [],
39+
include_issued_jwt_header_param: bool = False,
3940
**kwargs
4041
) -> None:
4142
self.client_id = client_id
4243
self.client_id_scheme = client_id_scheme
4344
self.certificate_authorities = certificate_authorities
45+
self.include_issued_jwt_header_param = include_issued_jwt_header_param
4446

4547
if not relying_party_certificate_chains_by_ca:
4648
raise InvalidTrustHandlerConfiguration("No x509 certificate chains provided in the configuration")
@@ -70,7 +72,7 @@ def __init__(
7072
break
7173

7274
if not found_client_id:
73-
logger.error(f"Invalid x509 leaf certificate using CA {k}. Unmatching client id ({client_id}); searched among {search_set}, the chain will be removed")
75+
logger.error(f"Invalid x509 leaf certificate using CA {k}. Unmatching client id ({client_id}); the chain will be removed")
7476
continue
7577

7678
pem_type = get_certificate_type(v[0])
@@ -130,24 +132,30 @@ def extract_and_update_trust_materials(
130132
return trust_source
131133

132134
def validate_trust_material(
133-
self,
134-
x5c: list[str],
135+
self,
136+
x5c: list[str],
135137
trust_source: TrustSourceData,
136138
) -> tuple[bool, TrustSourceData]:
137-
chain = [base64.b64decode(b64der) for b64der in x5c]
138-
139-
if len(chain) > 1 and not verify_x509_attestation_chain(chain):
139+
# TODO: qui c'è del lavoro veramente sporco da fare.
140+
# Bisogna
141+
# (1) normalizzare la rappresentazione della chain a DER; per fare questo bisogna fare inferenza se PEM o Base64+DER
142+
# (2) normalizzare il salvatagggio della chain a PEM
143+
# (3) incrociare le dita che MDOC non si sfasci...
144+
der_chain = [to_DER_cert(cert) for cert in x5c]
145+
pem_chain = der_list_to_pem_list(der_chain)
146+
147+
if len(der_chain) > 1 and not verify_x509_attestation_chain(der_chain):
140148
logger.error(f"Invalid x509 certificate chain. Chain validation failed")
141149
return False, trust_source
142150

143-
issuer = get_trust_anchor_from_x5c(chain)
151+
issuer = get_trust_anchor_from_x5c(der_chain)
144152

145153
if not issuer:
146-
logger.error(f"Invalid x509 certificate chain. Issuer not found")
154+
logger.error("Invalid x509 certificate chain. Issuer not found")
147155
return False, trust_source
148156

149157
if not issuer in self.certificate_authorities:
150-
logger.error(f"Invalid x509 certificate chain. Issuer not found in the list of trusted CAs")
158+
logger.error("Invalid x509 certificate chain. Issuer not found in the list of trusted CAs")
151159
return False, trust_source
152160

153161
issuer_pem = self.certificate_authorities[issuer]
@@ -156,19 +164,19 @@ def validate_trust_material(
156164
issuer_jwk = parse_pem(issuer_pem)
157165
chain_jwks = parse_x5c_keys(x5c)
158166
except Exception as e:
159-
logger.error(f"Invalid x509 certificate chain. Parsing failed: {e}")
167+
logger.error("Invalid x509 certificate chain. Parsing failed: {e}")
160168
return False, trust_source
161169

162170
if not issuer_jwk.thumbprint == chain_jwks[-1].thumbprint:
163-
logger.error(f"Invalid x509 certificate chain. Issuer thumbprint does not match")
171+
logger.error("Invalid x509 certificate chain. Issuer thumbprint does not match")
164172
return False, trust_source
165173

166174
trust_source.add_trust_param(
167175
"x509",
168176
TrustEvaluationType(
169177
attribute_name=self.get_handled_trust_material_name(),
170-
x5c=x5c,
171-
expiration_date=get_expiry_date_from_x5c(chain),
178+
x5c=pem_chain,
179+
expiration_date=get_expiry_date_from_x5c(der_chain),
172180
jwks=chain_jwks,
173181
trust_handler_name=self.name,
174182
)
@@ -179,7 +187,7 @@ def validate_trust_material(
179187
def extract_jwt_header_trust_parameters(self, trust_source: TrustSourceData) -> dict:
180188
tp: dict = trust_source.serialize().get(X509Handler._TRUST_TYPE, {})
181189
if (x5c_pem := tp.get(X509Handler._TRUST_PARAMETER_NAME, None)):
182-
x5c = [base64.b64encode(PEM_cert_to_DER_cert(pem)).decode() for pem in x5c_pem]
190+
x5c = [PEM_cert_to_B64DER_cert(pem) for pem in x5c_pem]
183191
return {"x5c": x5c}
184192
return {}
185193

pyeudiw/x509/verify.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import base64
12
import logging
23
from datetime import datetime
4+
import re
35
from ssl import DER_cert_to_PEM_cert, PEM_cert_to_DER_cert
46

57
import pem
@@ -16,6 +18,8 @@
1618

1719
logger = logging.getLogger(__name__)
1820

21+
_BASE64_RE = re.compile("^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$")
22+
1923

2024
def _verify_x509_certificate_chain(pems: list[str]):
2125
"""
@@ -90,6 +94,64 @@ def verify_x509_attestation_chain(x5c: list[bytes]) -> bool:
9094

9195
return _verify_x509_certificate_chain(pems)
9296

97+
98+
def PEM_cert_to_B64DER_cert(cert: str) -> str:
99+
"""
100+
Takes a certificate in ANSII PEM format and returns the base64
101+
encoding of the corresponding DER certificate.
102+
"""
103+
return base64.b64encode(PEM_cert_to_DER_cert(cert)).decode()
104+
105+
106+
def B64DER_cert_to_PEM_cert(cert: str) -> str:
107+
"""
108+
Takes a certificate Base64 encoded DER and returns the
109+
certificate in ANSII PEM format.
110+
"""
111+
return DER_cert_to_PEM_cert(base64.b64decode(cert))
112+
113+
114+
def to_DER_cert(cert: str | bytes) -> bytes:
115+
"""
116+
This function takes in a certificate with unknown representation
117+
(allegedly, PEM, DER or Base64 encoded DER) and applies some
118+
heuristics to convert it to a DER certificate.
119+
120+
This function should be treated as UNSAFE and inefficient. Do NOT
121+
use it unless you do NOT hany prior way to know the actual representation
122+
format of a certificate
123+
"""
124+
cert_s = ""
125+
if isinstance(cert, bytes):
126+
if is_der_format(cert):
127+
return cert
128+
cert_s = cert.decode()
129+
else:
130+
cert_s = cert
131+
132+
if cert_s.startswith("-----BEGIN CERTIFICATE-----"):
133+
return PEM_cert_to_DER_cert(cert_s)
134+
135+
cert_s = cert_s.replace('\n\r', '')
136+
if _BASE64_RE.fullmatch(cert_s):
137+
return B64DER_cert_to_PEM_cert(cert_s)
138+
139+
raise ValueError("unable to recognize input [cert] as a ccertifficate")
140+
141+
142+
def to_PEM_cert(cer: str | bytes) -> str:
143+
"""
144+
This function takes in a certificate with unknown representation
145+
(allegedly, PEM, DER or Base64 encoded DER) and applies some
146+
heuristics to convert it to a PEM certificate.
147+
148+
This function should be treated as UNSAFE and inefficient. Do NOT
149+
use it unless you do NOT hany prior way to know the actual representation
150+
format of a certificate
151+
"""
152+
raise NotImplementedError("TODO")
153+
154+
93155
def pem_to_pems_list(cert: str) -> list[str]:
94156
"""
95157
Convert the x509 certificate chain from PEM to multiple PEMs.

0 commit comments

Comments
 (0)