Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 55 additions & 59 deletions app/auth/firebase_auth.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import os
import firebase_admin
from firebase_admin import auth, credentials
from firebase_admin.auth import UserRecord
from typing import Optional, Dict, Any
import json
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any

import firebase_admin
from firebase_admin import auth, credentials
import jwt

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


class FirebaseAuthService:
def __init__(self):
Expand All @@ -17,72 +21,66 @@ def __init__(self):
self.refresh_token_expiry = timedelta(days=7)

def _initialize_firebase(self):
"""Initialize Firebase Admin SDK"""
"""Initialize Firebase Admin SDK using env credentials, service file, or default."""
try:
# Try to get Firebase credentials from environment
firebase_credentials = os.getenv("FIREBASE_CREDENTIALS")
if firebase_credentials:
cred_dict = json.loads(firebase_credentials)
cred = credentials.Certificate(cred_dict)
cred = credentials.Certificate(json.loads(firebase_credentials))
else:
# Fallback to service account file
service_account_path = os.getenv("FIREBASE_SERVICE_ACCOUNT_PATH")
if service_account_path and os.path.exists(service_account_path):
cred = credentials.Certificate(service_account_path)
else:
# Use default credentials (for development)
cred = credentials.ApplicationDefault()

firebase_admin.initialize_app(cred)
logger.info("Firebase initialized successfully.")
except Exception as e:
print(f"Firebase initialization error: {e}")
raise
logger.error(f"Firebase initialization error: {e}")
raise RuntimeError("Failed to initialize Firebase SDK.") from e

async def create_user(self, email: str, password: str, first_name: str, last_name: str) -> Dict[str, Any]:
"""Create a new user in Firebase"""
"""Create a new Firebase user with custom claims."""
try:
user_record = auth.create_user(
email=email,
password=password,
display_name=f"{first_name} {last_name}",
email_verified=False
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: The sign_in_user method simulates authentication without actual password verification, which creates a significant security vulnerability. This could lead to authentication bypass and unauthorized access.

Recommendation: Implement proper authentication using Firebase Auth REST API with secure credential validation instead of simulation.


# Set custom claims
auth.set_custom_user_claims(user_record.uid, {
"first_name": first_name,
"last_name": last_name,
"role": "user"
})

return {
"id": user_record.uid,
"email": user_record.email,
"first_name": first_name,
"last_name": last_name,
"is_active": not user_record.disabled,
"created_at": user_record.user_metadata.creation_timestamp
"created_at": str(user_record.user_metadata.creation_timestamp)
}
except Exception as e:
raise Exception(f"Failed to create user: {str(e)}")
logger.error(f"User creation failed: {e}")
raise RuntimeError("User creation failed.") from e

async def sign_in_user(self, email: str, password: str) -> Dict[str, Any]:
"""Sign in user with email and password"""
"""
Simulate sign-in (Firebase doesn't allow password verification via Admin SDK).
Production use should use Firebase Auth REST API.
"""
try:
# In a real implementation, you would use Firebase Auth REST API
# For now, we'll simulate the authentication
user_record = auth.get_user_by_email(email)

if user_record.disabled:
raise Exception("User account is disabled")

# Generate JWT tokens
raise PermissionError("User account is disabled")

access_token = self._generate_access_token(user_record.uid, user_record.email)
refresh_token = self._generate_refresh_token(user_record.uid)

# Get custom claims
custom_claims = auth.get_custom_user_claims(user_record.uid)

custom_claims = auth.get_custom_user_claims(user_record.uid) or {}

return {
"access_token": access_token,
"refresh_token": refresh_token,
Expand All @@ -96,15 +94,16 @@ async def sign_in_user(self, email: str, password: str) -> Dict[str, Any]:
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error Handling Issue: Using generic RuntimeError for authentication failures reduces API clarity and may expose unnecessary system details to clients.

Recommendation: Create and use specific authentication-related exceptions that provide clear, user-friendly error messages without exposing implementation details.

}
except Exception as e:
raise Exception(f"Authentication failed: {str(e)}")
logger.error(f"Sign-in failed: {e}")
raise RuntimeError("Authentication failed.") from e

async def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Verify Firebase ID token"""
"""Verify Firebase ID token and return user metadata."""
try:
decoded_token = auth.verify_id_token(token)
user_record = auth.get_user(decoded_token["uid"])
custom_claims = auth.get_custom_user_claims(user_record.uid)
decoded = auth.verify_id_token(token)
user_record = auth.get_user(decoded["uid"])
custom_claims = auth.get_custom_user_claims(user_record.uid) or {}

return {
"uid": user_record.uid,
"email": user_record.email,
Expand All @@ -113,46 +112,43 @@ async def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"role": custom_claims.get("role", "user")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: Token verification returns None on failure instead of raising an appropriate exception. This silent failure pattern could lead to authentication bypass if callers don't explicitly check for None returns.

Recommendation: Raise specific authentication exceptions with clear error messages rather than returning None.

}
except Exception as e:
print(f"Token verification failed: {e}")
logger.warning(f"Token verification failed: {e}")
return None

async def refresh_access_token(self, refresh_token: str) -> Optional[str]:
"""Issue new access token if refresh token is valid."""
try:
payload = jwt.decode(refresh_token, self.jwt_secret, algorithms=[self.jwt_algorithm])
if payload.get("type") != "refresh":
raise PermissionError("Invalid token type for refresh")

user_id = payload.get("user_id")
user_record = auth.get_user(user_id)

return self._generate_access_token(user_id, user_record.email)
except Exception as e:
logger.warning(f"Token refresh failed: {e}")
return None

def _generate_access_token(self, user_id: str, email: str) -> str:
"""Generate JWT access token"""
payload = {
"user_id": user_id,
"email": email,
"exp": datetime.utcnow() + self.access_token_expiry,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + self.access_token_expiry,
"type": "access"
}
return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)

def _generate_refresh_token(self, user_id: str) -> str:
"""Generate JWT refresh token"""
payload = {
"user_id": user_id,
"exp": datetime.utcnow() + self.refresh_token_expiry,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + self.refresh_token_expiry,
"type": "refresh"
}
return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)

async def refresh_access_token(self, refresh_token: str) -> Optional[str]:
"""Refresh access token using refresh token"""
try:
payload = jwt.decode(refresh_token, self.jwt_secret, algorithms=[self.jwt_algorithm])

if payload.get("type") != "refresh":
raise Exception("Invalid token type")

user_id = payload.get("user_id")
user_record = auth.get_user(user_id)

return self._generate_access_token(user_id, user_record.email)
except Exception as e:
print(f"Token refresh failed: {e}")
return None


# Global instance
firebase_auth = FirebaseAuthService()
# Singleton instance
firebase_auth = FirebaseAuthService()