17
17
... basic_auth_user = 'user'
18
18
... basic_auth_password = 'pass'
19
19
... auth_required = True
20
+ ... require_token_expiration = False
20
21
>>> vc.settings = DummySettings()
21
22
>>> import jwt
22
23
>>> token = jwt.encode({'sub': 'alice'}, 'secret', algorithm='HS256')
39
40
"""
40
41
41
42
# Standard
43
+ import logging
42
44
from typing import Optional
43
45
44
46
# Third-Party
51
53
)
52
54
from fastapi .security .utils import get_authorization_scheme_param
53
55
import jwt
54
- from jwt import PyJWTError
55
56
56
57
# First-Party
57
58
from mcpgateway .config import settings
58
59
59
60
basic_security = HTTPBasic (auto_error = False )
60
61
security = HTTPBearer (auto_error = False )
61
62
63
+ # Standard
64
+ logger = logging .getLogger (__name__ )
65
+
62
66
63
67
async def verify_jwt_token (token : str ) -> dict :
64
68
"""Verify and decode a JWT token.
@@ -74,6 +78,7 @@ async def verify_jwt_token(token: str) -> dict:
74
78
75
79
Raises:
76
80
HTTPException: 401 status if the token has expired or is invalid.
81
+ MissingRequiredClaimError: If the 'exp' claim is required but missing.
77
82
78
83
Examples:
79
84
>>> from mcpgateway.utils import verify_credentials as vc
@@ -83,6 +88,7 @@ async def verify_jwt_token(token: str) -> dict:
83
88
... basic_auth_user = 'user'
84
89
... basic_auth_password = 'pass'
85
90
... auth_required = True
91
+ ... require_token_expiration = False
86
92
>>> vc.settings = DummySettings()
87
93
>>> import jwt
88
94
>>> token = jwt.encode({'sub': 'alice'}, 'secret', algorithm='HS256')
@@ -108,22 +114,60 @@ async def verify_jwt_token(token: str) -> dict:
108
114
... print(e.status_code, e.detail)
109
115
401 Invalid token
110
116
"""
117
+ # try:
118
+ # Decode and validate token
119
+ # payload = jwt.decode(
120
+ # token,
121
+ # settings.jwt_secret_key,
122
+ # algorithms=[settings.jwt_algorithm],
123
+ # # options={"require": ["exp"]}, # Require expiration
124
+ # )
125
+ # return payload # Contains the claims (e.g., user info)
126
+ # except jwt.ExpiredSignatureError:
127
+ # raise HTTPException(
128
+ # status_code=status.HTTP_401_UNAUTHORIZED,
129
+ # detail="Token has expired",
130
+ # headers={"WWW-Authenticate": "Bearer"},
131
+ # )
132
+ # except PyJWTError:
133
+ # raise HTTPException(
134
+ # status_code=status.HTTP_401_UNAUTHORIZED,
135
+ # detail="Invalid token",
136
+ # headers={"WWW-Authenticate": "Bearer"},
137
+ # )
111
138
try :
112
- # Decode and validate token
113
- payload = jwt .decode (
114
- token ,
115
- settings .jwt_secret_key ,
116
- algorithms = [settings .jwt_algorithm ],
117
- # options={"require": ["exp"]}, # Require expiration
139
+ # First decode to check claims
140
+ unverified = jwt .decode (token , options = {"verify_signature" : False })
141
+
142
+ # Check for expiration claim
143
+ if "exp" not in unverified and settings .require_token_expiration :
144
+ raise jwt .MissingRequiredClaimError ("exp" )
145
+
146
+ # Log warning for non-expiring tokens
147
+ if "exp" not in unverified :
148
+ logger .warning ("JWT token without expiration accepted. " "Consider enabling REQUIRE_TOKEN_EXPIRATION for better security. " f"Token sub: { unverified .get ('sub' , 'unknown' )} " )
149
+
150
+ # Full validation
151
+ options = {}
152
+ if settings .require_token_expiration :
153
+ options ["require" ] = ["exp" ]
154
+
155
+ payload = jwt .decode (token , settings .jwt_secret_key , algorithms = [settings .jwt_algorithm ], options = options )
156
+ return payload
157
+
158
+ except jwt .MissingRequiredClaimError :
159
+ raise HTTPException (
160
+ status_code = status .HTTP_401_UNAUTHORIZED ,
161
+ detail = "Token is missing required expiration claim. Set REQUIRE_TOKEN_EXPIRATION=false to allow." ,
162
+ headers = {"WWW-Authenticate" : "Bearer" },
118
163
)
119
- return payload # Contains the claims (e.g., user info)
120
164
except jwt .ExpiredSignatureError :
121
165
raise HTTPException (
122
166
status_code = status .HTTP_401_UNAUTHORIZED ,
123
167
detail = "Token has expired" ,
124
168
headers = {"WWW-Authenticate" : "Bearer" },
125
169
)
126
- except PyJWTError :
170
+ except jwt . PyJWTError :
127
171
raise HTTPException (
128
172
status_code = status .HTTP_401_UNAUTHORIZED ,
129
173
detail = "Invalid token" ,
@@ -154,6 +198,7 @@ async def verify_credentials(token: str) -> dict:
154
198
... basic_auth_user = 'user'
155
199
... basic_auth_password = 'pass'
156
200
... auth_required = True
201
+ ... require_token_expiration = False
157
202
>>> vc.settings = DummySettings()
158
203
>>> import jwt
159
204
>>> token = jwt.encode({'sub': 'alice'}, 'secret', algorithm='HS256')
@@ -194,6 +239,7 @@ async def require_auth(credentials: Optional[HTTPAuthorizationCredentials] = Dep
194
239
... basic_auth_user = 'user'
195
240
... basic_auth_password = 'pass'
196
241
... auth_required = True
242
+ ... require_token_expiration = False
197
243
>>> vc.settings = DummySettings()
198
244
>>> import jwt
199
245
>>> from fastapi.security import HTTPAuthorizationCredentials
@@ -373,6 +419,7 @@ async def require_auth_override(
373
419
... basic_auth_user = 'user'
374
420
... basic_auth_password = 'pass'
375
421
... auth_required = True
422
+ ... require_token_expiration = False
376
423
>>> vc.settings = DummySettings()
377
424
>>> import jwt
378
425
>>> import asyncio
0 commit comments