From 1e8849f5f0e062e3c444dba484f7602ca190d545 Mon Sep 17 00:00:00 2001 From: vishwam talnikar Date: Tue, 30 Sep 2025 14:30:37 +0530 Subject: [PATCH] added new changes --- app/auth/models.py | 44 ++++++++- app/auth/routes.py | 214 ++++++++++++++++++++++++++++++++++++++++++- app/auth/security.py | 183 ++++++++++++++++++++++++++++++++++++ app/main.py | 82 ++++++++++++++++- requirements.txt | 3 + 5 files changed, 514 insertions(+), 12 deletions(-) create mode 100644 app/auth/security.py diff --git a/app/auth/models.py b/app/auth/models.py index c467e1a..96125be 100644 --- a/app/auth/models.py +++ b/app/auth/models.py @@ -1,5 +1,7 @@ -from pydantic import BaseModel, EmailStr -from typing import Optional +from pydantic import BaseModel, EmailStr, Field +from typing import Optional, List, Dict, Any +from enum import Enum +from datetime import datetime class UserSignupRequest(BaseModel): @@ -21,6 +23,8 @@ class UserResponse(BaseModel): last_name: str is_active: bool created_at: str + role: UserRole = UserRole.USER + last_login: Optional[datetime] = None class AuthResponse(BaseModel): @@ -36,4 +40,38 @@ class TokenResponse(BaseModel): class RefreshTokenRequest(BaseModel): - refresh_token: str \ No newline at end of file + refresh_token: str + + +class UserRole(str, Enum): + ADMIN = "admin" + USER = "user" + MODERATOR = "moderator" + + +class UserUpdateRequest(BaseModel): + first_name: Optional[str] = Field(None, min_length=1, max_length=50) + last_name: Optional[str] = Field(None, min_length=1, max_length=50) + role: Optional[UserRole] = None + + +class ChangePasswordRequest(BaseModel): + current_password: str + new_password: str = Field(..., min_length=8, max_length=100) + + +class SessionInfo(BaseModel): + session_id: str + created_at: datetime + last_activity: datetime + ip_address: Optional[str] = None + user_agent: Optional[str] = None + + +class AuditLogEntry(BaseModel): + user_id: str + action: str + timestamp: datetime + ip_address: Optional[str] = None + user_agent: Optional[str] = None + details: Optional[Dict[str, Any]] = None \ No newline at end of file diff --git a/app/auth/routes.py b/app/auth/routes.py index 74e5e2a..f5bbcb6 100644 --- a/app/auth/routes.py +++ b/app/auth/routes.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, HTTPException, status, Depends +from fastapi import APIRouter, HTTPException, status, Depends, Request, Header from fastapi.security import HTTPBearer from .models import ( UserSignupRequest, @@ -6,11 +6,18 @@ AuthResponse, UserResponse, TokenResponse, - RefreshTokenRequest + RefreshTokenRequest, + UserUpdateRequest, + ChangePasswordRequest, + UserRole, + SessionInfo, + AuditLogEntry ) from .firebase_auth import firebase_auth from .dependencies import get_current_user -from typing import Dict, Any +from typing import Dict, Any, Optional, List +import uuid +from datetime import datetime router = APIRouter(prefix="/auth", tags=["authentication"]) @@ -130,6 +137,203 @@ async def verify_token(current_user: Dict[str, Any] = Depends(get_current_user)) "user": { "id": current_user["uid"], "email": current_user["email"], - "role": current_user["role"] + "role": current_user.get("role", "user") } - } \ No newline at end of file + } + + +@router.put("/profile", response_model=UserResponse) +async def update_profile( + profile_data: UserUpdateRequest, + current_user: Dict[str, Any] = Depends(get_current_user), + request: Request = None +): + """ + Update user profile information + """ + try: + # Update user in Firebase + update_data = {} + if profile_data.first_name is not None: + update_data["first_name"] = profile_data.first_name + if profile_data.last_name is not None: + update_data["last_name"] = profile_data.last_name + + # Only admins can change roles + if profile_data.role is not None: + if current_user.get("role") != UserRole.ADMIN: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only administrators can change user roles" + ) + update_data["role"] = profile_data.role.value + + if update_data: + await firebase_auth.update_user(current_user["uid"], update_data) + + # Log the profile update + await _log_audit_event( + user_id=current_user["uid"], + action="profile_update", + request=request, + details=update_data + ) + + # Return updated user info + updated_user = await firebase_auth.get_user(current_user["uid"]) + return UserResponse( + id=updated_user["uid"], + email=updated_user["email"], + first_name=updated_user["first_name"], + last_name=updated_user["last_name"], + is_active=True, + created_at=updated_user.get("created_at", ""), + role=UserRole(updated_user.get("role", "user")), + last_login=updated_user.get("last_login") + ) + + except Exception as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e) + ) + + +@router.post("/change-password") +async def change_password( + password_data: ChangePasswordRequest, + current_user: Dict[str, Any] = Depends(get_current_user), + request: Request = None +): + """ + Change user password + """ + try: + # Verify current password + await firebase_auth.sign_in_user( + email=current_user["email"], + password=password_data.current_password + ) + + # Update password + await firebase_auth.update_user_password( + current_user["uid"], + password_data.new_password + ) + + # Log the password change + await _log_audit_event( + user_id=current_user["uid"], + action="password_change", + request=request + ) + + return {"message": "Password changed successfully"} + + except Exception as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid current password or unable to update password" + ) + + +@router.get("/sessions", response_model=List[SessionInfo]) +async def get_user_sessions( + current_user: Dict[str, Any] = Depends(get_current_user) +): + """ + Get active sessions for the current user + """ + # This would typically fetch from a database + # For demo purposes, return a mock session + sessions = [ + SessionInfo( + session_id=str(uuid.uuid4()), + created_at=datetime.now(), + last_activity=datetime.now(), + ip_address="127.0.0.1", + user_agent="Mozilla/5.0 (Demo Browser)" + ) + ] + return sessions + + +@router.delete("/sessions/{session_id}") +async def revoke_session( + session_id: str, + current_user: Dict[str, Any] = Depends(get_current_user), + request: Request = None +): + """ + Revoke a specific user session + """ + # This would typically remove the session from a database + await _log_audit_event( + user_id=current_user["uid"], + action="session_revoked", + request=request, + details={"session_id": session_id} + ) + + return {"message": f"Session {session_id} has been revoked"} + + +@router.get("/audit-logs", response_model=List[AuditLogEntry]) +async def get_audit_logs( + current_user: Dict[str, Any] = Depends(get_current_user), + limit: int = 50 +): + """ + Get audit logs for the current user (admin only) + """ + if current_user.get("role") != UserRole.ADMIN: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only administrators can view audit logs" + ) + + # This would typically fetch from a database + # For demo purposes, return mock audit logs + logs = [ + AuditLogEntry( + user_id=current_user["uid"], + action="login", + timestamp=datetime.now(), + ip_address="127.0.0.1", + user_agent="Mozilla/5.0 (Demo Browser)" + ), + AuditLogEntry( + user_id=current_user["uid"], + action="profile_update", + timestamp=datetime.now(), + ip_address="127.0.0.1", + details={"fields_updated": ["first_name"]} + ) + ] + return logs[:limit] + + +async def _log_audit_event( + user_id: str, + action: str, + request: Request = None, + details: Optional[Dict[str, Any]] = None +): + """ + Log an audit event + """ + # This would typically save to a database + # For demo purposes, we'll just print it + ip_address = request.client.host if request else None + user_agent = request.headers.get("user-agent") if request else None + + audit_entry = AuditLogEntry( + user_id=user_id, + action=action, + timestamp=datetime.now(), + ip_address=ip_address, + user_agent=user_agent, + details=details + ) + + print(f"Audit Log: {audit_entry.dict()}") \ No newline at end of file diff --git a/app/auth/security.py b/app/auth/security.py new file mode 100644 index 0000000..3b90cbc --- /dev/null +++ b/app/auth/security.py @@ -0,0 +1,183 @@ +""" +Security utilities and helpers for authentication +""" +import re +import hashlib +import secrets +from typing import Optional +from datetime import datetime, timedelta + + +class PasswordValidator: + """Password validation utility""" + + @staticmethod + def validate_password_strength(password: str) -> dict: + """ + Validate password strength and return detailed feedback + """ + issues = [] + score = 0 + + # Length check + if len(password) < 8: + issues.append("Password must be at least 8 characters long") + else: + score += 1 + + # Uppercase check + if not re.search(r'[A-Z]', password): + issues.append("Password must contain at least one uppercase letter") + else: + score += 1 + + # Lowercase check + if not re.search(r'[a-z]', password): + issues.append("Password must contain at least one lowercase letter") + else: + score += 1 + + # Number check + if not re.search(r'\d', password): + issues.append("Password must contain at least one number") + else: + score += 1 + + # Special character check + if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): + issues.append("Password must contain at least one special character") + else: + score += 1 + + # Common password check + common_passwords = [ + "password", "123456", "password123", "admin", "qwerty", + "letmein", "welcome", "monkey", "dragon", "master" + ] + + if password.lower() in common_passwords: + issues.append("Password is too common") + score -= 1 + + strength_levels = ["Very Weak", "Weak", "Fair", "Good", "Strong", "Very Strong"] + strength = strength_levels[min(score, len(strength_levels) - 1)] + + return { + "valid": len(issues) == 0, + "score": score, + "strength": strength, + "issues": issues + } + + +class SecurityHeaders: + """Security headers utility""" + + @staticmethod + def get_security_headers() -> dict: + """ + Get recommended security headers + """ + return { + "X-Content-Type-Options": "nosniff", + "X-Frame-Options": "DENY", + "X-XSS-Protection": "1; mode=block", + "Strict-Transport-Security": "max-age=31536000; includeSubDomains", + "Referrer-Policy": "strict-origin-when-cross-origin", + "Content-Security-Policy": "default-src 'self'", + "Permissions-Policy": "geolocation=(), microphone=(), camera=()" + } + + +class TokenGenerator: + """Token generation utilities""" + + @staticmethod + def generate_session_token() -> str: + """ + Generate a secure session token + """ + return secrets.token_urlsafe(32) + + @staticmethod + def generate_api_key() -> str: + """ + Generate a secure API key + """ + return secrets.token_urlsafe(48) + + @staticmethod + def generate_verification_code() -> str: + """ + Generate a 6-digit verification code + """ + return f"{secrets.randbelow(1000000):06d}" + + +class IPWhitelist: + """IP whitelist management""" + + def __init__(self): + self.allowed_ips = set() + self.blocked_ips = set() + + def add_allowed_ip(self, ip: str): + """Add IP to whitelist""" + self.allowed_ips.add(ip) + + def add_blocked_ip(self, ip: str): + """Add IP to blacklist""" + self.blocked_ips.add(ip) + + def is_ip_allowed(self, ip: str) -> bool: + """Check if IP is allowed""" + if ip in self.blocked_ips: + return False + if not self.allowed_ips: # If no whitelist, allow all + return True + return ip in self.allowed_ips + + +class AuditLogger: + """Audit logging utility""" + + def __init__(self): + self.logs = [] + + def log_event(self, + user_id: str, + action: str, + ip_address: Optional[str] = None, + user_agent: Optional[str] = None, + details: Optional[dict] = None): + """ + Log an audit event + """ + log_entry = { + "timestamp": datetime.now().isoformat(), + "user_id": user_id, + "action": action, + "ip_address": ip_address, + "user_agent": user_agent, + "details": details or {} + } + + self.logs.append(log_entry) + + # In production, this would save to a database + print(f"AUDIT: {log_entry}") + + def get_user_logs(self, user_id: str, limit: int = 100) -> list: + """ + Get audit logs for a specific user + """ + user_logs = [log for log in self.logs if log["user_id"] == user_id] + return sorted(user_logs, key=lambda x: x["timestamp"], reverse=True)[:limit] + + +# Global instances +password_validator = PasswordValidator() +security_headers = SecurityHeaders() +token_generator = TokenGenerator() +ip_whitelist = IPWhitelist() +audit_logger = AuditLogger() diff --git a/app/main.py b/app/main.py index 0342502..538f8d3 100644 --- a/app/main.py +++ b/app/main.py @@ -1,15 +1,57 @@ -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware +from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.responses import JSONResponse from .auth.routes import router as auth_router from .example_protected_routes import router as protected_router import os +import time +from collections import defaultdict +from typing import Dict + +# Rate limiting storage (in production, use Redis or database) +rate_limit_storage: Dict[str, list] = defaultdict(list) + +# Rate limiting middleware +@app.middleware("http") +async def rate_limit_middleware(request: Request, call_next): + # Get client IP + client_ip = request.client.host + + # Current time + current_time = time.time() + + # Clean old entries (older than 1 minute) + rate_limit_storage[client_ip] = [ + timestamp for timestamp in rate_limit_storage[client_ip] + if current_time - timestamp < 60 + ] + + # Check if rate limit exceeded (100 requests per minute) + if len(rate_limit_storage[client_ip]) >= 100: + return JSONResponse( + status_code=429, + content={"detail": "Rate limit exceeded. Maximum 100 requests per minute."} + ) + + # Add current request timestamp + rate_limit_storage[client_ip].append(current_time) + + # Process request + response = await call_next(request) + return response # Create FastAPI app app = FastAPI( title="Authentication API", - description="A FastAPI application with Firebase authentication", - version="1.0.0" + description="A FastAPI application with Firebase authentication and enhanced security features", + version="2.0.0" +) + +# Add trusted host middleware for security +app.add_middleware( + TrustedHostMiddleware, + allowed_hosts=["localhost", "127.0.0.1", "*.yourdomain.com"] ) # Add CORS middleware @@ -44,11 +86,43 @@ async def health_check(): @app.get("/") async def root(): return { - "message": "Welcome to Authentication API", + "message": "Welcome to Authentication API v2.0", + "version": "2.0.0", + "features": [ + "User authentication and authorization", + "Role-based access control", + "Profile management", + "Session management", + "Audit logging", + "Rate limiting", + "Security enhancements" + ], "docs": "/docs", "redoc": "/redoc", "endpoints": { "auth": "/auth", "protected": "/protected" } + } + +# Security information endpoint +@app.get("/security-info") +async def security_info(): + return { + "security_features": { + "rate_limiting": "100 requests per minute per IP", + "trusted_hosts": "Configured for localhost and trusted domains", + "cors": "Configured for cross-origin requests", + "audit_logging": "All authentication events are logged", + "session_management": "Users can manage active sessions", + "role_based_access": "Admin, Moderator, and User roles supported" + }, + "recommendations": [ + "Configure CORS origins for production", + "Use HTTPS in production", + "Implement proper database for audit logs", + "Consider using Redis for rate limiting", + "Add two-factor authentication", + "Implement password complexity requirements" + ] } \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f6086a0..487bb90 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,3 +38,6 @@ uvicorn==0.34.0 uvloop==0.21.0 watchfiles==1.0.4 websockets==15.0 +redis==5.2.1 +cryptography==43.0.3 +bcrypt==4.2.1