Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions app/auth/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pydantic import BaseModel, EmailStr
from typing import Optional
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List, Dict, Any
from enum import Enum
from datetime import datetime


class UserSignupRequest(BaseModel):
Expand All @@ -21,6 +23,8 @@ class UserResponse(BaseModel):
last_name: str
is_active: bool
created_at: str
role: UserRole = UserRole.USER

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The UserRole enum is used here as a type hint and default value, but it is defined later in the file (lines 46-49). This will cause a NameError when Python interprets this file. To fix this, please move the definition of the UserRole enum to be before the UserResponse class.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: UserResponse references UserRole directly (for both the annotation and the default) before UserRole is defined, so importing this module will raise NameError and prevent the app from starting; use a forward-reference annotation plus a delayed default (e.g., Field(default_factory=...)) so the enum is only looked up after its definition. [logic error]

Severity Level: Minor ⚠️

Suggested change
role: UserRole = UserRole.USER
role: "UserRole" = Field(default_factory=lambda: UserRole.USER)
Why it matters? ⭐

UserRole is declared below UserResponse, so evaluating the annotation/default during class creation hits a NameError and the auth module can't load; changing to a forward-reference annotation plus Field(default_factory=…) defers the lookup until after UserRole exists, eliminating the import-time crash.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/models.py
**Line:** 26:26
**Comment:**
	*Logic Error: `UserResponse` references `UserRole` directly (for both the annotation and the default) before `UserRole` is defined, so importing this module will raise `NameError` and prevent the app from starting; use a forward-reference annotation plus a delayed default (e.g., `Field(default_factory=...)`) so the enum is only looked up after its definition.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

last_login: Optional[datetime] = None


class AuthResponse(BaseModel):
Expand All @@ -36,4 +40,38 @@ class TokenResponse(BaseModel):


class RefreshTokenRequest(BaseModel):
refresh_token: str
refresh_token: str


class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"


class UserUpdateRequest(BaseModel):
first_name: Optional[str] = Field(None, min_length=1, max_length=50)
last_name: Optional[str] = Field(None, min_length=1, max_length=50)
role: Optional[UserRole] = None


class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str = Field(..., min_length=8, max_length=100)


class SessionInfo(BaseModel):
session_id: str
created_at: datetime
last_activity: datetime
ip_address: Optional[str] = None
user_agent: Optional[str] = None


class AuditLogEntry(BaseModel):
user_id: str
action: str
timestamp: datetime
ip_address: Optional[str] = None
user_agent: Optional[str] = None
details: Optional[Dict[str, Any]] = None
214 changes: 209 additions & 5 deletions app/auth/routes.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from fastapi import APIRouter, HTTPException, status, Depends
from fastapi import APIRouter, HTTPException, status, Depends, Request, Header
from fastapi.security import HTTPBearer
from .models import (
UserSignupRequest,
UserLoginRequest,
AuthResponse,
UserResponse,
TokenResponse,
RefreshTokenRequest
RefreshTokenRequest,
UserUpdateRequest,
ChangePasswordRequest,
UserRole,
SessionInfo,
AuditLogEntry
)
from .firebase_auth import firebase_auth
from .dependencies import get_current_user
from typing import Dict, Any
from typing import Dict, Any, Optional, List
import uuid
from datetime import datetime

router = APIRouter(prefix="/auth", tags=["authentication"])

Expand Down Expand Up @@ -130,6 +137,203 @@ async def verify_token(current_user: Dict[str, Any] = Depends(get_current_user))
"user": {
"id": current_user["uid"],
"email": current_user["email"],
"role": current_user["role"]
"role": current_user.get("role", "user")
}
}
}


@router.put("/profile", response_model=UserResponse)
async def update_profile(
profile_data: UserUpdateRequest,
current_user: Dict[str, Any] = Depends(get_current_user),
request: Request = None
Comment on lines +147 to +149

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In FastAPI, parameters without default values must come before parameters with default values. Also, to inject the Request object, you should declare it as request: Request. The current ordering and default value for request are not idiomatic. Please reorder the parameters to follow Python's syntax rules and FastAPI conventions.

Suggested change
profile_data: UserUpdateRequest,
current_user: Dict[str, Any] = Depends(get_current_user),
request: Request = None
profile_data: UserUpdateRequest,
request: Request,
current_user: Dict[str, Any] = Depends(get_current_user),

):
"""
Update user profile information
"""
try:
# Update user in Firebase
update_data = {}
if profile_data.first_name is not None:
update_data["first_name"] = profile_data.first_name
if profile_data.last_name is not None:
update_data["last_name"] = profile_data.last_name

# Only admins can change roles
if profile_data.role is not None:
if current_user.get("role") != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can change user roles"
)
update_data["role"] = profile_data.role.value
Comment on lines +163 to +169
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Type mismatch: comparing string to enum.

current_user.get("role") likely returns a string (e.g., "admin"), while UserRole.ADMIN is an enum member. This comparison will always be False.

Apply this fix:

-        if profile_data.role is not None:
-            if current_user.get("role") != UserRole.ADMIN:
+        if profile_data.role is not None:
+            if current_user.get("role") != UserRole.ADMIN.value:

Or normalize to enum before comparison:

-            if current_user.get("role") != UserRole.ADMIN:
+            if current_user.get("role") not in (UserRole.ADMIN, UserRole.ADMIN.value):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if profile_data.role is not None:
if current_user.get("role") != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can change user roles"
)
update_data["role"] = profile_data.role.value
if profile_data.role is not None:
if current_user.get("role") != UserRole.ADMIN.value:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can change user roles"
)
update_data["role"] = profile_data.role.value
🧰 Tools
🪛 Ruff (0.14.5)

165-168: Abstract raise to an inner function

(TRY301)

🤖 Prompt for AI Agents
In app/auth/routes.py around lines 163-169, the code compares
current_user.get("role") (a string) to UserRole.ADMIN (an enum), causing the
check to always fail; convert or normalize the value before comparing — either
compare to the enum's value (UserRole.ADMIN.value) or coerce the string into the
enum (e.g., UserRole(current_user.get("role"))) and handle possible
ValueError/None, then perform the comparison; keep the assignment
update_data["role"] = profile_data.role.value as-is.


if update_data:
await firebase_auth.update_user(current_user["uid"], update_data)

# Log the profile update
await _log_audit_event(
user_id=current_user["uid"],
action="profile_update",
request=request,
details=update_data
)

# Return updated user info
updated_user = await firebase_auth.get_user(current_user["uid"])
return UserResponse(
id=updated_user["uid"],
email=updated_user["email"],
first_name=updated_user["first_name"],
last_name=updated_user["last_name"],
is_active=True,
created_at=updated_user.get("created_at", ""),
role=UserRole(updated_user.get("role", "user")),
last_login=updated_user.get("last_login")
)

except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
Comment on lines +195 to +199

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Catching a generic Exception and returning its string representation (str(e)) in the response detail is dangerous as it can leak sensitive internal information about your application. This could expose details about your code, libraries, or infrastructure that could be exploited by an attacker. It's better to catch specific exceptions and return a generic error message for unexpected ones, while logging the full error for debugging purposes.


Comment on lines +196 to +200
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broad Exception Handling Leaks Implementation Details

The endpoint catches a generic Exception and returns its string representation in the error detail. This can leak sensitive internal implementation details to the client and masks the specific type of error, hindering proper error handling and debugging.


@router.post("/change-password")
async def change_password(
password_data: ChangePasswordRequest,
current_user: Dict[str, Any] = Depends(get_current_user),
request: Request = None
Comment on lines +204 to +206

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function parameters should be reordered. In Python, parameters without default values must be defined before parameters with default values. The request: Request parameter should be moved before current_user.

Suggested change
password_data: ChangePasswordRequest,
current_user: Dict[str, Any] = Depends(get_current_user),
request: Request = None
password_data: ChangePasswordRequest,
request: Request,
current_user: Dict[str, Any] = Depends(get_current_user),

):
"""
Change user password
"""
try:
# Verify current password
await firebase_auth.sign_in_user(
email=current_user["email"],
password=password_data.current_password
)
Comment on lines +213 to +216

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current password is not being verified before updating to a new one. The firebase_auth.sign_in_user method is called, but its mock implementation in app/auth/firebase_auth.py does not actually use the password parameter to verify credentials. This allows any authenticated user to change their password without knowing their current one, which is a critical security flaw. You must implement proper password verification.


# Update password
await firebase_auth.update_user_password(
current_user["uid"],
password_data.new_password
)

# Log the password change
await _log_audit_event(
user_id=current_user["uid"],
action="password_change",
request=request
)

return {"message": "Password changed successfully"}

except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid current password or unable to update password"
)


@router.get("/sessions", response_model=List[SessionInfo])
async def get_user_sessions(
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Comment on lines +241 to +244
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Endpoint Pagination on Session Data

The /sessions endpoint returns a list of all active sessions without any pagination mechanism. If a user accumulates many sessions over time, this can lead to large response payloads, causing performance degradation or a potential denial-of-service vector.

Standards
  • Org-Guideline-Very critical to consider apply pagination to queries
  • CWE-770

Get active sessions for the current user
"""
# This would typically fetch from a database
# For demo purposes, return a mock session
sessions = [
SessionInfo(
session_id=str(uuid.uuid4()),
created_at=datetime.now(),
last_activity=datetime.now(),
ip_address="127.0.0.1",
user_agent="Mozilla/5.0 (Demo Browser)"
)
]
return sessions
Comment on lines +248 to +258
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mocked Data Returned in Endpoints

The /sessions and /audit-logs endpoints return hardcoded mock data instead of fetching from a persistent store. While acknowledged in comments, this represents significant technical debt and must be replaced with a real database implementation before the feature is considered complete.

Standards
  • Maintainability-Quality-Testability



@router.delete("/sessions/{session_id}")
async def revoke_session(
session_id: str,
current_user: Dict[str, Any] = Depends(get_current_user),
request: Request = None
Comment on lines +263 to +265

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function parameters should be reordered. In Python, parameters without default values must be defined before parameters with default values. The request: Request parameter should be moved before current_user.

Suggested change
session_id: str,
current_user: Dict[str, Any] = Depends(get_current_user),
request: Request = None
session_id: str,
request: Request,
current_user: Dict[str, Any] = Depends(get_current_user),

):
"""
Revoke a specific user session
"""
# This would typically remove the session from a database
await _log_audit_event(
user_id=current_user["uid"],
action="session_revoked",
request=request,
details={"session_id": session_id}
)

return {"message": f"Session {session_id} has been revoked"}


@router.get("/audit-logs", response_model=List[AuditLogEntry])
async def get_audit_logs(
current_user: Dict[str, Any] = Depends(get_current_user),
limit: int = 50
):
"""
Comment on lines +283 to +286
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete Audit Log Pagination

The /audit-logs endpoint implements a limit parameter but lacks an offset or cursor-based mechanism. This prevents clients from navigating beyond the first page of results, rendering the endpoint ineffective for browsing large audit histories. Full pagination should be implemented.

Standards
  • Org-Guideline-Very critical to consider apply pagination to queries

Get audit logs for the current user (admin only)
"""
if current_user.get("role") != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can view audit logs"
)
Comment on lines +289 to +293
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Same type mismatch issue for admin role check.

Similar to the update_profile endpoint, this comparison will always fail because it compares a string to an enum.

-    if current_user.get("role") != UserRole.ADMIN:
+    if current_user.get("role") != UserRole.ADMIN.value:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if current_user.get("role") != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can view audit logs"
)
if current_user.get("role") != UserRole.ADMIN.value:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can view audit logs"
)
🤖 Prompt for AI Agents
In app/auth/routes.py around lines 289 to 293, the admin role check compares
current_user.get("role") (a string) to UserRole.ADMIN (an enum) which always
fails; change the comparison to compare like types — e.g., compare the string to
UserRole.ADMIN.value (current_user.get("role") != UserRole.ADMIN.value) or
coerce the string into the enum (UserRole(current_user.get("role")) !=
UserRole.ADMIN); ensure you handle a missing/invalid role value
(KeyError/ValueError) before raising the 403.


# This would typically fetch from a database
# For demo purposes, return mock audit logs
logs = [
AuditLogEntry(
user_id=current_user["uid"],
action="login",
timestamp=datetime.now(),
ip_address="127.0.0.1",
user_agent="Mozilla/5.0 (Demo Browser)"
),
AuditLogEntry(
user_id=current_user["uid"],
action="profile_update",
timestamp=datetime.now(),
ip_address="127.0.0.1",
details={"fields_updated": ["first_name"]}
)
]
return logs[:limit]


async def _log_audit_event(
user_id: str,
action: str,
request: Request = None,
details: Optional[Dict[str, Any]] = None
):
"""
Log an audit event
"""
# This would typically save to a database
# For demo purposes, we'll just print it
ip_address = request.client.host if request else None
user_agent = request.headers.get("user-agent") if request else None

audit_entry = AuditLogEntry(
user_id=user_id,
action=action,
timestamp=datetime.now(),
ip_address=ip_address,
user_agent=user_agent,
details=details
)

print(f"Audit Log: {audit_entry.dict()}")
Loading