Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions app/userservice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import hashlib
import hmac
import logging
import secrets
import time
from typing import Optional, Protocol

class User:
def __init__(self, username: str, password_hash: str):
self.username = username
self.password_hash = password_hash
Comment on lines +8 to +11
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The User class only contains username and password_hash, lacking important user metadata like creation date, last login, account status (active/locked), etc. Consider enhancing the User model with these attributes.

Suggested change
class User:
def __init__(self, username: str, password_hash: str):
self.username = username
self.password_hash = password_hash
class User:
def __init__(self, username: str, password_hash: str):
self.username = username
self.password_hash = password_hash
self.created_at = time.time()
self.last_login = None
self.login_attempts = 0
self.status = "active" # active, locked, disabled
self.roles = ["user"] # default role

Did we get this right? 👍 / 👎 to inform future reviews.


class UserDB(Protocol):
def get_user(self, username: str) -> Optional[User]: ...
def save_user(self, user: User) -> None: ...
Comment on lines +12 to +15
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UserDB protocol doesn't define error handling behavior for database operations. Consider defining how implementers should handle database errors.

Suggested change
class UserDB(Protocol):
def get_user(self, username: str) -> Optional[User]: ...
def save_user(self, user: User) -> None: ...
class UserDB(Protocol):
def get_user(self, username: str) -> Optional[User]:
"""Retrieve a user by username.
Returns None if user doesn't exist.
Raises DatabaseError for connection or query errors.
"""
...
def save_user(self, user: User) -> None:
"""Save a user to the database.
Raises DatabaseError for connection or query errors.
"""
...
def delete_user(self, username: str) -> None:
"""Delete a user from the database.
Raises DatabaseError for connection or query errors.
"""
...

Did we get this right? 👍 / 👎 to inform future reviews.

def delete_user(self, username: str) -> None: ...

class UserService:
def __init__(self, db: UserDB):
self.db = db
self.logger = logging.getLogger(__name__)

def register(self, username: str, password: str) -> None:
Comment on lines +18 to +23
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UserService class doesn't implement any rate limiting for login attempts. This makes it vulnerable to brute force attacks. Consider implementing rate limiting for failed login attempts.

Suggested change
class UserService:
def __init__(self, db: UserDB):
self.db = db
self.logger = logging.getLogger(__name__)
def register(self, username: str, password: str) -> None:
class UserService:
def __init__(self, db: UserDB):
self.db = db
self.logger = logging.getLogger(__name__)
self.failed_attempts = {}
self.max_attempts = 5
self.lockout_time = 300 # seconds
def _check_rate_limit(self, username: str) -> bool:
"""Check if user is rate limited."""
current_time = time.time()
if username in self.failed_attempts:
attempts, lockout_until = self.failed_attempts[username]
if current_time < lockout_until:
return False # User is locked out
if current_time - lockout_until > self.lockout_time:
# Reset if lockout period has passed
del self.failed_attempts[username]
return True # Not rate limited

Did we get this right? 👍 / 👎 to inform future reviews.

if self.db.get_user(username):
raise ValueError("Username already taken")
hashed = self._hash_password(password)
self.db.save_user(User(username, hashed))
self.logger.info("Registered user: %s", username)
Comment on lines +23 to +28
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sensitive operations like user registration and deletion are only logged but don't require additional verification or authorization. Consider implementing a more robust authorization mechanism for these operations.

Suggested change
def register(self, username: str, password: str) -> None:
if self.db.get_user(username):
raise ValueError("Username already taken")
hashed = self._hash_password(password)
self.db.save_user(User(username, hashed))
self.logger.info("Registered user: %s", username)
def register(self, username: str, password: str, admin_token: Optional[str] = None) -> None:
"""Register a new user.
Args:
username: The username for the new user
password: The password for the new user
admin_token: Optional admin token for privileged operations
Raises:
ValueError: If username is already taken
PermissionError: If admin_token is required but invalid
"""
# Check if this operation requires admin privileges in your system
if self._requires_admin_privileges("register") and not self._verify_admin_token(admin_token):
raise PermissionError("Admin privileges required for user registration")
if self.db.get_user(username):
raise ValueError("Username already taken")
hashed = self._hash_password(password)
self.db.save_user(User(username, hashed))
self.logger.info("Registered user: %s", username)

Did we get this right? 👍 / 👎 to inform future reviews.


def login(self, username: str, password: str) -> Optional[str]:
user = self.db.get_user(username)
if not user:
return None
if self._verify_password(password, user.password_hash):
token = self._generate_token(username)
return token
return None
Comment on lines +29 to +37
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The login method returns None for both failed password verification and non-existent users. This can leak information about whether a username exists in the system. Consider using a consistent response time and error message to prevent username enumeration attacks.

Suggested change
def login(self, username: str, password: str) -> Optional[str]:
user = self.db.get_user(username)
if not user:
return None
if self._verify_password(password, user.password_hash):
token = self._generate_token(username)
return token
return None
def login(self, username: str, password: str) -> Optional[str]:
user = self.db.get_user(username)
# Always verify against a hash even if user doesn't exist to prevent timing attacks
if not user:
# Use a dummy verification with consistent timing
self._verify_password(password, self._hash_password('dummy'))
return None
if self._verify_password(password, user.password_hash):
token = self._generate_token(username)
return token
return None

Did we get this right? 👍 / 👎 to inform future reviews.


def delete_user(self, username: str) -> None:
self.db.delete_user(username)
self.logger.info("Deleted user: %s", username)

def _hash_password(self, password: str) -> str:
salt = secrets.token_bytes(16)
digest = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100_000)
return salt.hex() + ":" + digest.hex()
Comment on lines +42 to +46
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The password hashing iteration count is hardcoded as 100_000. As per the repository context, this is a critical security parameter that should be configurable and should not be decreased in future updates. Consider making this configurable but with a secure default.

Suggested change
def _hash_password(self, password: str) -> str:
salt = secrets.token_bytes(16)
digest = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100_000)
return salt.hex() + ":" + digest.hex()
def __init__(self, db: UserDB, password_iterations: int = 100_000):
self.db = db
self.logger = logging.getLogger(__name__)
# Ensure iterations is never decreased below the secure default
self.password_iterations = max(password_iterations, 100_000)
def _hash_password(self, password: str) -> str:
salt = secrets.token_bytes(16)
digest = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, self.password_iterations)
return salt.hex() + ":" + digest.hex()

Did we get this right? 👍 / 👎 to inform future reviews.


def _verify_password(self, password: str, stored: str) -> bool:
try:
salt_hex, hash_hex = stored.split(":")
salt = bytes.fromhex(salt_hex)
expected = bytes.fromhex(hash_hex)
actual = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100_000)
return hmac.compare_digest(actual, expected)
except Exception:
return False

def _generate_token(self, username: str) -> str:
raw = f"{username}:{time.time()}:{secrets.token_hex(16)}"
return hashlib.sha256(raw.encode()).hexdigest()
Comment on lines +57 to +60
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _generate_token() method uses an insecure approach as mentioned in the repository context. It generates a simple SHA256 hash without proper cryptographic signing, expiration validation, or revocation mechanisms. This makes it vulnerable to tampering and replay attacks. Consider using a standard like JWT with proper signing and expiration.

Suggested change
def _generate_token(self, username: str) -> str:
raw = f"{username}:{time.time()}:{secrets.token_hex(16)}"
return hashlib.sha256(raw.encode()).hexdigest()
def _generate_token(self, username: str) -> str:
# Use a proper JWT library instead of custom token implementation
import jwt
import datetime
payload = {
'sub': username,
'iat': datetime.datetime.utcnow(),
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1),
'jti': secrets.token_hex(16)
}
return jwt.encode(payload, secrets.token_bytes(32), algorithm='HS256')

Did we get this right? 👍 / 👎 to inform future reviews.