11import logging
22from datetime import datetime , timedelta , timezone
3- from typing import Optional
3+ from typing import Any , Dict , Optional , cast
44
55from jose import JWTError , jwt
66from sqlalchemy .ext .asyncio import AsyncSession
@@ -81,9 +81,10 @@ async def verify_token(
8181 self .SECRET_KEY ,
8282 algorithms = [self .ALGORITHM ],
8383 )
84- username_or_email : str = payload .get ("sub" )
85- if username_or_email is None :
86- logger .warning ("No username/email found in token" )
84+ payload_dict = cast (Dict [str , Any ], payload )
85+ username_or_email = payload_dict .get ("sub" )
86+ if not isinstance (username_or_email , str ):
87+ logger .warning ("No valid username/email found in token" )
8788 return None
8889
8990 logger .info ("Token verified successfully" )
@@ -109,7 +110,12 @@ async def blacklist_token(
109110 self .SECRET_KEY ,
110111 algorithms = [self .ALGORITHM ],
111112 )
112- expires_at = datetime .fromtimestamp (payload .get ("exp" ))
113+ payload_dict = cast (Dict [str , Any ], payload )
114+ exp = payload_dict .get ("exp" )
115+ if not isinstance (exp , (int , float )):
116+ logger .error ("Invalid expiration in token" )
117+ return
118+ expires_at = datetime .fromtimestamp (exp )
113119 await self .crud_token_blacklist .create (
114120 db ,
115121 object = AdminTokenBlacklistCreate (token = token , expires_at = expires_at ),
0 commit comments