@@ -64,6 +64,21 @@ def _raise_on_invalid_jwt_signature(encoded_jwt):
6464 if len (segments ) < 3 or not segments [2 ]:
6565 raise AdalError ('Failed to sign JWT. This is most likely due to an invalid certificate.' )
6666
67+ def _extract_certs (public_cert_content ):
68+ # Parses raw public certificate file contents and returns a list of strings
69+ # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())}
70+ public_certificates = re .findall (
71+ r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----' ,
72+ public_cert_content , re .I )
73+ if public_certificates :
74+ return [cert .strip () for cert in public_certificates ]
75+ # The public cert tags are not found in the input,
76+ # let's make best effort to exclude a private key pem file.
77+ if "PRIVATE KEY" in public_cert_content :
78+ raise ValueError (
79+ "We expect your public key but detect a private key instead" )
80+ return [public_cert_content .strip ()]
81+
6782class SelfSignedJwt (object ):
6883
6984 NumCharIn128BitHexString = 128 / 8 * 2
@@ -82,7 +97,7 @@ def _create_header(self, thumbprint, public_certificate):
8297 x5t = _create_x5t_value (thumbprint )
8398 header = {'typ' :'JWT' , 'alg' :'RS256' , 'x5t' :x5t }
8499 if public_certificate :
85- header ['x5c' ] = public_certificate
100+ header ['x5c' ] = _extract_certs ( public_certificate )
86101 self ._log .debug ("Creating self signed JWT header. x5t: %(x5t)s, x5c: %(x5c)s" ,
87102 {"x5t" : x5t , "x5c" : public_certificate })
88103
0 commit comments