11# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22# SPDX-License-Identifier: Apache-2.0
33import logging
4- from typing import Annotated , Any
4+ from typing import Annotated , Any , NoReturn
55
66from fastapi import Depends , FastAPI , Request
77from jwcrypto .common import JWException
@@ -62,25 +62,24 @@ async def get_token_authorization_code_grant(
6262 client_secret : str | None = None ,
6363 ) -> dict [str , Any ]:
6464 try :
65- token = self .keycloak_openid .token (
65+ return await self .keycloak_openid .a_token (
6666 grant_type = "authorization_code" ,
6767 code = code ,
6868 redirect_uri = self .settings .keycloak .redirect_uri ,
6969 )
70- return token
7170 except KeycloakOperationError as e :
7271 raise AuthorizationError ("Failed to get token" ) from e
7372
74- async def get_current_user (self , access_token : str , request : Request ) -> User : # noqa: WPS231
73+ async def get_current_user (self , access_token : str | None , request : Request ) -> User : # noqa: WPS231, WPS217
7574 if not access_token :
76- log .debug ("No access token found in session. " )
77- self .redirect_to_auth ()
75+ log .debug ("No access token found in session" )
76+ await self .redirect_to_auth ()
7877
7978 refresh_token = request .session .get ("refresh_token" )
8079 try :
8180 # if user is disabled or blocked in Keycloak after the token is issued, he will
8281 # remain authorized until the token expires (not more than 15 minutes in MTS SSO)
83- token_info = self .keycloak_openid .decode_token ( token = access_token )
82+ token_info = await self .keycloak_openid .a_decode_token ( access_token )
8483 except (KeycloakOperationError , JWException ) as e :
8584 log .info ("Access token is invalid or expired: %s" , e )
8685 token_info = None
@@ -89,20 +88,20 @@ async def get_current_user(self, access_token: str, request: Request) -> User:
8988 log .debug ("Access token invalid. Attempting to refresh." )
9089
9190 try :
92- new_tokens = self .keycloak_openid .refresh_token (refresh_token )
91+ new_tokens = await self .keycloak_openid .a_refresh_token (refresh_token )
9392
9493 new_access_token = new_tokens ["access_token" ]
9594 new_refresh_token = new_tokens ["refresh_token" ]
9695 request .session ["access_token" ] = new_access_token
9796 request .session ["refresh_token" ] = new_refresh_token
9897
99- token_info = self .keycloak_openid .decode_token (
98+ token_info = await self .keycloak_openid .a_decode_token (
10099 token = new_access_token ,
101100 )
102101 log .debug ("Access token refreshed and decoded successfully." )
103102 except (KeycloakOperationError , JWException ) as e :
104103 log .debug ("Failed to refresh access token: %s" , e )
105- self .redirect_to_auth ()
104+ await self .redirect_to_auth ()
106105
107106 if not token_info :
108107 raise AuthorizationError ("Invalid token payload" )
@@ -129,8 +128,8 @@ async def get_current_user(self, access_token: str, request: Request) -> User:
129128 )
130129 return user
131130
132- def redirect_to_auth (self ) -> None :
133- auth_url = self .keycloak_openid .auth_url (
131+ async def redirect_to_auth (self ) -> NoReturn :
132+ auth_url = await self .keycloak_openid .a_auth_url (
134133 redirect_uri = self .settings .keycloak .redirect_uri ,
135134 scope = self .settings .keycloak .scope ,
136135 )
@@ -142,7 +141,7 @@ async def logout(self, user: User, refresh_token: str | None) -> None:
142141 return
143142
144143 try :
145- self .keycloak_openid .logout (refresh_token )
144+ await self .keycloak_openid .a_logout (refresh_token )
146145 except KeycloakOperationError as err :
147146 msg = f"Can't logout user: { user .username } "
148147 log .debug ("%s. Error: %s" , msg , err )
0 commit comments