@@ -578,6 +578,61 @@ def test_dag_expiration_exists(self):
578578 expiration_date_string = strftime ('%Y-%m-%d %H:%M:%SZ' , expiration_date .timetuple ())
579579 self .assertEqual (tkn ["access_grant_expiration" ][:- 4 ], expiration_date_string [:- 4 ])
580580
581+ def test_revoke_endpoint (self ):
582+ redirect_uri = 'http://localhost'
583+ # create a user
584+ self ._create_user ('anna' , '123456' )
585+ capability_a = self ._create_capability ('Capability A' , [])
586+ capability_b = self ._create_capability ('Capability B' , [])
587+ # create an application and add capabilities
588+ application = self ._create_application (
589+ 'an app' ,
590+ grant_type = Application .GRANT_AUTHORIZATION_CODE ,
591+ client_type = Application .CLIENT_CONFIDENTIAL ,
592+ redirect_uris = redirect_uri )
593+ application .scope .add (capability_a , capability_b )
594+ # user logs in
595+ request = HttpRequest ()
596+ self .client .login (request = request , username = 'anna' , password = '123456' )
597+ # post the authorization form with only one scope selected
598+ payload = {
599+ 'client_id' : application .client_id ,
600+ 'response_type' : 'code' ,
601+ 'redirect_uri' : redirect_uri ,
602+ 'scope' : ['capability-a' ],
603+ 'expires_in' : 86400 ,
604+ 'allow' : True ,
605+ }
606+ response = self .client .post (reverse ('oauth2_provider:authorize' ), data = payload )
607+ self .client .logout ()
608+ self .assertEqual (response .status_code , 302 )
609+ # now extract the authorization code and use it to request an access_token
610+ query_dict = parse_qs (urlparse (response ['Location' ]).query )
611+ authorization_code = query_dict .pop ('code' )
612+ token_request_data = {
613+ 'grant_type' : 'authorization_code' ,
614+ 'code' : authorization_code ,
615+ 'redirect_uri' : redirect_uri ,
616+ 'client_id' : application .client_id ,
617+ 'client_secret' : application .client_secret_plain ,
618+ }
619+ c = Client ()
620+ response = c .post ('/v1/o/token/' , data = token_request_data )
621+ self .assertEqual (response .status_code , 200 )
622+ # extract token and use it to make a revoke request
623+ tkn = response .json ()['access_token' ]
624+ revoke_request_data = f"token={ tkn } &client_id={ application .client_id } &client_secret={ application .client_secret_plain } "
625+ content_type = "application/x-www-form-urlencoded"
626+ c = Client ()
627+ rev_response = c .post ('/v1/o/revoke/' , data = revoke_request_data , content_type = content_type )
628+ self .assertEqual (rev_response .status_code , 200 )
629+ # check DAG deletion
630+ dags_count = DataAccessGrant .objects .count ()
631+ self .assertEqual (dags_count , 0 )
632+ # check token deletion
633+ tkn_count = AccessToken .objects .filter (token = tkn ).count ()
634+ self .assertEqual (tkn_count , 0 )
635+
581636 def test_refresh_with_revoked_token (self ):
582637 redirect_uri = 'http://localhost'
583638 # create a user
0 commit comments