|
1 | 1 | from cotlette.shortcuts import render |
2 | 2 |
|
3 | 3 | # Create your views here. |
| 4 | + |
| 5 | +# --- API ROUTES (moved from api.py) --- |
| 6 | +from typing import Union |
| 7 | + |
| 8 | +from fastapi import APIRouter, Depends, HTTPException, status |
| 9 | +from pydantic import BaseModel |
| 10 | +from jose import jwt, JWTError |
| 11 | +from datetime import timedelta |
| 12 | +from .models import UserModel, UserCreate, User |
| 13 | +from .utils import hash_password, generate_jwt, check_password |
| 14 | + |
| 15 | +from starlette.responses import JSONResponse, \ |
| 16 | + PlainTextResponse, \ |
| 17 | + RedirectResponse, \ |
| 18 | + StreamingResponse, \ |
| 19 | + FileResponse, \ |
| 20 | + HTMLResponse |
| 21 | + |
| 22 | +from cotlette.core.database.sqlalchemy import db |
| 23 | +from cotlette.contrib.auth.groups.models import GroupModel |
| 24 | + |
| 25 | +ACCESS_TOKEN_EXPIRE_MINUTES = 30 |
| 26 | + |
| 27 | +router = APIRouter() |
| 28 | + |
| 29 | +# Pydantic model for user login |
| 30 | +class UserLogin(BaseModel): |
| 31 | + email: str |
| 32 | + password: str |
| 33 | + |
| 34 | +# Pydantic model for token |
| 35 | +class Token(BaseModel): |
| 36 | + access_token: str |
| 37 | + token_type: str |
| 38 | + |
| 39 | +# Pydantic model for token data |
| 40 | +class TokenData(BaseModel): |
| 41 | + email: Union[str] = None |
| 42 | + |
| 43 | + |
| 44 | +# JWT settings |
| 45 | +ACCESS_TOKEN_EXPIRE_MINUTES = 30 |
| 46 | + |
| 47 | +@router.route("/login", methods=["POST"]) |
| 48 | +async def login_user(request): |
| 49 | + # Redirect to previous path after login |
| 50 | + if 'history' in request.session and len(request.session['history']): |
| 51 | + previous = request.session['history'].pop() |
| 52 | + else: |
| 53 | + previous = '/admin' |
| 54 | + |
| 55 | + # Get form data |
| 56 | + form = await request.form() |
| 57 | + username = form["email"] |
| 58 | + password = form["password"] |
| 59 | + |
| 60 | + # Search for user in database |
| 61 | + user = UserModel.objects.filter(email=username).first() # type: ignore |
| 62 | + if not user: |
| 63 | + return RedirectResponse(previous, status_code=303) |
| 64 | + |
| 65 | + hashed_pass = user.password_hash |
| 66 | + |
| 67 | + # Check password |
| 68 | + valid_pass = await check_password(password, hashed_pass) |
| 69 | + if not valid_pass: |
| 70 | + return RedirectResponse(previous, status_code=303) |
| 71 | + |
| 72 | + if previous in ('/users/login', '/users/login/', "/"): |
| 73 | + previous = '/admin' |
| 74 | + |
| 75 | + response = RedirectResponse(previous, status_code=303) |
| 76 | + if valid_pass: |
| 77 | + response.set_cookie('jwt', generate_jwt(user.id), httponly=True) |
| 78 | + return response |
| 79 | + |
| 80 | + |
| 81 | +@router.post("/logout", response_model=None) |
| 82 | +def logout(): |
| 83 | + response = JSONResponse(content={"message": "Logout successful"}) |
| 84 | + response.delete_cookie("jwt") |
| 85 | + return response |
| 86 | + |
| 87 | + |
| 88 | +# Create new user (POST) |
| 89 | +@router.post("/", response_model=None) |
| 90 | +async def create_user(user: UserCreate): |
| 91 | + hashed_password = await hash_password(user.password) |
| 92 | + group = await GroupModel.objects.filter(id=user.group_id).first() # type: ignore |
| 93 | + |
| 94 | + # Check if user doesn't exist |
| 95 | + existing_user = await UserModel.objects.filter(email=user.email).first() # type: ignore |
| 96 | + if existing_user: |
| 97 | + return JSONResponse( |
| 98 | + status_code=400, |
| 99 | + content={"message": "User with this email already exists"} |
| 100 | + ) |
| 101 | + |
| 102 | + new_user = await UserModel.objects.create( |
| 103 | + name=user.name, |
| 104 | + age=user.age, |
| 105 | + email=user.email, |
| 106 | + password_hash=hashed_password, |
| 107 | + group=group.id |
| 108 | + ) |
| 109 | + return User( |
| 110 | + id=new_user.id, |
| 111 | + name=new_user.name, |
| 112 | + age=new_user.age, |
| 113 | + email=new_user.email, |
| 114 | + group=new_user.group.id |
| 115 | + ) |
| 116 | + |
| 117 | + |
| 118 | +# Get all users (GET) |
| 119 | +@router.get("/", response_model=list[User]) |
| 120 | +async def get_users(): |
| 121 | + users = await UserModel.objects.all().execute() # type: ignore |
| 122 | + return [User(name=user.name, age=user.age, email=user.email) for user in users] |
0 commit comments