Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions app/auth/user_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from fastapi import APIRouter, Depends, HTTPException, status
from .firebase_auth import firebase_auth
from .dependencies import get_current_user, require_admin
from typing import Dict, Any, List
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

wc -l app/auth/user_management.py

Repository: coderabbit-test/pr-reviewer

Length of output: 101


🏁 Script executed:

cat -n app/auth/user_management.py | head -100

Repository: coderabbit-test/pr-reviewer

Length of output: 3575


Add Literal type constraint and explicit fallback for bulk operation actions

BulkUserRequest.action is currently a plain str, but bulk_user_action only handles "disable" and "delete". Any other value silently produces an empty or partial results array, which masks client bugs.

Strengthen the type safety and add explicit handling by:

  1. Import Literal from typing (line 4)
  2. Constrain BulkUserRequest.action to Literal["disable", "delete"] (line 19)
  3. Add an explicit else clause in the action handling loop to return a "skipped" status with an explanatory message for any unexpected action values

Also, switch the admin check from manual role verification (if current_user.get("role") != "admin") to use require_admin dependency for consistency with the rest of the auth layer (line 63).

🤖 Prompt for AI Agents
In app/auth/user_management.py around lines 4, 19 and 63, strengthen action
typing and handling: import Literal on line 4, change the BulkUserRequest.action
type (around line 19) to Literal["disable","delete"], and in the bulk action
loop add an explicit else branch that appends a "skipped" result with an
explanatory message for any unexpected action value; also replace the manual
admin role check at line 63 (if current_user.get("role") != "admin") with the
existing require_admin dependency for consistency and update imports
accordingly.

from pydantic import BaseModel, EmailStr
from firebase_admin import auth

router = APIRouter(prefix="/users", tags=["user-management"])


class UserUpdateRequest(BaseModel):
email: EmailStr
first_name: str
last_name: str


class BulkUserRequest(BaseModel):
user_ids: List[str]
action: str

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The action field is a plain string, which is prone to typos and doesn't clearly define the allowed operations. It's better to use typing.Literal to restrict the possible values to "disable" and "delete". This improves type safety and makes the API contract clearer for clients. You'll need to add Literal to your typing import at the top of the file (e.g., from typing import Dict, Any, List, Literal).

Suggested change
action: str
action: Literal["disable", "delete"]

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Action Validation

The action field in BulkUserRequest is a generic string, but the endpoint logic only handles 'disable' and 'delete'. Providing an invalid action results in a misleading 200 OK response with no work done.

Standards
  • Business-Rule-Input-Validation
  • Logic-Verification-Data-Integrity



@router.get("/profile")
async def get_user_profile(current_user: Dict[str, Any] = Depends(get_current_user)):
"""Get detailed user profile information"""
try:
user_record = auth.get_user(current_user["id"])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current_user dictionary from the get_current_user dependency contains the user's ID under the key "uid", not "id". Using "id" will result in a KeyError at runtime. This seems to be a recurring issue in this file.

Suggested change
user_record = auth.get_user(current_user["id"])
user_record = auth.get_user(current_user["uid"])

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking I/O in Async Endpoints

Synchronous functions from the firebase-admin SDK, like auth.get_user, perform blocking network I/O. Calling them directly within an async function blocks the server's event loop, preventing it from handling other requests concurrently.

        user_record = await asyncio.get_event_loop().run_in_executor(None, auth.get_user, current_user["id"])
        custom_claims = await asyncio.get_event_loop().run_in_executor(None, auth.get_custom_user_claims, user_record.uid)
Commitable Suggestion
Suggested change
user_record = auth.get_user(current_user["id"])
user_record = await asyncio.get_event_loop().run_in_executor(None, auth.get_user, current_user["id"])
custom_claims = await asyncio.get_event_loop().run_in_executor(None, auth.get_custom_user_claims, user_record.uid)
Standards
  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior

custom_claims = auth.get_custom_user_claims(user_record.uid)
return {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims["first_name"],
"last_name": custom_claims["last_name"],
Comment on lines +31 to +32

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing custom_claims with bracket notation ([]) can raise a KeyError if the first_name or last_name claims are not set for a user. It's safer to use the .get() method with a default value to avoid unexpected crashes. This is also consistent with how claims are accessed in other parts of the codebase (e.g., firebase_auth.py).

Suggested change
"first_name": custom_claims["first_name"],
"last_name": custom_claims["last_name"],
"first_name": custom_claims.get("first_name", ""),
"last_name": custom_claims.get("last_name", ""),

Comment on lines +27 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Directly indexing custom_claims["first_name"] and custom_claims["last_name"] without handling the case where get_custom_user_claims returns None or missing keys can lead to TypeError or KeyError when a user has no custom claims set. [null pointer]

Severity Level: Minor ⚠️

Suggested change
custom_claims = auth.get_custom_user_claims(user_record.uid)
return {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims["first_name"],
"last_name": custom_claims["last_name"],
custom_claims = auth.get_custom_user_claims(user_record.uid) or {}
return {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims.get("first_name", ""),
"last_name": custom_claims.get("last_name", ""),
Why it matters? ⭐

This is a legitimate runtime risk: auth.get_custom_user_claims(...) can return None or a dict missing keys, and indexing directly will raise TypeError/KeyError. The improved code (coalesce to {} and use .get with defaults) prevents these exceptions and returns sensible empty strings instead of failing the whole request. This addresses a real potential bug with minimal, safe change.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/user_management.py
**Line:** 27:32
**Comment:**
	*Null Pointer: Directly indexing `custom_claims["first_name"]` and `custom_claims["last_name"]` without handling the case where `get_custom_user_claims` returns `None` or missing keys can lead to TypeError or KeyError when a user has no custom claims set.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

Comment on lines +31 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafe Custom Claim Access

Direct dictionary access on custom_claims will raise a KeyError if a user profile lacks these specific claims. This causes an unhandled exception and a 500 error, making the profile endpoint unreliable for certain users.

            "first_name": custom_claims.get("first_name", ""),
            "last_name": custom_claims.get("last_name", ""),
Commitable Suggestion
Suggested change
"first_name": custom_claims["first_name"],
"last_name": custom_claims["last_name"],
"first_name": custom_claims.get("first_name", ""),
"last_name": custom_claims.get("last_name", ""),
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Correctness

"is_active": not user_record.disabled,
"created_at": str(user_record.user_metadata.creation_timestamp)
}
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
Comment on lines +36 to +37

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching a generic Exception is too broad and can obscure the actual error. For instance, if a user is not found, firebase_admin.auth.UserNotFoundError is raised, and a 404 Not Found response would be more appropriate than a 500 Internal Server Error. Additionally, exposing raw exception messages via str(e) can be a security risk by leaking internal implementation details. It's better to catch specific exceptions and return generic error messages to the user, while logging the detailed error for debugging purposes.

Comment on lines +22 to +37
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's locate and read the file to verify the code at lines 22-37
find . -name "user_management.py" -type f | head -20

Repository: coderabbit-test/pr-reviewer

Length of output: 100


🏁 Script executed:

# Read the entire file to understand context and verify lines 22-37
cat -n ./app/auth/user_management.py

Repository: coderabbit-test/pr-reviewer

Length of output: 3575


🏁 Script executed:

# Find and read the dependencies file
find . -name "dependencies.py" -type f | grep -E "auth|dependencies"

Repository: coderabbit-test/pr-reviewer

Length of output: 97


🏁 Script executed:

cat -n ./app/auth/dependencies.py

Repository: coderabbit-test/pr-reviewer

Length of output: 2627


🏁 Script executed:

# Find and read firebase_auth.py
find . -name "firebase_auth.py" -type f

Repository: coderabbit-test/pr-reviewer

Length of output: 98


🏁 Script executed:

cat -n ./app/auth/firebase_auth.py

Repository: coderabbit-test/pr-reviewer

Length of output: 7451


🏁 Script executed:

# Search for where "id" key might be set in current_user or user_data
rg -n "current_user\[.id.\]|\"id\":" ./app/auth/ -A 2 -B 2

Repository: coderabbit-test/pr-reviewer

Length of output: 2074


Fix critical bug: use current_user["uid"] instead of current_user["id"], and harden against missing custom claims

The endpoint has three issues:

  • Critical bug: current_user["id"] does not exist. The verify_token() function returns current_user["uid"], so line 26 will raise KeyError at runtime. Use current_user["uid"] instead.
  • auth.get_custom_user_claims() can return None, and indexing with custom_claims["first_name"] / ["last_name"] will raise KeyError for users who haven't set profile data. Use .get() with defaults.
  • The except Exception block leaks internal error details to clients via str(e), which can expose Firebase/Admin SDK internals.

Use this fix:

-        user_record = auth.get_user(current_user["id"])
+        user_record = auth.get_user(current_user["uid"])
         custom_claims = auth.get_custom_user_claims(user_record.uid) or {}
         return {
             "id": user_record.uid,
             "email": user_record.email,
-            "first_name": custom_claims["first_name"],
-            "last_name": custom_claims["last_name"],
+            "first_name": custom_claims.get("first_name"),
+            "last_name": custom_claims.get("last_name"),
             "is_active": not user_record.disabled,
             "created_at": str(user_record.user_metadata.creation_timestamp)
         }
     except Exception as e:
-        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
+        raise HTTPException(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            detail="Failed to fetch user profile",
+        ) from e

Also apply the same current_user["uid"] fix to line 47 in the update_user_profile function.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@router.get("/profile")
async def get_user_profile(current_user: Dict[str, Any] = Depends(get_current_user)):
"""Get detailed user profile information"""
try:
user_record = auth.get_user(current_user["id"])
custom_claims = auth.get_custom_user_claims(user_record.uid)
return {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims["first_name"],
"last_name": custom_claims["last_name"],
"is_active": not user_record.disabled,
"created_at": str(user_record.user_metadata.creation_timestamp)
}
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.get("/profile")
async def get_user_profile(current_user: Dict[str, Any] = Depends(get_current_user)):
"""Get detailed user profile information"""
try:
user_record = auth.get_user(current_user["uid"])
custom_claims = auth.get_custom_user_claims(user_record.uid) or {}
return {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims.get("first_name"),
"last_name": custom_claims.get("last_name"),
"is_active": not user_record.disabled,
"created_at": str(user_record.user_metadata.creation_timestamp)
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to fetch user profile",
) from e
🧰 Tools
🪛 Ruff (0.14.8)

23-23: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


36-36: Do not catch blind exception: Exception

(BLE001)


37-37: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In app/auth/user_management.py around lines 22 to 37, the code incorrectly uses
current_user["id"] (should be current_user["uid"]), assumes
auth.get_custom_user_claims() always returns a dict, and returns internal
exception text to clients; change the lookup to current_user["uid"], read
custom_claims using .get("first_name", "") and .get("last_name", "") (or
suitable defaults) to avoid KeyError when claims are missing, and replace
raising HTTPException(status=500, detail=str(e)) with a generic error message
(e.g., "Internal server error") while logging the original exception internally;
also apply the same current_user["uid"] fix at line 47 in update_user_profile.



@router.put("/profile")
async def update_user_profile(
update_data: UserUpdateRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""Update current user's profile information"""
try:
user_record = auth.get_user(current_user["id"])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current_user dictionary from the get_current_user dependency contains the user's ID under the key "uid", not "id". Using "id" will result in a KeyError at runtime.

Suggested change
user_record = auth.get_user(current_user["id"])
user_record = auth.get_user(current_user["uid"])

if update_data.email != user_record.email:
auth.update_user(user_record.uid, email=update_data.email)
auth.set_custom_user_claims(user_record.uid, {
"first_name": update_data.first_name,
"last_name": update_data.last_name,
"role": current_user.get("role", "user")
})
return {"message": "Profile updated successfully"}
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
Comment on lines +56 to +57

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching a generic Exception and always returning a 400 Bad Request may not be accurate for all failure scenarios. For example, a UserNotFoundError should result in a 404, and other Firebase errors might indicate a server-side issue (5xx). It's better to handle specific exceptions. Also, returning str(e) can leak sensitive information.



@router.post("/bulk-action")
async def bulk_user_action(
request: BulkUserRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""Perform bulk actions on multiple users (admin only)"""
if current_user.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
Comment on lines +66 to +67

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Instead of manually checking for the 'admin' role, you should use the require_admin dependency from app.auth.dependencies. This centralizes authorization logic, making the code cleaner and more consistent with other protected endpoints in the application.

To fix this, you would:

  1. Import require_admin in app/auth/user_management.py.
  2. Change the current_user dependency in the function signature to Depends(require_admin).
  3. Remove this manual if check.

Comment on lines +66 to +67
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Admin Authorization

Manual role checking bypasses centralized authorization controls and creates inconsistent security patterns. The require_admin dependency is imported but not used, leading to potential authorization bypass if role validation logic differs.

async def bulk_user_action(
    request: BulkUserRequest,
    current_user: Dict[str, Any] = Depends(require_admin)
): 
Commitable Suggestion
Suggested change
if current_user.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
async def bulk_user_action(
request: BulkUserRequest,
current_user: Dict[str, Any] = Depends(require_admin)
):
Standards
  • CWE-862
  • OWASP-A01
  • NIST-SSDF-PW.1

results = []
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The bulk user action endpoint silently accepts any action value and returns a 200 response with an empty results list when the action is not "disable" or "delete", which is misleading and makes invalid input appear successful. [logic error]

Severity Level: Minor ⚠️

Suggested change
results = []
if request.action not in ("disable", "delete"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid action")
Why it matters? ⭐

Currently if request.action is not "disable" or "delete" the endpoint simply returns an empty results list and 200 OK, which is misleading and lets invalid input appear to succeed. Validating the action and returning a 400 for unsupported actions is correct behavior and prevents silent no-ops. This change fixes an input validation bug and improves API correctness.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/user_management.py
**Line:** 68:68
**Comment:**
	*Logic Error: The bulk user action endpoint silently accepts any `action` value and returns a 200 response with an empty `results` list when the action is not `"disable"` or `"delete"`, which is misleading and makes invalid input appear successful.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

for user_id in request.user_ids:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For the 'delete' action, the current implementation iterates through user_ids and deletes users one by one. The Firebase Admin SDK provides a delete_users(uids) method that can delete up to 1000 users in a single API call, which is much more efficient. Consider refactoring to group user IDs for deletion and use this bulk method. Note that there is no bulk update for disabling users, so that part would correctly remain in a loop.

try:
if request.action == "disable":
auth.update_user(user_id, disabled=True)
results.append({"user_id": user_id, "status": "disabled"})
elif request.action == "delete":
auth.delete_user(user_id)
results.append({"user_id": user_id, "status": "deleted"})
Comment on lines +71 to +76

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current logic doesn't handle cases where request.action is something other than "disable" or "delete". In such cases, the loop continues without performing any action or providing feedback for that user. It would be better to handle this case explicitly by adding an else block to append an error status to the results, informing the client about the invalid action.

Suggested change
if request.action == "disable":
auth.update_user(user_id, disabled=True)
results.append({"user_id": user_id, "status": "disabled"})
elif request.action == "delete":
auth.delete_user(user_id)
results.append({"user_id": user_id, "status": "deleted"})
if request.action == "disable":
auth.update_user(user_id, disabled=True)
results.append({"user_id": user_id, "status": "disabled"})
elif request.action == "delete":
auth.delete_user(user_id)
results.append({"user_id": user_id, "status": "deleted"})
else:
results.append({"user_id": user_id, "status": "error", "message": f"Invalid action: {request.action}"})

Comment on lines +74 to +76
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient Bulk Deletion

The code iterates through user IDs to delete them one by one, causing an N+1 query problem where N is the number of users. This results in N separate network requests, which is inefficient and scales poorly.

            elif request.action == "delete":
                delete_result = auth.delete_users(request.user_ids)
                for uid in delete_result.success_count:
                    results.append({"user_id": uid, "status": "deleted"})
                for error in delete_result.errors:
                    results.append({"user_id": error.uid, "status": "error", "message": error.reason})
                break
Commitable Suggestion
Suggested change
elif request.action == "delete":
auth.delete_user(user_id)
results.append({"user_id": user_id, "status": "deleted"})
elif request.action == "delete":
delete_result = auth.delete_users(request.user_ids)
for uid in delete_result.success_count:
results.append({"user_id": uid, "status": "deleted"})
for error in delete_result.errors:
results.append({"user_id": error.uid, "status": "error", "message": error.reason})
break
Standards
  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior
  • Optimization-Pattern-Batch-Processing

except Exception as e:
results.append({"user_id": user_id, "status": "error", "message": str(e)})
Comment on lines +77 to +78

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While it's good that the exception is caught inside the loop to allow other operations to continue, catching a generic Exception is too broad. It's better to catch specific exceptions from the Firebase Admin SDK (e.g., auth.UserNotFoundError). Also, exposing raw exception details with str(e) can be a security risk. Consider logging the full error and returning a more generic error message in the response.

return {"results": results}
4 changes: 4 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from fastapi.responses import JSONResponse
from .auth.routes import router as auth_router
from .example_protected_routes import router as protected_router
from .auth.user_management import router as user_management_router
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The new import for the user management router will raise an ImportError at application startup because the .auth.user_management module does not exist in the codebase, causing the whole FastAPI app to fail to load; wrapping this import in a try/except with a safe fallback router prevents the crash while still using the router if/when the module is added. [possible bug]

Severity Level: Critical 🚨

Suggested change
from .auth.user_management import router as user_management_router
try:
from .auth.user_management import router as user_management_router
except ImportError:
from fastapi import APIRouter
user_management_router = APIRouter()
Why it matters? ⭐

The suggestion correctly identifies a real startup failure: the PR added "from .auth.user_management import router as user_management_router"
but there is no app/auth/user_management.py in the current tree. I verified that the only reference to "user_management" is in app/main.py and no module file exists, so the direct import will raise ImportError when the app is started. Wrapping the import in a try/except and providing a safe APIRouter fallback prevents the whole FastAPI app from crashing while the optional module is absent. This is a pragmatic, low-risk mitigation that fixes an actual runtime problem introduced by the PR.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/main.py
**Line:** 6:6
**Comment:**
	*Possible Bug: The new import for the user management router will raise an ImportError at application startup because the `.auth.user_management` module does not exist in the codebase, causing the whole FastAPI app to fail to load; wrapping this import in a try/except with a safe fallback router prevents the crash while still using the router if/when the module is added.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

import os

# Create FastAPI app
Expand All @@ -27,6 +28,9 @@
# Include protected routes (examples)
app.include_router(protected_router)

# Include user management routes
app.include_router(user_management_router)

# Global exception handler
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
Expand Down