Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Git
.git
.gitignore
.github

# Python
__pycache__
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
venv/
ENV/
env/

# Testing
.pytest_cache/
.coverage
.coverage.*
htmlcov/
*.db
test.db
test_auth.db
neurobank.db

# IDEs
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store

# Documentation - exclude most but keep important ones
docs/
*.md
!README.md
!docker/README.md

# Dependencies
*.txt
!requirements.txt
!requirements-dev.txt

# CI/CD
.travis.yml
.circleci/
azure-pipelines.yml

# Docker
docker-compose.yml

# Misc
node_modules/
*.log
*.pid
*.seed
*.pid.lock
tmp/
temp/
11 changes: 8 additions & 3 deletions .github/workflows/production-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ jobs:
uses: docker/build-push-action@v5
with:
context: .
file: ./docker/Dockerfile.api
push: false
load: true
tags: neurobank-fastapi:test
Expand Down Expand Up @@ -203,6 +204,7 @@ jobs:

- name: 🔐 Log in to Docker Hub
uses: docker/login-action@v3
if: github.event_name != 'pull_request'
with:
username: neiland
password: ${{ secrets.DOCKER_PAT }}
Expand All @@ -218,12 +220,15 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
file: ./docker/Dockerfile.api
push: ${{ github.event_name != 'pull_request' }}
tags: "neiland/neurobank-fastapi:latest,neiland/neurobank-fastapi:${{ github.sha }}"
# For pull requests, export results to the build cache.
# Otherwise, push to a registry.
outputs: ${{ github.event_name == 'pull_request' && 'type=cacheonly' || 'type=registry' }}
cache-from: type=registry,ref=neiland/neurobank-fastapi:buildcache
cache-to: type=registry,ref=neiland/neurobank-fastapi:buildcache,mode=max
build-args: |
BUILD_DATE=${{ github.event.head_commit.timestamp }}
VCS_REF=${{ github.sha }}


# ============================================================================
# 4. FRONTEND ASSET OPTIMIZATION
Expand Down
143 changes: 142 additions & 1 deletion app/auth/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import os
from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, HTTPException, Request
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session

from ..config import get_settings
from ..database import get_db
from ..models.user import User

# Configuración del esquema de seguridad
security = HTTPBearer(auto_error=False)

# JWT Configuration
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


def get_api_key() -> str:
"""Obtiene la API key desde la configuración centralizada"""
Expand Down Expand Up @@ -67,3 +77,134 @@ def verify_api_key(
)

return provided_api_key


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a JWT access token

Args:
data: Data to encode in the token (should include user_id and role)
expires_delta: Optional expiration time delta

Returns:
Encoded JWT token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt


def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""
Get the current authenticated user from JWT token

Extracts and validates JWT token from Authorization header,
then retrieves the user from the database with their role information.

Args:
credentials: HTTP Bearer credentials containing JWT token
db: Database session

Returns:
User object with role information

Raises:
HTTPException: If token is invalid or user not found
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)

if not credentials:
raise credentials_exception

try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception

user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise credentials_exception

if user.is_active != "true":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)

return user


def require_role(*allowed_roles: str):
"""
Dependency factory for role-based access control

Creates a dependency that checks if the current user has one of the allowed roles.

Args:
allowed_roles: Variable number of role names that are allowed

Returns:
Dependency function that validates user role

Example:
@router.get("/admin", dependencies=[Depends(require_role("admin"))])
async def admin_endpoint():
return {"message": "Admin access granted"}
"""
def role_checker(current_user: User = Depends(get_current_user)) -> User:
if current_user.role.name not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Access denied. Required role: {', '.join(allowed_roles)}"
)
return current_user

return role_checker


# Convenient role-specific dependencies
def admin_only(current_user: User = Depends(get_current_user)) -> User:
"""Dependency that requires admin role"""
if current_user.role.name != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
return current_user


def customer_only(current_user: User = Depends(get_current_user)) -> User:
"""Dependency that requires customer role"""
if current_user.role.name != "customer":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Customer access required"
)
return current_user


def auditor_only(current_user: User = Depends(get_current_user)) -> User:
"""Dependency that requires auditor role"""
if current_user.role.name != "auditor":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Auditor access required"
)
return current_user
7 changes: 7 additions & 0 deletions app/crud/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
CRUD package for NeuroBank FastAPI Toolkit
"""
from app.crud.role import role_crud
from app.crud.user import user_crud

__all__ = ["role_crud", "user_crud"]
67 changes: 67 additions & 0 deletions app/crud/role.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
CRUD operations for UserRole
"""
from typing import List, Optional
from uuid import UUID

from sqlalchemy.orm import Session

from app.models.role import UserRole
from app.schemas.role import UserRoleCreate, UserRoleUpdate


class RoleCRUD:
"""CRUD operations for UserRole"""

def get(self, db: Session, role_id: UUID) -> Optional[UserRole]:
"""Get a role by ID"""
return db.query(UserRole).filter(UserRole.id == role_id).first()

def get_by_name(self, db: Session, name: str) -> Optional[UserRole]:
"""Get a role by name"""
return db.query(UserRole).filter(UserRole.name == name).first()

def get_all(self, db: Session, skip: int = 0, limit: int = 100) -> List[UserRole]:
"""Get all roles with pagination"""
return db.query(UserRole).offset(skip).limit(limit).all()

def create(self, db: Session, role_in: UserRoleCreate) -> UserRole:
"""Create a new role"""
db_role = UserRole(
name=role_in.name,
description=role_in.description,
)
db.add(db_role)
db.commit()
db.refresh(db_role)
return db_role

def update(
self, db: Session, role_id: UUID, role_in: UserRoleUpdate
) -> Optional[UserRole]:
"""Update a role"""
db_role = self.get(db, role_id)
if not db_role:
return None

update_data = role_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_role, field, value)

db.commit()
db.refresh(db_role)
return db_role

def delete(self, db: Session, role_id: UUID) -> Optional[UserRole]:
"""Delete a role"""
db_role = self.get(db, role_id)
if not db_role:
return None

db.delete(db_role)
db.commit()
return db_role


# Create a singleton instance
role_crud = RoleCRUD()
Loading
Loading