Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 20 additions & 37 deletions .github/workflows/reviewer.yml
Original file line number Diff line number Diff line change
@@ -1,43 +1,26 @@
name: PR Reviewer Agent
name: PR Event Logger

on:
issue_comment:
types: [created]
pull_request:
types: [opened, synchronize, reopened]
push:
types: [opened, reopened, ready_for_review, review_requested]
issue_comment:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test comment

types: [created, edited]

jobs:
process_pr_events:
log-event:
runs-on: ubuntu-latest
steps:
- name: Extract event details
run: echo "EVENT_PAYLOAD=$(jq -c . < $GITHUB_EVENT_PATH)" >> $GITHUB_ENV

- name: Generate Signature and Encrypt Token
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'

- name: Run event logger
env:
WEBHOOK_SECRET: ${{ secrets.WEBHOOK_SECRET }}
API_TOKEN: ${{ secrets.API_TOKEN }}
run: |
# Generate signature for the payload
SIGNATURE=$(echo -n "$EVENT_PAYLOAD" | openssl dgst -sha256 -hmac "$WEBHOOK_SECRET" | cut -d " " -f2)
echo "SIGNATURE=$SIGNATURE" >> $GITHUB_ENV
# Create a consistent key from the webhook secret
KEY=$(echo -n "$WEBHOOK_SECRET" | openssl dgst -sha256 | cut -d ' ' -f2)
# Generate a random IV
IV=$(openssl rand -hex 16)
# Encrypt token with proper padding
ENCRYPTED_TOKEN=$(echo -n "$API_TOKEN" | openssl enc -aes-256-cbc -a -A -K "$KEY" -iv "$IV" -md sha256)
echo "ENCRYPTED_TOKEN=$ENCRYPTED_TOKEN" >> $GITHUB_ENV
echo "TOKEN_IV=$IV" >> $GITHUB_ENV
- name: Call External API (With Encrypted Token)
run: |
curl -X POST https://firstly-worthy-chamois.ngrok-free.app/github-webhook \
-H "Content-Type: application/json" \
-H "X-Hub-Signature-256: sha256=$SIGNATURE" \
-H "X-Encrypted-Token: $ENCRYPTED_TOKEN" \
-H "X-Token-IV: $TOKEN_IV" \
-d "$EVENT_PAYLOAD"
GITHUB_EVENT_NAME: ${{ github.event_name }}
GITHUB_EVENT_PATH: ${{ github.event_path }}
run: python main.py

1 change: 1 addition & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Authentication API package
1 change: 1 addition & 0 deletions app/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Auth module
67 changes: 67 additions & 0 deletions app/auth/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional, Dict, Any
from .firebase_auth import firebase_auth

# Security scheme for Bearer token
security = HTTPBearer()


async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict[str, Any]:
"""
Dependency to get current authenticated user from Firebase token
"""
token = credentials.credentials

if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)

user_data = await firebase_auth.verify_token(token)

if not user_data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)

return user_data


async def get_current_active_user(current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
"""
Dependency to get current active user
"""
if not current_user.get("is_active", True):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
Comment on lines +39 to +43

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current_user dictionary, which comes from firebase_auth.verify_token, does not contain an is_active key. Because of the get() default value (True), the condition not current_user.get("is_active", True) will always be false, and the check will never fail for an inactive user. firebase_auth.verify_token must be updated to include the user's active status (e.g., is_active: not user_record.disabled) in the dictionary it returns.

return current_user


async def require_role(required_role: str):
"""
Dependency factory to require specific role
"""
async def role_checker(current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
user_role = current_user.get("role", "user")

if user_role != required_role and user_role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Access denied. Required role: {required_role}"
)

return current_user

return role_checker


# Predefined role dependencies
require_admin = require_role("admin")
require_user = require_role("user")
158 changes: 158 additions & 0 deletions app/auth/firebase_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import os
import firebase_admin
from firebase_admin import auth, credentials
from firebase_admin.auth import UserRecord
from typing import Optional, Dict, Any
import json
from datetime import datetime, timedelta
import jwt


class FirebaseAuthService:
def __init__(self):
self._initialize_firebase()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: Hardcoded JWT secret key in default parameter creates a security vulnerability. If the environment variable is missing, the application will use a known default value that attackers could exploit. Remove the hardcoded default secret and ensure JWT_SECRET is always provided through environment variables with proper validation at startup.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: Hardcoded JWT secret in the code. The line self.jwt_secret = os.getenv("JWT_SECRET", "your-secret-key") provides a default value that could be used in production if the environment variable is not set, leading to predictable JWT signing keys. This creates a significant security vulnerability as anyone who sees the code could forge valid JWTs.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: Hardcoded JWT secret key in default parameter creates a security risk. If the environment variable is not set, this predictable default will be used, compromising token security. Remove the default value and fail explicitly if JWT_SECRET is not provided in environment variables.

self.jwt_secret = os.getenv("JWT_SECRET", "your-secret-key")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Using a hardcoded, weak fallback for the JWT_SECRET is a critical security risk. If the JWT_SECRET environment variable is not set in production, the application will use this known, insecure key, allowing attackers to forge valid tokens. The application should fail to start if a secret key is not provided via environment variables.

Suggested change
self.jwt_secret = os.getenv("JWT_SECRET", "your-secret-key")
self.jwt_secret = os.getenv("JWT_SECRET")
if not self.jwt_secret:
raise ValueError("JWT_SECRET environment variable must be set")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: The JWT secret key is hardcoded as a default value (your-secret-key). This should be properly configured from environment variables without a default fallback that could be used in production.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: Hardcoded JWT secret in the initialization. The fallback value "your-secret-key" could be used in production if the environment variable is not set, creating a serious security vulnerability.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Concern: The JWT secret is hardcoded ("your-secret-key") rather than being loaded from environment variables. This creates a significant security risk if deployed to production, as anyone with access to the code could forge valid authentication tokens. Consider using environment variables to store sensitive credentials.

self.jwt_algorithm = "HS256"
self.access_token_expiry = timedelta(hours=1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: Hardcoded JWT secret key in code. This creates a security vulnerability as secrets should never be hardcoded. Move the default "your-secret-key" to a separate development config file and ensure it's never used in production.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: The JWT secret key is hardcoded in the default parameter. This is a serious security risk as it could lead to token forgery if the code is exposed.

def __init__(self, jwt_secret: str = "your-secret-key"):

Recommendation: Move the JWT secret to an environment variable and ensure there is no fallback to a default value that could be used in production.

self.refresh_token_expiry = timedelta(days=7)

def _initialize_firebase(self):
"""Initialize Firebase Admin SDK"""
try:
# Try to get Firebase credentials from environment
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: Firebase credentials initialization with potential for insecure fallback. In production environments, using ApplicationDefault() without proper configuration could lead to unintended access levels. Replace this fallback approach with explicit error handling that fails securely when proper credentials aren't available.

firebase_credentials = os.getenv("FIREBASE_CREDENTIALS")
if firebase_credentials:
cred_dict = json.loads(firebase_credentials)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: Insecure Firebase credential handling with fallback to default credentials could allow unintended access in production. Implement strict credential validation and fail securely if proper credentials aren't available rather than falling back to potentially insecure defaults.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: Firebase credentials handling has a fallback to default credentials which could lead to unintended access in production. Remove the fallback to default credentials in production code or add environment checks to ensure this only happens in development environments.

cred = credentials.Certificate(cred_dict)
else:
# Fallback to service account file
service_account_path = os.getenv("FIREBASE_SERVICE_ACCOUNT_PATH")
if service_account_path and os.path.exists(service_account_path):
cred = credentials.Certificate(service_account_path)
else:
# Use default credentials (for development)
cred = credentials.ApplicationDefault()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: The Firebase initialization uses default credentials as a fallback which could lead to unintended access in production environments. Always require explicit credentials in production.


Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error Handling Issue: Firebase initialization error handling prints potentially sensitive information to stdout. Replace with proper logging using Python's logging module with appropriate log levels to avoid exposing implementation details.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: The Firebase initialization contains an insecure fallback to default credentials which could lead to unintended access in production environments.

try:
    self.firebase_app = firebase_admin.initialize_app(cred)
except Exception as e:
    print(f"Failed to initialize with service account, trying application default: {e}")
    try:
        cred = credentials.ApplicationDefault()
        self.firebase_app = firebase_admin.initialize_app(cred)
    except Exception as e:
        print(f"Failed to initialize with application default: {e}")

Recommendation: Remove the fallback to ApplicationDefault() and implement proper credential validation with appropriate error handling.

firebase_admin.initialize_app(cred)
except Exception as e:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: The Firebase initialization error is being printed to the console which could leak sensitive information about your Firebase configuration in production environments. Consider using a proper logging system with different log levels for different environments.

Similar issues with printing sensitive error information also appear in:

  • app/auth/firebase_auth.py:114 (token verification errors)
  • app/auth/firebase_auth.py:150 (token refresh errors)

print(f"Firebase initialization error: {e}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using print() for error messages is not suitable for a production application. It's better to use Python's standard logging module, which allows for configurable log levels, formats, and destinations. This is crucial for effective monitoring and debugging.

Suggested change
print(f"Firebase initialization error: {e}")
import logging
logging.error(f"Firebase initialization error: {e}")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: Exception details are printed, potentially exposing sensitive configuration information to users. Log errors internally but return generic error messages to users to prevent information disclosure.

raise

async def create_user(self, email: str, password: str, first_name: str, last_name: str) -> Dict[str, Any]:
"""Create a new user in Firebase"""
try:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Quality Issue: The create_user method doesn't verify if a user with the same email already exists before creation, which could lead to unexpected errors. Add a check to verify if the email is already registered before attempting to create a user.

user_record = auth.create_user(
email=email,
password=password,
display_name=f"{first_name} {last_name}",
email_verified=False
)

# Set custom claims
auth.set_custom_user_claims(user_record.uid, {
"first_name": first_name,
"last_name": last_name,
"role": "user"
})

return {
"id": user_record.uid,
"email": user_record.email,
"first_name": first_name,
"last_name": last_name,
"is_active": not user_record.disabled,
"created_at": user_record.user_metadata.creation_timestamp
}
except Exception as e:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error Handling Issue: Exception messages from Firebase are directly exposed in the API responses. This could leak sensitive implementation details to potential attackers. Consider using generic error messages for client responses and logging the actual errors server-side.

This issue also appears in:

  • app/auth/firebase_auth.py:83
  • app/auth/firebase_auth.py:114
  • app/auth/firebase_auth.py:151

raise Exception(f"Failed to create user: {str(e)}")
Comment on lines +66 to +67

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Catching a generic Exception and re-raising it with str(e) can leak sensitive implementation details to the client. This can provide attackers with useful information about your application's internals. Catch specific exceptions from the Firebase SDK (e.g., firebase_admin.auth.EmailAlreadyExistsError) and raise more generic errors for unexpected issues.


async def sign_in_user(self, email: str, password: str) -> Dict[str, Any]:
"""Sign in user with email and password"""
try:
# In a real implementation, you would use Firebase Auth REST API
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Issue: The sign_in_user method appears to be simulating authentication rather than using Firebase's actual authentication mechanisms. This bypasses security controls and could lead to unauthorized access. Implement proper Firebase authentication using their Auth REST API for production use.

# For now, we'll simulate the authentication
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing Gap: The sign_in_user method mentions using Firebase Auth REST API in a real implementation but currently only simulates authentication. This could lead to security issues if deployed as-is. Consider implementing proper Firebase authentication.

user_record = auth.get_user_by_email(email)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Authentication Issue: The sign_in_user method doesn't properly authenticate with Firebase. The current implementation only verifies the user exists but doesn't actually validate the password. Implement proper Firebase authentication using the Auth REST API instead of simulating authentication.


if user_record.disabled:
raise Exception("User account is disabled")

# Generate JWT tokens
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Issue: The sign_in_user method doesn't actually authenticate the user with Firebase (it only retrieves the user). Implement proper Firebase authentication using the Auth REST API instead of simulating authentication.

access_token = self._generate_access_token(user_record.uid, user_record.email)
refresh_token = self._generate_refresh_token(user_record.uid)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Authentication Vulnerability: The sign_in_user method doesn't actually authenticate with Firebase using the password. It only retrieves the user by email, which creates a serious authentication bypass risk.

def sign_in_user(self, email: str, password: str):
    try:
        user = auth.get_user_by_email(email)
        # No actual password verification happens here!

Recommendation: Implement proper Firebase authentication using the Firebase Auth REST API instead of simulating authentication.

# Get custom claims
custom_claims = auth.get_custom_user_claims(user_record.uid)

return {
"access_token": access_token,
"refresh_token": refresh_token,
"user": {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Authentication Issue: The sign_in_user() method doesn't actually authenticate with Firebase using password, it only retrieves the user by email. This creates a serious authentication bypass risk. Implement proper Firebase authentication using the Firebase Auth REST API for email/password authentication as mentioned in your comment.

"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims.get("first_name", ""),
"last_name": custom_claims.get("last_name", ""),
"is_active": not user_record.disabled,
"created_at": str(user_record.user_metadata.creation_timestamp)
}
}
except Exception as e:
raise Exception(f"Authentication failed: {str(e)}")
Comment on lines +71 to +99

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The sign_in_user method retrieves a user by email but never verifies the provided password. This allows anyone to log in as any user simply by knowing their email address. A real implementation needs to validate credentials. Since the Firebase Admin SDK doesn't support password verification directly, you would typically handle sign-in on the client and send the resulting ID token to the backend for verification, or use Firebase's Identity Platform REST API to verify the password on the server-side.


async def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Verify Firebase ID token"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Gap: The token verification process doesn't check for token expiration. Without expiration validation, revoked or expired tokens might still be accepted as valid. Add explicit expiration checking to the token verification logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review comment reply

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reply on older review comment

try:
decoded_token = auth.verify_id_token(token)
user_record = auth.get_user(decoded_token["uid"])
custom_claims = auth.get_custom_user_claims(user_record.uid)

return {
"uid": user_record.uid,
"email": user_record.email,
"first_name": custom_claims.get("first_name", ""),
"last_name": custom_claims.get("last_name", ""),
"role": custom_claims.get("role", "user")
}
except Exception as e:
print(f"Token verification failed: {e}")
return None
Comment on lines +101 to +117

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This function is critically flawed for two main reasons:

  1. Incorrect Verification Method: It uses auth.verify_id_token(token) to validate a custom JWT generated by your _generate_access_token method. This will always fail because verify_id_token is designed for ID tokens issued by Firebase, not custom tokens signed with your own secret. You must use jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm]) instead.

  2. Incomplete User Data: The returned dictionary is missing key fields like is_active and created_at. This causes bugs in other parts of the application, such as the /auth/me endpoint and the get_current_active_user dependency.


def _generate_access_token(self, user_id: str, email: str) -> str:
"""Generate JWT access token"""
payload = {
"user_id": user_id,
"email": email,
"exp": datetime.utcnow() + self.access_token_expiry,
"iat": datetime.utcnow(),
"type": "access"
}
return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)

def _generate_refresh_token(self, user_id: str) -> str:
"""Generate JWT refresh token"""
payload = {
"user_id": user_id,
"exp": datetime.utcnow() + self.refresh_token_expiry,
"iat": datetime.utcnow(),
"type": "refresh"
}
return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)

async def refresh_access_token(self, refresh_token: str) -> Optional[str]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This method is defined as async def, but it does not perform any truly asynchronous operations. The firebase-admin library's auth.get_user method is synchronous, as are the jwt operations. Defining a method as async when it doesn't need to be can be misleading and adds unnecessary overhead.

Suggested change
async def refresh_access_token(self, refresh_token: str) -> Optional[str]:
def refresh_access_token(self, refresh_token: str) -> Optional[str]:

"""Refresh access token using refresh token"""
try:
payload = jwt.decode(refresh_token, self.jwt_secret, algorithms=[self.jwt_algorithm])

if payload.get("type") != "refresh":
raise Exception("Invalid token type")

user_id = payload.get("user_id")
user_record = auth.get_user(user_id)

return self._generate_access_token(user_id, user_record.email)
except Exception as e:
print(f"Token refresh failed: {e}")
return None


# Global instance
firebase_auth = FirebaseAuthService()
39 changes: 39 additions & 0 deletions app/auth/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from pydantic import BaseModel, EmailStr
from typing import Optional


class UserSignupRequest(BaseModel):
email: EmailStr
password: str
first_name: str
last_name: str


class UserLoginRequest(BaseModel):
email: EmailStr
password: str


class UserResponse(BaseModel):
id: str
email: str
first_name: str
last_name: str
is_active: bool
created_at: str

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using str for created_at is acceptable, but it's better practice to use datetime from the datetime module. Pydantic and FastAPI can automatically handle the serialization of datetime objects to ISO 8601 strings in JSON responses, and it provides better type safety within your application code.

Suggested change
created_at: str
created_at: datetime



class AuthResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
user: UserResponse


class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"


class RefreshTokenRequest(BaseModel):
refresh_token: str
Loading