Skip to content

Commit 5a0b55d

Browse files
authored
Subject Name and Issuer Authentication (#71)
1 parent 0672c72 commit 5a0b55d

File tree

2 files changed

+65
-2
lines changed

2 files changed

+65
-2
lines changed

msal/application.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,22 @@ def decorate_scope(
5050
return list(decorated)
5151

5252

53+
def extract_certs(public_cert_content):
54+
# Parses raw public certificate file contents and returns a list of strings
55+
# Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())}
56+
public_certificates = re.findall(
57+
r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----',
58+
public_cert_content, re.I)
59+
if public_certificates:
60+
return [cert.strip() for cert in public_certificates]
61+
# The public cert tags are not found in the input,
62+
# let's make best effort to exclude a private key pem file.
63+
if "PRIVATE KEY" in public_cert_content:
64+
raise ValueError(
65+
"We expect your public key but detect a private key instead")
66+
return [public_cert_content.strip()]
67+
68+
5369
class ClientApplication(object):
5470

5571
def __init__(
@@ -59,7 +75,7 @@ def __init__(
5975
verify=True, proxies=None, timeout=None):
6076
"""Create an instance of application.
6177
62-
:param client_id: Your app has a clinet_id after you register it on AAD.
78+
:param client_id: Your app has a client_id after you register it on AAD.
6379
:param client_credential:
6480
For :class:`PublicClientApplication`, you simply use `None` here.
6581
For :class:`ConfidentialClientApplication`,
@@ -69,8 +85,12 @@ def __init__(
6985
{
7086
"private_key": "...-----BEGIN PRIVATE KEY-----...",
7187
"thumbprint": "A1B2C3D4E5F6...",
88+
"public_certificate": "...-----BEGIN CERTIFICATE-----..." (Optional. See below.)
7289
}
7390
91+
public_certificate (optional) is public key certificate which is
92+
sent through 'x5c' JWT header only for
93+
subject name and issuer authentication to support cert auto rolls
7494
:param str authority:
7595
A URL that identifies a token authority. It should be of the format
7696
https://login.microsoftonline.com/your_tenant
@@ -113,9 +133,12 @@ def _build_client(self, client_credential, authority):
113133
if isinstance(client_credential, dict):
114134
assert ("private_key" in client_credential
115135
and "thumbprint" in client_credential)
136+
headers = {}
137+
if 'public_certificate' in client_credential:
138+
headers["x5c"] = extract_certs(client_credential['public_certificate'])
116139
signer = JwtSigner(
117140
client_credential["private_key"], algorithm="RS256",
118-
sha1_thumbprint=client_credential.get("thumbprint"))
141+
sha1_thumbprint=client_credential.get("thumbprint"), headers=headers)
119142
client_assertion = signer.sign_assertion(
120143
audience=authority.token_endpoint, issuer=self.client_id)
121144
client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT

tests/test_application.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,46 @@ def test_client_certificate(self):
9999
self.assertIn('access_token', result)
100100
self.assertCacheWorks(result, app.acquire_token_silent(scope, account=None))
101101

102+
def test_extract_a_tag_less_public_cert(self):
103+
pem = "my_cert"
104+
self.assertEqual(["my_cert"], extract_certs(pem))
105+
106+
def test_extract_a_tag_enclosed_cert(self):
107+
pem = """
108+
-----BEGIN CERTIFICATE-----
109+
my_cert
110+
-----END CERTIFICATE-----
111+
"""
112+
self.assertEqual(["my_cert"], extract_certs(pem))
113+
114+
def test_extract_multiple_tag_enclosed_certs(self):
115+
pem = """
116+
-----BEGIN CERTIFICATE-----
117+
my_cert1
118+
-----END CERTIFICATE-----
119+
120+
-----BEGIN CERTIFICATE-----
121+
my_cert2
122+
-----END CERTIFICATE-----
123+
"""
124+
self.assertEqual(["my_cert1", "my_cert2"], extract_certs(pem))
125+
126+
@unittest.skipUnless("public_certificate" in CONFIG, "Missing Public cert")
127+
def test_subject_name_issuer_authentication(self):
128+
assert ("private_key_file" in CONFIG
129+
and "thumbprint" in CONFIG and "public_certificate" in CONFIG)
130+
with open(os.path.join(THIS_FOLDER, CONFIG['private_key_file'])) as f:
131+
pem = f.read()
132+
with open(os.path.join(THIS_FOLDER, CONFIG['public_certificate'])) as f:
133+
public_certificate = f.read()
134+
app = ConfidentialClientApplication(
135+
CONFIG['client_id'], authority=CONFIG["authority"],
136+
client_credential={"private_key": pem, "thumbprint": CONFIG["thumbprint"],
137+
"public_certificate": public_certificate})
138+
scope = CONFIG.get("scope", [])
139+
result = app.acquire_token_for_client(scope)
140+
self.assertIn('access_token', result)
141+
self.assertCacheWorks(result, app.acquire_token_silent(scope, account=None))
102142

103143
@unittest.skipUnless("client_id" in CONFIG, "client_id missing")
104144
class TestPublicClientApplication(Oauth2TestCase):

0 commit comments

Comments
 (0)