44from typing import Annotated , Any
55
66from fastapi import Depends , FastAPI , Request
7- from keycloak import KeycloakOpenID
7+ from jwcrypto .common import JWException
8+ from keycloak import KeycloakOpenID , KeycloakOperationError
89
10+ from syncmaster .db .models .user import User
911from syncmaster .exceptions import EntityNotFoundError
10- from syncmaster .exceptions .auth import AuthorizationError
12+ from syncmaster .exceptions .auth import AuthorizationError , LogoutError
1113from syncmaster .exceptions .redirect import RedirectException
1214from syncmaster .server .dependencies import Stub
1315from syncmaster .server .providers .auth .base_provider import AuthProvider
@@ -55,43 +57,39 @@ async def get_token_password_grant(
5557 async def get_token_authorization_code_grant (
5658 self ,
5759 code : str ,
58- redirect_uri : str ,
5960 scopes : list [str ] | None = None ,
6061 client_id : str | None = None ,
6162 client_secret : str | None = None ,
6263 ) -> dict [str , Any ]:
6364 try :
64- redirect_uri = redirect_uri or self .settings .keycloak .redirect_uri
6565 token = self .keycloak_openid .token (
6666 grant_type = "authorization_code" ,
6767 code = code ,
68- redirect_uri = redirect_uri ,
68+ redirect_uri = self . settings . keycloak . redirect_uri ,
6969 )
7070 return token
71- except Exception as e :
71+ except KeycloakOperationError as e :
7272 raise AuthorizationError ("Failed to get token" ) from e
7373
74- async def get_current_user (self , access_token : str , * args , ** kwargs ) -> Any : # noqa: WPS231
75- request : Request = kwargs ["request" ]
76- refresh_token = request .session .get ("refresh_token" )
77-
74+ async def get_current_user (self , access_token : str , request : Request ) -> User : # noqa: WPS231
7875 if not access_token :
7976 log .debug ("No access token found in session." )
80- self .redirect_to_auth (request . url . path )
77+ self .redirect_to_auth ()
8178
79+ refresh_token = request .session .get ("refresh_token" )
8280 try :
8381 # if user is disabled or blocked in Keycloak after the token is issued, he will
8482 # remain authorized until the token expires (not more than 15 minutes in MTS SSO)
8583 token_info = self .keycloak_openid .decode_token (token = access_token )
86- except Exception as e :
84+ except ( KeycloakOperationError , JWException ) as e :
8785 log .info ("Access token is invalid or expired: %s" , e )
8886 token_info = None
8987
9088 if not token_info and refresh_token :
9189 log .debug ("Access token invalid. Attempting to refresh." )
9290
9391 try :
94- new_tokens = await self .refresh_access_token (refresh_token )
92+ new_tokens = self .keycloak_openid . refresh_token (refresh_token )
9593
9694 new_access_token = new_tokens ["access_token" ]
9795 new_refresh_token = new_tokens ["refresh_token" ]
@@ -102,9 +100,9 @@ async def get_current_user(self, access_token: str, *args, **kwargs) -> Any: #
102100 token = new_access_token ,
103101 )
104102 log .debug ("Access token refreshed and decoded successfully." )
105- except Exception as e :
103+ except ( KeycloakOperationError , JWException ) as e :
106104 log .debug ("Failed to refresh access token: %s" , e )
107- self .redirect_to_auth (request . url . path )
105+ self .redirect_to_auth ()
108106
109107 if not token_info :
110108 raise AuthorizationError ("Invalid token payload" )
@@ -131,13 +129,21 @@ async def get_current_user(self, access_token: str, *args, **kwargs) -> Any: #
131129 )
132130 return user
133131
134- async def refresh_access_token (self , refresh_token : str ) -> dict [str , Any ]:
135- new_tokens = self .keycloak_openid .refresh_token (refresh_token )
136- return new_tokens
137-
138- def redirect_to_auth (self , path : str ) -> None :
132+ def redirect_to_auth (self ) -> None :
139133 auth_url = self .keycloak_openid .auth_url (
140134 redirect_uri = self .settings .keycloak .redirect_uri ,
141135 scope = self .settings .keycloak .scope ,
142136 )
143137 raise RedirectException (redirect_url = auth_url )
138+
139+ async def logout (self , user : User , refresh_token : str | None ) -> None :
140+ if not refresh_token :
141+ log .debug ("No refresh token found in session." )
142+ return
143+
144+ try :
145+ self .keycloak_openid .logout (refresh_token )
146+ except KeycloakOperationError as err :
147+ msg = f"Can't logout user: { user .username } "
148+ log .debug ("%s. Error: %s" , msg , err )
149+ raise LogoutError (msg ) from err
0 commit comments