-
Notifications
You must be signed in to change notification settings - Fork 0
Create userservice class #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
|
||
| def _generate_token(self, username: str) -> str: | ||
| raw = f"{username}:{time.time()}:{secrets.token_hex(16)}" | ||
| return hashlib.sha256(raw.encode()).hexdigest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _generate_token() method uses an insecure approach as mentioned in the repository context. It generates a simple SHA256 hash without proper cryptographic signing, expiration validation, or revocation mechanisms. This makes it vulnerable to tampering and replay attacks. Consider using a standard like JWT with proper signing and expiration.
| def _generate_token(self, username: str) -> str: | |
| raw = f"{username}:{time.time()}:{secrets.token_hex(16)}" | |
| return hashlib.sha256(raw.encode()).hexdigest() | |
| def _generate_token(self, username: str) -> str: | |
| # Use a proper JWT library instead of custom token implementation | |
| import jwt | |
| import datetime | |
| payload = { | |
| 'sub': username, | |
| 'iat': datetime.datetime.utcnow(), | |
| 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1), | |
| 'jti': secrets.token_hex(16) | |
| } | |
| return jwt.encode(payload, secrets.token_bytes(32), algorithm='HS256') |
Did we get this right? 👍 / 👎 to inform future reviews.
|
|
||
| def login(self, username: str, password: str) -> Optional[str]: | ||
| user = self.db.get_user(username) | ||
| if not user: | ||
| return None | ||
| if self._verify_password(password, user.password_hash): | ||
| token = self._generate_token(username) | ||
| return token | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The login method returns None for both failed password verification and non-existent users. This can leak information about whether a username exists in the system. Consider using a consistent response time and error message to prevent username enumeration attacks.
| def login(self, username: str, password: str) -> Optional[str]: | |
| user = self.db.get_user(username) | |
| if not user: | |
| return None | |
| if self._verify_password(password, user.password_hash): | |
| token = self._generate_token(username) | |
| return token | |
| return None | |
| def login(self, username: str, password: str) -> Optional[str]: | |
| user = self.db.get_user(username) | |
| # Always verify against a hash even if user doesn't exist to prevent timing attacks | |
| if not user: | |
| # Use a dummy verification with consistent timing | |
| self._verify_password(password, self._hash_password('dummy')) | |
| return None | |
| if self._verify_password(password, user.password_hash): | |
| token = self._generate_token(username) | |
| return token | |
| return None |
Did we get this right? 👍 / 👎 to inform future reviews.
| class UserService: | ||
| def __init__(self, db: UserDB): | ||
| self.db = db | ||
| self.logger = logging.getLogger(__name__) | ||
|
|
||
| def register(self, username: str, password: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UserService class doesn't implement any rate limiting for login attempts. This makes it vulnerable to brute force attacks. Consider implementing rate limiting for failed login attempts.
| class UserService: | |
| def __init__(self, db: UserDB): | |
| self.db = db | |
| self.logger = logging.getLogger(__name__) | |
| def register(self, username: str, password: str) -> None: | |
| class UserService: | |
| def __init__(self, db: UserDB): | |
| self.db = db | |
| self.logger = logging.getLogger(__name__) | |
| self.failed_attempts = {} | |
| self.max_attempts = 5 | |
| self.lockout_time = 300 # seconds | |
| def _check_rate_limit(self, username: str) -> bool: | |
| """Check if user is rate limited.""" | |
| current_time = time.time() | |
| if username in self.failed_attempts: | |
| attempts, lockout_until = self.failed_attempts[username] | |
| if current_time < lockout_until: | |
| return False # User is locked out | |
| if current_time - lockout_until > self.lockout_time: | |
| # Reset if lockout period has passed | |
| del self.failed_attempts[username] | |
| return True # Not rate limited |
Did we get this right? 👍 / 👎 to inform future reviews.
|
|
||
| class UserDB(Protocol): | ||
| def get_user(self, username: str) -> Optional[User]: ... | ||
| def save_user(self, user: User) -> None: ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UserDB protocol doesn't define error handling behavior for database operations. Consider defining how implementers should handle database errors.
| class UserDB(Protocol): | |
| def get_user(self, username: str) -> Optional[User]: ... | |
| def save_user(self, user: User) -> None: ... | |
| class UserDB(Protocol): | |
| def get_user(self, username: str) -> Optional[User]: | |
| """Retrieve a user by username. | |
| Returns None if user doesn't exist. | |
| Raises DatabaseError for connection or query errors. | |
| """ | |
| ... | |
| def save_user(self, user: User) -> None: | |
| """Save a user to the database. | |
| Raises DatabaseError for connection or query errors. | |
| """ | |
| ... | |
| def delete_user(self, username: str) -> None: | |
| """Delete a user from the database. | |
| Raises DatabaseError for connection or query errors. | |
| """ | |
| ... |
Did we get this right? 👍 / 👎 to inform future reviews.
| def register(self, username: str, password: str) -> None: | ||
| if self.db.get_user(username): | ||
| raise ValueError("Username already taken") | ||
| hashed = self._hash_password(password) | ||
| self.db.save_user(User(username, hashed)) | ||
| self.logger.info("Registered user: %s", username) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sensitive operations like user registration and deletion are only logged but don't require additional verification or authorization. Consider implementing a more robust authorization mechanism for these operations.
| def register(self, username: str, password: str) -> None: | |
| if self.db.get_user(username): | |
| raise ValueError("Username already taken") | |
| hashed = self._hash_password(password) | |
| self.db.save_user(User(username, hashed)) | |
| self.logger.info("Registered user: %s", username) | |
| def register(self, username: str, password: str, admin_token: Optional[str] = None) -> None: | |
| """Register a new user. | |
| Args: | |
| username: The username for the new user | |
| password: The password for the new user | |
| admin_token: Optional admin token for privileged operations | |
| Raises: | |
| ValueError: If username is already taken | |
| PermissionError: If admin_token is required but invalid | |
| """ | |
| # Check if this operation requires admin privileges in your system | |
| if self._requires_admin_privileges("register") and not self._verify_admin_token(admin_token): | |
| raise PermissionError("Admin privileges required for user registration") | |
| if self.db.get_user(username): | |
| raise ValueError("Username already taken") | |
| hashed = self._hash_password(password) | |
| self.db.save_user(User(username, hashed)) | |
| self.logger.info("Registered user: %s", username) |
Did we get this right? 👍 / 👎 to inform future reviews.
| class User: | ||
| def __init__(self, username: str, password_hash: str): | ||
| self.username = username | ||
| self.password_hash = password_hash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The User class only contains username and password_hash, lacking important user metadata like creation date, last login, account status (active/locked), etc. Consider enhancing the User model with these attributes.
| class User: | |
| def __init__(self, username: str, password_hash: str): | |
| self.username = username | |
| self.password_hash = password_hash | |
| class User: | |
| def __init__(self, username: str, password_hash: str): | |
| self.username = username | |
| self.password_hash = password_hash | |
| self.created_at = time.time() | |
| self.last_login = None | |
| self.login_attempts = 0 | |
| self.status = "active" # active, locked, disabled | |
| self.roles = ["user"] # default role |
Did we get this right? 👍 / 👎 to inform future reviews.
|
|
||
| def _hash_password(self, password: str) -> str: | ||
| salt = secrets.token_bytes(16) | ||
| digest = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100_000) | ||
| return salt.hex() + ":" + digest.hex() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The password hashing iteration count is hardcoded as 100_000. As per the repository context, this is a critical security parameter that should be configurable and should not be decreased in future updates. Consider making this configurable but with a secure default.
| def _hash_password(self, password: str) -> str: | |
| salt = secrets.token_bytes(16) | |
| digest = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100_000) | |
| return salt.hex() + ":" + digest.hex() | |
| def __init__(self, db: UserDB, password_iterations: int = 100_000): | |
| self.db = db | |
| self.logger = logging.getLogger(__name__) | |
| # Ensure iterations is never decreased below the secure default | |
| self.password_iterations = max(password_iterations, 100_000) | |
| def _hash_password(self, password: str) -> str: | |
| salt = secrets.token_bytes(16) | |
| digest = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, self.password_iterations) | |
| return salt.hex() + ":" + digest.hex() |
Did we get this right? 👍 / 👎 to inform future reviews.
No description provided.