44Demonstrates missing audit logging vulnerabilities.
55"""
66
7+ import logging
78from datetime import datetime
9+ from typing import Any
810
9- from fastapi import APIRouter , HTTPException , Request
11+ from fastapi import APIRouter , Request
12+ from sqlmodel import select
1013
11- from app .modules .owasp_demo .models import AuditLog
12- from app .modules .rbac .service import UserRoleService
14+ from app .modules .owasp_demo .models import AuditLog , AuditLogPublic , AuditLogsPublic
1315from app .modules .shared import CurrentUser , SessionDep
1416
17+ # Configure security audit logger
18+ # In production, this would output JSON to a SIEM system (Splunk, ELK, Datadog)
19+ audit_logger = logging .getLogger ("security.audit" )
20+ audit_logger .setLevel (logging .INFO )
21+
22+ # Add console handler if not already configured
23+ if not audit_logger .handlers :
24+ handler = logging .StreamHandler ()
25+ handler .setFormatter (
26+ logging .Formatter (
27+ "%(asctime)s | %(levelname)s | %(name)s | %(message)s | %(extra_data)s"
28+ )
29+ )
30+ audit_logger .addHandler (handler )
31+
1532router = APIRouter (
1633 prefix = "/a09" ,
1734 tags = ["A09 - Logging Failures" ],
@@ -35,40 +52,156 @@ def no_audit_logging(
3552 return {"status" : "Action completed" , "action" : action }
3653
3754
55+ class LogExtraAdapter (logging .LoggerAdapter ):
56+ """Adapter to format extra data for logging."""
57+
58+ def process (
59+ self , msg : str , kwargs : dict [str , Any ]
60+ ) -> tuple [str , dict [str , Any ]]:
61+ extra = kwargs .get ("extra" , {})
62+ # Format extra data as key=value pairs for structured logging
63+ extra_str = " " .join (f"{ k } ={ v } " for k , v in extra .items ())
64+ kwargs ["extra" ] = {"extra_data" : extra_str }
65+ return msg , kwargs
66+
67+
68+ # Create adapter for structured logging
69+ security_logger = LogExtraAdapter (audit_logger , {})
70+
71+
3872@router .post ("/secure/action" )
3973def with_audit_logging (
4074 session : SessionDep ,
4175 request : Request ,
4276 current_user : CurrentUser ,
4377 action : str ,
4478 target_id : str ,
45- ) -> dict [str , str ]:
79+ ) -> dict [str , Any ]:
4680 """
47- SECURE: Comprehensive audit logging with RBAC
81+ SECURE: Comprehensive audit logging
4882
4983 Fix: Log all security-relevant events with context.
50- Requires 'audit:write' permission to perform auditable actions.
84+ - Console/SIEM logging for real-time monitoring
85+ - Database audit trail for compliance and queries
5186 """
52- if not current_user .is_superuser :
53- user_role_service = UserRoleService (session )
54- if not user_role_service .user_has_permission (current_user .id , "audit:write" ):
55- raise HTTPException (
56- status_code = 403 , detail = "Permission 'audit:write' required"
57- )
58-
5987 client_ip = request .client .host if request .client else "unknown"
88+ user_agent = request .headers .get ("User-Agent" , "unknown" )
89+ timestamp = datetime .utcnow ()
90+
91+ # Structured log context for all log entries
92+ log_context = {
93+ "event_type" : "security_action" ,
94+ "action" : action ,
95+ "target_id" : target_id ,
96+ "user_id" : str (current_user .id ),
97+ "user_email" : current_user .email ,
98+ "ip_address" : client_ip ,
99+ "user_agent" : user_agent [:50 ] if user_agent else "unknown" ,
100+ "timestamp" : timestamp .isoformat (),
101+ }
60102
103+ # Log action attempt
104+ security_logger .info ("Action attempted" , extra = log_context )
105+
106+ # Store in database for audit trail (queryable, compliance)
107+ # Using JSON for rich, structured details that can be queried
61108 log_entry = AuditLog (
62109 action = action ,
63110 user_id = current_user .id ,
64- details = f"Target: { target_id } " ,
111+ details = {
112+ "target_id" : target_id ,
113+ "user_agent" : user_agent [:100 ] if user_agent else "unknown" ,
114+ "request_path" : str (request .url .path ),
115+ "request_method" : request .method ,
116+ },
65117 ip_address = client_ip ,
66- timestamp = datetime . utcnow () ,
118+ timestamp = timestamp ,
67119 )
68120 session .add (log_entry )
69121 session .commit ()
70122
123+ # Log successful completion
124+ security_logger .info (
125+ "Action completed" ,
126+ extra = {** log_context , "status" : "success" , "log_id" : str (log_entry .id )},
127+ )
128+
71129 return {
72130 "status" : "Action completed and logged" ,
73131 "log_id" : str (log_entry .id ),
132+ "audit_log" : log_context ,
133+ "logging_destinations" : {
134+ "console" : "Real-time monitoring (check server terminal)" ,
135+ "database" : f"Audit trail stored (ID: { log_entry .id } )" ,
136+ },
74137 }
138+
139+
140+ @router .delete ("/audit-logs" )
141+ def clear_audit_logs (
142+ session : SessionDep ,
143+ current_user : CurrentUser , # noqa: ARG001
144+ ) -> dict [str , str ]:
145+ """
146+ Clear all audit logs from the database.
147+
148+ This is for demo purposes to reset the audit trail.
149+ In production, audit logs should be immutable and archived, not deleted.
150+ """
151+ statement = select (AuditLog )
152+ logs = session .exec (statement ).all ()
153+ for log in logs :
154+ session .delete (log )
155+ session .commit ()
156+ return {"status" : "Audit logs cleared" , "deleted_count" : str (len (logs ))}
157+
158+
159+ @router .get ("/audit-logs" )
160+ def get_audit_logs (
161+ session : SessionDep ,
162+ current_user : CurrentUser , # noqa: ARG001
163+ skip : int = 0 ,
164+ limit : int = 10 ,
165+ ) -> AuditLogsPublic :
166+ """
167+ Fetch recent audit logs from the database with pagination.
168+
169+ Returns the most recent audit log entries for display.
170+ Requires authentication.
171+ """
172+ from app .modules .users .models import User
173+
174+ # Get total count
175+ count_statement = select (AuditLog )
176+ total_count = len (session .exec (count_statement ).all ())
177+
178+ # Get paginated logs
179+ statement = (
180+ select (AuditLog )
181+ .order_by (AuditLog .timestamp .desc ())
182+ .offset (skip )
183+ .limit (limit )
184+ )
185+ logs = session .exec (statement ).all ()
186+
187+ # Collect unique user IDs and fetch users in batch
188+ user_ids = {log .user_id for log in logs if log .user_id }
189+ users_map : dict [str , str ] = {}
190+ if user_ids :
191+ users = session .exec (select (User ).where (User .id .in_ (user_ids ))).all () # type: ignore[attr-defined]
192+ users_map = {str (user .id ): user .email for user in users }
193+
194+ data = [
195+ AuditLogPublic (
196+ id = str (log .id ),
197+ action = log .action ,
198+ user_id = str (log .user_id ) if log .user_id else None ,
199+ user_email = users_map .get (str (log .user_id )) if log .user_id else None ,
200+ details = log .details ,
201+ ip_address = log .ip_address ,
202+ timestamp = log .timestamp .isoformat () if log .timestamp else None ,
203+ )
204+ for log in logs
205+ ]
206+
207+ return AuditLogsPublic (data = data , count = total_count )
0 commit comments