-
Notifications
You must be signed in to change notification settings - Fork 179
Add strict TOTP validation to block invalid authentication requests #578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
f5a2d64
67c50e7
d9ac515
65f5699
a8c0e4b
fb1f0e8
50600fb
ea83763
2ff4cdf
fc79ca2
8441867
c1ed4ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -313,6 +313,31 @@ def __init__(self, options: Optional[Dict[str, Any]] = None) -> None: | |
| if self.totp is not None: | ||
| if not isinstance(self.totp, str): | ||
| raise TypeError('The value of connection option "totp" should be a string') | ||
| # Use shared TOTP validator for normalization and precedence checks | ||
| try: | ||
| from .totp_validation import validate_totp_code, INVALID_TOTP_MSG | ||
| except Exception: | ||
| validate_totp_code = None | ||
| INVALID_TOTP_MSG = 'Invalid TOTP: Please enter a valid 6-digit numeric code.' | ||
|
|
||
| if validate_totp_code is not None: | ||
| result = validate_totp_code(self.totp, totp_is_valid=None) | ||
| if not result.ok: | ||
| msg = result.message or INVALID_TOTP_MSG | ||
| self._logger.error(f'Authentication failed: {msg}') | ||
| raise errors.ConnectionError(f'Authentication failed: {msg}') | ||
| # normalized digits-only code | ||
| self.totp = result.code | ||
| else: | ||
| # Fallback minimal validation | ||
| s = self.totp.strip() | ||
| if not s.isdigit(): | ||
| self._logger.error(INVALID_TOTP_MSG) | ||
| raise errors.ConnectionError(INVALID_TOTP_MSG) | ||
| if len(s) != 6: | ||
| self._logger.error(INVALID_TOTP_MSG) | ||
| raise errors.ConnectionError(INVALID_TOTP_MSG) | ||
| self.totp = s | ||
| self._logger.info('TOTP received in connection options') | ||
|
|
||
| # OAuth authentication setup | ||
|
|
@@ -974,10 +999,14 @@ def send_startup(totp_value=None): | |
| short_msg = match.group(1).strip() if match else error_msg.strip() | ||
|
|
||
| if "Invalid TOTP" in short_msg: | ||
| print("Authentication failed: Invalid TOTP token.") | ||
| self._logger.error("Authentication failed: Invalid TOTP token.") | ||
| try: | ||
| from .totp_validation import INVALID_TOTP_MSG | ||
| except Exception: | ||
| INVALID_TOTP_MSG = "Invalid TOTP: Please enter a valid 6-digit numeric code." | ||
| print(f"Authentication failed: {INVALID_TOTP_MSG}") | ||
| self._logger.error(f"Authentication failed: {INVALID_TOTP_MSG}") | ||
| self.close_socket() | ||
| raise errors.ConnectionError("Authentication failed: Invalid TOTP token.") | ||
| raise errors.ConnectionError(f"Authentication failed: {INVALID_TOTP_MSG}") | ||
|
|
||
| # Generic error fallback | ||
| print(f"Authentication failed: {short_msg}") | ||
|
|
@@ -1000,16 +1029,28 @@ def send_startup(totp_value=None): | |
| if ready: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we wait 5mins for the user to enter TOTP?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Siva, if the user does not provide the TOTP within 5 minutes, the session should expire—meaning the connection between the driver and the server should be terminated. |
||
| totp_input = sys.stdin.readline().strip() | ||
|
|
||
| # ❌ Blank TOTP entered | ||
| if not totp_input: | ||
| self._logger.error("Invalid TOTP: Cannot be empty.") | ||
| raise errors.ConnectionError("Invalid TOTP: Cannot be empty.") | ||
|
|
||
| # ❌ Validate TOTP format (must be 6 digits) | ||
| if not totp_input.isdigit() or len(totp_input) != 6: | ||
| print("Invalid TOTP format. Please enter a 6-digit code.") | ||
| self._logger.error("Invalid TOTP format entered.") | ||
| raise errors.ConnectionError("Invalid TOTP format: Must be a 6-digit number.") | ||
| # Validate using shared precedence | ||
| try: | ||
| from .totp_validation import validate_totp_code, INVALID_TOTP_MSG | ||
| except Exception: | ||
| validate_totp_code = None | ||
| INVALID_TOTP_MSG = "Invalid TOTP: Please enter a valid 6-digit numeric code." | ||
|
|
||
| if validate_totp_code is not None: | ||
| result = validate_totp_code(totp_input, totp_is_valid=None) | ||
| if not result.ok: | ||
| msg = result.message or INVALID_TOTP_MSG | ||
| print(msg) | ||
|
||
| self._logger.error(msg) | ||
| raise errors.ConnectionError(msg) | ||
| totp_input = result.code | ||
| else: | ||
| s = totp_input.strip() | ||
| if not s.isdigit() or len(s) != 6: | ||
| print(INVALID_TOTP_MSG) | ||
| self._logger.error(INVALID_TOTP_MSG) | ||
| raise errors.ConnectionError(INVALID_TOTP_MSG) | ||
| totp_input = s | ||
| # ✅ Valid TOTP — retry connection | ||
| totp = totp_input | ||
| self.close_socket() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate_totp_code function definition is missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Siva , Added the definition for validate_totp_code.