Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions app/commands/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def _create_user(user_data: dict[str, str | RoleType]) -> None:
user_level = "Admin" if admin else ""
rprint(
f"\n[green]-> {user_level} User [bold]{user_data['email']}"
"[/bold] added succesfully.\n"
"[/bold] added successfully.\n"
)
except HTTPException as exc:
rprint(f"\n[red]-> ERROR adding User : [bold]{exc.detail}\n")
Expand Down Expand Up @@ -246,7 +246,7 @@ async def _verify_user(user_id: int) -> User | None:
user = aiorun(_verify_user(user_id))
if user:
rprint(
f"\n[green]-> User [bold]{user_id}[/bold] verified succesfully.\n"
f"\n[green]-> User [bold]{user_id}[/bold] verified successfully.\n"
)
else:
rprint("\n[red]-> ERROR verifying User : [bold]User not found\n")
Expand Down Expand Up @@ -289,7 +289,7 @@ async def _ban_user(user_id: int, *, unban: bool | None) -> User | None:
if user:
rprint(
f"\n[green]-> User [bold]{user_id}[/bold] "
f"[red]{'UN' if unban else ''}BANNED[/red] succesfully."
f"[red]{'UN' if unban else ''}BANNED[/red] successfully."
)
show_table("", [user])
else:
Expand Down Expand Up @@ -338,7 +338,7 @@ async def _toggle_admin(
status = "removed from" if remove else "granted to"
rprint(
f"\n[green]-> Admin status [bold]{status}[/bold] "
f"User [bold]{user_id}[/bold] succesfully."
f"User [bold]{user_id}[/bold] successfully."
)
show_table("", [user])
else:
Expand All @@ -356,31 +356,25 @@ def delete(
) -> None:
"""Delete the user with the given id."""

async def _delete_user(user_id: int) -> User | None:
async def _delete_user(user_id: int) -> None:
"""Async function to delete a user."""
try:
async with async_session() as session:
await check_db_initialized(session)
user = await session.get(User, user_id)
if user:
await session.delete(user)
await session.commit()
await UserManager.delete_user(user_id, user_id, session)
await session.commit()
except HTTPException as exc:
rprint(f"\n[RED]-> ERROR deleting that User : [bold]{exc.detail}\n")
raise typer.Exit(1) from exc
except SQLAlchemyError as exc:
rprint(f"\n[RED]-> ERROR deleting that User : [bold]{exc}\n")
raise typer.Exit(1) from exc
else:
return user

user = aiorun(_delete_user(user_id))

if user:
rprint(
f"\n[green]-> User [bold]{user_id}[/bold] "
f"[red]DELETED[/red] succesfully."
)
else:
rprint("\n[red]-> ERROR deleting that User : [bold]User not found\n")
raise typer.Exit(1)
aiorun(_delete_user(user_id))
rprint(
f"\n[green]-> User [bold]{user_id}[/bold] "
f"[red]DELETED[/red] successfully."
)


@app.command()
Expand Down
2 changes: 1 addition & 1 deletion app/managers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ResponseMessages:
CANT_GENERATE_RESET = "Unable to generate the Password Reset Token"
INVALID_TOKEN = "That token is Invalid" # noqa: S105
EXPIRED_TOKEN = "That token has Expired" # noqa: S105
VERIFICATION_SUCCESS = "User succesfully Verified"
VERIFICATION_SUCCESS = "User successfully Verified"
USER_NOT_FOUND = "User not Found"
ALREADY_VALIDATED = "You are already validated"
VALIDATION_RESENT = "Validation email re-sent"
Expand Down
30 changes: 27 additions & 3 deletions app/managers/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from email_validator import EmailNotValidError, validate_email
from fastapi import BackgroundTasks, HTTPException, status
from pydantic import NameEmail
from sqlalchemy import Select, delete, or_, select, update
from sqlalchemy import Select, delete, func, or_, select, update
from sqlalchemy.exc import IntegrityError

from app.config.settings import get_settings
Expand All @@ -21,6 +21,7 @@
)
from app.managers.auth import AuthManager
from app.managers.email import EmailManager
from app.models.enums import RoleType
from app.models.user import User
from app.schemas.email import EmailTemplateSchema
from app.schemas.request.user import SearchField
Expand All @@ -31,7 +32,6 @@

from sqlalchemy.ext.asyncio import AsyncSession

from app.models.enums import RoleType
from app.schemas.request.user import (
UserChangePasswordRequest,
UserEditRequest,
Expand All @@ -46,6 +46,7 @@ class ErrorMessages:
AUTH_INVALID = "Wrong email or password"
USER_INVALID = "This User does not exist"
CANT_SELF_BAN = "You cannot ban/unban yourself!"
CANT_DELETE_LAST_ADMIN = "Cannot delete the last admin user"
NOT_VERIFIED = "You need to verify your Email before logging in"
EMPTY_FIELDS = "You must supply all fields and they cannot be empty"
ALREADY_BANNED_OR_UNBANNED = "This User is already banned/unbanned"
Expand Down Expand Up @@ -186,13 +187,36 @@ async def login(
return token, refresh

@staticmethod
async def delete_user(user_id: int, session: AsyncSession) -> None:
async def delete_user(
user_id: int, current_user_id: int, session: AsyncSession
) -> None:
"""Delete the User with specified ID."""
check_user = await get_user_by_id_(user_id, session)
if not check_user:
raise HTTPException(
status.HTTP_404_NOT_FOUND, ErrorMessages.USER_INVALID
)

# Prevent deletion if user is the last admin
if check_user.role == RoleType.admin:
count_query = (
select(func.count())
.select_from(User)
.where(User.role == RoleType.admin)
)
result = await session.execute(count_query)
admin_count = result.scalar()

if (
admin_count is not None
and admin_count <= 1
and current_user_id == user_id
):
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
ErrorMessages.CANT_DELETE_LAST_ADMIN,
)

await session.execute(delete(User).where(User.id == user_id))

@staticmethod
Expand Down
6 changes: 4 additions & 2 deletions app/resources/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ async def edit_user(
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_user(
user_id: int, db: Annotated[AsyncSession, Depends(get_database)]
request: Request,
user_id: int,
db: Annotated[AsyncSession, Depends(get_database)],
) -> None:
"""Delete the specified User by user_id.

Admin only.
"""
await UserManager.delete_user(user_id, db)
await UserManager.delete_user(user_id, request.state.user.id, db)


@router.get(
Expand Down
16 changes: 15 additions & 1 deletion docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Verifies a user's email address using the token sent via email.

```json
{
"detail": "User succesfully Verified"
"detail": "User successfully Verified"
}
```

Expand Down Expand Up @@ -574,10 +574,24 @@ Permanently delete the specified user.

**Response:** `204 No Content`

**Error Responses:**

- `400 Bad Request`: Returned when attempting to delete the last admin user

```json
{
"detail": "Cannot delete the last admin user"
}
```

**Notes:**

- This action is permanent and cannot be undone
- All associated API keys will also be deleted
- **Admin Protection:** The system prevents the last admin from deleting
themselves to avoid system lockout. Admins can delete themselves only when
multiple admin users exist
- Regular users can always be deleted by admins regardless of admin count

---

Expand Down
13 changes: 13 additions & 0 deletions docs/usage/user-control.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ $ api-admin user delete 23
They will no longer be able to access the API, this CANNOT BE UNDONE. It's
probably better to BAN a user unless you are very sure.

!!! warning "Admin Protection"

The system includes safeguards to prevent you from locking yourself out
of the API by removing all admin users.

When using the API directly, the system prevents the last remaining admin
user from deleting **themselves**.

When using the CLI (`api-admin user delete`), the CLI prevents deletion
of the last remaining admin user altogether to avoid system lockout.
To delete an admin user, you must first ensure there is at least one
other admin user in the system.

## Seed Database with Users from a File

You can import users from a CSV file using the `db seed` command:
Expand Down
8 changes: 4 additions & 4 deletions tests/cli/test_cli_create_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_create_user_success(
],
)
assert result.exit_code == 0
assert "added succesfully" in result.output
assert "added successfully" in result.output
assert mock_register.called

def test_create_admin_success(
Expand Down Expand Up @@ -85,7 +85,7 @@ def test_create_admin_success(
],
)
assert result.exit_code == 0
assert "added succesfully" in result.output
assert "added successfully" in result.output
assert "Admin" in result.output
assert mock_register.called

Expand Down Expand Up @@ -159,7 +159,7 @@ def test_create_user_interactive(

result = runner.invoke(app, ["user", "create"], input=user_input)
assert result.exit_code == 0
assert "added succesfully" in result.output
assert "added successfully" in result.output
assert mock_register.called

def test_create_admin_interactive(
Expand All @@ -178,6 +178,6 @@ def test_create_admin_interactive(

result = runner.invoke(app, ["user", "create"], input=user_input)
assert result.exit_code == 0
assert "added succesfully" in result.output
assert "added successfully" in result.output
assert "Admin" in result.output
assert mock_register.called
65 changes: 35 additions & 30 deletions tests/cli/test_cli_user_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,40 +561,29 @@ def test_admin_missing_user(
# ------------------------------------------------------------------------ #
def test_delete_user(self, runner: CliRunner, mocker, test_user) -> None:
"""Test that the 'delete' command works."""
mock_session = mocker.patch(
self.patch_async_session,
)

mock_session.return_value.__aenter__.return_value.get.return_value = (
test_user
mock_manager = mocker.patch(
"app.commands.user.UserManager.delete_user",
return_value=None,
)
mocker.patch(self.patch_async_session)

result = runner.invoke(app, ["user", "delete", str(test_user.id)])
assert result.exit_code == 0

assert mock_session.called
assert mock_session.return_value.__aenter__.return_value.commit.called

assert mock_manager.called
assert f"User {test_user.id} DELETED" in result.output

def test_delete_sqlalchemy_error(
self, runner: CliRunner, mocker, test_user
) -> None:
"""Test that the 'delete' command exits when there is an error."""
mock_session = mocker.patch(
self.patch_async_session,
)
mock_session.return_value.__aenter__.return_value.get.side_effect = (
SQLAlchemyError("Ooooops!!")
)
result = runner.invoke(app, ["user", "delete", str(test_user.id)])
assert result.exit_code == 1

assert mock_session.called
assert (
not mock_session.return_value.__aenter__.return_value.commit.called
mocker.patch(
"app.commands.user.UserManager.delete_user",
side_effect=SQLAlchemyError("Ooooops!!"),
)
mocker.patch(self.patch_async_session)

result = runner.invoke(app, ["user", "delete", str(test_user.id)])
assert result.exit_code == 1

assert "ERROR deleting that User" in result.output
Expand All @@ -604,23 +593,39 @@ def test_delete_missing_user(
self, runner: CliRunner, mocker, test_user
) -> None:
"""Test that the 'delete' command exits when the user is missing."""
mock_session = mocker.patch(
self.patch_async_session,
)
mock_session.return_value.__aenter__.return_value.get.return_value = (
None
mocker.patch(
"app.commands.user.UserManager.delete_user",
side_effect=HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ErrorMessages.USER_INVALID,
),
)
mocker.patch(self.patch_async_session)

result = runner.invoke(app, ["user", "delete", str(test_user.id)])
assert result.exit_code == 1

assert mock_session.called
assert (
not mock_session.return_value.__aenter__.return_value.commit.called
assert "ERROR deleting that User" in result.output
assert ErrorMessages.USER_INVALID in result.output

def test_delete_last_admin_blocked(
self, runner: CliRunner, mocker, test_admin
) -> None:
"""Test that deleting the last admin is blocked."""
mocker.patch(
"app.commands.user.UserManager.delete_user",
side_effect=HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorMessages.CANT_DELETE_LAST_ADMIN,
),
)
mocker.patch(self.patch_async_session)

result = runner.invoke(app, ["user", "delete", str(test_admin.id)])
assert result.exit_code == 1

assert "ERROR deleting that User" in result.output
assert "User not found" in result.output
assert ErrorMessages.CANT_DELETE_LAST_ADMIN in result.output

# ------------------------------------------------------------------------ #
# test 'search' subcommand #
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_auth_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ async def test_verify_user(
)

assert response.status_code == status.HTTP_200_OK
assert response.json()["detail"] == "User succesfully Verified"
assert response.json()["detail"] == "User successfully Verified"

@pytest.mark.parametrize(
"verification_token",
Expand Down
Loading