Skip to content

Commit 4c48203

Browse files
committed
fixed tests
Signed-off-by: NAYANAR <[email protected]>
1 parent 5ff75bf commit 4c48203

File tree

6 files changed

+87
-11
lines changed

6 files changed

+87
-11
lines changed

.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ JWT_ALGORITHM=HS256
9797
# Expiry time for generated JWT tokens (in minutes; e.g. 7 days)
9898
TOKEN_EXPIRY=10080
9999

100+
# Require all JWT tokens to have expiration claims (true or false)
101+
REQUIRE_TOKEN_EXPIRATION=false
102+
100103
# Used to derive an AES encryption key for secure auth storage
101104
# Must be a non-empty string (e.g. passphrase or random secret)
102105
AUTH_ENCRYPTION_SECRET=my-test-salt

mcpgateway/config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@
6161
from jsonpath_ng.ext import parse
6262
from jsonpath_ng.jsonpath import JSONPath
6363
from pydantic import field_validator
64-
from pydantic_settings import BaseSettings, NoDecode, SettingsConfigDict
64+
from pydantic_settings import BaseSettings, NoDecode, SettingsConfigDict
65+
from pydantic import Field
6566

6667
logging.basicConfig(
6768
level=logging.INFO,
@@ -116,6 +117,11 @@ class Settings(BaseSettings):
116117
auth_required: bool = True
117118
token_expiry: int = 10080 # minutes
118119

120+
require_token_expiration: bool = Field(
121+
default=False, # Default to flexible mode for backward compatibility
122+
description="Require all JWT tokens to have expiration claims"
123+
)
124+
119125
# Encryption key phrase for auth storage
120126
auth_encryption_secret: str = "my-test-salt"
121127

mcpgateway/utils/create_jwt_token.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ def _create_jwt_token(
105105
if expires_in_minutes > 0:
106106
expire = _dt.datetime.now(_dt.timezone.utc) + _dt.timedelta(minutes=expires_in_minutes)
107107
payload["exp"] = int(expire.timestamp())
108+
else:
109+
# Warn about non-expiring token
110+
print(
111+
"⚠️ WARNING: Creating token without expiration. This is a security risk!\n"
112+
" Consider using --exp with a value > 0 for production use.\n"
113+
" Once JWT API (#425) is available, use it for automatic token renewal.",
114+
file=sys.stderr
115+
)
108116
return jwt.encode(payload, secret, algorithm=algorithm)
109117

110118

mcpgateway/utils/verify_credentials.py

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,18 @@
5353
import jwt
5454
from jwt import PyJWTError
5555

56+
5657
# First-Party
5758
from mcpgateway.config import settings
5859

5960
basic_security = HTTPBasic(auto_error=False)
6061
security = HTTPBearer(auto_error=False)
6162

63+
import logging
64+
logger = logging.getLogger(__name__)
6265

6366
async def verify_jwt_token(token: str) -> dict:
67+
6468
"""Verify and decode a JWT token.
6569
6670
Decodes and validates a JWT token using the configured secret key
@@ -108,29 +112,75 @@ async def verify_jwt_token(token: str) -> dict:
108112
... print(e.status_code, e.detail)
109113
401 Invalid token
110114
"""
115+
# try:
116+
# Decode and validate token
117+
# payload = jwt.decode(
118+
# token,
119+
# settings.jwt_secret_key,
120+
# algorithms=[settings.jwt_algorithm],
121+
# # options={"require": ["exp"]}, # Require expiration
122+
# )
123+
# return payload # Contains the claims (e.g., user info)
124+
# except jwt.ExpiredSignatureError:
125+
# raise HTTPException(
126+
# status_code=status.HTTP_401_UNAUTHORIZED,
127+
# detail="Token has expired",
128+
# headers={"WWW-Authenticate": "Bearer"},
129+
# )
130+
# except PyJWTError:
131+
# raise HTTPException(
132+
# status_code=status.HTTP_401_UNAUTHORIZED,
133+
# detail="Invalid token",
134+
# headers={"WWW-Authenticate": "Bearer"},
135+
# )
111136
try:
112-
# Decode and validate token
137+
# First decode to check claims
138+
unverified = jwt.decode(token, options={"verify_signature": False})
139+
140+
# Check for expiration claim
141+
if "exp" not in unverified and settings.require_token_expiration:
142+
raise jwt.MissingRequiredClaimError("exp")
143+
144+
# Log warning for non-expiring tokens
145+
if "exp" not in unverified:
146+
logger.warning(
147+
"JWT token without expiration accepted. "
148+
"Consider enabling REQUIRE_TOKEN_EXPIRATION for better security. "
149+
f"Token sub: {unverified.get('sub', 'unknown')}"
150+
)
151+
152+
# Full validation
153+
options = {}
154+
if settings.require_token_expiration:
155+
options["require"] = ["exp"]
156+
113157
payload = jwt.decode(
114158
token,
115159
settings.jwt_secret_key,
116160
algorithms=[settings.jwt_algorithm],
117-
# options={"require": ["exp"]}, # Require expiration
161+
options=options
162+
)
163+
return payload
164+
165+
except jwt.MissingRequiredClaimError:
166+
raise HTTPException(
167+
status_code=status.HTTP_401_UNAUTHORIZED,
168+
detail="Token is missing required expiration claim. Set REQUIRE_TOKEN_EXPIRATION=false to allow.",
169+
headers={"WWW-Authenticate": "Bearer"},
118170
)
119-
return payload # Contains the claims (e.g., user info)
120171
except jwt.ExpiredSignatureError:
121172
raise HTTPException(
122173
status_code=status.HTTP_401_UNAUTHORIZED,
123174
detail="Token has expired",
124175
headers={"WWW-Authenticate": "Bearer"},
125176
)
126-
except PyJWTError:
177+
except jwt.PyJWTError:
127178
raise HTTPException(
128179
status_code=status.HTTP_401_UNAUTHORIZED,
129180
detail="Invalid token",
130181
headers={"WWW-Authenticate": "Bearer"},
131182
)
132-
133-
183+
134184
async def verify_credentials(token: str) -> dict:
135185
"""Verify credentials using a JWT token.
136186

tests/security/test_rpc_endpoint_validation.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@ class TestRPCEndpointValidation:
3939
@pytest.fixture
4040
def client(self):
4141
"""Create a test client for the FastAPI app."""
42-
return TestClient(app)
43-
42+
return TestClient(app)
4443
@pytest.fixture
4544
def auth_headers(self):
4645
"""Create authorization headers for testing."""
4746
# You might need to adjust this based on your auth setup
4847
return {"Authorization": "Bearer test-token", "Content-Type": "application/json"}
48+
49+
4950

5051
def test_rpc_endpoint_with_malicious_methods(self, client, auth_headers):
5152
"""Test that malicious method names are rejected before processing.

tests/unit/mcpgateway/utils/test_verify_credentials.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,14 @@
4141
ALGO = "HS256"
4242

4343

44-
def _token(payload: dict, *, exp_delta: int | None = None, secret: str = SECRET) -> str:
44+
# def _token(payload: dict, *, exp_delta: int | None = None, secret: str = SECRET) -> str:
45+
# """Return a signed JWT with optional expiry offset (minutes)."""
46+
# if exp_delta is not None:
47+
# expire = datetime.now(timezone.utc) + timedelta(minutes=exp_delta)
48+
# payload = payload | {"exp": int(expire.timestamp())}
49+
# return jwt.encode(payload, secret, algorithm=ALGO)
50+
51+
def _token(payload: dict, *, exp_delta: int | None = 60, secret: str = SECRET) -> str:
4552
"""Return a signed JWT with optional expiry offset (minutes)."""
4653
if exp_delta is not None:
4754
expire = datetime.now(timezone.utc) + timedelta(minutes=exp_delta)
@@ -56,7 +63,8 @@ def _token(payload: dict, *, exp_delta: int | None = None, secret: str = SECRET)
5663
async def test_verify_jwt_token_success(monkeypatch):
5764
monkeypatch.setattr(vc.settings, "jwt_secret_key", SECRET, raising=False)
5865
monkeypatch.setattr(vc.settings, "jwt_algorithm", ALGO, raising=False)
59-
66+
monkeypatch.setattr(vc.settings, "require_token_expiration", False, raising=False)
67+
6068
token = _token({"sub": "abc"})
6169
data = await vc.verify_jwt_token(token)
6270

0 commit comments

Comments
 (0)