@@ -1025,6 +1025,91 @@ def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs):
10251025 telemetry_context .update_telemetry (response )
10261026 return response
10271027
1028+ def acquire_token_by_username_password (
1029+ self , username , password , scopes , claims_challenge = None , ** kwargs ):
1030+ """Gets a token for a given resource via user credentials.
1031+
1032+ See this page for constraints of Username Password Flow.
1033+ https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
1034+
1035+ :param str username: Typically a UPN in the form of an email address.
1036+ :param str password: The password.
1037+ :param list[str] scopes:
1038+ Scopes requested to access a protected API (a resource).
1039+ :param claims_challenge:
1040+ The claims_challenge parameter requests specific claims requested by the resource provider
1041+ in the form of a claims_challenge directive in the www-authenticate header to be
1042+ returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1043+ It is a string of a JSON object which contains lists of claims being requested from these locations.
1044+
1045+ :return: A dict representing the json response from AAD:
1046+
1047+ - A successful response would contain "access_token" key,
1048+ - an error response would contain "error" and usually "error_description".
1049+ """
1050+ scopes = decorate_scope (scopes , self .client_id )
1051+ telemetry_context = self ._build_telemetry_context (
1052+ self .ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID )
1053+ headers = telemetry_context .generate_headers ()
1054+ data = dict (
1055+ kwargs .pop ("data" , {}),
1056+ claims = _merge_claims_challenge_and_capabilities (
1057+ self ._client_capabilities , claims_challenge ))
1058+ if not self .authority .is_adfs :
1059+ user_realm_result = self .authority .user_realm_discovery (
1060+ username , correlation_id = headers [msal .telemetry .CLIENT_REQUEST_ID ])
1061+ if user_realm_result .get ("account_type" ) == "Federated" :
1062+ response = _clean_up (self ._acquire_token_by_username_password_federated (
1063+ user_realm_result , username , password , scopes = scopes ,
1064+ data = data ,
1065+ headers = headers , ** kwargs ))
1066+ telemetry_context .update_telemetry (response )
1067+ return response
1068+ response = _clean_up (self .client .obtain_token_by_username_password (
1069+ username , password , scope = scopes ,
1070+ headers = headers ,
1071+ data = data ,
1072+ ** kwargs ))
1073+ telemetry_context .update_telemetry (response )
1074+ return response
1075+
1076+ def _acquire_token_by_username_password_federated (
1077+ self , user_realm_result , username , password , scopes = None , ** kwargs ):
1078+ wstrust_endpoint = {}
1079+ if user_realm_result .get ("federation_metadata_url" ):
1080+ wstrust_endpoint = mex_send_request (
1081+ user_realm_result ["federation_metadata_url" ],
1082+ self .http_client )
1083+ if wstrust_endpoint is None :
1084+ raise ValueError ("Unable to find wstrust endpoint from MEX. "
1085+ "This typically happens when attempting MSA accounts. "
1086+ "More details available here. "
1087+ "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication" )
1088+ logger .debug ("wstrust_endpoint = %s" , wstrust_endpoint )
1089+ wstrust_result = wst_send_request (
1090+ username , password ,
1091+ user_realm_result .get ("cloud_audience_urn" , "urn:federation:MicrosoftOnline" ),
1092+ wstrust_endpoint .get ("address" ,
1093+ # Fallback to an AAD supplied endpoint
1094+ user_realm_result .get ("federation_active_auth_url" )),
1095+ wstrust_endpoint .get ("action" ), self .http_client )
1096+ if not ("token" in wstrust_result and "type" in wstrust_result ):
1097+ raise RuntimeError ("Unsuccessful RSTR. %s" % wstrust_result )
1098+ GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
1099+ grant_type = {
1100+ SAML_TOKEN_TYPE_V1 : GRANT_TYPE_SAML1_1 ,
1101+ SAML_TOKEN_TYPE_V2 : self .client .GRANT_TYPE_SAML2 ,
1102+ WSS_SAML_TOKEN_PROFILE_V1_1 : GRANT_TYPE_SAML1_1 ,
1103+ WSS_SAML_TOKEN_PROFILE_V2 : self .client .GRANT_TYPE_SAML2
1104+ }.get (wstrust_result .get ("type" ))
1105+ if not grant_type :
1106+ raise RuntimeError (
1107+ "RSTR returned unknown token type: %s" , wstrust_result .get ("type" ))
1108+ self .client .grant_assertion_encoders .setdefault ( # Register a non-standard type
1109+ grant_type , self .client .encode_saml_assertion )
1110+ return self .client .obtain_token_by_assertion (
1111+ wstrust_result ["token" ], grant_type , scope = scopes , ** kwargs )
1112+
10281113
10291114class PublicClientApplication (ClientApplication ): # browser app or mobile app
10301115
@@ -1176,91 +1261,6 @@ def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
11761261 telemetry_context .update_telemetry (response )
11771262 return response
11781263
1179- def acquire_token_by_username_password (
1180- self , username , password , scopes , claims_challenge = None , ** kwargs ):
1181- """Gets a token for a given resource via user credentials.
1182-
1183- See this page for constraints of Username Password Flow.
1184- https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
1185-
1186- :param str username: Typically a UPN in the form of an email address.
1187- :param str password: The password.
1188- :param list[str] scopes:
1189- Scopes requested to access a protected API (a resource).
1190- :param claims_challenge:
1191- The claims_challenge parameter requests specific claims requested by the resource provider
1192- in the form of a claims_challenge directive in the www-authenticate header to be
1193- returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1194- It is a string of a JSON object which contains lists of claims being requested from these locations.
1195-
1196- :return: A dict representing the json response from AAD:
1197-
1198- - A successful response would contain "access_token" key,
1199- - an error response would contain "error" and usually "error_description".
1200- """
1201- scopes = decorate_scope (scopes , self .client_id )
1202- telemetry_context = self ._build_telemetry_context (
1203- self .ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID )
1204- headers = telemetry_context .generate_headers ()
1205- data = dict (
1206- kwargs .pop ("data" , {}),
1207- claims = _merge_claims_challenge_and_capabilities (
1208- self ._client_capabilities , claims_challenge ))
1209- if not self .authority .is_adfs :
1210- user_realm_result = self .authority .user_realm_discovery (
1211- username , correlation_id = headers [msal .telemetry .CLIENT_REQUEST_ID ])
1212- if user_realm_result .get ("account_type" ) == "Federated" :
1213- response = _clean_up (self ._acquire_token_by_username_password_federated (
1214- user_realm_result , username , password , scopes = scopes ,
1215- data = data ,
1216- headers = headers , ** kwargs ))
1217- telemetry_context .update_telemetry (response )
1218- return response
1219- response = _clean_up (self .client .obtain_token_by_username_password (
1220- username , password , scope = scopes ,
1221- headers = headers ,
1222- data = data ,
1223- ** kwargs ))
1224- telemetry_context .update_telemetry (response )
1225- return response
1226-
1227- def _acquire_token_by_username_password_federated (
1228- self , user_realm_result , username , password , scopes = None , ** kwargs ):
1229- wstrust_endpoint = {}
1230- if user_realm_result .get ("federation_metadata_url" ):
1231- wstrust_endpoint = mex_send_request (
1232- user_realm_result ["federation_metadata_url" ],
1233- self .http_client )
1234- if wstrust_endpoint is None :
1235- raise ValueError ("Unable to find wstrust endpoint from MEX. "
1236- "This typically happens when attempting MSA accounts. "
1237- "More details available here. "
1238- "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication" )
1239- logger .debug ("wstrust_endpoint = %s" , wstrust_endpoint )
1240- wstrust_result = wst_send_request (
1241- username , password ,
1242- user_realm_result .get ("cloud_audience_urn" , "urn:federation:MicrosoftOnline" ),
1243- wstrust_endpoint .get ("address" ,
1244- # Fallback to an AAD supplied endpoint
1245- user_realm_result .get ("federation_active_auth_url" )),
1246- wstrust_endpoint .get ("action" ), self .http_client )
1247- if not ("token" in wstrust_result and "type" in wstrust_result ):
1248- raise RuntimeError ("Unsuccessful RSTR. %s" % wstrust_result )
1249- GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
1250- grant_type = {
1251- SAML_TOKEN_TYPE_V1 : GRANT_TYPE_SAML1_1 ,
1252- SAML_TOKEN_TYPE_V2 : self .client .GRANT_TYPE_SAML2 ,
1253- WSS_SAML_TOKEN_PROFILE_V1_1 : GRANT_TYPE_SAML1_1 ,
1254- WSS_SAML_TOKEN_PROFILE_V2 : self .client .GRANT_TYPE_SAML2
1255- }.get (wstrust_result .get ("type" ))
1256- if not grant_type :
1257- raise RuntimeError (
1258- "RSTR returned unknown token type: %s" , wstrust_result .get ("type" ))
1259- self .client .grant_assertion_encoders .setdefault ( # Register a non-standard type
1260- grant_type , self .client .encode_saml_assertion )
1261- return self .client .obtain_token_by_assertion (
1262- wstrust_result ["token" ], grant_type , scope = scopes , ** kwargs )
1263-
12641264
12651265class ConfidentialClientApplication (ClientApplication ): # server-side web app
12661266
0 commit comments