|
4 | 4 | except: # Python 3 |
5 | 5 | from urllib.parse import urljoin |
6 | 6 | import logging |
| 7 | +from base64 import b64encode |
7 | 8 |
|
8 | 9 | from oauth2cli import Client |
9 | 10 | from .authority import Authority |
10 | 11 | from .assertion import create_jwt_assertion |
| 12 | +import mex |
| 13 | +import wstrust_request |
| 14 | +from .wstrust_response import SAML_TOKEN_TYPE_V1, SAML_TOKEN_TYPE_V2 |
11 | 15 | from .token_cache import TokenCache |
12 | 16 |
|
13 | 17 |
|
| 18 | +logger = logging.getLogger(__name__) |
| 19 | + |
14 | 20 | def decorate_scope( |
15 | 21 | scope, client_id, |
16 | 22 | policy=None, # obsolete |
@@ -268,9 +274,40 @@ class PublicClientApplication(ClientApplication): # browser app or mobile app |
268 | 274 | def acquire_token_with_username_password( |
269 | 275 | self, username, password, scope=None, **kwargs): |
270 | 276 | """Gets a token for a given resource via user credentails.""" |
| 277 | + scope = decorate_scope(scope, self.client_id) |
| 278 | + if not self.authority.is_adfs: |
| 279 | + user_realm_result = self.authority.user_realm_discovery(username) |
| 280 | + if user_realm_result.get("account_type") == "Federated": |
| 281 | + return self._acquire_token_with_username_password_federated( |
| 282 | + user_realm_result, username, password, scope=scope, **kwargs) |
271 | 283 | return self.client.obtain_token_with_username_password( |
272 | | - username, password, |
273 | | - scope=decorate_scope(scope, self.client_id), **kwargs) |
| 284 | + username, password, scope=scope, **kwargs) |
| 285 | + |
| 286 | + def _acquire_token_with_username_password_federated( |
| 287 | + self, user_realm_result, username, password, scope=None, **kwargs): |
| 288 | + wstrust_endpoint = {} |
| 289 | + if user_realm_result.get("federation_metadata_url"): |
| 290 | + wstrust_endpoint = mex.send_request( |
| 291 | + user_realm_result["federation_metadata_url"]) |
| 292 | + logger.debug("wstrust_endpoint = %s", wstrust_endpoint) |
| 293 | + wstrust_result = wstrust_request.send_request( |
| 294 | + username, password, user_realm_result.get("cloud_audience_urn"), |
| 295 | + wstrust_endpoint.get("address", |
| 296 | + # Fallback to an AAD supplied endpoint |
| 297 | + user_realm_result.get("federation_active_auth_url")), |
| 298 | + wstrust_endpoint.get("action"), **kwargs) |
| 299 | + if not ("token" in wstrust_result and "type" in wstrust_result): |
| 300 | + raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result) |
| 301 | + grant_type = { |
| 302 | + SAML_TOKEN_TYPE_V1: 'urn:ietf:params:oauth:grant-type:saml1_1-bearer', |
| 303 | + SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2, |
| 304 | + }.get(wstrust_result.get("type")) |
| 305 | + if not grant_type: |
| 306 | + raise RuntimeError( |
| 307 | + "RSTR returned unknown token type: %s", wstrust_result.get("type")) |
| 308 | + return self.client.obtain_token_with_assertion( |
| 309 | + b64encode(wstrust_result["token"]), |
| 310 | + grant_type=grant_type, scope=scope, **kwargs) |
274 | 311 |
|
275 | 312 | def acquire_token( |
276 | 313 | self, |
|
0 commit comments