Skip to content

Commit 266042f

Browse files
committed
refactor: move owasp route into small file
1 parent 545fb71 commit 266042f

File tree

18 files changed

+1524
-1327
lines changed

18 files changed

+1524
-1327
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""add_owasp_demo_permissions
2+
3+
Revision ID: 405b1d47dec0
4+
Revises: ffdac40437ff
5+
Create Date: 2025-12-14 20:05:20.434206
6+
7+
"""
8+
import uuid
9+
from datetime import datetime, timezone
10+
11+
from alembic import op
12+
import sqlalchemy as sa
13+
14+
15+
# revision identifiers, used by Alembic.
16+
revision = '405b1d47dec0'
17+
down_revision = 'ffdac40437ff'
18+
branch_labels = None
19+
depends_on = None
20+
21+
22+
def upgrade():
23+
"""Add OWASP demo permissions and assign them to roles."""
24+
permission_table = sa.table(
25+
'permission',
26+
sa.column('id', sa.Uuid()),
27+
sa.column('name', sa.String()),
28+
sa.column('description', sa.String()),
29+
sa.column('resource', sa.String()),
30+
sa.column('action', sa.String()),
31+
sa.column('created_at', sa.DateTime()),
32+
)
33+
34+
role_permission_table = sa.table(
35+
'role_permission',
36+
sa.column('role_id', sa.Uuid()),
37+
sa.column('permission_id', sa.Uuid()),
38+
)
39+
40+
now = datetime.now(timezone.utc)
41+
42+
# OWASP Demo permissions
43+
owasp_permissions = [
44+
{'id': uuid.uuid4(), 'name': 'documents:read', 'description': 'View all documents (OWASP demo)', 'resource': 'documents', 'action': 'read', 'created_at': now},
45+
{'id': uuid.uuid4(), 'name': 'notes:read', 'description': 'View all notes (OWASP demo)', 'resource': 'notes', 'action': 'read', 'created_at': now},
46+
{'id': uuid.uuid4(), 'name': 'config:read', 'description': 'View system configuration', 'resource': 'config', 'action': 'read', 'created_at': now},
47+
{'id': uuid.uuid4(), 'name': 'audit:write', 'description': 'Write audit log entries', 'resource': 'audit', 'action': 'write', 'created_at': now},
48+
]
49+
50+
op.bulk_insert(permission_table, owasp_permissions)
51+
52+
# Get role IDs from database
53+
connection = op.get_bind()
54+
55+
admin_role = connection.execute(
56+
sa.text("SELECT id FROM role WHERE name = 'Admin'")
57+
).fetchone()
58+
editor_role = connection.execute(
59+
sa.text("SELECT id FROM role WHERE name = 'Editor'")
60+
).fetchone()
61+
viewer_role = connection.execute(
62+
sa.text("SELECT id FROM role WHERE name = 'Viewer'")
63+
).fetchone()
64+
65+
if not admin_role or not editor_role or not viewer_role:
66+
return # Roles not found, skip permission assignment
67+
68+
perm_ids = {p['name']: p['id'] for p in owasp_permissions}
69+
70+
role_permissions = []
71+
72+
# Admin gets all OWASP permissions
73+
for perm in owasp_permissions:
74+
role_permissions.append({'role_id': admin_role[0], 'permission_id': perm['id']})
75+
76+
# Editor gets all OWASP permissions (read + write)
77+
for perm_name in ['documents:read', 'notes:read', 'config:read', 'audit:write']:
78+
role_permissions.append({'role_id': editor_role[0], 'permission_id': perm_ids[perm_name]})
79+
80+
# Viewer gets read-only OWASP permissions
81+
for perm_name in ['documents:read', 'notes:read']:
82+
role_permissions.append({'role_id': viewer_role[0], 'permission_id': perm_ids[perm_name]})
83+
84+
op.bulk_insert(role_permission_table, role_permissions)
85+
86+
87+
def downgrade():
88+
"""Remove OWASP demo permissions."""
89+
connection = op.get_bind()
90+
91+
# Get permission IDs
92+
perm_names = ['documents:read', 'notes:read', 'config:read', 'audit:write']
93+
94+
for perm_name in perm_names:
95+
# Delete role_permission associations first
96+
connection.execute(
97+
sa.text("""
98+
DELETE FROM role_permission
99+
WHERE permission_id IN (SELECT id FROM permission WHERE name = :name)
100+
"""),
101+
{'name': perm_name}
102+
)
103+
# Delete the permission
104+
connection.execute(
105+
sa.text("DELETE FROM permission WHERE name = :name"),
106+
{'name': perm_name}
107+
)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""
2+
OWASP Top 10 2025 Vulnerability Demonstration Routes
3+
4+
Each submodule contains vulnerable and secure endpoint pairs for educational purposes.
5+
"""
6+
7+
from app.modules.owasp_demo.routers.a01_access_control import router as router_a01
8+
from app.modules.owasp_demo.routers.a02_security_misconfig import router as router_a02
9+
from app.modules.owasp_demo.routers.a03_supply_chain import router as router_a03
10+
from app.modules.owasp_demo.routers.a04_cryptographic import router as router_a04
11+
from app.modules.owasp_demo.routers.a05_injection import router as router_a05
12+
from app.modules.owasp_demo.routers.a06_insecure_design import router as router_a06
13+
from app.modules.owasp_demo.routers.a07_authentication import router as router_a07
14+
from app.modules.owasp_demo.routers.a08_integrity import router as router_a08
15+
from app.modules.owasp_demo.routers.a09_logging import router as router_a09
16+
from app.modules.owasp_demo.routers.a10_exception_handling import router as router_a10
17+
from app.modules.owasp_demo.routers.summary import router as router_summary
18+
19+
__all__ = [
20+
"router_a01",
21+
"router_a02",
22+
"router_a03",
23+
"router_a04",
24+
"router_a05",
25+
"router_a06",
26+
"router_a07",
27+
"router_a08",
28+
"router_a09",
29+
"router_a10",
30+
"router_summary",
31+
]
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
"""
2+
A01:2025 - Broken Access Control
3+
4+
Demonstrates IDOR, mass assignment, and missing function-level access control vulnerabilities.
5+
"""
6+
7+
import uuid
8+
from typing import Any
9+
10+
from fastapi import APIRouter, HTTPException
11+
from sqlmodel import select
12+
13+
from app.modules.owasp_demo.models import SecretDocument, UserNote
14+
from app.modules.owasp_demo.schemas import (
15+
DocumentPublic,
16+
NotePublic,
17+
UserProfileUpdate,
18+
)
19+
from app.modules.rbac.service import UserRoleService
20+
from app.modules.shared import CurrentUser, SessionDep
21+
from app.modules.users.models import User
22+
23+
router = APIRouter(
24+
prefix="/a01",
25+
tags=["A01 - Broken Access Control"],
26+
)
27+
28+
29+
@router.get("/vulnerable/documents", response_model=DocumentPublic)
30+
def get_document_vulnerable(
31+
session: SessionDep,
32+
doc_id: uuid.UUID | None = None,
33+
) -> Any:
34+
"""
35+
VULNERABLE: Insecure Direct Object Reference (IDOR)
36+
37+
Problem: No authentication or authorization check.
38+
Anyone can access any document by guessing/enumerating the ID.
39+
40+
Attack: Simply change the doc_id in the URL to access other users' documents.
41+
"""
42+
if doc_id:
43+
document = session.get(SecretDocument, doc_id)
44+
else:
45+
# Auto-fetch first available document for easy testing
46+
document = session.exec(select(SecretDocument).limit(1)).first()
47+
48+
if not document:
49+
raise HTTPException(status_code=404, detail="Document not found")
50+
return document
51+
52+
53+
@router.get("/secure/documents", response_model=DocumentPublic)
54+
def get_document_secure(
55+
session: SessionDep,
56+
current_user: CurrentUser,
57+
doc_id: uuid.UUID | None = None,
58+
) -> Any:
59+
"""
60+
SECURE: Proper access control implementation with RBAC
61+
62+
Fix:
63+
1. Requires authentication (CurrentUser dependency)
64+
2. Verifies the user owns the document OR has documents:read permission OR is superuser
65+
"""
66+
if doc_id:
67+
document = session.get(SecretDocument, doc_id)
68+
else:
69+
# Auto-fetch first available document for easy testing
70+
document = session.exec(select(SecretDocument).limit(1)).first()
71+
72+
if not document:
73+
raise HTTPException(status_code=404, detail="Document not found")
74+
75+
if document.owner_id != current_user.id and not current_user.is_superuser:
76+
user_role_service = UserRoleService(session)
77+
if not user_role_service.user_has_permission(current_user.id, "documents:read"):
78+
raise HTTPException(status_code=403, detail="Access denied")
79+
80+
return document
81+
82+
83+
@router.get("/vulnerable/notes", response_model=NotePublic)
84+
def get_note_idor_vulnerable(
85+
session: SessionDep,
86+
current_user: CurrentUser, # noqa: ARG001
87+
note_id: int | None = None,
88+
) -> Any:
89+
"""
90+
VULNERABLE: IDOR with sequential IDs
91+
92+
Problem: Uses sequential integer IDs that are easy to enumerate.
93+
Even with authentication, there's no ownership check.
94+
95+
Attack: Try note_id=1, note_id=2, etc. to access other users' notes.
96+
"""
97+
if note_id:
98+
note = session.get(UserNote, note_id)
99+
else:
100+
# Auto-fetch first available note for easy testing
101+
note = session.exec(select(UserNote).limit(1)).first()
102+
103+
if not note:
104+
raise HTTPException(status_code=404, detail="Note not found")
105+
return note
106+
107+
108+
@router.get("/secure/notes", response_model=NotePublic)
109+
def get_note_secure(
110+
session: SessionDep,
111+
current_user: CurrentUser,
112+
note_id: int | None = None,
113+
) -> Any:
114+
"""
115+
SECURE: Proper IDOR prevention with RBAC
116+
117+
Fix: Always verify ownership OR check RBAC permissions before returning data.
118+
"""
119+
if note_id:
120+
note = session.get(UserNote, note_id)
121+
else:
122+
# Auto-fetch first available note for easy testing
123+
note = session.exec(select(UserNote).limit(1)).first()
124+
125+
if not note:
126+
raise HTTPException(status_code=404, detail="Note not found")
127+
128+
if note.owner_id != current_user.id and not current_user.is_superuser:
129+
user_role_service = UserRoleService(session)
130+
if not user_role_service.user_has_permission(current_user.id, "notes:read"):
131+
raise HTTPException(status_code=403, detail="Access denied")
132+
133+
return note
134+
135+
136+
@router.patch("/vulnerable/users/profile")
137+
def update_profile_mass_assignment(
138+
session: SessionDep,
139+
current_user: CurrentUser,
140+
profile: UserProfileUpdate,
141+
) -> dict[str, Any]:
142+
"""
143+
VULNERABLE: Mass Assignment / Privilege Escalation
144+
145+
Problem: Accepts is_superuser and is_active fields from user input,
146+
allowing users to grant themselves admin privileges.
147+
148+
Attack: Send {"is_superuser": true} in the request body.
149+
"""
150+
user = session.get(User, current_user.id)
151+
if not user:
152+
raise HTTPException(status_code=404, detail="User not found")
153+
154+
update_data = profile.model_dump(exclude_unset=True)
155+
for field, value in update_data.items():
156+
setattr(user, field, value)
157+
158+
session.add(user)
159+
session.commit()
160+
session.refresh(user)
161+
162+
return {
163+
"message": "Profile updated",
164+
"is_superuser": user.is_superuser,
165+
"is_active": user.is_active,
166+
}
167+
168+
169+
@router.patch("/secure/users/profile")
170+
def update_profile_secure(
171+
session: SessionDep,
172+
current_user: CurrentUser,
173+
profile: UserProfileUpdate,
174+
) -> dict[str, Any]:
175+
"""
176+
SECURE: Protected against mass assignment
177+
178+
Fix: Explicitly whitelist which fields can be updated.
179+
Never trust user input for sensitive fields.
180+
"""
181+
user = session.get(User, current_user.id)
182+
if not user:
183+
raise HTTPException(status_code=404, detail="User not found")
184+
185+
ALLOWED_FIELDS = {"full_name", "email"}
186+
187+
update_data = profile.model_dump(exclude_unset=True)
188+
for field, value in update_data.items():
189+
if field in ALLOWED_FIELDS:
190+
setattr(user, field, value)
191+
192+
session.add(user)
193+
session.commit()
194+
session.refresh(user)
195+
196+
return {"message": "Profile updated safely", "full_name": user.full_name}
197+
198+
199+
@router.get("/vulnerable/admin/users")
200+
def list_users_no_auth(session: SessionDep) -> dict[str, Any]:
201+
"""
202+
VULNERABLE: Missing Function Level Access Control
203+
204+
Problem: Admin endpoint accessible without authentication.
205+
206+
Attack: Anyone can access /admin/users to see all user data.
207+
"""
208+
statement = select(User)
209+
users = session.exec(statement).all()
210+
return {
211+
"count": len(users),
212+
"users": [
213+
{"id": str(u.id), "email": u.email, "is_superuser": u.is_superuser}
214+
for u in users
215+
],
216+
}
217+
218+
219+
@router.get("/secure/admin/users")
220+
def list_users_secure(
221+
session: SessionDep,
222+
current_user: CurrentUser,
223+
) -> dict[str, Any]:
224+
"""
225+
SECURE: Proper admin access control with RBAC
226+
227+
Fix: Require authentication AND appropriate RBAC permission (users:read).
228+
Demonstrates proper function-level access control using role-based permissions.
229+
"""
230+
if not current_user.is_superuser:
231+
user_role_service = UserRoleService(session)
232+
if not user_role_service.user_has_permission(current_user.id, "users:read"):
233+
raise HTTPException(status_code=403, detail="Access denied")
234+
235+
statement = select(User)
236+
users = session.exec(statement).all()
237+
return {
238+
"count": len(users),
239+
"users": [
240+
{"id": str(u.id), "email": u.email, "is_superuser": u.is_superuser}
241+
for u in users
242+
],
243+
}

0 commit comments

Comments
 (0)