@@ -319,3 +319,83 @@ def test_only_client_capabilities_no_claims_merge(self):
319319
320320 def test_both_claims_and_capabilities_none (self ):
321321 self .assertEqual (_merge_claims_challenge_and_capabilities (None , None ), None )
322+
323+
324+ class TestApplicationForRefreshInBehaviors (unittest .TestCase ):
325+ """The following test cases were based on design doc here
326+ https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=%2FRefreshAtExpirationPercentage%2Foverview.md&version=GBdev&_a=preview&anchor=scenarios
327+ """
328+ def setUp (self ):
329+ self .authority_url = "https://login.microsoftonline.com/common"
330+ self .authority = msal .authority .Authority (
331+ self .authority_url , MinimalHttpClient ())
332+ self .scopes = ["s1" , "s2" ]
333+ self .uid = "my_uid"
334+ self .utid = "my_utid"
335+ self .account = {"home_account_id" : "{}.{}" .format (self .uid , self .utid )}
336+ self .rt = "this is a rt"
337+ self .cache = msal .SerializableTokenCache ()
338+ self .client_id = "my_app"
339+ self .app = ClientApplication (
340+ self .client_id , authority = self .authority_url , token_cache = self .cache )
341+
342+ def populate_cache (self , access_token = "at" , expires_in = 86400 , refresh_in = 43200 ):
343+ self .cache .add ({
344+ "client_id" : self .client_id ,
345+ "scope" : self .scopes ,
346+ "token_endpoint" : "{}/oauth2/v2.0/token" .format (self .authority_url ),
347+ "response" : TokenCacheTestCase .build_response (
348+ access_token = access_token ,
349+ expires_in = expires_in , refresh_in = refresh_in ,
350+ uid = self .uid , utid = self .utid , refresh_token = self .rt ),
351+ })
352+
353+ def test_fresh_token_should_be_returned_from_cache (self ):
354+ # a.k.a. Return unexpired token that is not above token refresh expiration threshold
355+ access_token = "An access token prepopulated into cache"
356+ self .populate_cache (access_token = access_token , expires_in = 900 , refresh_in = 450 )
357+ self .assertEqual (
358+ access_token ,
359+ self .app .acquire_token_silent (['s1' ], self .account ).get ("access_token" ))
360+
361+ def test_aging_token_and_available_aad_should_return_new_token (self ):
362+ # a.k.a. Attempt to refresh unexpired token when AAD available
363+ self .populate_cache (access_token = "old AT" , expires_in = 3599 , refresh_in = - 1 )
364+ new_access_token = "new AT"
365+ self .app ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family = (
366+ lambda * args , ** kwargs : {"access_token" : new_access_token })
367+ self .assertEqual (
368+ new_access_token ,
369+ self .app .acquire_token_silent (['s1' ], self .account ).get ("access_token" ))
370+
371+ def test_aging_token_and_unavailable_aad_should_return_old_token (self ):
372+ # a.k.a. Attempt refresh unexpired token when AAD unavailable
373+ old_at = "old AT"
374+ self .populate_cache (access_token = old_at , expires_in = 3599 , refresh_in = - 1 )
375+ self .app ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family = (
376+ lambda * args , ** kwargs : {"error" : "sth went wrong" })
377+ self .assertEqual (
378+ old_at ,
379+ self .app .acquire_token_silent (['s1' ], self .account ).get ("access_token" ))
380+
381+ def test_expired_token_and_unavailable_aad_should_return_error (self ):
382+ # a.k.a. Attempt refresh expired token when AAD unavailable
383+ self .populate_cache (access_token = "expired at" , expires_in = - 1 , refresh_in = - 900 )
384+ error = "something went wrong"
385+ self .app ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family = (
386+ lambda * args , ** kwargs : {"error" : error })
387+ self .assertEqual (
388+ error ,
389+ self .app .acquire_token_silent_with_error ( # This variant preserves error
390+ ['s1' ], self .account ).get ("error" ))
391+
392+ def test_expired_token_and_available_aad_should_return_new_token (self ):
393+ # a.k.a. Attempt refresh expired token when AAD available
394+ self .populate_cache (access_token = "expired at" , expires_in = - 1 , refresh_in = - 900 )
395+ new_access_token = "new AT"
396+ self .app ._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family = (
397+ lambda * args , ** kwargs : {"access_token" : new_access_token })
398+ self .assertEqual (
399+ new_access_token ,
400+ self .app .acquire_token_silent (['s1' ], self .account ).get ("access_token" ))
401+
0 commit comments