1111logging .basicConfig (level = logging .DEBUG )
1212
1313
14- class TokenCacheTestCase (unittest .TestCase ):
14+ # NOTE: These helpers were once implemented as static methods in TokenCacheTestCase.
15+ # That would cause other test files' "from ... import TokenCacheTestCase"
16+ # to re-run all test cases in this file.
17+ # Now we avoid that, by defining these helpers in module level.
18+ def build_id_token (
19+ iss = "issuer" , sub = "subject" , aud = "my_client_id" , exp = None , iat = None ,
20+ ** claims ): # AAD issues "preferred_username", ADFS issues "upn"
21+ return "header.%s.signature" % base64 .b64encode (json .dumps (dict ({
22+ "iss" : iss ,
23+ "sub" : sub ,
24+ "aud" : aud ,
25+ "exp" : exp or (time .time () + 100 ),
26+ "iat" : iat or time .time (),
27+ }, ** claims )).encode ()).decode ('utf-8' )
28+
1529
16- @staticmethod
17- def build_id_token (
18- iss = "issuer" , sub = "subject" , aud = "my_client_id" , exp = None , iat = None ,
19- ** claims ): # AAD issues "preferred_username", ADFS issues "upn"
20- return "header.%s.signature" % base64 .b64encode (json .dumps (dict ({
21- "iss" : iss ,
22- "sub" : sub ,
23- "aud" : aud ,
24- "exp" : exp or (time .time () + 100 ),
25- "iat" : iat or time .time (),
26- }, ** claims )).encode ()).decode ('utf-8' )
30+ def build_response ( # simulate a response from AAD
31+ uid = None , utid = None , # If present, they will form client_info
32+ access_token = None , expires_in = 3600 , token_type = "some type" ,
33+ ** kwargs # Pass-through: refresh_token, foci, id_token, error, refresh_in, ...
34+ ):
35+ response = {}
36+ if uid and utid : # Mimic the AAD behavior for "client_info=1" request
37+ response ["client_info" ] = base64 .b64encode (json .dumps ({
38+ "uid" : uid , "utid" : utid ,
39+ }).encode ()).decode ('utf-8' )
40+ if access_token :
41+ response .update ({
42+ "access_token" : access_token ,
43+ "expires_in" : expires_in ,
44+ "token_type" : token_type ,
45+ })
46+ response .update (kwargs ) # Pass-through key-value pairs as top-level fields
47+ return response
2748
28- @staticmethod
29- def build_response ( # simulate a response from AAD
30- uid = None , utid = None , # If present, they will form client_info
31- access_token = None , expires_in = 3600 , token_type = "some type" ,
32- ** kwargs # Pass-through: refresh_token, foci, id_token, error, refresh_in, ...
33- ):
34- response = {}
35- if uid and utid : # Mimic the AAD behavior for "client_info=1" request
36- response ["client_info" ] = base64 .b64encode (json .dumps ({
37- "uid" : uid , "utid" : utid ,
38- }).encode ()).decode ('utf-8' )
39- if access_token :
40- response .update ({
41- "access_token" : access_token ,
42- "expires_in" : expires_in ,
43- "token_type" : token_type ,
44- })
45- response .update (kwargs ) # Pass-through key-value pairs as top-level fields
46- return response
49+
50+ class TokenCacheTestCase (unittest .TestCase ):
4751
4852 def setUp (self ):
4953 self .cache = TokenCache ()
5054
5155 def testAddByAad (self ):
5256 client_id = "my_client_id"
53- id_token = self . build_id_token (
57+ id_token = build_id_token (
5458 oid = "object1234" , preferred_username = "John Doe" , aud = client_id )
5559 self .cache .add ({
5660 "client_id" : client_id ,
5761 "scope" : ["s2" , "s1" , "s3" ], # Not in particular order
5862 "token_endpoint" : "https://login.example.com/contoso/v2/token" ,
59- "response" : self . build_response (
63+ "response" : build_response (
6064 uid = "uid" , utid = "utid" , # client_info
6165 expires_in = 3600 , access_token = "an access token" ,
6266 id_token = id_token , refresh_token = "a refresh token" ),
@@ -125,12 +129,12 @@ def testAddByAad(self):
125129
126130 def testAddByAdfs (self ):
127131 client_id = "my_client_id"
128- id_token = self . build_id_token (
aud = client_id ,
upn = "[email protected] " )
132+ id_token = build_id_token (
aud = client_id ,
upn = "[email protected] " )
129133 self .cache .add ({
130134 "client_id" : client_id ,
131135 "scope" : ["s2" , "s1" , "s3" ], # Not in particular order
132136 "token_endpoint" : "https://fs.msidlab8.com/adfs/oauth2/token" ,
133- "response" : self . build_response (
137+ "response" : build_response (
134138 uid = None , utid = None , # ADFS will provide no client_info
135139 expires_in = 3600 , access_token = "an access token" ,
136140 id_token = id_token , refresh_token = "a refresh token" ),
@@ -204,7 +208,7 @@ def test_key_id_is_also_recorded(self):
204208 "client_id" : "my_client_id" ,
205209 "scope" : ["s2" , "s1" , "s3" ], # Not in particular order
206210 "token_endpoint" : "https://login.example.com/contoso/v2/token" ,
207- "response" : self . build_response (
211+ "response" : build_response (
208212 uid = "uid" , utid = "utid" , # client_info
209213 expires_in = 3600 , access_token = "an access token" ,
210214 refresh_token = "a refresh token" ),
@@ -219,7 +223,7 @@ def test_refresh_in_should_be_recorded_as_refresh_on(self): # Sounds weird. Yep
219223 "client_id" : "my_client_id" ,
220224 "scope" : ["s2" , "s1" , "s3" ], # Not in particular order
221225 "token_endpoint" : "https://login.example.com/contoso/v2/token" ,
222- "response" : self . build_response (
226+ "response" : build_response (
223227 uid = "uid" , utid = "utid" , # client_info
224228 expires_in = 3600 , refresh_in = 1800 , access_token = "an access token" ,
225229 ), #refresh_token="a refresh token"),
0 commit comments