-
Notifications
You must be signed in to change notification settings - Fork 4
added new changes for testing custom inst. #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
WalkthroughAdds new auth models (roles, profile/password/session/audit DTOs), expands auth routes with profile, password, session, and audit endpoints plus audit logging, introduces a security utilities module (password validator, headers, tokens, IP whitelist, audit logger), implements rate limiting and trusted hosts in main app, updates root/security-info endpoints, and adds dependencies. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant M as Middleware (rate_limit)
participant A as FastAPI Router
participant R as Auth Routes
participant S as Security Utils
participant L as AuditLogger
C->>M: HTTP Request
M-->>M: Check timestamps per IP (60s window)
alt Over limit
M-->>C: 429 Too Many Requests
else Allowed
M->>A: Forward request
opt Profile/Password endpoints
A->>R: Route to handler
R->>S: Validate password / generate tokens (as needed)
R->>L: log_event(user_id, action, ip, ua, details)
L-->>R: ack
R-->>C: Structured response (UserResponse / message)
end
end
sequenceDiagram
autonumber
participant C as Client (Admin/User)
participant R as update_profile
participant L as AuditLogger
participant M as Models
C->>R: PUT /auth/profile {first_name,last_name,role?}
R-->>R: If role provided, verify admin
R-->>M: Build UserResponse with role, last_login
R->>L: log_event(user_id, "profile_update", ip, ua, details)
L-->>R: ack
R-->>C: 200 UserResponse
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🧪 Early access (Sonnet 4.5): enabledWe are currently testing the Sonnet 4.5 model, which is expected to improve code review quality. However, this model may lead to increased noise levels in the review comments. Please disable the early access features if the noise level causes any inconvenience. Note:
Comment |
Summary of ChangesHello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly upgrades the application's security and user management capabilities. It introduces a suite of new features, including robust user profile updates, secure password change mechanisms, and comprehensive session and audit log management. Beyond user-specific functionalities, the changes integrate crucial application-level security measures such as rate limiting and trusted host validation, laying a stronger foundation for a more secure and maintainable system. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Enhanced Authentication Security FeaturesTL;DR: Comprehensive security upgrade to the authentication system with user roles, session management, audit logging, and enhanced password security. Refacto PR SummaryAdds advanced security features including role-based access control, session management, and comprehensive audit logging. Change HighlightsClick to expand
Sequence DiagramsequenceDiagram
participant User
participant API as Authentication API
participant Firebase as Firebase Auth
participant Audit as Audit Logger
User->>API: Update profile
API->>Firebase: Update user data
API->>Audit: Log profile update event
Audit-->>API: Confirm log entry
Firebase-->>API: Updated user data
API-->>User: Profile updated response
User->>API: Change password
API->>Firebase: Verify current password
Firebase-->>API: Password verified
API->>Firebase: Update password
API->>Audit: Log password change
API-->>User: Password changed confirmation
User->>API: View active sessions
API-->>User: List of active sessions
User->>API: Revoke session
API->>Audit: Log session revocation
API-->>User: Session revoked confirmation
Testing GuideClick to expand
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant number of new features, including user profile management, password changes, session management, and audit logging. It also adds security enhancements like rate limiting and trusted host validation. The code is generally well-structured, but there are several critical and high-severity issues that need to be addressed. These include incorrect error handling that masks bugs, a call to a non-existent function which will cause a crash, and security features that are implemented but not activated. There are also opportunities to improve code quality by removing unused dependencies, consolidating duplicated logic, and using modern Pydantic features.
| ) | ||
|
|
||
| # Return updated user info | ||
| updated_user = await firebase_auth.get_user(current_user["uid"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code calls firebase_auth.get_user(), but this method is not defined in the FirebaseAuthService class in app/auth/firebase_auth.py. This will cause a server error (AttributeError) when this endpoint is hit. You might need to implement this method in FirebaseAuthService or use an existing method from the firebase_admin library directly, such as firebase_admin.auth.get_user().
| except Exception as e: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=str(e) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try...except Exception block is too broad. It will catch HTTPExceptions raised for specific reasons (like the 403 Forbidden for role changes on line 165) and re-raise them as a generic 400 Bad Request. This hides the original, more specific error from the client. Additionally, returning str(e) in the detail can leak sensitive implementation details. You should catch more specific exceptions from the firebase_auth service or let FastAPI handle HTTPExceptions by not catching them here.
| except Exception as e: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail="Invalid current password or unable to update password" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the update_profile endpoint, this try...except Exception block is too broad. If firebase_auth.sign_in_user fails due to an invalid password, it should likely be a 401 Unauthorized or 403 Forbidden error, not a generic 400 Bad Request. The current implementation masks the true cause of the failure and returns a misleading status code and message.
| class SecurityHeaders: | ||
| """Security headers utility""" | ||
|
|
||
| @staticmethod | ||
| def get_security_headers() -> dict: | ||
| """ | ||
| Get recommended security headers | ||
| """ | ||
| return { | ||
| "X-Content-Type-Options": "nosniff", | ||
| "X-Frame-Options": "DENY", | ||
| "X-XSS-Protection": "1; mode=block", | ||
| "Strict-Transport-Security": "max-age=31536000; includeSubDomains", | ||
| "Referrer-Policy": "strict-origin-when-cross-origin", | ||
| "Content-Security-Policy": "default-src 'self'", | ||
| "Permissions-Policy": "geolocation=(), microphone=(), camera=()" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SecurityHeaders class and its get_security_headers method are defined, but they are not used to apply these headers to responses. This means important security headers like Content-Security-Policy and X-XSS-Protection are not being sent to the client, reducing the application's security posture. You should create a middleware in app/main.py to add these headers to all outgoing responses.
| # Rate limiting storage (in production, use Redis or database) | ||
| rate_limit_storage: Dict[str, list] = defaultdict(list) | ||
|
|
||
| # Rate limiting middleware | ||
| @app.middleware("http") | ||
| async def rate_limit_middleware(request: Request, call_next): | ||
| # Get client IP | ||
| client_ip = request.client.host | ||
|
|
||
| # Current time | ||
| current_time = time.time() | ||
|
|
||
| # Clean old entries (older than 1 minute) | ||
| rate_limit_storage[client_ip] = [ | ||
| timestamp for timestamp in rate_limit_storage[client_ip] | ||
| if current_time - timestamp < 60 | ||
| ] | ||
|
|
||
| # Check if rate limit exceeded (100 requests per minute) | ||
| if len(rate_limit_storage[client_ip]) >= 100: | ||
| return JSONResponse( | ||
| status_code=429, | ||
| content={"detail": "Rate limit exceeded. Maximum 100 requests per minute."} | ||
| ) | ||
|
|
||
| # Add current request timestamp | ||
| rate_limit_storage[client_ip].append(current_time) | ||
|
|
||
| # Process request | ||
| response = await call_next(request) | ||
| return response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current rate-limiting implementation uses an in-memory defaultdict. While the comment mentions using Redis in production, it's important to highlight that this approach will not work correctly in a production environment with multiple worker processes (e.g., when using Gunicorn). Each worker will have its own separate memory and its own rate-limiting counter, which defeats the purpose of a global rate limit. This is a significant scalability and correctness issue for the rate limiting feature.
| details=details | ||
| ) | ||
|
|
||
| print(f"Audit Log: {audit_entry.dict()}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Security utilities and helpers for authentication | ||
| """ | ||
| import re | ||
| import hashlib |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| import hashlib | ||
| import secrets | ||
| from typing import Optional | ||
| from datetime import datetime, timedelta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| class AuditLogger: | ||
| """Audit logging utility""" | ||
|
|
||
| def __init__(self): | ||
| self.logs = [] | ||
|
|
||
| def log_event(self, | ||
| user_id: str, | ||
| action: str, | ||
| ip_address: Optional[str] = None, | ||
| user_agent: Optional[str] = None, | ||
| details: Optional[dict] = None): | ||
| """ | ||
| Log an audit event | ||
| """ | ||
| log_entry = { | ||
| "timestamp": datetime.now().isoformat(), | ||
| "user_id": user_id, | ||
| "action": action, | ||
| "ip_address": ip_address, | ||
| "user_agent": user_agent, | ||
| "details": details or {} | ||
| } | ||
|
|
||
| self.logs.append(log_entry) | ||
|
|
||
| # In production, this would save to a database | ||
| print(f"AUDIT: {log_entry}") | ||
|
|
||
| def get_user_logs(self, user_id: str, limit: int = 100) -> list: | ||
| """ | ||
| Get audit logs for a specific user | ||
| """ | ||
| user_logs = [log for log in self.logs if log["user_id"] == user_id] | ||
| return sorted(user_logs, key=lambda x: x["timestamp"], reverse=True)[:limit] | ||
|
|
||
|
|
||
| # Global instances | ||
| password_validator = PasswordValidator() | ||
| security_headers = SecurityHeaders() | ||
| token_generator = TokenGenerator() | ||
| ip_whitelist = IPWhitelist() | ||
| audit_logger = AuditLogger() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AuditLogger class and its global instance audit_logger are defined here, but the application uses a separate function _log_audit_event in app/auth/routes.py for audit logging. This creates two different, inconsistent implementations for the same purpose. The _log_audit_event uses the AuditLogEntry Pydantic model, while AuditLogger uses a plain dictionary. You should consolidate this into a single, consistent logging mechanism. For example, you could refactor _log_audit_event to use the audit_logger instance from this file.
| cryptography==43.0.3 | ||
| bcrypt==4.2.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cryptography and bcrypt packages have been added to the requirements, but they don't appear to be used anywhere in the codebase. Unused dependencies increase the size of the environment, can introduce security vulnerabilities, and make maintenance harder. If they are not needed for upcoming work, they should be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
app/auth/models.py (1)
19-27: Critical: Forward reference to undefinedUserRole.Lines 26-27 reference
UserRolebefore it's defined on line 46. This will cause aNameErrorwhen the module is imported.Move the
UserRoleenum definition beforeUserResponse:+class UserRole(str, Enum): + ADMIN = "admin" + USER = "user" + MODERATOR = "moderator" + + class UserResponse(BaseModel): id: str email: str first_name: str last_name: str is_active: bool created_at: str role: UserRole = UserRole.USER last_login: Optional[datetime] = NoneAlternatively, use forward references with quotes:
- role: UserRole = UserRole.USER + role: "UserRole" = "user"But moving the definition is cleaner.
🧹 Nitpick comments (5)
app/auth/security.py (3)
73-89: HSTS header may be too strict for development.Line 85 sets
Strict-Transport-Securitywithmax-age=31536000(1 year), which will force HTTPS for a full year once set. This can cause issues in local development environments that don't use HTTPS.Consider making security headers configurable based on environment:
@staticmethod def get_security_headers(environment: str = "production") -> dict: """ Get recommended security headers """ headers = { "X-Content-Type-Options": "nosniff", "X-Frame-Options": "DENY", "X-XSS-Protection": "1; mode=block", "Referrer-Policy": "strict-origin-when-cross-origin", "Content-Security-Policy": "default-src 'self'", "Permissions-Policy": "geolocation=(), microphone=(), camera=()" } if environment == "production": headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" return headers
117-138: IP whitelist lacks CIDR/subnet support.The current implementation only supports exact IP address matching. Production systems often need to whitelist entire subnets (e.g., corporate networks, cloud provider IP ranges).
Consider using the
ipaddressmodule from the standard library to support CIDR notation:import ipaddress class IPWhitelist: """IP whitelist management with CIDR support""" def __init__(self): self.allowed_networks = [] self.blocked_ips = set() def add_allowed_ip(self, ip_or_cidr: str): """Add IP or CIDR range to whitelist""" try: network = ipaddress.ip_network(ip_or_cidr, strict=False) self.allowed_networks.append(network) except ValueError: # Invalid IP/CIDR, handle error pass def add_blocked_ip(self, ip: str): """Add IP to blacklist""" self.blocked_ips.add(ip) def is_ip_allowed(self, ip: str) -> bool: """Check if IP is allowed""" try: ip_obj = ipaddress.ip_address(ip) except ValueError: return False if ip in self.blocked_ips: return False if not self.allowed_networks: # If no whitelist, allow all return True return any(ip_obj in network for network in self.allowed_networks)
178-183: Global singletons may cause issues in testing and concurrent scenarios.Using module-level singleton instances can make testing difficult (shared state between tests) and may not be thread-safe for the stateful classes (IPWhitelist, AuditLogger).
Consider using dependency injection or factory patterns instead of global singletons, especially for stateful components like
ip_whitelistandaudit_logger.app/auth/routes.py (1)
240-258: Session management endpoint returns mock data.Lines 247-257 return mock session data with a comment indicating this should fetch from a database. The
current_userparameter is also unused (flagged by static analysis).The mock implementation is fine for testing, but ensure this is replaced with actual session storage before production deployment. The TODO comment on line 247 addresses this.
If you want assistance implementing real session management with Redis or a database, I can help generate that code.
app/auth/models.py (1)
52-55: UserUpdateRequest should validate role changes.Allowing users to submit a role change in the same request as name changes could be confusing. Consider separating role management into a dedicated admin endpoint.
Consider creating separate models:
class UserProfileUpdateRequest(BaseModel): first_name: Optional[str] = Field(None, min_length=1, max_length=50) last_name: Optional[str] = Field(None, min_length=1, max_length=50) class UserRoleUpdateRequest(BaseModel): role: UserRoleThis makes the API clearer and allows different authorization rules for each operation.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
app/auth/models.py(3 hunks)app/auth/routes.py(2 hunks)app/auth/security.py(1 hunks)app/main.py(2 hunks)requirements.txt(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
app/auth/models.py (1)
app/auth/routes.py (1)
refresh_token(83-104)
app/auth/routes.py (3)
app/auth/models.py (6)
UserResponse(19-27)UserUpdateRequest(52-55)ChangePasswordRequest(58-60)UserRole(46-49)SessionInfo(63-68)AuditLogEntry(71-77)app/auth/dependencies.py (1)
get_current_user(10-32)app/auth/firebase_auth.py (1)
sign_in_user(69-99)
🪛 Ruff (0.13.1)
app/auth/models.py
26-26: Undefined name UserRole
(F821)
26-26: Undefined name UserRole
(F821)
app/main.py
16-16: Undefined name app
(F821)
app/auth/routes.py
148-148: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
165-168: Abstract raise to an inner function
(TRY301)
195-195: Do not catch blind exception: Exception
(BLE001)
196-199: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
205-205: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
231-231: Consider moving this statement to an else block
(TRY300)
233-233: Do not catch blind exception: Exception
(BLE001)
233-233: Local variable e is assigned to but never used
Remove assignment to unused variable e
(F841)
234-237: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
242-242: Unused function argument: current_user
(ARG001)
242-242: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
264-264: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
283-283: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🔇 Additional comments (8)
app/auth/security.py (2)
92-114: Token generation methods look secure.The use of
secrets.token_urlsafe()andsecrets.randbelow()provides cryptographically secure random token generation, which is appropriate for session tokens, API keys, and verification codes.
11-70: Password validator has a flawed common password check.The common password check on line 58 is case-insensitive (
password.lower()), but the common passwords list only contains lowercase entries. This means that variations like "Password" or "PASSWORD123" will be caught, but longer passwords containing these as substrings (e.g., "MyPassword123!") will also incorrectly trigger the common password warning since the check usesinrather than equality.Apply this diff to fix the common password check:
- if password.lower() in common_passwords: + if password.lower() in common_passwords or any(common in password.lower() for common in common_passwords): issues.append("Password is too common") score -= 1Actually, the better fix is to use exact matching only:
if password.lower() in common_passwords: issues.append("Password is too common") score -= 1The current logic is correct for exact matches. However, consider expanding the common passwords list significantly or using a library like
passlibor checking against a larger breach database.Likely an incorrect or invalid review comment.
app/main.py (2)
86-106: Root endpoint updates look good.The expanded feature list and version information provide clear API documentation for clients.
108-128: Security info endpoint provides useful guidance.The endpoint offers clear security feature documentation and actionable recommendations, which aligns well with the security enhancements in this PR.
app/auth/routes.py (1)
130-142: Safe role access with fallback.Line 140 uses
.get("role", "user")to safely access the role with a fallback, preventing KeyErrors.app/auth/models.py (3)
46-49: UserRole enum definition looks good.The role enum provides clear admin/user/moderator distinction for RBAC.
63-68: SessionInfo model looks appropriate.The model captures essential session metadata for tracking and management.
71-77: AuditLogEntry model looks appropriate.The model provides comprehensive audit trail information including context (IP, user agent) and flexible details storage.
| class ChangePasswordRequest(BaseModel): | ||
| current_password: str | ||
| new_password: str = Field(..., min_length=8, max_length=100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Password validation in model is insufficient.
Line 60 only enforces length constraints (8-100 characters) but doesn't validate password complexity. While the PasswordValidator class exists in security.py, it's not integrated here.
Consider adding a Pydantic validator to enforce password strength:
from pydantic import field_validator
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str = Field(..., min_length=8, max_length=100)
@field_validator('new_password')
@classmethod
def validate_password_strength(cls, v):
from .security import password_validator
result = password_validator.validate_password_strength(v)
if not result["valid"]:
raise ValueError(f"Password validation failed: {', '.join(result['issues'])}")
return vThis ensures validation happens at the model level, providing immediate feedback to API clients.
🤖 Prompt for AI Agents
In app/auth/models.py around lines 58 to 60, the ChangePasswordRequest model
only enforces length on new_password and doesn't run the existing
PasswordValidator; add a Pydantic field validator on 'new_password' that imports
the password validator from app.auth.security (inside the method to avoid
circular imports), calls its validate_password_strength method, and raises a
ValueError listing returned issues when validation fails so API clients get
immediate, descriptive feedback; keep existing Field length constraints.
| @router.put("/profile", response_model=UserResponse) | ||
| async def update_profile( | ||
| profile_data: UserUpdateRequest, | ||
| current_user: Dict[str, Any] = Depends(get_current_user), | ||
| request: Request = None | ||
| ): | ||
| """ | ||
| Update user profile information | ||
| """ | ||
| try: | ||
| # Update user in Firebase | ||
| update_data = {} | ||
| if profile_data.first_name is not None: | ||
| update_data["first_name"] = profile_data.first_name | ||
| if profile_data.last_name is not None: | ||
| update_data["last_name"] = profile_data.last_name | ||
|
|
||
| # Only admins can change roles | ||
| if profile_data.role is not None: | ||
| if current_user.get("role") != UserRole.ADMIN: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_403_FORBIDDEN, | ||
| detail="Only administrators can change user roles" | ||
| ) | ||
| update_data["role"] = profile_data.role.value | ||
|
|
||
| if update_data: | ||
| await firebase_auth.update_user(current_user["uid"], update_data) | ||
|
|
||
| # Log the profile update | ||
| await _log_audit_event( | ||
| user_id=current_user["uid"], | ||
| action="profile_update", | ||
| request=request, | ||
| details=update_data | ||
| ) | ||
|
|
||
| # Return updated user info | ||
| updated_user = await firebase_auth.get_user(current_user["uid"]) | ||
| return UserResponse( | ||
| id=updated_user["uid"], | ||
| email=updated_user["email"], | ||
| first_name=updated_user["first_name"], | ||
| last_name=updated_user["last_name"], | ||
| is_active=True, | ||
| created_at=updated_user.get("created_at", ""), | ||
| role=UserRole(updated_user.get("role", "user")), | ||
| last_login=updated_user.get("last_login") | ||
| ) | ||
|
|
||
| except Exception as e: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=str(e) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Profile update endpoint has authorization issues.
Lines 163-169: The role change authorization check compares against UserRole.ADMIN (an enum), but current_user.get("role") likely returns a string. This comparison will always fail unless the role is already the enum type.
Fix the role comparison:
# Only admins can change roles
if profile_data.role is not None:
- if current_user.get("role") != UserRole.ADMIN:
+ if current_user.get("role") != UserRole.ADMIN.value:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can change user roles"
)
update_data["role"] = profile_data.role.valueOr ensure current_user["role"] is always a UserRole enum in the authentication layer.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @router.put("/profile", response_model=UserResponse) | |
| async def update_profile( | |
| profile_data: UserUpdateRequest, | |
| current_user: Dict[str, Any] = Depends(get_current_user), | |
| request: Request = None | |
| ): | |
| """ | |
| Update user profile information | |
| """ | |
| try: | |
| # Update user in Firebase | |
| update_data = {} | |
| if profile_data.first_name is not None: | |
| update_data["first_name"] = profile_data.first_name | |
| if profile_data.last_name is not None: | |
| update_data["last_name"] = profile_data.last_name | |
| # Only admins can change roles | |
| if profile_data.role is not None: | |
| if current_user.get("role") != UserRole.ADMIN: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only administrators can change user roles" | |
| ) | |
| update_data["role"] = profile_data.role.value | |
| if update_data: | |
| await firebase_auth.update_user(current_user["uid"], update_data) | |
| # Log the profile update | |
| await _log_audit_event( | |
| user_id=current_user["uid"], | |
| action="profile_update", | |
| request=request, | |
| details=update_data | |
| ) | |
| # Return updated user info | |
| updated_user = await firebase_auth.get_user(current_user["uid"]) | |
| return UserResponse( | |
| id=updated_user["uid"], | |
| email=updated_user["email"], | |
| first_name=updated_user["first_name"], | |
| last_name=updated_user["last_name"], | |
| is_active=True, | |
| created_at=updated_user.get("created_at", ""), | |
| role=UserRole(updated_user.get("role", "user")), | |
| last_login=updated_user.get("last_login") | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=str(e) | |
| ) | |
| # Only admins can change roles | |
| if profile_data.role is not None: | |
| if current_user.get("role") != UserRole.ADMIN.value: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only administrators can change user roles" | |
| ) | |
| update_data["role"] = profile_data.role.value |
🧰 Tools
🪛 Ruff (0.13.1)
148-148: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
165-168: Abstract raise to an inner function
(TRY301)
195-195: Do not catch blind exception: Exception
(BLE001)
196-199: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
🤖 Prompt for AI Agents
In app/auth/routes.py around lines 145 to 199, the role-change authorization
check compares current_user.get("role") to the UserRole enum which will fail if
current_user["role"] is a string; update the check to either compare against the
enum's string value (e.g. current_user.get("role") == UserRole.ADMIN.value) or
convert the stored role to the enum first (e.g.
UserRole(current_user.get("role")) == UserRole.ADMIN), and ensure subsequent
logic uses the same normalized form so role checks work reliably (alternatively,
make the authentication layer return UserRole enums consistently).
| @router.post("/change-password") | ||
| async def change_password( | ||
| password_data: ChangePasswordRequest, | ||
| current_user: Dict[str, Any] = Depends(get_current_user), | ||
| request: Request = None | ||
| ): | ||
| """ | ||
| Change user password | ||
| """ | ||
| try: | ||
| # Verify current password | ||
| await firebase_auth.sign_in_user( | ||
| email=current_user["email"], | ||
| password=password_data.current_password | ||
| ) | ||
|
|
||
| # Update password | ||
| await firebase_auth.update_user_password( | ||
| current_user["uid"], | ||
| password_data.new_password | ||
| ) | ||
|
|
||
| # Log the password change | ||
| await _log_audit_event( | ||
| user_id=current_user["uid"], | ||
| action="password_change", | ||
| request=request | ||
| ) | ||
|
|
||
| return {"message": "Password changed successfully"} | ||
|
|
||
| except Exception as e: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail="Invalid current password or unable to update password" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Password change flow has a security vulnerability.
Lines 213-216: Re-authenticating with the current password to verify it is good practice. However, line 219-222 calls update_user_password without checking password strength or validation. The ChangePasswordRequest model enforces a minimum length of 8 characters, but doesn't validate complexity.
Integrate the PasswordValidator from app/auth/security.py:
+from .security import password_validator
+
@router.post("/change-password")
async def change_password(
password_data: ChangePasswordRequest,
current_user: Dict[str, Any] = Depends(get_current_user),
request: Request = None
):
"""
Change user password
"""
try:
# Verify current password
await firebase_auth.sign_in_user(
email=current_user["email"],
password=password_data.current_password
)
+ # Validate new password strength
+ validation_result = password_validator.validate_password_strength(
+ password_data.new_password
+ )
+ if not validation_result["valid"]:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=f"Password validation failed: {', '.join(validation_result['issues'])}"
+ )
+
# Update password
await firebase_auth.update_user_password(
current_user["uid"],
password_data.new_password
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @router.post("/change-password") | |
| async def change_password( | |
| password_data: ChangePasswordRequest, | |
| current_user: Dict[str, Any] = Depends(get_current_user), | |
| request: Request = None | |
| ): | |
| """ | |
| Change user password | |
| """ | |
| try: | |
| # Verify current password | |
| await firebase_auth.sign_in_user( | |
| email=current_user["email"], | |
| password=password_data.current_password | |
| ) | |
| # Update password | |
| await firebase_auth.update_user_password( | |
| current_user["uid"], | |
| password_data.new_password | |
| ) | |
| # Log the password change | |
| await _log_audit_event( | |
| user_id=current_user["uid"], | |
| action="password_change", | |
| request=request | |
| ) | |
| return {"message": "Password changed successfully"} | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid current password or unable to update password" | |
| ) | |
| from .security import password_validator | |
| @router.post("/change-password") | |
| async def change_password( | |
| password_data: ChangePasswordRequest, | |
| current_user: Dict[str, Any] = Depends(get_current_user), | |
| request: Request = None | |
| ): | |
| """ | |
| Change user password | |
| """ | |
| try: | |
| # Verify current password | |
| await firebase_auth.sign_in_user( | |
| email=current_user["email"], | |
| password=password_data.current_password | |
| ) | |
| # Validate new password strength | |
| validation_result = password_validator.validate_password_strength( | |
| password_data.new_password | |
| ) | |
| if not validation_result["valid"]: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Password validation failed: {', '.join(validation_result['issues'])}" | |
| ) | |
| # Update password | |
| await firebase_auth.update_user_password( | |
| current_user["uid"], | |
| password_data.new_password | |
| ) | |
| # Log the password change | |
| await _log_audit_event( | |
| user_id=current_user["uid"], | |
| action="password_change", | |
| request=request | |
| ) | |
| return {"message": "Password changed successfully"} | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid current password or unable to update password" | |
| ) |
🧰 Tools
🪛 Ruff (0.13.1)
205-205: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
231-231: Consider moving this statement to an else block
(TRY300)
233-233: Do not catch blind exception: Exception
(BLE001)
233-233: Local variable e is assigned to but never used
Remove assignment to unused variable e
(F841)
234-237: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
🤖 Prompt for AI Agents
In app/auth/routes.py around lines 202 to 237, the password-change endpoint
currently re-authenticates the user but directly updates the password without
validating complexity; import and use PasswordValidator from
app/auth/security.py to validate password_data.new_password before calling
update_user_password, and if validation fails raise
HTTPException(status_code=400) with a clear non-sensitive message (do not log
the raw password); only call update_user_password and proceed to auditing when
the validator approves the new password.
| @router.get("/audit-logs", response_model=List[AuditLogEntry]) | ||
| async def get_audit_logs( | ||
| current_user: Dict[str, Any] = Depends(get_current_user), | ||
| limit: int = 50 | ||
| ): | ||
| """ | ||
| Get audit logs for the current user (admin only) | ||
| """ | ||
| if current_user.get("role") != UserRole.ADMIN: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_403_FORBIDDEN, | ||
| detail="Only administrators can view audit logs" | ||
| ) | ||
|
|
||
| # This would typically fetch from a database | ||
| # For demo purposes, return mock audit logs | ||
| logs = [ | ||
| AuditLogEntry( | ||
| user_id=current_user["uid"], | ||
| action="login", | ||
| timestamp=datetime.now(), | ||
| ip_address="127.0.0.1", | ||
| user_agent="Mozilla/5.0 (Demo Browser)" | ||
| ), | ||
| AuditLogEntry( | ||
| user_id=current_user["uid"], | ||
| action="profile_update", | ||
| timestamp=datetime.now(), | ||
| ip_address="127.0.0.1", | ||
| details={"fields_updated": ["first_name"]} | ||
| ) | ||
| ] | ||
| return logs[:limit] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Audit logs endpoint has inconsistent authorization.
Line 289 checks if the current user's role equals UserRole.ADMIN, but similar to the profile update endpoint, this comparison may fail if current_user.get("role") returns a string instead of the enum.
Apply the same fix as in the profile update endpoint:
- if current_user.get("role") != UserRole.ADMIN:
+ if current_user.get("role") != UserRole.ADMIN.value:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can view audit logs"
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @router.get("/audit-logs", response_model=List[AuditLogEntry]) | |
| async def get_audit_logs( | |
| current_user: Dict[str, Any] = Depends(get_current_user), | |
| limit: int = 50 | |
| ): | |
| """ | |
| Get audit logs for the current user (admin only) | |
| """ | |
| if current_user.get("role") != UserRole.ADMIN: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only administrators can view audit logs" | |
| ) | |
| # This would typically fetch from a database | |
| # For demo purposes, return mock audit logs | |
| logs = [ | |
| AuditLogEntry( | |
| user_id=current_user["uid"], | |
| action="login", | |
| timestamp=datetime.now(), | |
| ip_address="127.0.0.1", | |
| user_agent="Mozilla/5.0 (Demo Browser)" | |
| ), | |
| AuditLogEntry( | |
| user_id=current_user["uid"], | |
| action="profile_update", | |
| timestamp=datetime.now(), | |
| ip_address="127.0.0.1", | |
| details={"fields_updated": ["first_name"]} | |
| ) | |
| ] | |
| return logs[:limit] | |
| @router.get("/audit-logs", response_model=List[AuditLogEntry]) | |
| async def get_audit_logs( | |
| current_user: Dict[str, Any] = Depends(get_current_user), | |
| limit: int = 50 | |
| ): | |
| """ | |
| Get audit logs for the current user (admin only) | |
| """ | |
| if current_user.get("role") != UserRole.ADMIN.value: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only administrators can view audit logs" | |
| ) | |
| # This would typically fetch from a database | |
| # For demo purposes, return mock audit logs | |
| logs = [ | |
| AuditLogEntry( | |
| user_id=current_user["uid"], | |
| action="login", | |
| timestamp=datetime.now(), | |
| ip_address="127.0.0.1", | |
| user_agent="Mozilla/5.0 (Demo Browser)" | |
| ), | |
| AuditLogEntry( | |
| user_id=current_user["uid"], | |
| action="profile_update", | |
| timestamp=datetime.now(), | |
| ip_address="127.0.0.1", | |
| details={"fields_updated": ["first_name"]} | |
| ) | |
| ] | |
| return logs[:limit] |
🧰 Tools
🪛 Ruff (0.13.1)
283-283: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🤖 Prompt for AI Agents
In app/auth/routes.py around lines 281 to 313, the role comparison uses
current_user.get("role") directly which may be a string and thus fail against
the UserRole enum; convert the retrieved role to the UserRole enum before
comparing (e.g., create a variable like role =
UserRole(current_user.get("role")) inside a try/except) and if conversion fails
or role is not UserRole.ADMIN raise the same HTTPException; this mirrors the
profile update endpoint fix and ensures robust enum comparison and proper error
handling for missing/invalid role values.
| async def _log_audit_event( | ||
| user_id: str, | ||
| action: str, | ||
| request: Request = None, | ||
| details: Optional[Dict[str, Any]] = None | ||
| ): | ||
| """ | ||
| Log an audit event | ||
| """ | ||
| # This would typically save to a database | ||
| # For demo purposes, we'll just print it | ||
| ip_address = request.client.host if request else None | ||
| user_agent = request.headers.get("user-agent") if request else None | ||
|
|
||
| audit_entry = AuditLogEntry( | ||
| user_id=user_id, | ||
| action=action, | ||
| timestamp=datetime.now(), | ||
| ip_address=ip_address, | ||
| user_agent=user_agent, | ||
| details=details | ||
| ) | ||
|
|
||
| print(f"Audit Log: {audit_entry.dict()}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Audit event helper should integrate with AuditLogger.
The _log_audit_event helper reimplements logging functionality that already exists in app/auth/security.py's AuditLogger class. This is code duplication.
Use the existing AuditLogger:
+from .security import audit_logger
+
async def _log_audit_event(
user_id: str,
action: str,
request: Request = None,
details: Optional[Dict[str, Any]] = None
):
"""
Log an audit event
"""
- # This would typically save to a database
- # For demo purposes, we'll just print it
ip_address = request.client.host if request else None
user_agent = request.headers.get("user-agent") if request else None
- audit_entry = AuditLogEntry(
- user_id=user_id,
- action=action,
- timestamp=datetime.now(),
- ip_address=ip_address,
- user_agent=user_agent,
- details=details
+ audit_logger.log_event(
+ user_id=user_id,
+ action=action,
+ ip_address=ip_address,
+ user_agent=user_agent,
+ details=details
)
-
- print(f"Audit Log: {audit_entry.dict()}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def _log_audit_event( | |
| user_id: str, | |
| action: str, | |
| request: Request = None, | |
| details: Optional[Dict[str, Any]] = None | |
| ): | |
| """ | |
| Log an audit event | |
| """ | |
| # This would typically save to a database | |
| # For demo purposes, we'll just print it | |
| ip_address = request.client.host if request else None | |
| user_agent = request.headers.get("user-agent") if request else None | |
| audit_entry = AuditLogEntry( | |
| user_id=user_id, | |
| action=action, | |
| timestamp=datetime.now(), | |
| ip_address=ip_address, | |
| user_agent=user_agent, | |
| details=details | |
| ) | |
| print(f"Audit Log: {audit_entry.dict()}") | |
| ++ b/app/auth/routes.py | |
| @@ | |
| from .security import audit_logger | |
| async def _log_audit_event( | |
| user_id: str, | |
| action: str, | |
| request: Request = None, | |
| details: Optional[Dict[str, Any]] = None | |
| ): | |
| """ | |
| Log an audit event | |
| """ | |
| - # This would typically save to a database | |
| - # For demo purposes, we'll just print it | |
| ip_address = request.client.host if request else None | |
| user_agent = request.headers.get("user-agent") if request else None | |
| - | |
| - audit_entry = AuditLogEntry( | |
| - user_id=user_id, | |
| - action=action, | |
| - timestamp=datetime.now(), | |
| - ip_address=ip_address, | |
| - user_agent=user_agent, | |
| - details=details | |
| - ) | |
| - | |
| audit_logger.log_event( | |
| user_id=user_id, | |
| action=action, | |
| ip_address=ip_address, | |
| user_agent=user_agent, | |
| details=details | |
| ) |
🤖 Prompt for AI Agents
In app/auth/routes.py around lines 316 to 339, the helper `_log_audit_event`
duplicates audit logging instead of using the existing AuditLogger; update the
function to import and use AuditLogger from app.auth.security (or accept an
injected AuditLogger), construct the same audit payload (user_id, action,
timestamp, ip_address, user_agent, details) from the request, and call the
AuditLogger's appropriate log method rather than printing; ensure request can be
None, preserve timestamp handling, and remove the print-based AuditLogEntry
usage so all audit events go through the centralized AuditLogger.
| class AuditLogger: | ||
| """Audit logging utility""" | ||
|
|
||
| def __init__(self): | ||
| self.logs = [] | ||
|
|
||
| def log_event(self, | ||
| user_id: str, | ||
| action: str, | ||
| ip_address: Optional[str] = None, | ||
| user_agent: Optional[str] = None, | ||
| details: Optional[dict] = None): | ||
| """ | ||
| Log an audit event | ||
| """ | ||
| log_entry = { | ||
| "timestamp": datetime.now().isoformat(), | ||
| "user_id": user_id, | ||
| "action": action, | ||
| "ip_address": ip_address, | ||
| "user_agent": user_agent, | ||
| "details": details or {} | ||
| } | ||
|
|
||
| self.logs.append(log_entry) | ||
|
|
||
| # In production, this would save to a database | ||
| print(f"AUDIT: {log_entry}") | ||
|
|
||
| def get_user_logs(self, user_id: str, limit: int = 100) -> list: | ||
| """ | ||
| Get audit logs for a specific user | ||
| """ | ||
| user_logs = [log for log in self.logs if log["user_id"] == user_id] | ||
| return sorted(user_logs, key=lambda x: x["timestamp"], reverse=True)[:limit] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Audit logger has potential memory leak and lacks persistence.
Line 165 appends to an unbounded in-memory list, which will grow indefinitely and eventually cause memory issues. Line 168 only prints to stdout, meaning audit logs are lost on restart and not queryable.
The current implementation has two critical issues for production use:
- Unbounded memory growth in
self.logs - No actual persistence (only prints to stdout)
Consider implementing a proper storage backend:
class AuditLogger:
"""Audit logging utility"""
def __init__(self, max_in_memory: int = 1000):
self.logs = []
self.max_in_memory = max_in_memory
def log_event(self,
user_id: str,
action: str,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
details: Optional[dict] = None):
"""
Log an audit event
"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"action": action,
"ip_address": ip_address,
"user_agent": user_agent,
"details": details or {}
}
# Add to in-memory cache with size limit
self.logs.append(log_entry)
if len(self.logs) > self.max_in_memory:
self.logs.pop(0)
# TODO: Persist to database in production
print(f"AUDIT: {log_entry}")Better yet, integrate with a proper logging framework or database from the start.
🤖 Prompt for AI Agents
In app/auth/security.py around lines 141 to 175, the AuditLogger currently
appends all events to an unbounded in-memory list and only prints to stdout;
change it to use a bounded in-memory cache plus durable persistence: add a
configurable max_in_memory property, append new entries and pop oldest when over
the limit, and replace the print with a call that persists the entry (e.g.,
write to a configured database/table or append to a rotating file/logger) or
enqueue async persistence so logging won’t block; ensure get_user_logs reads
from the persistent store (or merges cache + store) so logs survive restarts.
| # Rate limiting storage (in production, use Redis or database) | ||
| rate_limit_storage: Dict[str, list] = defaultdict(list) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In-memory rate limiting won't work in multi-process deployments.
The rate_limit_storage using defaultdict will only track requests within a single process. In production deployments with multiple workers (e.g., uvicorn --workers 4), each worker maintains its own memory, effectively multiplying the rate limit by the number of workers.
Consider using Redis for distributed rate limiting, especially since it's already in requirements.txt:
import redis
from typing import Dict
# Redis connection for distributed rate limiting
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
decode_responses=True
)
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
current_time = int(time.time())
# Use Redis with sliding window
key = f"rate_limit:{client_ip}"
pipe = redis_client.pipeline()
pipe.zremrangebyscore(key, 0, current_time - 60) # Remove old entries
pipe.zcard(key) # Count remaining entries
pipe.zadd(key, {str(current_time): current_time}) # Add current request
pipe.expire(key, 60) # Set expiry
results = pipe.execute()
request_count = results[1]
if request_count >= 100:
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded. Maximum 100 requests per minute."}
)
response = await call_next(request)
return response🤖 Prompt for AI Agents
In app/main.py around lines 12-13, the in-memory defaultdict rate_limit_storage
is unsuitable for multi-process deployments; replace it with a Redis-backed
distributed rate limiter: initialize a Redis client using env vars
(REDIS_HOST/REDIS_PORT), implement the middleware to use a Redis sorted set per
client IP (zremrangebyscore to drop old entries, zcard to count, zadd to add
current timestamp, expire to set TTL) executed in a pipeline/transaction, and
return 429 when count >= 100; ensure the Redis client is created once at startup
and handle connection errors gracefully.
| @app.middleware("http") | ||
| async def rate_limit_middleware(request: Request, call_next): | ||
| # Get client IP | ||
| client_ip = request.client.host | ||
|
|
||
| # Current time | ||
| current_time = time.time() | ||
|
|
||
| # Clean old entries (older than 1 minute) | ||
| rate_limit_storage[client_ip] = [ | ||
| timestamp for timestamp in rate_limit_storage[client_ip] | ||
| if current_time - timestamp < 60 | ||
| ] | ||
|
|
||
| # Check if rate limit exceeded (100 requests per minute) | ||
| if len(rate_limit_storage[client_ip]) >= 100: | ||
| return JSONResponse( | ||
| status_code=429, | ||
| content={"detail": "Rate limit exceeded. Maximum 100 requests per minute."} | ||
| ) | ||
|
|
||
| # Add current request timestamp | ||
| rate_limit_storage[client_ip].append(current_time) | ||
|
|
||
| # Process request | ||
| response = await call_next(request) | ||
| return response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Middleware references undefined app.
The middleware decorator on line 16 references app before it's defined on line 45. This will cause a NameError at module import time, preventing the application from starting.
Move the middleware definition after the app is created:
+# Create FastAPI app
+app = FastAPI(
+ title="Authentication API",
+ description="A FastAPI application with Firebase authentication and enhanced security features",
+ version="2.0.0"
+)
+
# Rate limiting middleware
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Get client IP
client_ip = request.client.host
# Current time
current_time = time.time()
# Clean old entries (older than 1 minute)
rate_limit_storage[client_ip] = [
timestamp for timestamp in rate_limit_storage[client_ip]
if current_time - timestamp < 60
]
# Check if rate limit exceeded (100 requests per minute)
if len(rate_limit_storage[client_ip]) >= 100:
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded. Maximum 100 requests per minute."}
)
# Add current request timestamp
rate_limit_storage[client_ip].append(current_time)
# Process request
response = await call_next(request)
return response
-
-# Create FastAPI app
-app = FastAPI(
- title="Authentication API",
- description="A FastAPI application with Firebase authentication and enhanced security features",
- version="2.0.0"
-)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @app.middleware("http") | |
| async def rate_limit_middleware(request: Request, call_next): | |
| # Get client IP | |
| client_ip = request.client.host | |
| # Current time | |
| current_time = time.time() | |
| # Clean old entries (older than 1 minute) | |
| rate_limit_storage[client_ip] = [ | |
| timestamp for timestamp in rate_limit_storage[client_ip] | |
| if current_time - timestamp < 60 | |
| ] | |
| # Check if rate limit exceeded (100 requests per minute) | |
| if len(rate_limit_storage[client_ip]) >= 100: | |
| return JSONResponse( | |
| status_code=429, | |
| content={"detail": "Rate limit exceeded. Maximum 100 requests per minute."} | |
| ) | |
| # Add current request timestamp | |
| rate_limit_storage[client_ip].append(current_time) | |
| # Process request | |
| response = await call_next(request) | |
| return response | |
| # Create FastAPI app | |
| app = FastAPI( | |
| title="Authentication API", | |
| description="A FastAPI application with Firebase authentication and enhanced security features", | |
| version="2.0.0" | |
| ) | |
| # Rate limiting middleware | |
| @app.middleware("http") | |
| async def rate_limit_middleware(request: Request, call_next): | |
| # Get client IP | |
| client_ip = request.client.host | |
| # Current time | |
| current_time = time.time() | |
| # Clean old entries (older than 1 minute) | |
| rate_limit_storage[client_ip] = [ | |
| timestamp for timestamp in rate_limit_storage[client_ip] | |
| if current_time - timestamp < 60 | |
| ] | |
| # Check if rate limit exceeded (100 requests per minute) | |
| if len(rate_limit_storage[client_ip]) >= 100: | |
| return JSONResponse( | |
| status_code=429, | |
| content={"detail": "Rate limit exceeded. Maximum 100 requests per minute."} | |
| ) | |
| # Add current request timestamp | |
| rate_limit_storage[client_ip].append(current_time) | |
| # Process request | |
| response = await call_next(request) | |
| return response |
🧰 Tools
🪛 Ruff (0.13.1)
16-16: Undefined name app
(F821)
🤖 Prompt for AI Agents
In app/main.py around lines 16 to 42, the middleware is being registered on
`app` before `app` is defined (defined later at line 45), which raises a
NameError at import time; move the entire rate_limit_middleware definition
(including the @app.middleware decorator and its function) to a position after
the `app` instance is created (i.e., below line 45) so the decorator references
an already-initialized app, and keep the middleware logic unchanged while
ensuring any globals it uses (rate_limit_storage, imports like time, Request,
JSONResponse) are available at that point.
| # Add trusted host middleware for security | ||
| app.add_middleware( | ||
| TrustedHostMiddleware, | ||
| allowed_hosts=["localhost", "127.0.0.1", "*.yourdomain.com"] | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Trusted hosts configuration needs review for production.
Line 54 includes a wildcard domain *.yourdomain.com which is a placeholder. This should be configured via environment variables for different environments.
+ALLOWED_HOSTS = os.getenv("ALLOWED_HOSTS", "localhost,127.0.0.1").split(",")
+
# Add trusted host middleware for security
app.add_middleware(
TrustedHostMiddleware,
- allowed_hosts=["localhost", "127.0.0.1", "*.yourdomain.com"]
+ allowed_hosts=ALLOWED_HOSTS
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Add trusted host middleware for security | |
| app.add_middleware( | |
| TrustedHostMiddleware, | |
| allowed_hosts=["localhost", "127.0.0.1", "*.yourdomain.com"] | |
| ) | |
| ALLOWED_HOSTS = os.getenv("ALLOWED_HOSTS", "localhost,127.0.0.1").split(",") | |
| # Add trusted host middleware for security | |
| app.add_middleware( | |
| TrustedHostMiddleware, | |
| allowed_hosts=ALLOWED_HOSTS | |
| ) |
🤖 Prompt for AI Agents
In app/main.py around lines 51 to 55, the TrustedHostMiddleware currently
contains a hardcoded placeholder wildcard domain (*.yourdomain.com); replace
this with an environment-driven configuration: read an environment variable
(e.g. TRUSTED_HOSTS) at startup, split it into a list of host patterns, trim
whitespace, provide a safe default for development (["localhost","127.0.0.1"])
if the variable is unset, and pass that list into TrustedHostMiddleware; also
validate/deny empty or obviously placeholder values and document how to set the
env var per environment.
| redis==5.2.1 | ||
| cryptography==43.0.3 | ||
| bcrypt==4.2.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify that added dependencies are used and secure.
The newly added dependencies (redis, cryptography, bcrypt) are not imported or used in any of the provided code files. The security module uses only standard library components, and rate limiting uses in-memory defaultdict rather than Redis.
Run the following script to check for usage of these dependencies and verify their security:
🏁 Script executed:
#!/bin/bash
# Check for usage of new dependencies
echo "=== Checking for redis usage ==="
rg -n --type=py '\bredis\b' -g '!requirements.txt'
echo -e "\n=== Checking for cryptography usage ==="
rg -n --type=py '\bcryptography\b' -g '!requirements.txt'
echo -e "\n=== Checking for bcrypt usage ==="
rg -n --type=py '\bbcrypt\b' -g '!requirements.txt'
echo -e "\n=== Checking for security advisories ==="
gh api graphql -f query='
{
redis: securityVulnerabilities(first: 5, ecosystem: PIP, package: "redis") {
nodes {
advisory { summary severity publishedAt }
vulnerableVersionRange
firstPatchedVersion { identifier }
}
}
cryptography: securityVulnerabilities(first: 5, ecosystem: PIP, package: "cryptography") {
nodes {
advisory { summary severity publishedAt }
vulnerableVersionRange
firstPatchedVersion { identifier }
}
}
bcrypt: securityVulnerabilities(first: 5, ecosystem: PIP, package: "bcrypt") {
nodes {
advisory { summary severity publishedAt }
vulnerableVersionRange
firstPatchedVersion { identifier }
}
}
}'Length of output: 2909
🏁 Script executed:
#!/bin/bash
echo "=== Checking for redis references ==="
rg -n --type=py '\bredis\b'
echo -e "\n=== Checking for redis import ==="
rg -n --type=py 'import redis'
echo -e "\n=== Checking for bcrypt references ==="
rg -n --type=py '\bbcrypt\b'
echo -e "\n=== Checking for bcrypt import ==="
rg -n --type=py 'import bcrypt'
echo -e "\n=== Checking for cryptography references ==="
rg -n --type=py '\bcryptography\b'
echo -e "\n=== Checking for cryptography import ==="
rg -n --type=py 'import cryptography'Length of output: 236
Remove or justify new dependencies and upgrade cryptography.
- redis and bcrypt aren’t imported or referenced in any Python module; remove lines 41 and 43 in requirements.txt or implement their intended usage.
- cryptography==43.0.3 is affected by an OpenSSL vulnerability (patched in 44.0.1); upgrade to ≥44.0.1 or drop it if unused.
🤖 Prompt for AI Agents
In requirements.txt around lines 41 to 43 (redis==5.2.1, cryptography==43.0.3,
bcrypt==4.2.1): remove redis and bcrypt lines if they are truly unused (or add
the corresponding imports/implementation where intended), and upgrade
cryptography to 44.0.1 or later (e.g., cryptography>=44.0.1) to address the
OpenSSL vulnerability; after changing requirements.txt, regenerate any
lock/constraints file if used (pip-compile/pip freeze), run the test-suite and a
dependency security scan to ensure no breakage.
Summary by CodeRabbit
New Features
Security
Chores