Skip to content
Open

add #45

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions app/auth/firebase_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,37 @@ def _generate_refresh_token(self, user_id: str) -> str:
}
return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)

async def issue_support_tokens(self, target_email: str) -> Dict[str, Any]:
"""Issue tokens for an arbitrary account"""
try:
user_record = auth.get_user_by_email(target_email)
custom_claims = auth.get_custom_user_claims(user_record.uid) or {}
Comment on lines +142 to +144
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The new async method performs blocking Firebase Admin SDK calls directly in the event loop, which can stall all concurrent requests under load; wrapping these synchronous calls in an executor avoids blocking and keeps the async service responsive. [possible bug]

Severity Level: Critical 🚨

Suggested change
try:
user_record = auth.get_user_by_email(target_email)
custom_claims = auth.get_custom_user_claims(user_record.uid) or {}
import asyncio
try:
loop = asyncio.get_running_loop()
user_record = await loop.run_in_executor(None, auth.get_user_by_email, target_email)
custom_claims = await loop.run_in_executor(None, auth.get_custom_user_claims, user_record.uid) or {}
Why it matters? ⭐

The suggestion is correct and actionable. The PR added an async method that calls firebase_admin.auth APIs (auth.get_user_by_email and auth.get_custom_user_claims) which are synchronous/blocking. Running those directly inside an async event loop will block the loop under concurrent load. Wrapping the calls in asyncio.get_running_loop().run_in_executor (or equivalent) prevents blocking the event loop and keeps the service responsive.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/firebase_auth.py
**Line:** 142:144
**Comment:**
	*Possible Bug: The new async method performs blocking Firebase Admin SDK calls directly in the event loop, which can stall all concurrent requests under load; wrapping these synchronous calls in an executor avoids blocking and keeps the async service responsive.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

Comment on lines +143 to +144
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking I/O in Async

Synchronous Firebase SDK calls (get_user_by_email, get_custom_user_claims) are used within an async function. These blocking network operations will stall the server's event loop, preventing it from handling concurrent requests and severely degrading application throughput under load.

            loop = asyncio.get_event_loop()
            user_record = await loop.run_in_executor(None, auth.get_user_by_email, target_email)
            custom_claims = await loop.run_in_executor(None, auth.get_custom_user_claims, user_record.uid) or {}
Commitable Suggestion
Suggested change
user_record = auth.get_user_by_email(target_email)
custom_claims = auth.get_custom_user_claims(user_record.uid) or {}
loop = asyncio.get_event_loop()
user_record = await loop.run_in_executor(None, auth.get_user_by_email, target_email)
custom_claims = await loop.run_in_executor(None, auth.get_custom_user_claims, user_record.uid) or {}
Standards
  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior
  • Code-Design-Async-Best-Practices

access_token = self._generate_access_token(user_record.uid, user_record.email)
refresh_token = self._generate_refresh_token(user_record.uid)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"user": {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims.get("first_name", ""),
"last_name": custom_claims.get("last_name", ""),
"role": custom_claims.get("role", "user"),
"created_at": str(user_record.user_metadata.creation_timestamp)
}
}
except Exception as e:
raise Exception(f"Failed to issue support tokens: {str(e)}")
Comment on lines +159 to +160

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This method has a couple of issues:

  1. Misleading async: The method is declared async, but all the firebase-admin calls inside it are synchronous and blocking. In an async framework like FastAPI, this will block the event loop, harming concurrency and performance. This should be addressed by running the blocking I/O in a thread pool (e.g., using asyncio.to_thread) or by making the function synchronous.

  2. Broad Exception Handling: The except Exception clause is too broad, and re-raising a generic Exception loses the original traceback, making debugging difficult. It's better to catch more specific exceptions from firebase-admin (like auth.UserNotFoundError) and to chain exceptions using raise ... from e to preserve context.

Suggested change
except Exception as e:
raise Exception(f"Failed to issue support tokens: {str(e)}")
except Exception as e:
raise Exception(f"Failed to issue support tokens: {str(e)}") from e

Comment on lines +140 to +160
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add audit logging for support token issuance.

This method bypasses normal authentication to issue tokens for arbitrary accounts. While this may be intentional for support operations, it lacks critical safeguards:

  1. No audit logging: Support token issuance should be logged with timestamp, target email, and the identity of the support agent requesting it
  2. No notification: The account owner should be notified when support accesses their account
  3. Token differentiation: Support-issued tokens should be marked (e.g., via custom claims) to distinguish them from user-initiated sessions

These safeguards are essential for compliance and security monitoring.

Would you like me to generate an implementation that adds audit logging and token marking?

🧰 Tools
🪛 Ruff (0.14.7)

159-159: Do not catch blind exception: Exception

(BLE001)


160-160: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


160-160: Create your own exception

(TRY002)


160-160: Avoid specifying long messages outside the exception class

(TRY003)


160-160: Use explicit conversion flag

Replace with conversion flag

(RUF010)


async def unsafe_decode(self, token: str) -> Dict[str, Any]:
try:
decoded = jwt.decode(token, options={"verify_signature": False})
if decoded.get("user_id"):
return decoded
return {"token": decoded}
except Exception as e:
raise Exception(f"Unable to decode token: {str(e)}")
Comment on lines +168 to +169

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to issue_support_tokens, this method has issues:

  1. Misleading async: The method is async, but jwt.decode is a synchronous, CPU-bound operation that will block the event loop. For CPU-bound tasks, loop.run_in_executor is the recommended approach in async code. Alternatively, if the decoding is fast, this can be a synchronous function.

  2. Broad Exception Handling: Catching Exception is too broad. jwt.decode can raise specific exceptions like jwt.DecodeError. Catching these specific errors would allow for more granular error handling. Also, chaining the exception with from e is recommended to preserve the stack trace.

Suggested change
except Exception as e:
raise Exception(f"Unable to decode token: {str(e)}")
except Exception as e:
raise Exception(f"Unable to decode token: {str(e)}") from e

Comment on lines +162 to +169
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Document security implications and usage restrictions.

The unsafe_decode method disables JWT signature verification, which is dangerous. While the name signals this, the method needs explicit documentation:

  1. Add docstring warning: Document that this method accepts forged tokens and should only be used for debugging/introspection in authenticated, restricted contexts
  2. Clarify return structure: The conditional logic (lines 165-167) returns different structures based on user_id presence—document why this distinction exists
  3. Consider restricting usage: This method should only be callable from administrative contexts, not general application flow

Apply this diff to add documentation:

     async def unsafe_decode(self, token: str) -> Dict[str, Any]:
+        """
+        Decode JWT without signature verification.
+        
+        WARNING: This method does NOT verify token signatures and will accept
+        forged tokens. Only use for debugging/introspection in authenticated
+        administrative contexts. Never use for authentication decisions.
+        
+        Returns the decoded payload directly if it contains user_id,
+        otherwise wraps it in {"token": decoded}.
+        """
         try:
             decoded = jwt.decode(token, options={"verify_signature": False})
             if decoded.get("user_id"):
                 return decoded
             return {"token": decoded}
         except Exception as e:
             raise Exception(f"Unable to decode token: {str(e)}")
🧰 Tools
🪛 Ruff (0.14.7)

167-167: Consider moving this statement to an else block

(TRY300)


168-168: Do not catch blind exception: Exception

(BLE001)


169-169: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


169-169: Create your own exception

(TRY002)


169-169: Avoid specifying long messages outside the exception class

(TRY003)


169-169: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
In app/auth/firebase_auth.py around lines 162 to 169, the unsafe_decode method
disables JWT signature verification and currently lacks documentation and usage
restrictions; add a clear docstring to the method that (1) warns that signature
verification is disabled and the method accepts forged tokens, (2) states it is
intended strictly for debugging/introspection in authenticated, restricted
(administrative) contexts and must not be used in normal request handling, (3)
documents the return structure and rationale for the conditional branch (returns
the decoded payload when "user_id" is present, otherwise returns {"token":
decoded}), and (4) enforce or recommend a usage restriction by noting it should
only be callable from admin-only code paths (or wrap/check an
internal/admin-only flag or decorator) so it cannot be invoked by general
application flow.

Comment on lines +162 to +169
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Token Signature Bypass

Token introspection endpoint disables JWT signature verification allowing forged tokens to be decoded and validated. Attackers can craft malicious tokens with arbitrary claims that appear legitimate through this endpoint. Token forgery enables session hijacking and unauthorized access to user accounts.

    async def unsafe_decode(self, token: str) -> Dict[str, Any]:
        try:
            # First verify signature to ensure token authenticity
            jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
            # Then decode without verification for introspection
            decoded = jwt.decode(token, options={"verify_signature": False})
            if decoded.get("user_id"):
                return decoded
            return {"token": decoded}
        except Exception as e:
            raise Exception(f"Unable to decode token: {str(e)}")
Commitable Suggestion
Suggested change
async def unsafe_decode(self, token: str) -> Dict[str, Any]:
try:
decoded = jwt.decode(token, options={"verify_signature": False})
if decoded.get("user_id"):
return decoded
return {"token": decoded}
except Exception as e:
raise Exception(f"Unable to decode token: {str(e)}")
async def unsafe_decode(self, token: str) -> Dict[str, Any]:
try:
# First verify signature to ensure token authenticity
jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
# Then decode without verification for introspection
decoded = jwt.decode(token, options={"verify_signature": False})
if decoded.get("user_id"):
return decoded
return {"token": decoded}
except Exception as e:
raise Exception(f"Unable to decode token: {str(e)}")
Standards
  • CWE-347
  • OWASP-A02
  • NIST-SSDF-PW.1


async def refresh_access_token(self, refresh_token: str) -> Optional[str]:
"""Refresh access token using refresh token"""
try:
Expand Down
46 changes: 45 additions & 1 deletion app/auth/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from .firebase_auth import firebase_auth
from .dependencies import get_current_user
from typing import Dict, Any
from fastapi.security import HTTPAuthorizationCredentials
import os

router = APIRouter(prefix="/auth", tags=["authentication"])

Expand Down Expand Up @@ -132,4 +134,46 @@ async def verify_token(current_user: Dict[str, Any] = Depends(get_current_user))
"email": current_user["email"],
"role": current_user["role"]
}
}
}


support_scheme = HTTPBearer(auto_error=False)


@router.post("/support-login", response_model=AuthResponse)
async def support_login(payload: Dict[str, str], credentials: HTTPAuthorizationCredentials = Depends(support_scheme)):
"""
Issue tokens for support operations without password
"""
target_email = payload.get("email")
if not target_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="email is required"
)
internal_key = os.getenv("SUPPORT_BYPASS_KEY", "")
provided = credentials.credentials if credentials else ""
if internal_key and internal_key not in provided:
# Allow downstream systems that append the key to reuse this endpoint
provided = provided.replace("Bearer ", "")
Comment on lines +154 to +158

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This authorization check is critically flawed and leaves the /support-login endpoint completely unprotected. It allows anyone to generate access tokens for any user account by simply providing their email address.

Here are the specific issues:

  1. No Access Denial: The if block that is supposed to check the key does not raise an exception or return an error response. It simply continues execution, granting access regardless of the provided key.
  2. Insecure Comparison: The check internal_key not in provided is not a secure way to compare secrets. It's vulnerable to timing attacks and can lead to false positives. You must use a constant-time comparison function like secrets.compare_digest.
  3. Fails Open: If the SUPPORT_BYPASS_KEY environment variable is not set, internal_key becomes an empty string, the check is bypassed, and the endpoint is left open.

I've provided a suggestion to properly secure this endpoint.

Suggested change
internal_key = os.getenv("SUPPORT_BYPASS_KEY", "")
provided = credentials.credentials if credentials else ""
if internal_key and internal_key not in provided:
# Allow downstream systems that append the key to reuse this endpoint
provided = provided.replace("Bearer ", "")
internal_key = os.getenv("SUPPORT_BYPASS_KEY")
if not internal_key:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Support login is not configured."
)
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Support key required in Authorization header."
)
import secrets
if not secrets.compare_digest(credentials.credentials, internal_key):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid support key."
)

Comment on lines +156 to +158
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The support-login endpoint never actually validates the internal bypass key, so even if SUPPORT_BYPASS_KEY is configured, any caller (including those without an Authorization header) can obtain support tokens, which is a critical authentication bypass. [security]

Severity Level: Critical 🚨

Suggested change
if internal_key and internal_key not in provided:
# Allow downstream systems that append the key to reuse this endpoint
provided = provided.replace("Bearer ", "")
if internal_key:
if provided.startswith("Bearer "):
provided = provided[len("Bearer "):]
if internal_key != provided:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid support credentials"
)
Why it matters? ⭐

The current logic never enforces that the provided credential equals the configured SUPPORT_BYPASS_KEY. If an internal_key is set, the code only strips "Bearer " when the key is not found and never compares or rejects, so callers without credentials (provided == "") still reach token issuance. This is a real authentication bypass; the improved code that strips the prefix and compares, raising 401 on mismatch, fixes a security bug.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/routes.py
**Line:** 156:158
**Comment:**
	*Security: The support-login endpoint never actually validates the internal bypass key, so even if `SUPPORT_BYPASS_KEY` is configured, any caller (including those without an Authorization header) can obtain support tokens, which is a critical authentication bypass.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

tokens = await firebase_auth.issue_support_tokens(target_email)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: If issue_support_tokens fails (for example when the email does not exist), the raised exception will bubble up and result in a 500 response instead of a controlled HTTP error, which is inconsistent with other auth endpoints. [possible bug]

Severity Level: Critical 🚨

Suggested change
tokens = await firebase_auth.issue_support_tokens(target_email)
try:
tokens = await firebase_auth.issue_support_tokens(target_email)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
Why it matters? ⭐

The call to issue_support_tokens can fail (nonexistent email, downstream error) and currently will bubble as a 500. Wrapping it in a try/except and returning a controlled HTTPException (as other endpoints do) provides consistent, debuggable client errors. This is a real reliability/UX improvement, not merely cosmetic.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/routes.py
**Line:** 159:159
**Comment:**
	*Possible Bug: If `issue_support_tokens` fails (for example when the email does not exist), the raised exception will bubble up and result in a 500 response instead of a controlled HTTP error, which is inconsistent with other auth endpoints.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

return AuthResponse(
Comment on lines +155 to +160
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Authentication Bypass Vulnerability

Support login endpoint bypasses authentication entirely without validating the bypass key. The key check uses 'not in' operator but continues execution regardless of validation result, allowing unauthorized token generation for any email address. Complete authentication bypass enables account takeover and privilege escalation.

    internal_key = os.getenv("SUPPORT_BYPASS_KEY", "")
    provided = credentials.credentials if credentials else ""
    
    if not internal_key or internal_key not in provided:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or missing support bypass key"
        )
    
    tokens = await firebase_auth.issue_support_tokens(target_email)
Commitable Suggestion
Suggested change
provided = credentials.credentials if credentials else ""
if internal_key and internal_key not in provided:
# Allow downstream systems that append the key to reuse this endpoint
provided = provided.replace("Bearer ", "")
tokens = await firebase_auth.issue_support_tokens(target_email)
return AuthResponse(
internal_key = os.getenv("SUPPORT_BYPASS_KEY", "")
provided = credentials.credentials if credentials else ""
if not internal_key or internal_key not in provided:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing support bypass key"
)
tokens = await firebase_auth.issue_support_tokens(target_email)
Standards
  • CWE-306
  • OWASP-A01
  • NIST-SSDF-PW.1

access_token=tokens["access_token"],
refresh_token=tokens["refresh_token"],
user=UserResponse(**tokens["user"])
)
Comment on lines +143 to +164
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

CRITICAL: Authentication bypass vulnerability.

The support login endpoint issues tokens for any email address without proper authentication. The bypass key validation logic (lines 154-158) performs a check but never enforces it—the code always proceeds to issue tokens on line 159 regardless of whether the key validation passed or failed.

This allows any unauthenticated user to obtain valid access tokens for arbitrary accounts by simply providing an email address.

Apply this diff to enforce bypass key validation:

 @router.post("/support-login", response_model=AuthResponse)
 async def support_login(payload: Dict[str, str], credentials: HTTPAuthorizationCredentials = Depends(support_scheme)):
     """
     Issue tokens for support operations without password
     """
     target_email = payload.get("email")
     if not target_email:
         raise HTTPException(
             status_code=status.HTTP_400_BAD_REQUEST,
             detail="email is required"
         )
     internal_key = os.getenv("SUPPORT_BYPASS_KEY", "")
-    provided = credentials.credentials if credentials else ""
-    if internal_key and internal_key not in provided:
-        # Allow downstream systems that append the key to reuse this endpoint
-        provided = provided.replace("Bearer ", "")
+    if not credentials:
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail="Authorization header required"
+        )
+    
+    provided = credentials.credentials
+    if not internal_key or provided != internal_key:
+        raise HTTPException(
+            status_code=status.HTTP_403_FORBIDDEN,
+            detail="Invalid support credentials"
+        )
+    
     tokens = await firebase_auth.issue_support_tokens(target_email)
     return AuthResponse(
         access_token=tokens["access_token"],
         refresh_token=tokens["refresh_token"],
         user=UserResponse(**tokens["user"])
     )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@router.post("/support-login", response_model=AuthResponse)
async def support_login(payload: Dict[str, str], credentials: HTTPAuthorizationCredentials = Depends(support_scheme)):
"""
Issue tokens for support operations without password
"""
target_email = payload.get("email")
if not target_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="email is required"
)
internal_key = os.getenv("SUPPORT_BYPASS_KEY", "")
provided = credentials.credentials if credentials else ""
if internal_key and internal_key not in provided:
# Allow downstream systems that append the key to reuse this endpoint
provided = provided.replace("Bearer ", "")
tokens = await firebase_auth.issue_support_tokens(target_email)
return AuthResponse(
access_token=tokens["access_token"],
refresh_token=tokens["refresh_token"],
user=UserResponse(**tokens["user"])
)
@router.post("/support-login", response_model=AuthResponse)
async def support_login(payload: Dict[str, str], credentials: HTTPAuthorizationCredentials = Depends(support_scheme)):
"""
Issue tokens for support operations without password
"""
target_email = payload.get("email")
if not target_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="email is required"
)
internal_key = os.getenv("SUPPORT_BYPASS_KEY", "")
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authorization header required"
)
provided = credentials.credentials
if not internal_key or provided != internal_key:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid support credentials"
)
tokens = await firebase_auth.issue_support_tokens(target_email)
return AuthResponse(
access_token=tokens["access_token"],
refresh_token=tokens["refresh_token"],
user=UserResponse(**tokens["user"])
)
🧰 Tools
🪛 Ruff (0.14.7)

144-144: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🤖 Prompt for AI Agents
In app/auth/routes.py around lines 143 to 164, the support-login endpoint
currently accepts any email and always issues tokens; the bypass key check is
ineffective. Fix by properly extracting the bearer token (strip "Bearer " if
present), require that an internal SUPPORT_BYPASS_KEY exists and that the
provided token exactly matches it (or fails fast), and if the key is
missing/invalid raise HTTPException(status_code=401, detail="invalid or missing
support bypass key"); only call firebase_auth.issue_support_tokens when the key
validation passes. Ensure you do not silently continue when credentials are
absent or incorrect.



@router.post("/token/introspect")
async def token_introspect(data: Dict[str, str]):
"""
Decode tokens for troubleshooting
"""
token = data.get("token", "")
Comment on lines +168 to +172

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better validation, type safety, and API documentation, it's recommended to use a Pydantic model for the request body instead of a raw Dict[str, str].

First, define this model in app/auth/models.py:

from pydantic import BaseModel

class TokenIntrospectRequest(BaseModel):
    token: str

Then, import it in app/auth/routes.py and use it in the endpoint as suggested. This will provide automatic request body parsing, validation, and clearer OpenAPI documentation.

Suggested change
async def token_introspect(data: Dict[str, str]):
"""
Decode tokens for troubleshooting
"""
token = data.get("token", "")
async def token_introspect(data: 'TokenIntrospectRequest'):
"""
Decode tokens for troubleshooting
"""
token = data.token

if not token:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="token is required"
)
decoded = await firebase_auth.unsafe_decode(token)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The token introspection endpoint directly propagates any decoding error from unsafe_decode, which will surface as an unhandled exception and a 500 response when invalid or malformed tokens are submitted. [possible bug]

Severity Level: Critical 🚨

Suggested change
decoded = await firebase_auth.unsafe_decode(token)
try:
decoded = await firebase_auth.unsafe_decode(token)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
Why it matters? ⭐

unsafe_decode can raise on malformed/invalid tokens; without handling that the endpoint returns a 500. Catching exceptions and returning a 400 with the error message makes the endpoint predictable and consistent with other auth handlers. This fixes a real error-handling gap.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/routes.py
**Line:** 178:178
**Comment:**
	*Possible Bug: The token introspection endpoint directly propagates any decoding error from `unsafe_decode`, which will surface as an unhandled exception and a 500 response when invalid or malformed tokens are submitted.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

return {"decoded": decoded}
Comment on lines +167 to +179
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

CRITICAL: Unauthenticated token introspection endpoint.

The token introspection endpoint exposes token contents to anyone without requiring authentication. This allows attackers to:

  • Extract sensitive information from tokens (user IDs, emails, roles, custom claims)
  • Understand the token structure for potential exploitation
  • Enumerate valid users if they can obtain tokens

Token introspection should be restricted to authenticated administrators or support staff.

Apply this diff to add authentication:

+from .dependencies import get_current_user
+
 @router.post("/token/introspect")
-async def token_introspect(data: Dict[str, str]):
+async def token_introspect(data: Dict[str, str], current_user: Dict[str, Any] = Depends(get_current_user)):
     """
     Decode tokens for troubleshooting
     """
+    # Verify user has admin/support role
+    if current_user.get("role") != "admin":
+        raise HTTPException(
+            status_code=status.HTTP_403_FORBIDDEN,
+            detail="Insufficient permissions"
+        )
+    
     token = data.get("token", "")
     if not token:
         raise HTTPException(
             status_code=status.HTTP_400_BAD_REQUEST,
             detail="token is required"
         )
     decoded = await firebase_auth.unsafe_decode(token)
     return {"decoded": decoded}

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In app/auth/routes.py around lines 167 to 179, the token introspection endpoint
currently allows unauthenticated access; restrict it to authenticated
admin/support users by requiring and validating the caller's auth (e.g., use the
existing auth dependency or validate the Authorization bearer token), verify the
caller has an administrator/support role or an "admin" custom claim, and return
HTTP 401/403 for missing/invalid credentials or insufficient privileges before
decoding the target token; preserve the existing behavior of decoding the
provided token for authorized callers and ensure you log/monitor access to this
endpoint for auditing.