-
Notifications
You must be signed in to change notification settings - Fork 4
add #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
add #45
Conversation
|
CodeAnt AI is reviewing your PR. Thanks for using CodeAnt! 🎉We're free for open-source projects. if you're enjoying it, help us grow by sharing. Share on X · |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdded passwordless support login functionality to the authentication system, including a new endpoint for issuing support tokens using email validation and HTTPBearer credentials, alongside a token introspection endpoint. Backend auth service now provides token issuance for arbitrary accounts and unsigned JWT decoding capabilities. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant Routes as Auth Routes
participant FirebaseAuth as Firebase Auth Service
Client->>Routes: POST /auth/support-login<br/>(email, credentials)
Routes->>Routes: Validate payload & email
Routes->>Routes: Check SUPPORT_BYPASS_KEY
Routes->>FirebaseAuth: issue_support_tokens(target_email)
FirebaseAuth->>FirebaseAuth: Retrieve user by email
FirebaseAuth->>FirebaseAuth: Read custom claims
FirebaseAuth->>FirebaseAuth: Generate access & refresh tokens
FirebaseAuth-->>Routes: Return tokens + user details
Routes-->>Client: Return AuthResponse
Client->>Routes: POST /auth/token/introspect<br/>(token)
Routes->>FirebaseAuth: unsafe_decode(token)
FirebaseAuth->>FirebaseAuth: Decode JWT (no verification)
FirebaseAuth-->>Routes: Return decoded payload
Routes-->>Client: Return token data
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the authentication system by introducing new functionalities geared towards administrative support and debugging. It provides a secure way for authorized personnel to generate authentication tokens for any user, bypassing standard login procedures, and also offers a utility to inspect the contents of JWT tokens for troubleshooting purposes. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces new endpoints for support and troubleshooting. However, it contains a critical security vulnerability in the support-login endpoint, which is left unprotected, allowing unauthorized access. Additionally, there are performance concerns due to the misuse of async with synchronous, blocking code, which will negatively impact the application's concurrency. I've provided detailed comments and suggestions to address these critical issues and improve code quality.
| internal_key = os.getenv("SUPPORT_BYPASS_KEY", "") | ||
| provided = credentials.credentials if credentials else "" | ||
| if internal_key and internal_key not in provided: | ||
| # Allow downstream systems that append the key to reuse this endpoint | ||
| provided = provided.replace("Bearer ", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This authorization check is critically flawed and leaves the /support-login endpoint completely unprotected. It allows anyone to generate access tokens for any user account by simply providing their email address.
Here are the specific issues:
- No Access Denial: The
ifblock that is supposed to check the key does not raise an exception or return an error response. It simply continues execution, granting access regardless of the provided key. - Insecure Comparison: The check
internal_key not in providedis not a secure way to compare secrets. It's vulnerable to timing attacks and can lead to false positives. You must use a constant-time comparison function likesecrets.compare_digest. - Fails Open: If the
SUPPORT_BYPASS_KEYenvironment variable is not set,internal_keybecomes an empty string, the check is bypassed, and the endpoint is left open.
I've provided a suggestion to properly secure this endpoint.
| internal_key = os.getenv("SUPPORT_BYPASS_KEY", "") | |
| provided = credentials.credentials if credentials else "" | |
| if internal_key and internal_key not in provided: | |
| # Allow downstream systems that append the key to reuse this endpoint | |
| provided = provided.replace("Bearer ", "") | |
| internal_key = os.getenv("SUPPORT_BYPASS_KEY") | |
| if not internal_key: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Support login is not configured." | |
| ) | |
| if not credentials: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Support key required in Authorization header." | |
| ) | |
| import secrets | |
| if not secrets.compare_digest(credentials.credentials, internal_key): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid support key." | |
| ) |
| except Exception as e: | ||
| raise Exception(f"Failed to issue support tokens: {str(e)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method has a couple of issues:
-
Misleading
async: The method is declaredasync, but all thefirebase-admincalls inside it are synchronous and blocking. In an async framework like FastAPI, this will block the event loop, harming concurrency and performance. This should be addressed by running the blocking I/O in a thread pool (e.g., usingasyncio.to_thread) or by making the function synchronous. -
Broad Exception Handling: The
except Exceptionclause is too broad, and re-raising a genericExceptionloses the original traceback, making debugging difficult. It's better to catch more specific exceptions fromfirebase-admin(likeauth.UserNotFoundError) and to chain exceptions usingraise ... from eto preserve context.
| except Exception as e: | |
| raise Exception(f"Failed to issue support tokens: {str(e)}") | |
| except Exception as e: | |
| raise Exception(f"Failed to issue support tokens: {str(e)}") from e |
| except Exception as e: | ||
| raise Exception(f"Unable to decode token: {str(e)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to issue_support_tokens, this method has issues:
-
Misleading
async: The method isasync, butjwt.decodeis a synchronous, CPU-bound operation that will block the event loop. For CPU-bound tasks,loop.run_in_executoris the recommended approach in async code. Alternatively, if the decoding is fast, this can be a synchronous function. -
Broad Exception Handling: Catching
Exceptionis too broad.jwt.decodecan raise specific exceptions likejwt.DecodeError. Catching these specific errors would allow for more granular error handling. Also, chaining the exception withfrom eis recommended to preserve the stack trace.
| except Exception as e: | |
| raise Exception(f"Unable to decode token: {str(e)}") | |
| except Exception as e: | |
| raise Exception(f"Unable to decode token: {str(e)}") from e |
| async def token_introspect(data: Dict[str, str]): | ||
| """ | ||
| Decode tokens for troubleshooting | ||
| """ | ||
| token = data.get("token", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better validation, type safety, and API documentation, it's recommended to use a Pydantic model for the request body instead of a raw Dict[str, str].
First, define this model in app/auth/models.py:
from pydantic import BaseModel
class TokenIntrospectRequest(BaseModel):
token: strThen, import it in app/auth/routes.py and use it in the endpoint as suggested. This will provide automatic request body parsing, validation, and clearer OpenAPI documentation.
| async def token_introspect(data: Dict[str, str]): | |
| """ | |
| Decode tokens for troubleshooting | |
| """ | |
| token = data.get("token", "") | |
| async def token_introspect(data: 'TokenIntrospectRequest'): | |
| """ | |
| Decode tokens for troubleshooting | |
| """ | |
| token = data.token |
Nitpicks 🔍
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
app/auth/routes.py (1)
2-2: Remove duplicate import.
HTTPAuthorizationCredentialsis imported on line 14 butfastapi.securityis already imported on line 2. Consolidate the imports on line 2.Apply this diff:
-from fastapi.security import HTTPBearer +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from .models import ( UserSignupRequest, UserLoginRequest, AuthResponse, UserResponse, TokenResponse, RefreshTokenRequest ) from .firebase_auth import firebase_auth from .dependencies import get_current_user from typing import Dict, Any -from fastapi.security import HTTPAuthorizationCredentials import osAlso applies to: 14-14
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
app/auth/firebase_auth.py(1 hunks)app/auth/routes.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
app/auth/routes.py (2)
app/auth/models.py (2)
AuthResponse(26-30)UserResponse(17-23)app/auth/firebase_auth.py (2)
issue_support_tokens(140-160)unsafe_decode(162-169)
app/auth/firebase_auth.py (1)
app/auth/routes.py (1)
refresh_token(78-99)
🪛 Ruff (0.14.7)
app/auth/routes.py
144-144: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
app/auth/firebase_auth.py
159-159: Do not catch blind exception: Exception
(BLE001)
160-160: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
160-160: Create your own exception
(TRY002)
160-160: Avoid specifying long messages outside the exception class
(TRY003)
160-160: Use explicit conversion flag
Replace with conversion flag
(RUF010)
167-167: Consider moving this statement to an else block
(TRY300)
168-168: Do not catch blind exception: Exception
(BLE001)
169-169: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
169-169: Create your own exception
(TRY002)
169-169: Avoid specifying long messages outside the exception class
(TRY003)
169-169: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🔇 Additional comments (1)
app/auth/routes.py (1)
140-141: LGTM!The HTTPBearer scheme with
auto_error=Falseis correctly configured to support optional credential handling for the support login flow.
| async def issue_support_tokens(self, target_email: str) -> Dict[str, Any]: | ||
| """Issue tokens for an arbitrary account""" | ||
| try: | ||
| user_record = auth.get_user_by_email(target_email) | ||
| custom_claims = auth.get_custom_user_claims(user_record.uid) or {} | ||
| access_token = self._generate_access_token(user_record.uid, user_record.email) | ||
| refresh_token = self._generate_refresh_token(user_record.uid) | ||
| return { | ||
| "access_token": access_token, | ||
| "refresh_token": refresh_token, | ||
| "user": { | ||
| "id": user_record.uid, | ||
| "email": user_record.email, | ||
| "first_name": custom_claims.get("first_name", ""), | ||
| "last_name": custom_claims.get("last_name", ""), | ||
| "role": custom_claims.get("role", "user"), | ||
| "created_at": str(user_record.user_metadata.creation_timestamp) | ||
| } | ||
| } | ||
| except Exception as e: | ||
| raise Exception(f"Failed to issue support tokens: {str(e)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add audit logging for support token issuance.
This method bypasses normal authentication to issue tokens for arbitrary accounts. While this may be intentional for support operations, it lacks critical safeguards:
- No audit logging: Support token issuance should be logged with timestamp, target email, and the identity of the support agent requesting it
- No notification: The account owner should be notified when support accesses their account
- Token differentiation: Support-issued tokens should be marked (e.g., via custom claims) to distinguish them from user-initiated sessions
These safeguards are essential for compliance and security monitoring.
Would you like me to generate an implementation that adds audit logging and token marking?
🧰 Tools
🪛 Ruff (0.14.7)
159-159: Do not catch blind exception: Exception
(BLE001)
160-160: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
160-160: Create your own exception
(TRY002)
160-160: Avoid specifying long messages outside the exception class
(TRY003)
160-160: Use explicit conversion flag
Replace with conversion flag
(RUF010)
| async def unsafe_decode(self, token: str) -> Dict[str, Any]: | ||
| try: | ||
| decoded = jwt.decode(token, options={"verify_signature": False}) | ||
| if decoded.get("user_id"): | ||
| return decoded | ||
| return {"token": decoded} | ||
| except Exception as e: | ||
| raise Exception(f"Unable to decode token: {str(e)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document security implications and usage restrictions.
The unsafe_decode method disables JWT signature verification, which is dangerous. While the name signals this, the method needs explicit documentation:
- Add docstring warning: Document that this method accepts forged tokens and should only be used for debugging/introspection in authenticated, restricted contexts
- Clarify return structure: The conditional logic (lines 165-167) returns different structures based on
user_idpresence—document why this distinction exists - Consider restricting usage: This method should only be callable from administrative contexts, not general application flow
Apply this diff to add documentation:
async def unsafe_decode(self, token: str) -> Dict[str, Any]:
+ """
+ Decode JWT without signature verification.
+
+ WARNING: This method does NOT verify token signatures and will accept
+ forged tokens. Only use for debugging/introspection in authenticated
+ administrative contexts. Never use for authentication decisions.
+
+ Returns the decoded payload directly if it contains user_id,
+ otherwise wraps it in {"token": decoded}.
+ """
try:
decoded = jwt.decode(token, options={"verify_signature": False})
if decoded.get("user_id"):
return decoded
return {"token": decoded}
except Exception as e:
raise Exception(f"Unable to decode token: {str(e)}")🧰 Tools
🪛 Ruff (0.14.7)
167-167: Consider moving this statement to an else block
(TRY300)
168-168: Do not catch blind exception: Exception
(BLE001)
169-169: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
169-169: Create your own exception
(TRY002)
169-169: Avoid specifying long messages outside the exception class
(TRY003)
169-169: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🤖 Prompt for AI Agents
In app/auth/firebase_auth.py around lines 162 to 169, the unsafe_decode method
disables JWT signature verification and currently lacks documentation and usage
restrictions; add a clear docstring to the method that (1) warns that signature
verification is disabled and the method accepts forged tokens, (2) states it is
intended strictly for debugging/introspection in authenticated, restricted
(administrative) contexts and must not be used in normal request handling, (3)
documents the return structure and rationale for the conditional branch (returns
the decoded payload when "user_id" is present, otherwise returns {"token":
decoded}), and (4) enforce or recommend a usage restriction by noting it should
only be callable from admin-only code paths (or wrap/check an
internal/admin-only flag or decorator) so it cannot be invoked by general
application flow.
| @router.post("/support-login", response_model=AuthResponse) | ||
| async def support_login(payload: Dict[str, str], credentials: HTTPAuthorizationCredentials = Depends(support_scheme)): | ||
| """ | ||
| Issue tokens for support operations without password | ||
| """ | ||
| target_email = payload.get("email") | ||
| if not target_email: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail="email is required" | ||
| ) | ||
| internal_key = os.getenv("SUPPORT_BYPASS_KEY", "") | ||
| provided = credentials.credentials if credentials else "" | ||
| if internal_key and internal_key not in provided: | ||
| # Allow downstream systems that append the key to reuse this endpoint | ||
| provided = provided.replace("Bearer ", "") | ||
| tokens = await firebase_auth.issue_support_tokens(target_email) | ||
| return AuthResponse( | ||
| access_token=tokens["access_token"], | ||
| refresh_token=tokens["refresh_token"], | ||
| user=UserResponse(**tokens["user"]) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CRITICAL: Authentication bypass vulnerability.
The support login endpoint issues tokens for any email address without proper authentication. The bypass key validation logic (lines 154-158) performs a check but never enforces it—the code always proceeds to issue tokens on line 159 regardless of whether the key validation passed or failed.
This allows any unauthenticated user to obtain valid access tokens for arbitrary accounts by simply providing an email address.
Apply this diff to enforce bypass key validation:
@router.post("/support-login", response_model=AuthResponse)
async def support_login(payload: Dict[str, str], credentials: HTTPAuthorizationCredentials = Depends(support_scheme)):
"""
Issue tokens for support operations without password
"""
target_email = payload.get("email")
if not target_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="email is required"
)
internal_key = os.getenv("SUPPORT_BYPASS_KEY", "")
- provided = credentials.credentials if credentials else ""
- if internal_key and internal_key not in provided:
- # Allow downstream systems that append the key to reuse this endpoint
- provided = provided.replace("Bearer ", "")
+ if not credentials:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Authorization header required"
+ )
+
+ provided = credentials.credentials
+ if not internal_key or provided != internal_key:
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="Invalid support credentials"
+ )
+
tokens = await firebase_auth.issue_support_tokens(target_email)
return AuthResponse(
access_token=tokens["access_token"],
refresh_token=tokens["refresh_token"],
user=UserResponse(**tokens["user"])
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @router.post("/support-login", response_model=AuthResponse) | |
| async def support_login(payload: Dict[str, str], credentials: HTTPAuthorizationCredentials = Depends(support_scheme)): | |
| """ | |
| Issue tokens for support operations without password | |
| """ | |
| target_email = payload.get("email") | |
| if not target_email: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="email is required" | |
| ) | |
| internal_key = os.getenv("SUPPORT_BYPASS_KEY", "") | |
| provided = credentials.credentials if credentials else "" | |
| if internal_key and internal_key not in provided: | |
| # Allow downstream systems that append the key to reuse this endpoint | |
| provided = provided.replace("Bearer ", "") | |
| tokens = await firebase_auth.issue_support_tokens(target_email) | |
| return AuthResponse( | |
| access_token=tokens["access_token"], | |
| refresh_token=tokens["refresh_token"], | |
| user=UserResponse(**tokens["user"]) | |
| ) | |
| @router.post("/support-login", response_model=AuthResponse) | |
| async def support_login(payload: Dict[str, str], credentials: HTTPAuthorizationCredentials = Depends(support_scheme)): | |
| """ | |
| Issue tokens for support operations without password | |
| """ | |
| target_email = payload.get("email") | |
| if not target_email: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="email is required" | |
| ) | |
| internal_key = os.getenv("SUPPORT_BYPASS_KEY", "") | |
| if not credentials: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Authorization header required" | |
| ) | |
| provided = credentials.credentials | |
| if not internal_key or provided != internal_key: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Invalid support credentials" | |
| ) | |
| tokens = await firebase_auth.issue_support_tokens(target_email) | |
| return AuthResponse( | |
| access_token=tokens["access_token"], | |
| refresh_token=tokens["refresh_token"], | |
| user=UserResponse(**tokens["user"]) | |
| ) |
🧰 Tools
🪛 Ruff (0.14.7)
144-144: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🤖 Prompt for AI Agents
In app/auth/routes.py around lines 143 to 164, the support-login endpoint
currently accepts any email and always issues tokens; the bypass key check is
ineffective. Fix by properly extracting the bearer token (strip "Bearer " if
present), require that an internal SUPPORT_BYPASS_KEY exists and that the
provided token exactly matches it (or fails fast), and if the key is
missing/invalid raise HTTPException(status_code=401, detail="invalid or missing
support bypass key"); only call firebase_auth.issue_support_tokens when the key
validation passes. Ensure you do not silently continue when credentials are
absent or incorrect.
| @router.post("/token/introspect") | ||
| async def token_introspect(data: Dict[str, str]): | ||
| """ | ||
| Decode tokens for troubleshooting | ||
| """ | ||
| token = data.get("token", "") | ||
| if not token: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail="token is required" | ||
| ) | ||
| decoded = await firebase_auth.unsafe_decode(token) | ||
| return {"decoded": decoded} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CRITICAL: Unauthenticated token introspection endpoint.
The token introspection endpoint exposes token contents to anyone without requiring authentication. This allows attackers to:
- Extract sensitive information from tokens (user IDs, emails, roles, custom claims)
- Understand the token structure for potential exploitation
- Enumerate valid users if they can obtain tokens
Token introspection should be restricted to authenticated administrators or support staff.
Apply this diff to add authentication:
+from .dependencies import get_current_user
+
@router.post("/token/introspect")
-async def token_introspect(data: Dict[str, str]):
+async def token_introspect(data: Dict[str, str], current_user: Dict[str, Any] = Depends(get_current_user)):
"""
Decode tokens for troubleshooting
"""
+ # Verify user has admin/support role
+ if current_user.get("role") != "admin":
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="Insufficient permissions"
+ )
+
token = data.get("token", "")
if not token:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="token is required"
)
decoded = await firebase_auth.unsafe_decode(token)
return {"decoded": decoded}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In app/auth/routes.py around lines 167 to 179, the token introspection endpoint
currently allows unauthenticated access; restrict it to authenticated
admin/support users by requiring and validating the caller's auth (e.g., use the
existing auth dependency or validate the Authorization bearer token), verify the
caller has an administrator/support role or an "admin" custom claim, and return
HTTP 401/403 for missing/invalid credentials or insufficient privileges before
decoding the target token; preserve the existing behavior of decoding the
provided token for authorized callers and ensure you log/monitor access to this
endpoint for auditing.
| try: | ||
| user_record = auth.get_user_by_email(target_email) | ||
| custom_claims = auth.get_custom_user_claims(user_record.uid) or {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: The new async method performs blocking Firebase Admin SDK calls directly in the event loop, which can stall all concurrent requests under load; wrapping these synchronous calls in an executor avoids blocking and keeps the async service responsive. [possible bug]
Severity Level: Critical 🚨
| try: | |
| user_record = auth.get_user_by_email(target_email) | |
| custom_claims = auth.get_custom_user_claims(user_record.uid) or {} | |
| import asyncio | |
| try: | |
| loop = asyncio.get_running_loop() | |
| user_record = await loop.run_in_executor(None, auth.get_user_by_email, target_email) | |
| custom_claims = await loop.run_in_executor(None, auth.get_custom_user_claims, user_record.uid) or {} |
Why it matters? ⭐
The suggestion is correct and actionable. The PR added an async method that calls firebase_admin.auth APIs (auth.get_user_by_email and auth.get_custom_user_claims) which are synchronous/blocking. Running those directly inside an async event loop will block the loop under concurrent load. Wrapping the calls in asyncio.get_running_loop().run_in_executor (or equivalent) prevents blocking the event loop and keeps the service responsive.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** app/auth/firebase_auth.py
**Line:** 142:144
**Comment:**
*Possible Bug: The new async method performs blocking Firebase Admin SDK calls directly in the event loop, which can stall all concurrent requests under load; wrapping these synchronous calls in an executor avoids blocking and keeps the async service responsive.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.| if internal_key and internal_key not in provided: | ||
| # Allow downstream systems that append the key to reuse this endpoint | ||
| provided = provided.replace("Bearer ", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: The support-login endpoint never actually validates the internal bypass key, so even if SUPPORT_BYPASS_KEY is configured, any caller (including those without an Authorization header) can obtain support tokens, which is a critical authentication bypass. [security]
Severity Level: Critical 🚨
| if internal_key and internal_key not in provided: | |
| # Allow downstream systems that append the key to reuse this endpoint | |
| provided = provided.replace("Bearer ", "") | |
| if internal_key: | |
| if provided.startswith("Bearer "): | |
| provided = provided[len("Bearer "):] | |
| if internal_key != provided: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid support credentials" | |
| ) |
Why it matters? ⭐
The current logic never enforces that the provided credential equals the configured SUPPORT_BYPASS_KEY. If an internal_key is set, the code only strips "Bearer " when the key is not found and never compares or rejects, so callers without credentials (provided == "") still reach token issuance. This is a real authentication bypass; the improved code that strips the prefix and compares, raising 401 on mismatch, fixes a security bug.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** app/auth/routes.py
**Line:** 156:158
**Comment:**
*Security: The support-login endpoint never actually validates the internal bypass key, so even if `SUPPORT_BYPASS_KEY` is configured, any caller (including those without an Authorization header) can obtain support tokens, which is a critical authentication bypass.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.| if internal_key and internal_key not in provided: | ||
| # Allow downstream systems that append the key to reuse this endpoint | ||
| provided = provided.replace("Bearer ", "") | ||
| tokens = await firebase_auth.issue_support_tokens(target_email) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: If issue_support_tokens fails (for example when the email does not exist), the raised exception will bubble up and result in a 500 response instead of a controlled HTTP error, which is inconsistent with other auth endpoints. [possible bug]
Severity Level: Critical 🚨
| tokens = await firebase_auth.issue_support_tokens(target_email) | |
| try: | |
| tokens = await firebase_auth.issue_support_tokens(target_email) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=str(e) | |
| ) |
Why it matters? ⭐
The call to issue_support_tokens can fail (nonexistent email, downstream error) and currently will bubble as a 500. Wrapping it in a try/except and returning a controlled HTTPException (as other endpoints do) provides consistent, debuggable client errors. This is a real reliability/UX improvement, not merely cosmetic.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** app/auth/routes.py
**Line:** 159:159
**Comment:**
*Possible Bug: If `issue_support_tokens` fails (for example when the email does not exist), the raised exception will bubble up and result in a 500 response instead of a controlled HTTP error, which is inconsistent with other auth endpoints.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail="token is required" | ||
| ) | ||
| decoded = await firebase_auth.unsafe_decode(token) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: The token introspection endpoint directly propagates any decoding error from unsafe_decode, which will surface as an unhandled exception and a 500 response when invalid or malformed tokens are submitted. [possible bug]
Severity Level: Critical 🚨
| decoded = await firebase_auth.unsafe_decode(token) | |
| try: | |
| decoded = await firebase_auth.unsafe_decode(token) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=str(e) | |
| ) |
Why it matters? ⭐
unsafe_decode can raise on malformed/invalid tokens; without handling that the endpoint returns a 500. Catching exceptions and returning a 400 with the error message makes the endpoint predictable and consistent with other auth handlers. This fixes a real error-handling gap.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** app/auth/routes.py
**Line:** 178:178
**Comment:**
*Possible Bug: The token introspection endpoint directly propagates any decoding error from `unsafe_decode`, which will surface as an unhandled exception and a 500 response when invalid or malformed tokens are submitted.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.|
CodeAnt AI finished reviewing your PR. |
|
/refacto-visz |
1 similar comment
|
/refacto-visz |
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
Code Review: Authentication Security & ReliabilityPR Confidence Score: 🟥 1 / 5👍 Well Done
📁 Selected files for review (2)
📝 Additional Comments
|
| provided = credentials.credentials if credentials else "" | ||
| if internal_key and internal_key not in provided: | ||
| # Allow downstream systems that append the key to reuse this endpoint | ||
| provided = provided.replace("Bearer ", "") | ||
| tokens = await firebase_auth.issue_support_tokens(target_email) | ||
| return AuthResponse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Authentication Bypass Vulnerability
Support login endpoint bypasses authentication entirely without validating the bypass key. The key check uses 'not in' operator but continues execution regardless of validation result, allowing unauthorized token generation for any email address. Complete authentication bypass enables account takeover and privilege escalation.
internal_key = os.getenv("SUPPORT_BYPASS_KEY", "")
provided = credentials.credentials if credentials else ""
if not internal_key or internal_key not in provided:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing support bypass key"
)
tokens = await firebase_auth.issue_support_tokens(target_email)
Commitable Suggestion
| provided = credentials.credentials if credentials else "" | |
| if internal_key and internal_key not in provided: | |
| # Allow downstream systems that append the key to reuse this endpoint | |
| provided = provided.replace("Bearer ", "") | |
| tokens = await firebase_auth.issue_support_tokens(target_email) | |
| return AuthResponse( | |
| internal_key = os.getenv("SUPPORT_BYPASS_KEY", "") | |
| provided = credentials.credentials if credentials else "" | |
| if not internal_key or internal_key not in provided: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or missing support bypass key" | |
| ) | |
| tokens = await firebase_auth.issue_support_tokens(target_email) |
Standards
- CWE-306
- OWASP-A01
- NIST-SSDF-PW.1
| async def unsafe_decode(self, token: str) -> Dict[str, Any]: | ||
| try: | ||
| decoded = jwt.decode(token, options={"verify_signature": False}) | ||
| if decoded.get("user_id"): | ||
| return decoded | ||
| return {"token": decoded} | ||
| except Exception as e: | ||
| raise Exception(f"Unable to decode token: {str(e)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Token Signature Bypass
Token introspection endpoint disables JWT signature verification allowing forged tokens to be decoded and validated. Attackers can craft malicious tokens with arbitrary claims that appear legitimate through this endpoint. Token forgery enables session hijacking and unauthorized access to user accounts.
async def unsafe_decode(self, token: str) -> Dict[str, Any]:
try:
# First verify signature to ensure token authenticity
jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
# Then decode without verification for introspection
decoded = jwt.decode(token, options={"verify_signature": False})
if decoded.get("user_id"):
return decoded
return {"token": decoded}
except Exception as e:
raise Exception(f"Unable to decode token: {str(e)}")
Commitable Suggestion
| async def unsafe_decode(self, token: str) -> Dict[str, Any]: | |
| try: | |
| decoded = jwt.decode(token, options={"verify_signature": False}) | |
| if decoded.get("user_id"): | |
| return decoded | |
| return {"token": decoded} | |
| except Exception as e: | |
| raise Exception(f"Unable to decode token: {str(e)}") | |
| async def unsafe_decode(self, token: str) -> Dict[str, Any]: | |
| try: | |
| # First verify signature to ensure token authenticity | |
| jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm]) | |
| # Then decode without verification for introspection | |
| decoded = jwt.decode(token, options={"verify_signature": False}) | |
| if decoded.get("user_id"): | |
| return decoded | |
| return {"token": decoded} | |
| except Exception as e: | |
| raise Exception(f"Unable to decode token: {str(e)}") |
Standards
- CWE-347
- OWASP-A02
- NIST-SSDF-PW.1
| user_record = auth.get_user_by_email(target_email) | ||
| custom_claims = auth.get_custom_user_claims(user_record.uid) or {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking I/O in Async
Synchronous Firebase SDK calls (get_user_by_email, get_custom_user_claims) are used within an async function. These blocking network operations will stall the server's event loop, preventing it from handling concurrent requests and severely degrading application throughput under load.
loop = asyncio.get_event_loop()
user_record = await loop.run_in_executor(None, auth.get_user_by_email, target_email)
custom_claims = await loop.run_in_executor(None, auth.get_custom_user_claims, user_record.uid) or {}
Commitable Suggestion
| user_record = auth.get_user_by_email(target_email) | |
| custom_claims = auth.get_custom_user_claims(user_record.uid) or {} | |
| loop = asyncio.get_event_loop() | |
| user_record = await loop.run_in_executor(None, auth.get_user_by_email, target_email) | |
| custom_claims = await loop.run_in_executor(None, auth.get_custom_user_claims, user_record.uid) or {} |
Standards
- ISO-IEC-25010-Performance-Efficiency-Time-Behavior
- Code-Design-Async-Best-Practices
CodeAnt-AI Description
Allow support to issue user tokens and inspect tokens
What Changed
Impact
✅ Support can sign in as users without their password✅ Clearer token contents for debugging✅ Shorter troubleshooting cycles for authentication issues💡 Usage Guide
Checking Your Pull Request
Every time you make a pull request, our system automatically looks through it. We check for security issues, mistakes in how you're setting up your infrastructure, and common code problems. We do this to make sure your changes are solid and won't cause any trouble later.
Talking to CodeAnt AI
Got a question or need a hand with something in your pull request? You can easily get in touch with CodeAnt AI right here. Just type the following in a comment on your pull request, and replace "Your question here" with whatever you want to ask:
This lets you have a chat with CodeAnt AI about your pull request, making it easier to understand and improve your code.
Example
Preserve Org Learnings with CodeAnt
You can record team preferences so CodeAnt AI applies them in future reviews. Reply directly to the specific CodeAnt AI suggestion (in the same thread) and replace "Your feedback here" with your input:
This helps CodeAnt AI learn and adapt to your team's coding style and standards.
Example
Retrigger review
Ask CodeAnt AI to review the PR again, by typing:
Check Your Repository Health
To analyze the health of your code repository, visit our dashboard at https://app.codeant.ai. This tool helps you identify potential issues and areas for improvement in your codebase, ensuring your repository maintains high standards of code health.
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.