4949import signal
5050import select
5151import sys
52+ import unicodedata
5253from collections import deque
5354from struct import unpack
5455
8889 warnings .warn (f"Cannot get the login user name: { str (e )} " )
8990
9091
92+ # TOTP validation utilities (client-side)
93+ class TotpValidationResult (NamedTuple ):
94+ ok : bool
95+ code : str
96+ message : str
97+
98+
99+ INVALID_TOTP_MSG = 'Invalid TOTP: Please enter a valid 6-digit numeric code.'
100+
101+
102+ def validate_totp_code (raw_code : str , totp_is_valid = None ) -> TotpValidationResult :
103+ """Validate and normalize a user-supplied TOTP value.
104+
105+ Precedence:
106+ 1) Trim & normalize input (strip spaces and separators; normalize full-width digits)
107+ 2) Empty check -> "Enter your 6-digit code"
108+ 3) Length check -> "Code must be 6 digits"
109+ 4) Numeric-only check -> "Code can contain digits only"
110+
111+ Returns TotpValidationResult(ok, code, message). On success, `code` is a 6-digit ASCII string.
112+ `totp_is_valid` is reserved for optional server-side checks and ignored here.
113+ """
114+ try :
115+ s = raw_code if raw_code is not None else ''
116+ # Normalize Unicode (convert full-width digits etc. to ASCII)
117+ s = unicodedata .normalize ('NFKC' , s )
118+ # Strip leading/trailing whitespace
119+ s = s .strip ()
120+ # Remove common separators inside the code
121+ # Spaces, hyphens, underscores, dots, and common dash-like characters
122+ separators = {' ' , '\t ' , '\n ' , '\r ' , '\f ' , '\v ' , '-' , '_' , '.' ,
123+ '\u2012 ' , '\u2013 ' , '\u2014 ' , '\u2212 ' , '\u00B7 ' , '\u2027 ' , '\u30FB ' }
124+ # Replace all occurrences of separators
125+ for sep in list (separators ):
126+ s = s .replace (sep , '' )
127+
128+ # Empty check
129+ if s == '' :
130+ return TotpValidationResult (False , '' , 'Enter your 6-digit code' )
131+
132+ # Length check
133+ if len (s ) != 6 :
134+ return TotpValidationResult (False , '' , 'Code must be 6 digits' )
135+
136+ # Numeric-only check
137+ if not s .isdigit ():
138+ return TotpValidationResult (False , '' , 'Code can contain digits only' )
139+
140+ # All good
141+ return TotpValidationResult (True , s , '' )
142+ except Exception :
143+ # Fallback generic error
144+ return TotpValidationResult (False , '' , INVALID_TOTP_MSG )
145+
146+
91147def connect (** kwargs : Any ) -> Connection :
92148 """Opens a new connection to a Vertica database."""
93149 return Connection (kwargs )
@@ -313,31 +369,14 @@ def __init__(self, options: Optional[Dict[str, Any]] = None) -> None:
313369 if self .totp is not None :
314370 if not isinstance (self .totp , str ):
315371 raise TypeError ('The value of connection option "totp" should be a string' )
316- # Use shared TOTP validator for normalization and precedence checks
317- try :
318- from .totp_validation import validate_totp_code , INVALID_TOTP_MSG
319- except Exception :
320- validate_totp_code = None
321- INVALID_TOTP_MSG = 'Invalid TOTP: Please enter a valid 6-digit numeric code.'
322-
323- if validate_totp_code is not None :
324- result = validate_totp_code (self .totp , totp_is_valid = None )
325- if not result .ok :
326- msg = result .message or INVALID_TOTP_MSG
327- self ._logger .error (f'Authentication failed: { msg } ' )
328- raise errors .ConnectionError (f'Authentication failed: { msg } ' )
329- # normalized digits-only code
330- self .totp = result .code
331- else :
332- # Fallback minimal validation
333- s = self .totp .strip ()
334- if not s .isdigit ():
335- self ._logger .error (INVALID_TOTP_MSG )
336- raise errors .ConnectionError (INVALID_TOTP_MSG )
337- if len (s ) != 6 :
338- self ._logger .error (INVALID_TOTP_MSG )
339- raise errors .ConnectionError (INVALID_TOTP_MSG )
340- self .totp = s
372+ # Validate using local validator
373+ result = validate_totp_code (self .totp , totp_is_valid = None )
374+ if not result .ok :
375+ msg = result .message or INVALID_TOTP_MSG
376+ self ._logger .error (f'Authentication failed: { msg } ' )
377+ raise errors .ConnectionError (f'Authentication failed: { msg } ' )
378+ # normalized digits-only code
379+ self .totp = result .code
341380 self ._logger .info ('TOTP received in connection options' )
342381
343382 # OAuth authentication setup
@@ -999,10 +1038,6 @@ def send_startup(totp_value=None):
9991038 short_msg = match .group (1 ).strip () if match else error_msg .strip ()
10001039
10011040 if "Invalid TOTP" in short_msg :
1002- try :
1003- from .totp_validation import INVALID_TOTP_MSG
1004- except Exception :
1005- INVALID_TOTP_MSG = "Invalid TOTP: Please enter a valid 6-digit numeric code."
10061041 print (f"Authentication failed: { INVALID_TOTP_MSG } " )
10071042 self ._logger .error (f"Authentication failed: { INVALID_TOTP_MSG } " )
10081043 self .close_socket ()
@@ -1022,35 +1057,20 @@ def send_startup(totp_value=None):
10221057
10231058 # ✅ If TOTP not provided initially, prompt only once
10241059 if not totp :
1025- timeout_seconds = 30 # 5 minutes timeout
1060+ timeout_seconds = 300 # 5 minutes timeout
10261061 try :
10271062 print ("Enter TOTP: " , end = "" , flush = True )
10281063 ready , _ , _ = select .select ([sys .stdin ], [], [], timeout_seconds )
10291064 if ready :
10301065 totp_input = sys .stdin .readline ().strip ()
10311066
1032- # Validate using shared precedence
1033- try :
1034- from .totp_validation import validate_totp_code , INVALID_TOTP_MSG
1035- except Exception :
1036- validate_totp_code = None
1037- INVALID_TOTP_MSG = "Invalid TOTP: Please enter a valid 6-digit numeric code."
1038-
1039- if validate_totp_code is not None :
1040- result = validate_totp_code (totp_input , totp_is_valid = None )
1041- if not result .ok :
1042- msg = result .message or INVALID_TOTP_MSG
1043- print (msg )
1044- self ._logger .error (msg )
1045- raise errors .ConnectionError (msg )
1046- totp_input = result .code
1047- else :
1048- s = totp_input .strip ()
1049- if not s .isdigit () or len (s ) != 6 :
1050- print (INVALID_TOTP_MSG )
1051- self ._logger .error (INVALID_TOTP_MSG )
1052- raise errors .ConnectionError (INVALID_TOTP_MSG )
1053- totp_input = s
1067+ # Validate using local precedence-based validator
1068+ result = validate_totp_code (totp_input , totp_is_valid = None )
1069+ if not result .ok :
1070+ msg = result .message or INVALID_TOTP_MSG
1071+ self ._logger .error (msg )
1072+ raise errors .ConnectionError (msg )
1073+ totp_input = result .code
10541074 # ✅ Valid TOTP — retry connection
10551075 totp = totp_input
10561076 self .close_socket ()
0 commit comments