@@ -94,19 +94,23 @@ def test_username_password(self):
9494 self .skipUnlessWithConfig (["client_id" , "username" , "password" , "scope" ])
9595 self ._test_username_password (** self .config )
9696
97- def test_auth_code (self ):
98- self .skipUnlessWithConfig (["client_id" , "scope" ])
97+ def _get_app_and_auth_code (self ):
9998 from msal .oauth2cli .authcode import obtain_auth_code
100- self . app = msal .ClientApplication (
99+ app = msal .ClientApplication (
101100 self .config ["client_id" ],
102101 client_credential = self .config .get ("client_secret" ),
103102 authority = self .config .get ("authority" ))
104103 port = self .config .get ("listen_port" , 44331 )
105104 redirect_uri = "http://localhost:%s" % port
106- auth_request_uri = self . app .get_authorization_request_url (
105+ auth_request_uri = app .get_authorization_request_url (
107106 self .config ["scope" ], redirect_uri = redirect_uri )
108107 ac = obtain_auth_code (port , auth_uri = auth_request_uri )
109108 self .assertNotEqual (ac , None )
109+ return (app , ac , redirect_uri )
110+
111+ def test_auth_code (self ):
112+ self .skipUnlessWithConfig (["client_id" , "scope" ])
113+ (self .app , ac , redirect_uri ) = self ._get_app_and_auth_code ()
110114
111115 result = self .app .acquire_token_by_authorization_code (
112116 ac , self .config ["scope" ], redirect_uri = redirect_uri )
@@ -120,6 +124,46 @@ def test_auth_code(self):
120124 error_description = result .get ("error_description" )))
121125 self .assertCacheWorksForUser (result , self .config ["scope" ], username = None )
122126
127+
128+ def test_ssh_cert (self ):
129+ self .skipUnlessWithConfig (["client_id" , "scope" ])
130+
131+ JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
132+ JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}"""
133+ data1 = {"token_type" : "ssh-cert" , "key_id" : "key1" , "req_cnf" : JWK1 }
134+ ssh_test_slice = {
135+ "dc" : "prod-wst-test1" ,
136+ "slice" : "test" ,
137+ "sshcrt" : "true" ,
138+ }
139+
140+ (self .app , ac , redirect_uri ) = self ._get_app_and_auth_code ()
141+
142+ result = self .app .acquire_token_by_authorization_code (
143+ ac , self .config ["scope" ], redirect_uri = redirect_uri , data = data1 ,
144+ params = ssh_test_slice )
145+ self .assertEqual ("ssh-cert" , result ["token_type" ])
146+ logger .debug ("%s.cache = %s" ,
147+ self .id (), json .dumps (self .app .token_cache ._cache , indent = 4 ))
148+
149+ # acquire_token_silent() needs to be passed the same key to work
150+ account = self .app .get_accounts ()[0 ]
151+ result_from_cache = self .app .acquire_token_silent (
152+ self .config ["scope" ], account = account , data = data1 )
153+ self .assertIsNotNone (result_from_cache )
154+ self .assertEqual (
155+ result ['access_token' ], result_from_cache ['access_token' ],
156+ "We should get the cached SSH-cert" )
157+
158+ # refresh_token grant can fetch an ssh-cert bound to a different key
159+ refreshed_ssh_cert = self .app .acquire_token_silent (
160+ self .config ["scope" ], account = account , params = ssh_test_slice ,
161+ data = {"token_type" : "ssh-cert" , "key_id" : "key2" , "req_cnf" : JWK2 })
162+ self .assertIsNotNone (refreshed_ssh_cert )
163+ self .assertEqual (refreshed_ssh_cert ["token_type" ], "ssh-cert" )
164+ self .assertNotEqual (result ["access_token" ], refreshed_ssh_cert ['access_token' ])
165+
166+
123167 def test_client_secret (self ):
124168 self .skipUnlessWithConfig (["client_id" , "client_secret" ])
125169 self .app = msal .ConfidentialClientApplication (
0 commit comments