Skip to content

Commit 81db8d2

Browse files
committed
Username password grant also support cache now
1 parent 1ddddf4 commit 81db8d2

File tree

2 files changed

+27
-35
lines changed

2 files changed

+27
-35
lines changed

msal/application.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,7 @@ class PublicClientApplication(ClientApplication): # browser app or mobile app
268268
def acquire_token_with_username_password(
269269
self, username, password, scope=None, **kwargs):
270270
"""Gets a token for a given resource via user credentails."""
271-
cli = Client(self.client_id, configuration={
272-
"token_endpoint": self.authority.token_endpoint})
273-
return cli.obtain_token_with_username_password(
271+
return self.client.obtain_token_with_username_password(
274272
username, password,
275273
scope=decorate_scope(scope, self.client_id), **kwargs)
276274

tests/test_application.py

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,27 @@ def assertLoosely(self, response, assertion=None,
3535
error_description=response.get("error_description")))
3636
assertion()
3737

38+
def assertCacheWorks(self, result_from_wire):
39+
result = result_from_wire
40+
# Going to test acquire_token_silent(...) to locate an AT from cache
41+
# In practice, you may want to filter based on its "username" field
42+
accounts = self.app.get_accounts()
43+
self.assertNotEqual(0, len(accounts))
44+
result_from_cache = self.app.acquire_token_silent(
45+
CONFIG["scope"], account=accounts[0])
46+
self.assertIsNotNone(result_from_cache)
47+
self.assertEqual(result['access_token'], result_from_cache['access_token'],
48+
"We should get a cached AT")
49+
50+
# Going to test acquire_token_silent(...) to obtain an AT by a RT from cache
51+
self.app.token_cache._cache["AccessToken"] = {} # A hacky way to clear ATs
52+
result_from_cache = self.app.acquire_token_silent(
53+
CONFIG["scope"], account=accounts[0])
54+
self.assertIsNotNone(result_from_cache,
55+
"We should get a result from acquire_token_silent(...) call")
56+
self.assertNotEqual(result['access_token'], result_from_cache['access_token'],
57+
"We should get a fresh AT (via RT)")
58+
3859

3960
@unittest.skipUnless("client_id" in CONFIG, "client_id missing")
4061
class TestConfidentialClientApplication(unittest.TestCase):
@@ -63,20 +84,16 @@ def test_confidential_client_using_certificate(self):
6384

6485

6586
@unittest.skipUnless("client_id" in CONFIG, "client_id missing")
66-
class TestPublicClientApplication(unittest.TestCase):
87+
class TestPublicClientApplication(Oauth2TestCase):
6788

6889
@unittest.skipUnless("username" in CONFIG and "password" in CONFIG, "Missing U/P")
6990
def test_username_password(self):
70-
app = PublicClientApplication(
91+
self.app = PublicClientApplication(
7192
CONFIG["client_id"], authority=CONFIG["authority"])
72-
result = app.acquire_token_with_username_password(
93+
result = self.app.acquire_token_with_username_password(
7394
CONFIG["username"], CONFIG["password"], scope=CONFIG.get("scope"))
74-
if "error" in result:
75-
if result["error"] == "invalid_grant":
76-
raise unittest.SkipTest(result.get("error_description"))
77-
self.assertEqual(result["error"], "interaction_required")
78-
else:
79-
self.assertIn('access_token', result)
95+
self.assertLoosely(result)
96+
self.assertCacheWorks(result)
8097

8198

8299
@unittest.skipUnless("client_id" in CONFIG, "client_id missing")
@@ -107,31 +124,8 @@ def test_auth_code(self):
107124
# Note: No interpolation here, cause error won't always present
108125
error=result.get("error"),
109126
error_description=result.get("error_description")))
110-
111127
self.assertCacheWorks(result)
112128

113-
114-
def assertCacheWorks(self, result_from_wire):
115-
result = result_from_wire
116-
# Going to test acquire_token_silent(...) to locate an AT from cache
117-
# In practice, you may want to filter based on its "username" field
118-
accounts = self.app.get_accounts()
119-
self.assertNotEqual(0, len(accounts))
120-
result_from_cache = self.app.acquire_token_silent(
121-
CONFIG["scope"], account=accounts[0])
122-
self.assertIsNotNone(result_from_cache)
123-
self.assertEqual(result['access_token'], result_from_cache['access_token'],
124-
"We should get a cached AT")
125-
126-
# Going to test acquire_token_silent(...) to obtain an AT by a RT from cache
127-
self.app.token_cache._cache["AccessToken"] = {} # A hacky way to clear ATs
128-
result_from_cache = self.app.acquire_token_silent(
129-
CONFIG["scope"], account=accounts[0])
130-
self.assertIsNotNone(result_from_cache,
131-
"We should get a result from acquire_token_silent(...) call")
132-
self.assertNotEqual(result['access_token'], result_from_cache['access_token'],
133-
"We should get a fresh AT (via RT)")
134-
135129
def test_device_flow(self):
136130
flow = self.app.initiate_device_flow(scope=CONFIG.get("scope"))
137131
logging.warn(flow["message"])

0 commit comments

Comments
 (0)