Skip to content

Commit 714938e

Browse files
committed
Unit Test added
1 parent 1224710 commit 714938e

File tree

56 files changed

+1835
-70
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1835
-70
lines changed

backend/models/scan_history.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import pytz
33
from utils.db import db
44
from sqlalchemy.dialects.postgresql import JSONB
5+
from models.user import User
6+
from models.selected_repo import SelectedRepo
57

68
class ScanHistory(db.Model):
79
__tablename__ = "scan_history"
@@ -17,5 +19,5 @@ class ScanHistory(db.Model):
1719
input_type = db.Column(db.String(50))
1820
scan_type = db.Column(db.String(50))
1921

20-
user = db.relationship("User", backref="scan_history")
21-
repo = db.relationship("SelectedRepo", backref="scan_history")
22+
user = db.relationship(User, backref="scan_history")
23+
repo = db.relationship(SelectedRepo, backref="scan_history")

backend/models/user.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class User(db.Model):
99
email = db.Column(db.String(100), unique=True, nullable=False)
1010
password = db.Column(db.Text)
1111
role = db.Column(db.String(20), nullable=False, default="user")
12+
email_notifications_enabled = db.Column(db.Boolean, default=True)
1213
created_at = db.Column(db.DateTime, default=datetime.now(pytz.UTC))
1314

1415
__table_args__ = (

backend/schemas/admin_dto.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from pydantic import BaseModel
1+
from pydantic import BaseModel, field_validator
22
from typing import List
33

44
class UserStatsResponse(BaseModel):
@@ -9,7 +9,22 @@ class UserStatsResponse(BaseModel):
99
created_at: str
1010
scan_count: int
1111

12+
@field_validator('scan_count')
13+
@classmethod
14+
def validate_scan_count(cls, v):
15+
if v < 0:
16+
raise ValueError('Scan count must be greater than 0')
17+
return v
18+
19+
1220
class AdminStatsResponse(BaseModel):
1321
total_users: int
1422
total_scans: int
15-
users: List[UserStatsResponse]
23+
users: List[UserStatsResponse]
24+
25+
@field_validator('total_users', 'total_scans')
26+
@classmethod
27+
def validate_positive(cls, v):
28+
if v < 0:
29+
raise ValueError('Value must be greater than or equal to 0')
30+
return v

backend/schemas/checkov_dto.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from pydantic import BaseModel
1+
from pydantic import BaseModel, Field, field_validator
22
from typing import List, Optional
33

44
class CheckovCheck(BaseModel):
@@ -28,15 +28,22 @@ class CheckovResult(BaseModel):
2828
message: Optional[str] = None
2929

3030
class CheckovResponse(BaseModel):
31-
scan_id: int
31+
scan_id: int = Field(...,description="The ID of the checkov Response")
3232
results: CheckovResult
3333

34+
@field_validator("scan_id")
35+
@classmethod
36+
def validate(cls, v) -> int :
37+
if v <= 0 :
38+
raise ValueError("Checkov scan id must be greater than 0")
39+
return v
40+
3441
class CheckovContentRequest(BaseModel):
35-
content: str
42+
content: str = Field(...,min_length=1,description="The content of the checkov scanned file")
3643
framework: Optional[str] = "terraform"
3744

3845
class CheckovRepoRequest(BaseModel):
39-
repo_url: str
46+
repo_url: str = Field(...,min_length=1, description="The URL of the repository")
4047

4148
class CheckovErrorResponse(BaseModel):
4249
error: str

backend/schemas/semgrep_dto.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
from pydantic import BaseModel
2-
from typing import List, Dict, Any, Optional
1+
from pydantic import BaseModel, Field, field_validator
2+
from typing import List, Dict, Optional
33

44
class SemgrepFinding(BaseModel):
55
check_id: Optional[str]
66
message: Optional[str]
77
file_path: Optional[str]
88
file_line_range: List[int]
9-
severity: str
9+
severity: str = Field(..., min_length=1)
1010
code: Optional[str]
1111
suggestion: Optional[str]
1212

@@ -16,9 +16,16 @@ class SemgrepResult(BaseModel):
1616

1717
class SemgrepResponse(BaseModel):
1818
scan_id: int
19-
status: str
19+
status: str = Field(...,min_length=1)
2020
results: SemgrepResult
2121

22+
@field_validator('scan_id')
23+
@classmethod
24+
def validate(cls, v:int) -> int:
25+
if v <= 0 :
26+
raise ValueError("Scan ID must be greater than 0")
27+
return v
28+
2229
class SemgrepErrorResponse(BaseModel):
2330
error: str
2431
details: Optional[str] = None

backend/schemas/t5_dto.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
from pydantic import BaseModel
1+
from pydantic import BaseModel, Field
22
from typing import Optional
33

44
class T5Request(BaseModel):
5-
dockerfile: str
5+
dockerfile: str = Field(...,min_length=1,description='Dockerfile')
66

77
class T5Response(BaseModel):
8-
scan_id: int
8+
scan_id: int = Field(...,gt=0,description='scan id')
99
correction: str
1010
explanation: str
1111

backend/schemas/user_dto.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,32 @@
1-
from pydantic import BaseModel
1+
from pydantic import BaseModel, Field, EmailStr, field_validator
22
from typing import Optional
33

4+
45
class RegisterRequest(BaseModel):
5-
name: str
6-
email: str
7-
password: str
6+
name: str = Field(..., min_length=1, description="Full name, cannot be empty")
7+
email: EmailStr = Field(..., description="Valid email address")
8+
password: str = Field(..., min_length=5, description="Password at least 5 characters")
9+
@field_validator('name')
10+
@classmethod
11+
def validate_name(cls, v: str) -> str:
12+
if not v.strip(): # Extra check for whitespace-only
13+
raise ValueError("Name cannot be empty or whitespace-only")
14+
return v.strip()
815

916
class VerifyCodeRequest(BaseModel):
10-
email: str
11-
code: str
17+
email: EmailStr = Field(..., description="Valid email address")
18+
code: str = Field(...,min_length=6,max_length=6, description="Valid code")
1219

1320
class LoginRequest(BaseModel):
14-
email: str
15-
password: str
16-
21+
email: EmailStr = Field(..., description="Valid email address")
22+
password: str = Field(..., min_length=5, description="Password at least 5 characters")
1723
class SetPasswordRequest(BaseModel):
18-
password: str
24+
password: str = Field(..., min_length=5, description="Password at least 5 characters")
1925

2026
class UserResponse(BaseModel):
2127
id: int
2228
name: str
23-
email: str
29+
email: EmailStr = Field(..., description="Valid email address")
2430
role: str
2531

2632
class RegisterResponse(BaseModel):

backend/services/checkov_service.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import json
1010
import logging
1111
import time
12-
from datetime import datetime
12+
from datetime import datetime, timezone
1313

1414
import google.generativeai as genai
1515
from google.api_core import exceptions
@@ -449,7 +449,7 @@ def run_checkov_scan(user_id, input_type, content=None, file_path=None, repo_url
449449

450450
result = run_checkov_on_single_file(temp_file_path)
451451
result.setdefault("meta", {})
452-
result["meta"]["executed_at"] = datetime.utcnow().isoformat() + "Z"
452+
result["meta"]["executed_at"] = datetime.now(timezone.utc).isoformat() + "Z"
453453

454454
files_to_save = [(clean_path(temp_file_path), content)]
455455
scan_id = save_scan_history(user_id, result, input_type, files_to_save=files_to_save)
@@ -464,7 +464,7 @@ def run_checkov_scan(user_id, input_type, content=None, file_path=None, repo_url
464464

465465
result = run_checkov_on_dir(file_path, is_file=True)
466466
result.setdefault("meta", {})
467-
result["meta"]["executed_at"] = datetime.utcnow().isoformat() + "Z"
467+
result["meta"]["executed_at"] = datetime.now(timezone.utc).isoformat() + "Z"
468468

469469
files_to_save = [(clean_path(file_path), file_content)]
470470
scan_id = save_scan_history(user_id, result, input_type, files_to_save=files_to_save)
@@ -496,7 +496,7 @@ def run_checkov_scan(user_id, input_type, content=None, file_path=None, repo_url
496496

497497
result = run_checkov_on_dir(temp_dir, is_file=False)
498498
result.setdefault("meta", {})
499-
result["meta"]["executed_at"] = datetime.utcnow().isoformat() + "Z"
499+
result["meta"]["executed_at"] = datetime.now(timezone.utc).isoformat() + "Z"
500500

501501
scan_id = save_scan_history(user_id, result, input_type, files_to_save=files_found)
502502
return {"scan_id": scan_id, "results": result}
@@ -531,7 +531,7 @@ def remove_readonly(func, path, _):
531531

532532
result = run_checkov_on_dir(temp_dir, is_file=False)
533533
result.setdefault("meta", {})
534-
result["meta"]["executed_at"] = datetime.utcnow().isoformat() + "Z"
534+
result["meta"]["executed_at"] = datetime.now(timezone.utc).isoformat() + "Z"
535535

536536
scan_id = save_scan_history(user_id, result, input_type, repo_url=repo_url, files_to_save=files_found)
537537
return {"scan_id": scan_id, "results": result}

backend/services/t5_service.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
import logging
33
import torch
4-
from datetime import datetime
4+
from datetime import datetime ,UTC
55
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
66
import google.generativeai as genai
77
from utils.db import db
@@ -87,7 +87,7 @@ def _send_finish_email_t5(to_email: str, user_name: str, executed_at: str):
8787
<p>Thank you for using <strong>SafeOps</strong>!</p>
8888
</div>
8989
<div class="footer">
90-
&copy; {datetime.utcnow().year} SafeOps — Automated Security Platform
90+
&copy; {datetime.now(UTC).year} SafeOps — Automated Security Platform
9191
</div>
9292
</div>
9393
</body>
@@ -148,7 +148,7 @@ def correct_dockerfile(user_id, dockerfile):
148148
explanation = f"[Erreur Gemini] Impossible de générer les explications: {str(e)}"
149149

150150
# Save in DB
151-
executed_at = datetime.utcnow().isoformat() + "Z"
151+
executed_at = datetime.now(UTC).isoformat() + "Z"
152152
result = {
153153
"status": "success",
154154
"correction": fixed_code,

backend/services/user_service.py

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
from flask_bcrypt import Bcrypt
77
from flask_jwt_extended import create_access_token, create_refresh_token
8+
from sqlalchemy.exc import SQLAlchemyError
89
from utils.db import db
910
from models.user import User
1011
from models.pending_user import PendingUser
@@ -48,18 +49,15 @@ def register_user(name, email, password):
4849

4950
name = name.strip()
5051
hashed_password = bcrypt.generate_password_hash(password).decode("utf-8")
51-
52+
# Move duplicate checks OUTSIDE try to avoid catching
53+
if User.query.filter((User.email == email) | (User.name == name)).first():
54+
logger.error(f"User with email {email} or name {name} already exists")
55+
raise ValueError("A user with this email or name already exists")
56+
57+
if PendingUser.query.filter_by(email=email).first():
58+
logger.error(f"Verification request pending for email {email}")
59+
raise ValueError("A verification request for this email is already pending")
5260
try:
53-
# Check if email or name already exists in users
54-
if User.query.filter((User.email == email) | (User.name == name)).first():
55-
logger.error(f"User with email {email} or name {name} already exists")
56-
raise ValueError("A user with this email or name already exists")
57-
58-
# Check if email is pending
59-
if PendingUser.query.filter_by(email=email).first():
60-
logger.error(f"Verification request pending for email {email}")
61-
raise ValueError("A verification request for this email is already pending")
62-
6361
# Store in pending_users
6462
verification_code = generate_verification_code()
6563
expires_at = datetime.now(timezone.utc) + CODE_EXPIRATION
@@ -86,31 +84,30 @@ def register_user(name, email, password):
8684
except Exception as e:
8785
logger.error(f"Registration failed for email {email}: {str(e)}")
8886
db.session.rollback()
89-
raise RuntimeError("Registration failed")
87+
raise
9088

9189
def verify_code(email, code):
9290
"""Verify the code and create a user account."""
9391
if not email or not code:
9492
logger.error("Missing email or verification code")
9593
raise ValueError("Email and verification code are required")
9694

97-
try:
98-
pending_user = PendingUser.query.filter_by(email=email).first()
99-
if not pending_user:
100-
logger.error(f"No pending registration for email {email}")
101-
raise ValueError("No pending registration found for this email")
102-
103-
expires_at_aware = pytz.utc.localize(pending_user.expires_at) if pending_user.expires_at.tzinfo is None else pending_user.expires_at
104-
if datetime.now(timezone.utc) > expires_at_aware:
105-
db.session.delete(pending_user)
106-
db.session.commit()
107-
logger.error(f"Verification code expired for email {email}")
108-
raise ValueError("Verification code has expired")
109-
110-
if code != pending_user.verification_code:
111-
logger.error(f"Invalid verification code for email {email}")
112-
raise ValueError("Invalid verification code")
95+
pending_user = PendingUser.query.filter_by(email=email).first()
96+
if not pending_user:
97+
logger.error(f"No pending registration for email {email}")
98+
raise ValueError("No pending registration found for this email")
99+
if code != pending_user.verification_code:
100+
logger.error(f"Invalid verification code for email {email}")
101+
raise ValueError("Invalid verification code")
102+
expires_at_aware = pytz.utc.localize(
103+
pending_user.expires_at) if pending_user.expires_at.tzinfo is None else pending_user.expires_at
104+
if datetime.now(timezone.utc) > expires_at_aware:
105+
db.session.delete(pending_user)
106+
db.session.commit()
107+
logger.error(f"Verification code expired for email {email}")
108+
raise ValueError("Verification code has expired")
113109

110+
try:
114111
# Move to users
115112
user = User(
116113
name=pending_user.name,
@@ -189,7 +186,7 @@ def login_user(ip, email, password):
189186
logger.error(f"Invalid credentials for email {email}")
190187
raise ValueError("Invalid credentials")
191188

192-
except Exception as e:
189+
except (SQLAlchemyError, OSError) as e:
193190
logger.error(f"Login failed for email {email}: {str(e)}")
194191
db.session.rollback()
195192
raise RuntimeError("Internal server error")
@@ -205,12 +202,11 @@ def set_password(user_id, password):
205202
raise ValueError("Password must be at least 5 characters long")
206203

207204
hashed_password = bcrypt.generate_password_hash(password).decode("utf-8")
208-
205+
user = User.query.get(user_id)
206+
if not user:
207+
logger.error(f"User not found: {user_id}")
208+
raise ValueError("User not found")
209209
try:
210-
user = User.query.get(user_id)
211-
if not user:
212-
logger.error(f"User not found: {user_id}")
213-
raise ValueError("User not found")
214210
user.password = hashed_password
215211
db.session.commit()
216212
logger.info(f"Password set successfully for user_id {user_id}")

0 commit comments

Comments
 (0)