diff --git a/app/auth/firebase_auth.py b/app/auth/firebase_auth.py index 7326f38..7c5d003 100644 --- a/app/auth/firebase_auth.py +++ b/app/auth/firebase_auth.py @@ -1,12 +1,16 @@ import os -import firebase_admin -from firebase_admin import auth, credentials -from firebase_admin.auth import UserRecord -from typing import Optional, Dict, Any import json +import logging from datetime import datetime, timedelta +from typing import Optional, Dict, Any + +import firebase_admin +from firebase_admin import auth, credentials import jwt +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + class FirebaseAuthService: def __init__(self): @@ -17,29 +21,26 @@ def __init__(self): self.refresh_token_expiry = timedelta(days=7) def _initialize_firebase(self): - """Initialize Firebase Admin SDK""" + """Initialize Firebase Admin SDK using env credentials, service file, or default.""" try: - # Try to get Firebase credentials from environment firebase_credentials = os.getenv("FIREBASE_CREDENTIALS") if firebase_credentials: - cred_dict = json.loads(firebase_credentials) - cred = credentials.Certificate(cred_dict) + cred = credentials.Certificate(json.loads(firebase_credentials)) else: - # Fallback to service account file service_account_path = os.getenv("FIREBASE_SERVICE_ACCOUNT_PATH") if service_account_path and os.path.exists(service_account_path): cred = credentials.Certificate(service_account_path) else: - # Use default credentials (for development) cred = credentials.ApplicationDefault() - + firebase_admin.initialize_app(cred) + logger.info("Firebase initialized successfully.") except Exception as e: - print(f"Firebase initialization error: {e}") - raise + logger.error(f"Firebase initialization error: {e}") + raise RuntimeError("Failed to initialize Firebase SDK.") from e async def create_user(self, email: str, password: str, first_name: str, last_name: str) -> Dict[str, Any]: - """Create a new user in Firebase""" + """Create a new Firebase user with custom claims.""" try: user_record = auth.create_user( email=email, @@ -47,42 +48,39 @@ async def create_user(self, email: str, password: str, first_name: str, last_nam display_name=f"{first_name} {last_name}", email_verified=False ) - - # Set custom claims auth.set_custom_user_claims(user_record.uid, { "first_name": first_name, "last_name": last_name, "role": "user" }) - + return { "id": user_record.uid, "email": user_record.email, "first_name": first_name, "last_name": last_name, "is_active": not user_record.disabled, - "created_at": user_record.user_metadata.creation_timestamp + "created_at": str(user_record.user_metadata.creation_timestamp) } except Exception as e: - raise Exception(f"Failed to create user: {str(e)}") + logger.error(f"User creation failed: {e}") + raise RuntimeError(f"User creation failed: {e}") from e async def sign_in_user(self, email: str, password: str) -> Dict[str, Any]: - """Sign in user with email and password""" + """ + Simulate sign-in (Firebase doesn't allow password verification via Admin SDK). + Production use should use Firebase Auth REST API. + """ try: - # In a real implementation, you would use Firebase Auth REST API - # For now, we'll simulate the authentication user_record = auth.get_user_by_email(email) - + if user_record.disabled: - raise Exception("User account is disabled") - - # Generate JWT tokens + raise PermissionError("User account is disabled") + access_token = self._generate_access_token(user_record.uid, user_record.email) refresh_token = self._generate_refresh_token(user_record.uid) - - # Get custom claims - custom_claims = auth.get_custom_user_claims(user_record.uid) - + custom_claims = auth.get_custom_user_claims(user_record.uid) or {} + return { "access_token": access_token, "refresh_token": refresh_token, @@ -96,15 +94,16 @@ async def sign_in_user(self, email: str, password: str) -> Dict[str, Any]: } } except Exception as e: - raise Exception(f"Authentication failed: {str(e)}") + logger.error(f"Sign-in failed: {e}") + raise RuntimeError("Authentication failed.") from e async def verify_token(self, token: str) -> Optional[Dict[str, Any]]: - """Verify Firebase ID token""" + """Verify Firebase ID token and return user metadata.""" try: - decoded_token = auth.verify_id_token(token) - user_record = auth.get_user(decoded_token["uid"]) - custom_claims = auth.get_custom_user_claims(user_record.uid) - + decoded = auth.verify_id_token(token) + user_record = auth.get_user(decoded["uid"]) + custom_claims = auth.get_custom_user_claims(user_record.uid) or {} + return { "uid": user_record.uid, "email": user_record.email, @@ -113,46 +112,43 @@ async def verify_token(self, token: str) -> Optional[Dict[str, Any]]: "role": custom_claims.get("role", "user") } except Exception as e: - print(f"Token verification failed: {e}") + logger.warning(f"Token verification failed: {e}") + return None + + async def refresh_access_token(self, refresh_token: str) -> Optional[str]: + """Issue new access token if refresh token is valid.""" + try: + payload = jwt.decode(refresh_token, self.jwt_secret, algorithms=[self.jwt_algorithm]) + if payload.get("type") != "refresh": + raise PermissionError("Invalid token type for refresh") + + user_id = payload.get("user_id") + user_record = auth.get_user(user_id) + + return self._generate_access_token(user_id, user_record.email) + except Exception as e: + logger.warning(f"Token refresh failed: {e}") return None def _generate_access_token(self, user_id: str, email: str) -> str: - """Generate JWT access token""" payload = { "user_id": user_id, "email": email, - "exp": datetime.utcnow() + self.access_token_expiry, "iat": datetime.utcnow(), + "exp": datetime.utcnow() + self.access_token_expiry, "type": "access" } return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm) def _generate_refresh_token(self, user_id: str) -> str: - """Generate JWT refresh token""" payload = { "user_id": user_id, - "exp": datetime.utcnow() + self.refresh_token_expiry, "iat": datetime.utcnow(), + "exp": datetime.utcnow() + self.refresh_token_expiry, "type": "refresh" } return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm) - async def refresh_access_token(self, refresh_token: str) -> Optional[str]: - """Refresh access token using refresh token""" - try: - payload = jwt.decode(refresh_token, self.jwt_secret, algorithms=[self.jwt_algorithm]) - - if payload.get("type") != "refresh": - raise Exception("Invalid token type") - - user_id = payload.get("user_id") - user_record = auth.get_user(user_id) - - return self._generate_access_token(user_id, user_record.email) - except Exception as e: - print(f"Token refresh failed: {e}") - return None - -# Global instance -firebase_auth = FirebaseAuthService() \ No newline at end of file +# Singleton instance +firebase_auth = FirebaseAuthService()